Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

这篇具有很好参考价值的文章主要介绍了Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka尚硅谷视频:

10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解,kafka,分布式

     1. producer初始化:加载默认配置,以及配置的参数,开启网络线程

     2. 拦截器拦截

     3. 序列化器进行消息key, value序列化

     4. 进行分区

     5. kafka broker集群 获取metaData

     6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)

     7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient

         RecordBatch ==>RequestClient 发送消息体,

      8. 与分区相同broker建立网络连接,发送到对应broker

 1. send()方法参数producerRecord对象:

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解,kafka,分布式

    关于分区:

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解,kafka,分布式

      a.指定分区,则发送到该分区    

      b.不指定分区,k值没有传入,使用黏性分区(sticky partition

                 第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法   

      c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。

      如 k hash = 5 topic目前的分区数2  则 分区为:1

          k  hash =6  topic目前的分区数2  则 分区为:0

2. KafkaProducer 异步, 同步发送api:

    异步发送:

                    producer.send(producerRecord对象);

    同步发送则send()方法后面.get()Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解,kafka,分布式



kafka 的send方法核心逻辑:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // 拦截器集合。多个拦截对象循环遍历
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }
    
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        // 获取集群信息metadata
        try {
            this.throwIfProducerClosed();
            long nowMs = this.time.milliseconds();

            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
            } catch (KafkaException var22) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var22);
                }

                throw var22;
            }

            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;

            // 序列化器 key序列化
            byte[] serializedKey;
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var21) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
            }
      
            // 序列化器 value序列化
            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var20) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
            }

            // 分区
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
            }

            Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
            // RecordAccumulator.append() 添加数据转 ProducerBatch
            RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                this.partitioner.onNewBatch(record.topic(), cluster, partition);
                partition = this.partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
                }

                interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
                result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }

            if (this.transactionManager != null) {
                this.transactionManager.maybeAddPartition(tp);
            }

            // 判断是否满了,满了唤醒sender , sender继承了runnable
            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

            return result.future;
        } catch (ApiException var23) {
            this.log.debug("Exception occurred during message send:", var23);
            if (tp == null) {
                tp = ProducerInterceptors.extractTopicPartition(record);
            }

            Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
            interceptCallback.onCompletion((RecordMetadata)null, var23);
            this.errors.record();
            this.interceptors.onSendError(record, tp, var23);
            return new FutureFailure(var23);
        } catch (InterruptedException var24) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var24);
            throw new InterruptException(var24);
        } catch (KafkaException var25) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var25);
            throw var25;
        } catch (Exception var26) {
            this.interceptors.onSendError(record, tp, var26);
            throw var26;
        }
    }

  Sender类 run()方法:

 public void run() {
        this.log.debug("Starting Kafka producer I/O thread.");

        while(this.running) {
            try {
                this.runOnce();
            } catch (Exception var5) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
            }
        }

        this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {
            try {
                this.runOnce();
            } catch (Exception var4) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var4);
            }
        }

        while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
            if (!this.transactionManager.isCompleting()) {
                this.log.info("Aborting incomplete transaction due to shutdown");
                this.transactionManager.beginAbort();
            }

            try {
                this.runOnce();
            } catch (Exception var3) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var3);
            }
        }

        if (this.forceClose) {
            if (this.transactionManager != null) {
                this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
                this.transactionManager.close();
            }

            this.log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }

        try {
            this.client.close();
        } catch (Exception var2) {
            this.log.error("Failed to close network client", var2);
        }

        this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }


    void runOnce() {
        if (this.transactionManager != null) {
            try {
                this.transactionManager.maybeResolveSequences();
                if (this.transactionManager.hasFatalError()) {
                    RuntimeException lastError = this.transactionManager.lastError();
                    if (lastError != null) {
                        this.maybeAbortBatches(lastError);
                    }

                    this.client.poll(this.retryBackoffMs, this.time.milliseconds());
                    return;
                }

                this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
                if (this.maybeSendAndPollTransactionalRequest()) {
                    return;
                }
            } catch (AuthenticationException var5) {
                this.log.trace("Authentication exception while processing transactional request", var5);
                this.transactionManager.authenticationFailed(var5);
            }
        }

        long currentTimeMs = this.time.milliseconds();
        // 发送数据
        long pollTimeout = this.sendProducerData(currentTimeMs);
        this.client.poll(pollTimeout, currentTimeMs);
    }

  sendProducerData() :

      最终转换为ClientRequest对象

         ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
         this.client.send(clientRequest, now);
