Kafka-Sender分析

这篇具有很好参考价值的文章主要介绍了Kafka-Sender分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

通过前面的分析我们知道,主线程通过KafkaProducer.send方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络I/O操作是由Sender线程统一进行的。

我们先来了解一下Sender线程发送消息的整个流程:

首先,它根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息,即上一节介绍的RecordAccumulatorready方法;

然后,根据生产者与各个节点的连接情况由NetworkClient管理,过滤Node节点;

之后,生成相应的请求,这里要特别注意的是,每个Node节点只生成一个请求;

最后,调用NetWorkClient将请求发送出去。图展示了Sender依赖的三个比较关键的组件。

Kafka-Sender分析,队列,kafka,linq,分布式
Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run方法调用了其重载run(long),这才是Sender线程的核心方法,也是发送消息的关键流程,其时序图如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
下面简述run(long)方法的流程:

  1. 从Metadata获取Kafka集群元数据。
  2. 调用RecordAccumulator.ready方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送消息,返回ReadyCheckResult对象。
  3. 如果ReadyCheckResult中标识有unknownLeadersExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
  4. 针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready方法,目的是检查网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
  5. 针对经过步骤4处理后的readyNodes集合,调用RecordAccumulator.drain方法,获取待发送的消息集合。
  6. 调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时的消息。
    其代码逻辑是,遍历RecordAccumulator中保存的全部RecordBatch,调用RecordBatch.maybeExpire()方法进行处理。
    如果已超时,则调用RecordBatch.done()方法,其中会触发自定义Callback,并将RecordBatch从队列中移除,释放ByteBuffer空间。
  7. 调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
  8. 调用NetWorkClient.send方法,将ClientRequest写入KafkaChannel的send字段。
  9. 调用NetWorkClient.poll方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,同时,还会处理服务端发回的响应、处理超时的请求、调用用户自定义Callback等。

创建请求

在Protocol类中罗列了全部请求和响应的格式,请求和响应有多个不同的版本。

首先,介绍生产者向服务端追加消息时使用的请求和响应,它们分别是ProduceRequest(Version:2)和Produce Response(Version:2),结构如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
Produce Request(Version:2)的请求头和请求体各个字段的含义如表所示.
Kafka-Sender分析,队列,kafka,linq,分布式
Produce Response(Version:2)各个字段与含义如表所示.
Kafka-Sender分析,队列,kafka,linq,分布式
Sender.sendProduceRequests()方法的功能是将待发送的消息封装成ClientRequest。

不管一个Node对应有多少个RecordBatch,也不管这些RecordBatch是发给几个分区的,每个Node至多生成一个ClientRequest对象。创建ClientRequest的核心逻辑如下:

  1. 将一个Nodeld对应的RecordBatch集合,重新整理为produceRecordsByPartition(Map<TopicPartition,ByteBuffer>) 和recordsByPartition(Map<TopicPartition,RecordBatch>)两个集合。
  2. 创建RequestSend,RequestSend是真正通过网络I/O发送的对象,其格式符合上面描述的Produce Request(Version:2)协议,其中有效负载就是produceRecordsByPartition中的数据。
  3. 创建RequestCompletionHandler作为回调对象。
  4. 将RequestSend对象和RequestCompletionHandler对象封装进ClientRequest对象中,并将其返回。

下面来看Sender.sendProduceRequests()方法的具体实现:

Kafka-Sender分析,队列,kafka,linq,分布式

KSelector

在介绍NetworkClient之前,我们先来了解NetworkClient的整个结构,以及其依赖的他组件,如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
需要注意的是,图中的Selector的类型并不是java.nio.channels.Selector,而是
org.apache.kafka.common.network.Selector,为了方便区分和描述,将其简称为KSelect。

KSelector使用NIO异步非阻塞模式实现网络I/O操作,KSelector使用一个单独的线程可以管理多条网络连接上的连接、读、写等操作。

