Kafka篇——Kafka消费者端常见配置,涵盖自动手动提交offset、poll消息细节、健康状态检查、新消费组消费offset规则以及指定分区等技术点配置,全面无死角,一篇文章拿下!

这篇具有很好参考价值的文章主要介绍了Kafka篇——Kafka消费者端常见配置,涵盖自动手动提交offset、poll消息细节、健康状态检查、新消费组消费offset规则以及指定分区等技术点配置,全面无死角,一篇文章拿下!。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

消费者端自动和手动提交offset

一、自动提交offset
1、概念
Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前
主题-分区消费的偏移量

2、自动提交offset和手动提交offset流程图

kafka poll java,Kafka,kafka,分布式

3、在Java中实现配置

kafka poll java,Kafka,kafka,分布式
4、自动提交offset问题
自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,
于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。

二、手动提交offset
1、在Java中配置手动提交

kafka poll java,Kafka,kafka,分布式

2、手动提交两种方式
1)手动同步提交
同步提交的时候,会阻塞等待,等待Kafka集群确认收到这个offset,并返回给消费者一个ACK后,才会继续执行下面的业务代码

kafka poll java,Kafka,kafka,分布式

2)手动异步提交
在消息消费完后,不需要等待集群返回ACK,直接执行之后的逻辑,可以设置回调方法,供Kafka的Broker集群调用

kafka poll java,Kafka,kafka,分布式

消费者poll消息细节

消费者poll消息。就是消费者与Broker建立长连接拉取数据

一、在Java中,默认的配置是一次性poll500条数据

kafka poll java,Kafka,kafka,分布式

二、一次性poll的消息可以根据消费速度的快慢设置,因为如果两次poll的事件超出了30秒的时间间隔,Kafka会认为其消费能力过弱,将其提出消费组。将分区分配给其他消费者
注意:rebalance机制,也就是重新分配的工作是比较消耗性能的,一般尽量避免出现rebalance的情况。一般都会避免出现重新分配的情况,因为重新分配也是比较消耗性能的,所以如果一开始我们发现这个消费者的消费能力比较弱,那么就降低一次拉取消息的数量

kafka poll java,Kafka,kafka,分布式

如果每隔1s内没有poll到任何的消息,则会继续的poll,循环往复,直到poll到消息。如果时间超出了1s,则此次长轮询结束
代码中设置长轮询为1000毫秒:

kafka poll java,Kafka,kafka,分布式

意味着:
1、如果一次poll到了500条,直接执行下面的for循环
2、如果这一次没有poll到500条消息,且在1秒内,那么长轮询继续poll,要么到500条,要么时间到1秒
3、如果多次poll都没有达到500条,且1秒时间到了,那么直接执行for循环

三、消费者发送心跳的时间间隔

kafka poll java,Kafka,kafka,分布式

四、Kafka如果超过10s没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他的消费者
注意:rebalance机制,也就是重新分配的工作是比较消耗性能的,一般尽量避免出现rebalance的情况

kafka poll java,Kafka,kafka,分布式

消费者健康状态检查(心跳机制)

消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约心跳的消费者,将被踢出消费组,出发该消费组的rebalance机制,
将该分区交给消费组里的其他消费者进行消费

kafka poll java,Kafka,kafka,分布式

指定分区消费

下面的代码表示指定某个主题的0号分区进行消费
命令:

kafka poll java,Kafka,kafka,分布式

Java代码实现:

kafka poll java,Kafka,kafka,分布式

消息回溯消费

所谓消息回溯消费,指的是默认情况下会从某个分区的偏移量+1进行消费,但是我们也可以指定从某个主题下的某个分区的最开始进行消费
命令:

kafka poll java,Kafka,kafka,分布式

Java代码实现:

kafka poll java,Kafka,kafka,分布式

控制台:
发现从第0个偏移量的消息也被消费了!

kafka poll java,Kafka,kafka,分布式

指定offset消费

命令:

kafka poll java,Kafka,kafka,分布式

Java代码实现:

kafka poll java,Kafka,kafka,分布式

控制台:
发现从偏移量10的地方开始消费了!

kafka poll java,Kafka,kafka,分布式

从指定时间点消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition分区中找到该offset偏移量之后的消息开始消费

Java代码实现:
下面的案例是从当前时间的前1小时开始消费消息

kafka poll java,Kafka,kafka,分布式

新消费组消费offset规则

当新创建一个消费组去消费分区中的消息的时候,会有两种模式,分别是latest和earliest模式,下面是对两种模式的介绍
latest(默认):只消费自己启动后生产者发送到主题上的消息
earliest:第一次会从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBegining(每次都从头开始)

1、新消费组流程图

kafka poll java,Kafka,kafka,分布式

2、Java中配置新消费组消费方式

kafka poll java,Kafka,kafka,分布式

3、整体代码回顾

kafka poll java,Kafka,kafka,分布式

至此,关于Kafka中消费者相关的配置介绍完毕,内容较多,知识点比较分散,希望大家能够坚持反复学习。

后续还会持续更新,敬请期待~~~文章来源地址https://www.toymoban.com/news/detail-815741.html

到了这里,关于Kafka篇——Kafka消费者端常见配置,涵盖自动手动提交offset、poll消息细节、健康状态检查、新消费组消费offset规则以及指定分区等技术点配置,全面无死角,一篇文章拿下!的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(45)
  • Kafka:消费者参数配置

    maven配置 springboot配置类 配置文件 参数配置列表 属性 说明 bootstrap.servers 向Kafka集群建立初始连接用到的host/port列表。 客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管 是否指定了哪个服务器用作引导。 这个列表仅影响用来发现集群所有服务器的初始

    2024年02月09日
    浏览(48)
  • 关于kafka消费者超时配置

    在Kafka中,消费者超时配置是指消费者在等待服务器响应时的超时时间。如果消费者在超时时间内未收到服务器的响应,它将重新发起请求或执行其他逻辑。 以下是关于Kafka消费者超时配置的一些常见选项: session.timeout.ms :该配置定义了消费者与Kafka集群之间的会话超时时间

    2024年02月16日
    浏览(41)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(42)
  • Kafka消费者常用超时时间配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超时时间(session超时时间)增加成25秒(之前项目设置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息减少为20(之前是默认值500) spring.kafka.consumer.max-poll-records=20 //消息消费超时时间增加为10分钟 spring.kafka.p

    2024年02月03日
    浏览(92)
  • Kafka3.0.0版本——消费者(自动提交 offset)

    官网文档 参数解释 参数 描述 enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 图解分析 消费者自动提交 offset代码 消费者自动提交

    2024年02月09日
    浏览(37)
  • Kafka系列——详解创建Kafka消费者及相关配置

    参考自kafka系列文章——消费者创建与配置 在读取消息之前,需要先创建一个 KafkaConsumer 对象。 创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似——把想要传给消费者的属性放在 Properties 对象里,后面深入讨论所有属性。这里我们只需要使用 3 个必要的属性: bootstrap.

    2024年02月09日
    浏览(42)
  • kafka生产者和消费者配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月12日
    浏览(45)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包