问题描述
12月28日,公司测试环境Kafka的task.build.metadata.flow这个topic突然无法消费。
其他topic都正常使用,这个topic只有一个分区,并且只有一个消费者
查找问题原因
首先登录服务器,运行kafka的cli命令,查看消费者组的详情。
# 进入kafka安装目录下的bin目录执行
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group 消费者组名称
由上图可以发现,task.build.metadata.flow这个topic,最新offset是2,但是当前offset只到1,还有一条未消费。
并且该消费者组显示没有客户端正在消费这个topic。
查看项目日志发现以下内容:
分析项目日志,可以看出:
- 消费者是消费过数据的(有一行日志打印)。但是日志中只发现过一次消费记录。
- 在项目没有重新发布,没有消费者增加,分区数没有改变的情况下,kafka执行过Rebalance机制。
什么是Rebalance:
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
简单来说,就是把 Topic 中所有的分区,按照一定的规则分配给 Consumer Group 下的所有 consumer。
例如:某 Group 下有 3 个 consumer 实例,它订阅了一个具有 9 个 partition 的 Topic。
正常情况下,kafka 会为每个 Consumer 平均的分配 3 个分区。这个分配的过程就是 Rebalance。
触发 Rebalance 的时机:
1. Consumer Group 中的 Consumer 数量发生变化时。
2. 订阅的 Topic 数量发生变化时。
3. 订阅 Topic 的分区发生变化时。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。
但是 Rebalance 过程对 consumer group 会造成比较严重的影响。
在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成
解决问题
首先,日志中显示项目曾经消费过这个topic,但是只消费一次,后续不再消费,并且之后kafka发生Rebalance。结合使用kafka cli命令查看的消费者组详情,分析出:
消费者第一次可以消费数据,但是由于某种原因,消费后,消费者下线,然后触发了Rebalance机制,导致后续使用cli查看消费者详情的时候,看不到这个topic有客户端。
基于这个思路,我们把项目中消费者的代码全部注释,只留一行日志记录。然后发布项目后,再次生产一条消息查看是否能够消费。
根据多次测试结果发现消费是正常的。
那么问题应该就出现在消费者的代码中了,应该是代码中的操作影响到了kafka,导致kafka进行Rebalance。
查阅Rebalance相关资料发现:
原文出处:https://blog.csdn.net/qq_21383435/article/details/108720155
因为涉及到的参数原理较多,这里简单描述下:
因为kafka要保证消费者一直在正常消费,所以加了心跳,超时等机制,用来判断消费者是否正常。
由于我们的消费者代码执行太慢,超时了,导致kafka服务端认为消费者已经挂掉,所以将这个消费者踢出。
踢出消费者后,因为消费者数量发生变化,所以触发了Rebalance机制。
所以我们在分析日志时可以看到,消费者是消费过一次数据的,并且消费后一段时间(kafka消费者的超时时间)触发了Rebalance机制,在之后,使用kafka cli命令查看消费者组详情的时候,就看不到对应的消费者了。
至此,kafka某一个topic消费不到数据的问题已经发现原因了。
原因找到了,至于解决方案就很多了:
- 调整超时时间
- 优化代码执行速度,或者改成线程池异步执行
因为我们的消费者代码很简单,只是调用apache atlas的api接口推送数据,但是因为未知原因,atlas接口调用比较慢,不能确定具体执行时长,所以我们选择了方案二。
按照解决方案修改代码后,部署测试,发现已经正常。文章来源:https://www.toymoban.com/news/detail-437984.html
问题延伸
经过此次问题,延伸出一些kafka相关的知识需要巩固。文章来源地址https://www.toymoban.com/news/detail-437984.html
- kafka的Rebalance机制(为什么要有Rebalance机制,什么情况下会触发Rebalance机制)
- kafka消费的超时问题(session.timout.ms,heartbeat.interval.ms,max.poll.interval.ms三个参数)
到了这里,关于Kafka某个Topic无法消费问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!