下面介绍KSelector的核心字段和方法,如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
下面先介绍KSelector的字段。

  • nioSelector:java.nio.channels.Selector类型,用来监听网络I/O事件。
  • channels:HashMap<String,KafkaChannel>类型,维护了Nodeld与KafkaChannel之间的映射关系,表示生产者客户端与各个Node之间的网络连接。
    KafkaChannel是在SocketChannel上的又一层封装,如图所示,其中Send和NetworkReceive分别表示读和写时用的缓存,底层通过ByteBuffer实现,TransportLayer封装SocketChannel及SelectionKey,TransportLayer根据网络协议的不同,提供不同的子类,而对KafkaChannel提供统一的接口,这是策略模式很好的应用.

Kafka-Sender分析,队列,kafka,linq,分布式

  • completedSends:记录已经完全发送出去的请求。
  • completedReceives:记录已经完全接收到的请求。
  • stagedReceives:暂存一次OP_READ事件处理过程中读取到的全部请求。当一次OP_READ事件处理完成之后,会将stagedReceives集合中的请求保存到completeReceives集合中。
  • disconnected、connected:记录一次poll过程中发现的断开的连接和新建立的连接。
  • failedSends:记录向哪些Node发送的请求失败了。
  • channelBuilder:用于创建KafkaChannel的Builder。根据不同配置创建不同的TransportLayer的子类,然后创建KafkaChannel。其创建的KafkaChannel封装的是PlaintextTransportLayer。
  • IruConnections:LinkedHashMap类型,用来记录各个连接的使用情况,并据此关闭空闲时间超connectionsMaxldleNanos的连接。

下面介绍KSelector的核心方法。KSelector.connect方法主要负责创建KafkaChannel,并添加到channels集合中保存。其代码如下:

Kafka-Sender分析,队列,kafka,linq,分布式
KSelector.send方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。

在下次调用KSelector.poll时,才会将RequestSend对象发送出去。如果此KafkaChannel的send字段上还保存着一个未完全发送成功的RequestSend请求,为防止覆盖数据,则会抛出异常。也就是说,每个KafkaChannel一次poll过程中只能发送一个Send请求。

KSelectorpoll方法真正执行网络VO的地方,它会调用nioSelector.select方法等待VO事件发生。

当Channel可写时,发送KafkaChannel.send字段(切记,一次最多只发送一个RequestSend,有时候一个RequestSend也发送不完,需要多次poll才能发送完成);

Channel可读时,读取数据到KafkaChannel.receive,读取一个完整的NetworkReceive后,会将其缓存到stagedReceives中,当一次pollSelectionKeys完成后会将stagedReceives中的数据转移到completedReceives。

最后调用maybeCloseOldestConnection方法,根据IruConnections记录和connectionsMaxldleNanos最大空闲时间,关闭长期空闲的连接。

下面是KSelector.poll方法的代码:

Kafka-Sender分析,队列,kafka,linq,分布式
KSelector.pollSelectionKeys()方法是处理I/O操作的核心方法,其中会分别处理OP_CONNECT、OP_READ、OP_WRITE事件,并且会检测连接状态。下面是其代码:

Kafka-Sender分析,队列,kafka,linq,分布式
最终,读写操作还是交给了KafkaChannel,下面来分析其相关的方法:

Kafka-Sender分析,队列,kafka,linq,分布式

InFlightRequests

InFlightRequests队列的主要作用是缓存了已经发出去但没收到响应的ClientRequest.其底层是通过一个Map<String,Deque>对象实现的,key是Nodeld,value是发送到对应Node的ClientRequest对象集合。

InFlightRequests提供了很多管理这个缓存队列的方法,还通过配置参数,限制了每个连接最多缓存的ClientRequest个数。

InFlightRequests的结构如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
InFlightRequests.canSendMore()方法比较重要,NetworkClient调用此方法是用于判断是否可以向指定Node发送请求的条件之一,其代码如下:

Kafka-Sender分析,队列,kafka,linq,分布式
此外,队头的消息与对应KafkaChannel.send字段指向的是同一个消息,为了避免未发送的消息被覆盖,也不能让KafkaChannel.send字段指向新请求。最后queue.size<this.maxInFlightRequestsPerConnection)条件则是为了判断InFlightRequests队列中是否堆积过多请求。如果Node已经堆积了很多未响应的请求,说明这个节点负载可能较大或是网络连接有问题,继续向其发送请求,则可能导致请求超时。

