Kafka-消费者-KafkaConsumer分析-Rebalance

这篇具有很好参考价值的文章主要介绍了Kafka-消费者-KafkaConsumer分析-Rebalance。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在开始介绍Rebalance操作的实现细节之前,我们需要明确在哪几种情况下会触发Rebalance操作:

  1. 有新的消费者加入Consumer Group。
  2. 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为消费者下线。
  3. 有消费者主动退出Consumer Group。
  4. Consumer Group订阅的任一Topic出现分区数量的变化。
  5. 消费者调用unsubscrible()取消对某Topic的订阅。

第一阶段

Rebalance操作的第一步就是查找GroupCoordinator,这个阶段消费者会向Kafka集群中的任意一个Broker发送GroupCoordinatorRequest请求,并处理返回的GroupCoordinatorResponse响应。

GroupCoordinatorRequest消息体的格式比较简单,只包含了Consumer Group的id。GroupCoordinatorResponse消息体包含了错误码(short类型)、coordinator的节点Id(int类型)、GroupCoordinator的host(String类型)、GroupCoordinator的端口号(int类型)。
发送GroupCoordinatorRequest请求的入口是ConsumerCoordinator的ensureCoordinatorReady方法,其流程如图所示。

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式

  1. 首先检测是否需要重新查找GroupCoordinator,主要是检查coordinator字段是否为空以及与GroupCoordinator之间的连接是否正常。

  2. 查找集群负载最低的Node节点,并创建GroupCoordinatorRequest请求。调用client.send方法将请求放入unsent队列中等待发送,并返回RequestFuture对象。返回的RequestFuture对象经过了compose方法适配,原理同HeartbeatCompletionHandler。

  3. 调用ConsumerNetworkClient.poll(future)方法,将GroupCoordinatorRequest请求发送出去。此处使用阻塞的方式发送,直到收到GroupCoordinatorResponse响应或异常完成,才从此方法返回。

  4. 检测检查RequestFuture对象的状态。如果出现RetriableException异常,则调用ConsumerNetworkClient.awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据后跳转到步骤1继续执行。如果不是RetriableException异常则直接报错。

  5. 如果成功找到GroupCoordinator节点,但是网络连接失败,则将其unsent中对应的请求清空,并将coordinator字段置为null,准备重新查找GroupCoordinator,退避一段时间后跳转到步骤1继续执行。

下面介绍处理GroupCoordinatorResponse的相关操作。通过对sendGroupCoordinatorRequest方法的分析我们知道,handleGroupMetadataResponse)方法是处理GroupCoordinatorResponse的入口,其步骤如下:

  1. 调用coordinatorUnknown()检测是否已经找到GroupCoordinator且成功连接。如果是则忽略此GroupCoordinatorResponse,因为在发送GroupCoordinatorRequest时并没有防止重发的机制,可能有多个GroupCoordinatorResponse;否则,继续下面的步骤。
  2. 解析GroupCoordinatorResponse得到服务端GroupCoordinator的信息。
  3. 构建Node对象赋值给coordinator字段,并尝试与GroupCoordinator建立连接。
  4. 启动HeartbeatTask定时任务。
  5. 最后,调用RequestFuture.complete()方法将正常收到GroupCoordinatorResponse的事件传播出去。
  6. 如果GroupCoordinatorResponse中的错误码不为NONE,则调用RequestFuture.raise方法将异常传播出去。最终由ensureCoordinatorReady方法中的步骤4处理。

第二阶段

在成功查找到对应的GroupCoordinator之后进入Join Group阶段。在此阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。先来了解JoinGroupRequest和JoinGroupResponse的消息体格式,如图所示。

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式
了解了JoinGroupRequest和JoinGroupResponse的格式之后,再来分析第二阶段的相关处理流程,其入口函数是ensurePartitionAssignment方法。

ensurePartitionAssignment方法的流程如图所示。

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式

  1. 调用SubscriptionState.partitionsAutoAssigned方法,检测Consumer的订阅是否是AUTO_TOPICS或AUTO_PATTERN。因为USER_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。

  2. 如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。

    在前面提到过,在ConsumerCoordinator的构造函数中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata的快照。这里要更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。

  3. 调用ConsumerCoordinator.needRejoin()方法判断是要发送JoinGroupRequest加入ConsumerGroup,其实现是检测是否使用了AUTO_TOPICS或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。

  4. 调用onJoinPrepare方法进行发送JoinGroupRequest请求之前的准备,做了三件事:一是如果开启了自动提交offset则进行同步提交offset,提交offset的内容后面会详细介绍,此步骤可能会阻塞线程;二是调用注册在SubscriptionState中的ConsumerRebalanceListener上的 回调方法;三是将SubscriptionState的needsPartitionAssignment字段设置为true并收缩groupSubscription集合。

  5. 再次调用needRejoin方法检测,之后调用ensureCoordinatorReady方法检测已经找到GroupCoordinator且与之建立了连接。

  6. 如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent及InFlightRequests的对应队列为空),然后返回步骤5继续执行,主要是为了避免重复发送JoinGroupRequest请求。

  7. 调用sendJoinGroupRequest方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send方法将请求放入unsent中缓存,等待发送。

  8. 在步骤7返回的RequestFuture对象上添加RequestFutureListener。

  9. 调用ConsumerNetworkClient.poll方法发送JoinGroupRequest,这里会阻塞等待,直到收到JoinGroupResponse或出现异常。

  10. 检测RequestFuture.fail。如果出现RetriableException异常则进行重试,其他异常则报错。如果无异常,则整个第二阶段操作完成。

