Kafka某个Topic无法消费问题

这篇具有很好参考价值的文章主要介绍了Kafka某个Topic无法消费问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题描述

12月28日,公司测试环境Kafka的task.build.metadata.flow这个topic突然无法消费。

其他topic都正常使用,这个topic只有一个分区,并且只有一个消费者

查找问题原因

首先登录服务器,运行kafka的cli命令,查看消费者组的详情。

# 进入kafka安装目录下的bin目录执行
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group 消费者组名称

Kafka某个Topic无法消费问题
由上图可以发现,task.build.metadata.flow这个topic,最新offset是2,但是当前offset只到1,还有一条未消费。
并且该消费者组显示没有客户端正在消费这个topic。

查看项目日志发现以下内容:
Kafka某个Topic无法消费问题Kafka某个Topic无法消费问题
Kafka某个Topic无法消费问题分析项目日志,可以看出:

  1. 消费者是消费过数据的(有一行日志打印)。但是日志中只发现过一次消费记录。
  2. 在项目没有重新发布,没有消费者增加,分区数没有改变的情况下,kafka执行过Rebalance机制。
什么是Rebalance:
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
简单来说,就是把 Topic 中所有的分区,按照一定的规则分配给 Consumer Group 下的所有 consumer。
例如:某 Group 下有 3 个 consumer 实例,它订阅了一个具有 9 个 partition 的 Topic。
正常情况下,kafka 会为每个 Consumer 平均的分配 3 个分区。这个分配的过程就是 Rebalance。
触发 Rebalance 的时机:
1. Consumer Group 中的 Consumer 数量发生变化时。
2. 订阅的 Topic 数量发生变化时。
3. 订阅 Topic 的分区发生变化时。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。
但是 Rebalance 过程对 consumer group 会造成比较严重的影响。
在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成

解决问题

首先,日志中显示项目曾经消费过这个topic,但是只消费一次,后续不再消费,并且之后kafka发生Rebalance。结合使用kafka cli命令查看的消费者组详情,分析出:

消费者第一次可以消费数据,但是由于某种原因,消费后,消费者下线,然后触发了Rebalance机制,导致后续使用cli查看消费者详情的时候,看不到这个topic有客户端。

基于这个思路,我们把项目中消费者的代码全部注释,只留一行日志记录。然后发布项目后,再次生产一条消息查看是否能够消费。
根据多次测试结果发现消费是正常的。
那么问题应该就出现在消费者的代码中了,应该是代码中的操作影响到了kafka,导致kafka进行Rebalance。

查阅Rebalance相关资料发现:
Kafka某个Topic无法消费问题

原文出处:https://blog.csdn.net/qq_21383435/article/details/108720155

因为涉及到的参数原理较多,这里简单描述下:
因为kafka要保证消费者一直在正常消费,所以加了心跳,超时等机制,用来判断消费者是否正常。
由于我们的消费者代码执行太慢,超时了,导致kafka服务端认为消费者已经挂掉,所以将这个消费者踢出。
踢出消费者后,因为消费者数量发生变化,所以触发了Rebalance机制。
所以我们在分析日志时可以看到,消费者是消费过一次数据的,并且消费后一段时间(kafka消费者的超时时间)触发了Rebalance机制,在之后,使用kafka cli命令查看消费者组详情的时候,就看不到对应的消费者了。

至此,kafka某一个topic消费不到数据的问题已经发现原因了。
原因找到了,至于解决方案就很多了:

  1. 调整超时时间
  2. 优化代码执行速度,或者改成线程池异步执行

因为我们的消费者代码很简单,只是调用apache atlas的api接口推送数据,但是因为未知原因,atlas接口调用比较慢,不能确定具体执行时长,所以我们选择了方案二。

按照解决方案修改代码后,部署测试,发现已经正常。

问题延伸

经过此次问题,延伸出一些kafka相关的知识需要巩固。文章来源地址https://www.toymoban.com/news/detail-437984.html

  1. kafka的Rebalance机制(为什么要有Rebalance机制,什么情况下会触发Rebalance机制)
  2. kafka消费的超时问题(session.timout.ms,heartbeat.interval.ms,max.poll.interval.ms三个参数)

到了这里,关于Kafka某个Topic无法消费问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka消费多个topic的使用

    我们在业务中难免遇到一个kafka消费多个topic的消息,本文帮助大家如何在业务中用一个类消费多个topic消息 配置类1

    2024年02月11日
    浏览(53)
  • kafka如何动态消费新增topic主题

    一、解决痛点 使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务 。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件 二、组件

    2023年04月21日
    浏览(38)
  • 如何查看Kafka的Topic消费情况

    进入kafka安装目录,然后执行以下命令  2.10为Scala版本,0.10.0.2.5.3.0为kafka版本  

    2024年02月10日
    浏览(38)
  • Flink实现同时消费多个kafka topic,并输出到多个topic

    1)代码使用的 flink版本为1.16.1 ,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换; 2)本次编写的两个方案,均只适用于 数据源topic来自同一个集群 ,且kafka消费组相同,暂未研究flink的connect算子join多条流 代码涉及 Hadoop相关环境 ,若无该环境

    2023年04月20日
    浏览(84)
  • 自定义kafka客户端消费topic

    使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。 后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没

    2024年02月02日
    浏览(51)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(53)
  • kafka2.x常用命令:创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费

    原创/朱季谦 接触kafka开发已经两年多,也看过关于kafka的一些书,但一直没有怎么对它做总结,借着最近正好在看《Apache Kafka实战》一书,同时自己又搭建了三台kafka服务器,正好可以做一些总结记录。 本文主要是记录如何在kafka集群服务器上创建topic,查看topic列表、分区、

    2024年02月03日
    浏览(45)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(46)
  • Kafka3.1部署和Topic主题数据生产与消费

    本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 1.安装JDK配置环境变量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目录位置:D:setupapache-zookeeper-3.7.1 3.安装Kafka3.1X 3.1 下载包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解压并进入Kafka目录: 根目录:D:setupkafka3.1.2 3、 编辑

    2024年02月09日
    浏览(36)
  • Kafka - 主题Topic与消费者消息Offset日志记录机制

    可以根据业务类型,分发到不同的Topic中,对于每一个Topic,下面可以有多个分区(Partition)日志文件: kafka 下的Topic的多个分区,每一个分区实质上就是一个队列,将接收到的消息暂时存储到队列中,根据配置以及消息消费情况来对队列消息删除。 可以这么来理解Topic,Partitio

    2024年02月03日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包