MetadataUpdater

MetadataUpdater接口是一个辅助NetworkClient更新的Metadata的接口,它有两个实现类,如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
ManualMetadataUpdater是个空实现,DefaultMetadataUpdater是NetworkClient使用的默认实现,下面介绍其三个字段。

  • metadata:指向记录了集群元数据的Metadata对象。
  • metadataFetchlnProgress:用来标识是否已经发送了MetadataRequest请求更新Metadata,如果已经发送,则没必要重复发送。
  • lastNoNodeAvailableMs:当检测到没有可用节点时,会用此字段记录时间戳。
    maybeUpdate方法是DefaultMetadataUpdater的核心方法,用来判断当前的Metadata中保存的集群元数据是否需要更新。首先检测metadataFetchlnProgress字段,如果没发送,满足下面任一条件即可更新:
  • Metadata.needUpdate字段被设置为true,且退避时间已到。
  • 长时间没更新,默认5分钟更新一次。
    如果需要更新,则发送MetadataRequest请求,MetadataRequest请求的格式比较简单,其消息头部包含ApiKeys.METADATA标识,消息体中包含Topic集合表示需要获取元数据的Topic,如果Topic集合为null则表示请求全部Topic的元数据。MetadataResponse的格式略显复杂,如图所示。

Kafka-Sender分析,队列,kafka,linq,分布式
MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求。

这里的负载大小是通过每个Node在InFlightRequests队列中未确认的请求决定的,未确认请求越多则认为负载越大。

剩余的步骤与普通请求的发送方式一样,先将请求添加到InFlightRequests队列中,然后设置到KafkaChannel的send字段中,通过KSelector.poll方法将MetadataRequest请求发送出去。下面是DefaultMetadataUpdater.maybeUpdate()方法的具体代码:

Kafka-Sender分析,队列,kafka,linq,分布式
Kafka-Sender分析,队列,kafka,linq,分布式
在收到MetadataResponse之后,会先调用MetaUpdater.handleSuccessfulResponse方法检测是否为MetadataResponse,如果是,则调用handleResponse()解析响应,并构造Cluster对象更新Metadata.cluster字段。

注意,Cluster是不可变对象,所以更新集群元数据的方式是:创建新的Cluster对象,并覆盖Metadata.cluster字段。具体代码如下:
Kafka-Sender分析,队列,kafka,linq,分布式

NetworkClient

NetworkClient中所有连接的状态由ClusterConnectionStates管理,它底层使用Map<String,NodeConnectionState>实现,key是Nodeld,value是NodeConnectionState对象,其中使用ConnectionState枚举表示连接状态,还记录了最近一次尝试连接的时间戳。

前面已经介绍完了NetworkClient依赖的组件,下面来看一下NetworkClient的实现。NetworkClient是一个通用的网络客户端实现,不只用于生产者发送消息,也可以用于消费者消费消息以及服务端Broker之间的通信。
下面介绍NetworkClient的核心方法。NetworkClient.ready方法用来检查Node是否准备好接收数据。首先通过NetworkClientisReady方法检查是否可以向一个Node发送请求,需要符合以下三个条件,则表示Node已准备好:

  • Metadata并未处于正在更新或需要更新的状态。
  • 已经成功建立连接且连接正常connectionStates.isConnected(node)。
  • InFlightRequests.canSendMore()返回true。
    如果NetworkClient.isReady返回false,且满足下面两个条件,则会调用initiateConnect()方法发起连接。
  • 连接不能是CONNECTING状态,必须是DISCONNECTED。
  • 为了避免网络拥塞,重连不能太频繁,两次重试之间的时间差必须大于重试的退避时间,由reconnectBackoffMs字段指定。

NetworkClient.initiateConnect方法会修改在ClusterConnectionStates中的连接状态,并调用Selectorconnect()方法发起连接。

