在前面介绍了Kafka中Rebalance操作的相关方案和原理。
在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端的GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。
下面我们先来介绍AbstractCoordinator的核心字段,如图所示。
- heartbeat:心跳任务的辅助类,其中记录了两次发送心跳消息的间隔(interval字段)、最近发送心跳的时间(lastHeartbeatSend字段)、最后收到心跳响应的时间(lastHeartbeatReceive字段)、过期时间(timeout字段)、心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法)、检测是否过期的方法(sessionTimeoutExpired()方法)。
- heartbeatTask:HeartbeatTask是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到前面介绍的ConsumerNetworkClient.delayedTasks定时任务队列中。
- groupld:当前消费者所属的Consumer Group的Id。
- client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
- needsJoinPrepare:标记是否需要执行发送JoinGroupRequest请求前的准备操作。
- rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。
下面先简单了解修改其值的地方和含义,如图所示。
上图①处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为false,防止重复发送JoinGroupRequest请求。
②、③、④三处分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费者离开Consumer Group时执行的操作,它们都会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。
- coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
- memberld:服务端GroupCoordinator返回的分配给消费者的唯一Id。
- generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能收到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。
下面是ConsumerCoordinator的核心字段。
-
assignors:PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分区分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。
-
metadata:记录了Kafka集群的元数据。
-
subscriptions:SubscriptionState对象,参考SubscriptionState小节。
-
autoCommitEnabled:是否开启了自动提交offset。
-
autoCommitTask:自动提交offset的定时任务。
-
interceptors:ConsumerInterceptor集合。
-
excludeInternalTopics:标识是否排除内部Topic。
-
metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面几件事。
-
如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。
-
如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。
-
使用metadataSnapshot字段记录变化后的新快照。文章来源:https://www.toymoban.com/news/detail-800854.html
-
-
assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照与当前Metadata是否发生变化。如果发生变化,则要重新进行分区分配。文章来源地址https://www.toymoban.com/news/detail-800854.html
到了这里,关于Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!