private long sendProducerData(long now) {
        Cluster cluster = this.metadata.fetch();
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
        Iterator iter;
        if (!result.unknownLeaderTopics.isEmpty()) {
            iter = result.unknownLeaderTopics.iterator();

            while(iter.hasNext()) {
                String topic = (String)iter.next();
                this.metadata.add(topic, now);
            }

            this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;

        while(iter.hasNext()) {
            Node node = (Node)iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        this.addToInflightBatches(batches);
        List expiredBatches;
        Iterator var11;
        ProducerBatch expiredBatch;
        if (this.guaranteeMessageOrder) {
            Iterator var9 = batches.values().iterator();

            while(var9.hasNext()) {
                expiredBatches = (List)var9.next();
                var11 = expiredBatches.iterator();

                while(var11.hasNext()) {
                    expiredBatch = (ProducerBatch)var11.next();
                    this.accumulator.mutePartition(expiredBatch.topicPartition);
                }
            }
        }

        this.accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);
        expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);
        if (!expiredBatches.isEmpty()) {
            this.log.trace("Expired {} batches in accumulator", expiredBatches.size());
        }

        var11 = expiredBatches.iterator();

        while(var11.hasNext()) {
            expiredBatch = (ProducerBatch)var11.next();
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);
            if (this.transactionManager != null && expiredBatch.inRetry()) {
                this.transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }

        this.sensors.updateProduceRequestMetrics(batches);
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0L);
        if (!result.readyNodes.isEmpty()) {
            this.log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0L;
        }

        this.sendProduceRequests(batches, now);
        return pollTimeout;
    }

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        Iterator var4 = collated.entrySet().iterator();

        while(var4.hasNext()) {
            Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();
            this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());
        }

    }

    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (!batches.isEmpty()) {
            Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());
            byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();
            Iterator var9 = batches.iterator();

            while(var9.hasNext()) {
                ProducerBatch batch = (ProducerBatch)var9.next();
                if (batch.magic() < minUsedMagic) {
                    minUsedMagic = batch.magic();
                }
            }

            ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
            Iterator var16 = batches.iterator();

            while(var16.hasNext()) {
                ProducerBatch batch = (ProducerBatch)var16.next();
                TopicPartition tp = batch.topicPartition;
                MemoryRecords records = batch.records();
                if (!records.hasMatchingMagic(minUsedMagic)) {
                    records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();
                }

                ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
                if (tpData == null) {
                    tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());
                    tpd.add(tpData);
                }

                tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));
                recordsByPartition.put(tp, batch);
            }

            String transactionalId = null;
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                transactionalId = this.transactionManager.transactionalId();
            }

            ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));
            RequestCompletionHandler callback = (response) -> {
                this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());
            };
            String nodeId = Integer.toString(destination);
            ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
            // this.client 为KafkaClient接口 实现类:NetworkClient对象
            this.client.send(clientRequest, now);
            this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
        }
    }

 NetworkClient send()方法:

 public void send(ClientRequest request, long now) {
        this.doSend(request, false, now);
    }

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        this.ensureActive();
        String nodeId = clientRequest.destination();
        if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        } else {
            AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();

            try {
                NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
                short version;
                if (versionInfo == null) {
                    version = builder.latestAllowedVersion();
                    if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
                        this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
                    }
                } else {
                    version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
                }

                this.doSend(clientRequest, isInternalRequest, now, builder.build(version));
            } catch (UnsupportedVersionException var9) {
                this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});
                ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);
                if (!isInternalRequest) {
                    this.abortedSends.add(clientResponse);
                } else if (clientRequest.apiKey() == ApiKeys.METADATA) {
                    this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));
                }
            }

        }
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
        }

        Send send = request.toSend(header);
        // clientRequest convert InFlightRequest 对象
        InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
        this.inFlightRequests.add(inFlightRequest);
        // nio channel。。。selector 发送消息信息
        //this.selector is Selectable interface  KafkaChannel is implement
        this.selector.send(new NetworkSend(clientRequest.destination(), send));
    }

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。  核心==>

   本文第一张图片

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解,kafka,分布式文章来源地址https://www.toymoban.com/news/detail-672545.html

到了这里,关于Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(53)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(48)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(45)
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(77)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(45)
  • Kafka中的生产者如何处理消息发送失败的情况?

    在Kafka中,生产者可以通过以下方式处理消息发送失败的情况: 同步发送模式(Sync Mode):在同步发送模式下,生产者发送消息后会阻塞等待服务器的响应。如果发送失败,生产者会抛出异常(例如 ProducerRecord 发送异常)或返回错误信息。开发者可以捕获异常并根据需要进行

    2024年02月06日
    浏览(44)
  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    报这个错误是因为kafka里的配置要修改下 在config目录下 server.properties配置文件 这下发送消息就不会一直等待,就可以发送成功了

    2024年02月06日
    浏览(45)
  • [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题

    [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题 Kafka是一个高度可扩展的分布式流平台,用于构建实时数据管道和流处理应用程序。作为一个广泛使用的消息代理系统,Kafka在数据传输方面表现出色,但是在极端情况下,它可能会出现生产者阻塞的问题。这可能会导致

    2024年02月11日
    浏览(49)
  • Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验

    1.1 生产者消息发送流程 1.1.1 发送原理 在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。 batch.size:只有数据积累到bat

    2024年02月09日
    浏览(45)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包