之后调用Selector.pollSelectionKeys()方法时,判断连接是否建立。如果建立成功,则会将ConnectionState设置为CONNECTED。

NetworkClient.send方法主要是将请求设置到KafkaChannel.send字段,同时将请求添加到InFlightRequests队列中等待响应。

NetworkClient.poll()方法调用KSelector.poll进行网络I/O(参考KSelector小节),并使用handle*()方法对KSelector.poll产生的各种数据和队列进行处理。

Kafka-Sender分析,队列,kafka,linq,分布式
下面来看一下handle*()方法的处理逻辑:

  • handleCompletedSends()方法:首先,InFlightRequests保存的是已发送但没收到响应的请求,completedSends保存的是最近一次poll方法中发送成功的请求,所以completedSends列表与InFlightRequests中对应队列的最后一个请求应该是一致的,如图所示。
    Kafka-Sender分析,队列,kafka,linq,分布式
    handleCompletedSends()方法会遍历completeSends,如果发现不需要响应的请求,则将其从InFlightRequests中删除,并向responses列表中添加对应的ClientResponse,在ClientResponse中包含一个指向ClientRequest的引用。handleCompletedSends()方法的代码如下:

Kafka-Sender分析,队列,kafka,linq,分布式

  • handleCompletedReceives()方法: 遍历completedReceives队列,并在InFlightRequests中删除对应的ClientRequest,并向responses列表中添加对应的ClientResponse。如果是Metadata更新请求的响应,则会调用MetadataUpdater中的handleSuccessfulResponse方法,更新Metadata中记录的集Kafka集群元数据。
    Kafka-Sender分析,队列,kafka,linq,分布式
  • handleDisconnections方法:遍历disconnected列表,将InFlightRequests对应节点的ClientRequest清空,对每个请求都创建ClientResponse并添加到responses列表中。这里创建的ClientResponse会标识此响应并不是服务端返回的正常响应,而是因为连接断开产生的。如果是Metadata更新请求的响应,则会调用MetadataUpdater中的handleServerDisconnect方法处理。最后将Metadata.needUpdate设置为true,标识需要更新集群元数据。
    Kafka-Sender分析,队列,kafka,linq,分布式
  • handleConnections方法:遍历connected列表,将ConnectionStates中记录的连接状态修改为CONNECTED。
  • handleTimedOutRequests方法:遍历InFlightRequests集合,获取有超时请求的Node集合,之后的处理逻辑与handleDisconnections)方法一样。

经过一系列handle*()方法处理后,NetworkClient.poll()方法中产生的全部ClientResponse已经被收集到responses列表中。

之后,遍历responses调用每个ClientRequest中记录的回调,如果是异常响应则请求重发,如果是正常响应则调用每个消息的自定义Callback。

在前面的createProduceRequests方法中提到过,这里调用的Callback回调对象,也就是RequestCompletionHandler对象,其onComplete方法最终调用Sender.handleProduceResponse()方法,其逻辑如下:

  1. 如果因为断开连接或异常而产生的响应:
  • 遍历ClientRequest中的RecordBatch,则尝试将RecordBatch重新加入RecordAccumulator,重新发送。
  • 如果异常类型不允许重试或重试次数达到上限,则执行RecordBatch.done方法,此方法会循环调用RecordBatch中每个消息的Callback函数,并将RecordBatch的produceFuture设置为“异常完成”。最后,释放RecordBatch底层的ByteBuffer。
  • 最后,根据异常类型,决定是否设置更新Metadata标志。
  1. 如果是服务端正常的响应或不需要响应的情况:
  • 解析响应。
  • 遍历对应ClientRequest中的RecordBatch,执行RecordBatch.done方法。
  • 释放RecordBatch底层的ByteBuffer。
    下面是Sender.handleProduceResponse()方法的具体代码:

Kafka-Sender分析,队列,kafka,linq,分布式文章来源地址https://www.toymoban.com/news/detail-800714.html

到了这里,关于Kafka-Sender分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(51)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(37)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(42)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(43)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(41)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(50)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(44)
  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(60)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包