Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

这篇具有很好参考价值的文章主要介绍了Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。

ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。

在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式

  • client:NetworkClient对象。
  • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
    简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。
  • metadata:用于管理Kafka集群元数据。
  • unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
  • unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
  • wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
  • wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
    wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:

  • timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
  • now表示当前时间戳;
  • executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

下面介绍其流程,其中简单回顾一下NetworkClient的功能:

  1. 调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。

    具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。

    若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
    Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式

  2. 计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。

  3. 调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。

  4. 调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式

  1. 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式

  1. 根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。

  2. 再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。

  3. 调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式
分析完poll方法的详细步骤之后,我们下面来看其实现代码:

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。

具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。

poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式

在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。

  • isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。

在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。

当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,队列,kafka,分布式

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。

介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:文章来源地址https://www.toymoban.com/news/detail-803808.html

  • awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
  • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
  • put()方法:向unsent中添加请求。
  • schedule()方法:向delayedTasks队列中添加定时任务。
  • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

到了这里,关于Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka-消费者-KafkaConsumer分析-Rebalance

    在开始介绍Rebalance操作的实现细节之前,我们需要明确在哪几种情况下会触发Rebalance操作: 有新的消费者加入Consumer Group。 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordina

    2024年01月20日
    浏览(29)
  • Kafka-消费者-KafkaConsumer分析-PartitionAssignor

    Leader消费者在收到JoinGroupResponse后,会按照其中指定的分区分配策略进行分区分配,每个分区分配策略就是一个PartitionAssignor接口的实现。图是PartitionAssignor的继承结构及其中的组件。 PartitionAssignor接口中定义了Assignment和Subscription两个内部类。 进行分区分配需要的两方面的数

    2024年01月20日
    浏览(40)
  • 多个消费者订阅一个Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    记录 :466 场景 :一个KafkaProducer在一个Topic发布消息,多个消费者KafkaConsumer订阅Kafka的Topic。每个KafkaConsumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zha

    2024年02月16日
    浏览(32)
  • kafka在创建KafkaConsumer消费者时,发生Exception in thread “main“ org.apache.kafka.common.KafkaException: Faile

    原因:可能是序列化和反序列化没正确使用。将以下代码修改正确再次运行。 将以上代码的 StringDeserializer 反序列化,确认无误!!!

    2024年02月13日
    浏览(41)
  • 保障效率与可用,分析Kafka的消费者组与Rebalance机制

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 防止消息丢失与消息重复——Kafka可靠性分析及优化实践 我们上一期从可靠性分析了消息

    2024年02月06日
    浏览(33)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(33)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(33)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(53)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(34)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包