Kafka消费者不消费数据

这篇具有很好参考价值的文章主要介绍了Kafka消费者不消费数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景:

工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。

现象:

公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。

排查:

通过查看消费者消费Kafka数据的情况可以看到,数据出现了积压。

Kafka消费者不消费数据

 由于没有设置消费者的参数,所以使用的是默认值max.poll.interval.ms是5分钟、 max.poll.records是500

Kafka消费者不消费数据

 目前积压数据远大于一次拉取消费的500,所以判断是因为消费者无法在等待时间内消费完数据,​Consumer ​Group Coordination消费组判定当前消费者不在消费组内,所以查询消费者状态会出现消费者组不存在消费成员(如图符合判断)

Kafka消费者不消费数据

 目前解决方法:

max.poll.records改小或者 request.timeout.ms改大或者 request.timeout.ms改大,因为目前数据不稳定后续也只能通过数据量进行修改参数调优。重新开始消费,待后续观察。

结论:

由于数据量变大,消费者长时间不再请求数据,未向Group Coordinator发送心跳请求,所以kafka认为消费者已从消费组下线。所以不再进行消费。

学习:

之前知识浅浅了解了Rebalance,但没碰到过。所以借此好好学习一下。

一、什么是Rebalance

  • Rebalance本质上是一种协议, 规定了一个Consumer Group下的所有consumer如何达成一致,来分配订阅Topic的每个分区。

  • Rebalance发生时, 所有的Consumer Group都停止工作, 直到Rebalance 成。

二、触发条件

① 组成员个数发生变化

  • 新的消费者加入到消费组

  • 消费者主动退出消费组

  • 消费者被动下线。比如消费者长时间的GC, 网络延迟导致消费者长时间未向Group Coordinator发送心跳请求, 均会认为该消费者已经下线并踢出(本次问题出现的原因)

② 订阅的Topic的Consumer Group个数发生变化

③ Topic 的分区数发生变化

三、Rebalance的弊端

  1. rebalance的时候消费组内的所有消费者都不能处理消息
  2. 消费组内的消费者越多rebalance时间越长
  3. Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Consumer Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。

四、如何避免Rebalance

从触发条件可以看到,① 、 ② 、③基本都是可以认为尽量避免也就是提前根据数据量规划好消费者数量,主要是① 中的第三个,需要靠kafka的参数去调整

# 心跳相关
session.timeout.ms = 6s
heartbeat.interval.ms = 2s
# 消费数量(默认500)
max.poll.records


# 消费时间(默认300000)
request.timeout.msmax.poll.interval.ms=300000

Kafka消费者不消费数据

Kafka消费者不消费数据

 Kafka消费者不消费数据

Kafka消费者不消费数据

Kafka消费者不消费数据

五、Rebalance过程

Coordinator服务

  • Group Coordinator 是一个服务, 每个 Broker 在启动的时候都会启动一个该服务 Group Coordinator 的作用是用来存储 Group 的相关 Meta 信息, 并将对应 Partition 的 Offset 信息记录到 Kafka 内置 Topic(__consumer_offsets)中

  • Kafka 在0.9之前是基于 Zookeeper 来存储Partition的 offset 信息(consumers/{group}/offsets/{topic}/{partition}), 因为 Zookeeper 并不适用于频繁的写操作, 所以在0.9之后通过内置 Topic 的方式来记录对应 Partition 的 offset。

Rebalance过程分为两步:Join Group和 Sync Group

Join Group

① 概述

  • Join Group 顾名思义就是加入组。

  • 这一步中, 所有成员都向 Coordinator 发送 JoinGroup 请求, 请求加入消费组。

  • 一旦所有成员都发送了 JoinGroup 请求, Coordinator 会从中选择一个 Consumer 担任 Leader 的角色, 并把组成员信息以及订阅信息发给 Consumer Leader。

  • 注意Consumer Leader 和 Coordinator不是一个概念。Consumer Leader负责消费分配方案的制定。

② 流程图

Kafka消费者不消费数据

 Sync Group

① 概述

  • Consumer Leader 开始分配消费方案,即哪个 Consumer 负责消费哪些 Topic 的哪些 Partition。

  • 一旦完成分配,Consumer Leader 会将这个方案封装进SyncGroup请求中发给 Coordinator。

  • 非 Consumer Leader 也会发 SyncGroup 请求, 只是内容为空。

  • Coordinator 接收到分配方案之后会把方案塞进SyncGroup的Response中发给各个Consumer。

  • 这样组内的所有成员就都知道自己应该消费哪些分区了。

② 流程图

Kafka消费者不消费数据

参考:Kafka学习笔记 NO.004 Kafka的Rebalance(重平衡) - 墨天轮文章来源地址https://www.toymoban.com/news/detail-413284.html

到了这里,关于Kafka消费者不消费数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka消费者无法消费数据,解决

    作为一个在项目中边学边用的实习生,真的被昨天还好好的今天就不能消费数据的kafka折磨到了,下面提供一点建议,希望能对大家有所帮助。 //操作前集群都关了 1.首先去kafka-home的config目录下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092    如果有配置liste

    2024年02月17日
    浏览(50)
  • 【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程

    1.1.1 消费者群组         Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。         如上图,主题 T 有 4 个分区,群组中只有一个消费者,则该消费者将收到主题 T1 全部 4 个分区的消息。      

    2024年02月22日
    浏览(48)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题中数据,所下图所示: 注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 1.2、案例代码 代码 1.3、测试 在 Kafka 集群控制台,创建firstTopic主题 在 IDEA中

    2024年02月09日
    浏览(39)
  • Kafka入门,漏消费和重复消费, 消费者事务,数据积压(二十四)

    重复消费:已经消费了数据,但是offset没提交。 漏消费:先提交offset后消费,有可能会造成数据得漏消费 如果向完成consumer端得进准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如MySQ

    2024年02月15日
    浏览(39)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(45)
  • springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据

    在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。 按照官方文档创建一个监听。 官方文档地址 Kafka

    2023年04月15日
    浏览(37)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(36)
  • 大数据开发之Kafka(broker、消费者、eagle监控、kraft模式)

    4.1.1 Zookeeper存储的Kafka的信息 1、查看zookeeper中的kafka节点所存储的信息 启动Zookeeper客户端 通过ls命令列出kafka节点内容 2、zookeeper中存储的kafka信息 在zookeeper的服务端存储的Kafka相关信息: 1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器 2)/kafka/brokers/topics/first/partitions/0/state {“l

    2024年01月21日
    浏览(53)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(44)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包