通过前面对JoinGroupRequest发送流程的分析,我们了解到JoinGrouResponse处理流程的入口是JoinGroupResponseHandler:handle()方法,其中还包括了SyncGroupRequest发送的操作。

JoinGrouResponse的处理流程如图所示。

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式

  1. 解析JoinGroupResponse,获取GroupCoordinator分配的memberld、generation等信息,更新到本地。
  2. 消费者根据leaderld检测自己是不是Leader。如果是Leader则进入onJoinLeader方法,如果不是Leader则进入onJoinFollower方法。从上面的流程图也可以看出,onJoinFollower()方法的逻辑是onJoinLeader()方法的子集,下面主要分析onJoinLeader方法。
  3. Leader根据JoinGroupResponse的group_protocol字段指定的Parition分配策略,查找相应的PartitionAssignor对象。
  4. Leader将JoinGroupResponse的members字段进行反序列化,得到ConsumerGroup中全部消费者订阅的Topic。Leader会将这些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower则只关心自己订阅的Topic信息。
  5. 第4步可能有新的Topic添加进来,所以更新Metadata信息。
  6. 待Metadata更新完成后,会在assignmentSnapshot字段中存储一个Metadata快照(即通过Metadata的Listener创建的快照)。
  7. 调用PartitionAssignor.assign()方法进行分区分配。
  8. 将分配结果序列化,保存到Map中返回,其中key是消费者的memberld,value是分配结果序列化后的ByteBuffer。

第三阶段

完成分区分配之后就进入了Synchronizing Group State 阶段,主要逻辑是向GroupCoordinator 发送 SyncGroupRequest 请求并处理 SyncGroupResponse 响应。

先来了解SyncGroupRequest 和 SyncGroupResponse 的消息体格式。

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式
SyncGroupRequest 中各个字段的含义如表

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式
SyncGroupResponse 中各个字段的含义如表

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式
通过前面对onJoinLeader方法分析,我们知道发送 SyncGroupRequest 请求的逻辑紧接在分区分配操作之后,也是在 onJoinLeader方法中完成的。下面是其流程:

  1. 得到序列化后的分区分配结果后,Leader将其封装成 SyncGroupRequest,而Follower形成的SyncGroupRequest中这部分为空集合。
  2. 调用ConsumerNetworkClient.send方法将请求放入unsent集合中等待发送。
    对SyncGroupResponse处理的入口是SyncGroupResponseHandler.handle方法。对于正常完成的情况,解析SyncGroupResponse,从中拿到分区分配结果并将其传递出去;对于出现异常情况,将rejoinNeeded设置为true,并针对不用的错误码进行不同的处理。

从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如图所示。

Kafka-消费者-KafkaConsumer分析-Rebalance,队列,kafka,分布式文章来源地址https://www.toymoban.com/news/detail-809760.html

  1. 在第二阶段Leader开始分配分区之前,Leader使用assignmentSnapshot字段记录了Metadata快照。此时在Leader中,将此快照与最新的Metadata快照进行对比。如果快照不一致则表示分区分配过程中出现了Topic增删或分区数量的变化,则将needsPartitionAssignment置为true,需重新进行分区分配。
  2. 反序列化拿到分配给当前消费者的分区,并添加到SubscriptionStata.assignment集合中,之后消费者会按照此集合指定的分区进行消费,将needsPartitionAssignment置为false。
  3. 调用PartitionAssignor的onAssignment()回调函数,默认是空实现。当用户自定义PartitionAssignor时,可以自定义此方法。
  4. 如果开启了自动提交offset的功能,则重新启动AutoCommitTask定时任务。
  5. 调用SubscriptionState中注册的ConsumerRebalanceListener。
  6. 将needsJoinPrepare重置为true,为下次Rebalance操作做准备。
  7. 重启HeartbeatTask定时任务,定时发送心跳。

到了这里,关于Kafka-消费者-KafkaConsumer分析-Rebalance的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka-消费者-KafkaConsumer分析-PartitionAssignor

    Leader消费者在收到JoinGroupResponse后,会按照其中指定的分区分配策略进行分区分配,每个分区分配策略就是一个PartitionAssignor接口的实现。图是PartitionAssignor的继承结构及其中的组件。 PartitionAssignor接口中定义了Assignment和Subscription两个内部类。 进行分区分配需要的两方面的数

    2024年01月20日
    浏览(51)
  • Kafka-消费者-KafkaConsumer分析-SubscriptionState

    KafkaConsumer从Kafka拉取消息时发送的请求是FetchRequest(具体格式后面介绍),在其中需要指定消费者希望拉取的起始消息的offset。 为了消费者快速获取这个值,KafkaConsumer使用SubscriptionState来追踪TopicPartition与offset对应关系。 图展示了SubscriptionState依赖的类以及其核心字段。 Subscrip

    2024年01月18日
    浏览(47)
  • 多个消费者订阅一个Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    记录 :466 场景 :一个KafkaProducer在一个Topic发布消息,多个消费者KafkaConsumer订阅Kafka的Topic。每个KafkaConsumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zha

    2024年02月16日
    浏览(46)
  • kafka在创建KafkaConsumer消费者时,发生Exception in thread “main“ org.apache.kafka.common.KafkaException: Faile

    原因:可能是序列化和反序列化没正确使用。将以下代码修改正确再次运行。 将以上代码的 StringDeserializer 反序列化,确认无误!!!

    2024年02月13日
    浏览(54)
  • 保障效率与可用,分析Kafka的消费者组与Rebalance机制

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 防止消息丢失与消息重复——Kafka可靠性分析及优化实践 我们上一期从可靠性分析了消息

    2024年02月06日
    浏览(46)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(49)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(86)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(47)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包