JAVA面试题分享一百六十二:Kafka消息重复消费问题?

这篇具有很好参考价值的文章主要介绍了JAVA面试题分享一百六十二:Kafka消息重复消费问题?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、原因分析

消息重复消费的根本原因都在于:已经消费了数据,但是offset没有成功提交。

其中很大一部分原因在于发生了再均衡。

1)消费者宕机、重启等。导致消息已经消费但是没有提交offset。

2)消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance(再平衡)。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。

3)消息处理耗时,或者消费者拉取的消息量太多,处理耗时,超过了max.poll.interval.ms的配置时间,导致认为当前消费者已经死掉,触发再均衡。

4)  每次拉取的消息记录数max.poll.records为100,poll最大拉取间隔max.poll.interval.ms为 300s,消息处理过于耗时导致时长大于了这个值,导致再均衡发生重复消费

二、解决方案

由于网络问题,重复消费不可避免,因此,消费者需要实现消费幂等。方案:

1、消息表

2、数据库唯一索引

3、缓存消费过的消息id

4、手动提交office

5、减少每次拉取的消息记录数和增大poll之间的时间间隔

6、拉取到消息之后异步处理(保证成功消费)

提高消费者的处理速度。例如:对消息处理中比较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalance。可根据实际消息速率适当调小max.poll.records的值。   

三、新版kafka的broker幂等性具体实现原理

kafka新版本已经在broker中保证了接收消息的幂等性(比如2.4版本),只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。

kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID对用户完全是透明的。生产者如果重启则会生成新的PID。

Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

四、ACK可靠性保证

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

1、ack应答级别

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,没必要等ISR中的follower全部接收成功。所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

acks参数配置

  • 0:这一操作提供了一个最低的延迟,partition的leader接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据;
  • 1: partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
  • -1(all): partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复

2、常见配置

  • fetch.min.byte:配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量,默认为1B,如果小于这个参数配置的值,就需要进行等待,直到数据量满足这个参数的配置大小。调大可以提交吞吐量,但也会造成延迟
  • fetch.max.bytes,一次拉取数据的最大数据量,默认为52428800B,也就是50M,但是如果设置的值过小,甚至小于每条消息的值,实际上也是能消费成功的
  • fetch.wait.max.ms,若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间,默认是500ms
  • max.poll.records,单次poll调用返回的最大消息记录数,如果处理逻辑很轻量,可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完,默认值为500
  • consumer.poll(100) ,100 毫秒是一个超时时间,一旦拿到足够多的数据(fetch.min.bytes 参数设置),consumer.poll(100)会立即返回 ConsumerRecords<String, String> records。如果没有拿到足够多的数据,会阻塞100ms,但不会超过100ms就会返回
  • max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作(将分区分配给组内其他消费者成员)

若超过这个时间则报如下异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already
 rebalanced and assigned the partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either by increasing the session timeout or by
 reducing the maximum size of batches returned in poll() with max.poll.records. 

即:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间来处理消息。

可以通过增加max.poll.interval.ms来解决这个问题,也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题

max.partition.fetch.bytes:该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,将触发再均衡操作,对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

  1. 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  2. 协调Group成员的行为。

3、poll机制

①:每次poll的消息处理完成之后再进行下一次poll,是同步操作

②:每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移

③:每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息

④:poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒文章来源地址https://www.toymoban.com/news/detail-772596.html

到了这里,关于JAVA面试题分享一百六十二:Kafka消息重复消费问题?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 每天一个数据分析题(一百六十)

    以下关于代码片段(使用sklearn)的使用和PCA(主成分分析)的描述中,哪项是正确的? A. preprocessing.scale(data)用于对数据进行归一化处理,确保PCA分析前各特征处于同一量级。 B. PCA(n_components=9)将数据降维了9个主成分。 C. pca.explained_variance_输出的是降维后各主成分的方差。

    2024年02月20日
    浏览(49)
  • 第一百六十四回 如何实现NumberPicker

    我们在上一章回中介绍了\\\"如何在任意位置显示PopupMenu\\\"相关的内容,本章回中将介绍 如何实现NumberPicker .闲话休提,让我们一起Talk Flutter吧。 我们在本章回中介绍的 NumberPicker 主要用来实现数字选择功能,比如选择年月日,当然也可以使用 YearPicker 实现,不过YearPicer是把年月

    2024年02月07日
    浏览(63)
  • 每天一个数据分析题(一百六十四)

    关于OLAP系统,下列选项不正确的是() A. 是基于数据仓库的信息进行分析处理过程 B. 用户数量相对较少,其用户主要是业务决策人员与管理人员 C. 对响应时间要求非常高。 D. 基础数据来源于生产系统的操作数据,也就是说,OLAP系统的数据来源与OLTP系统。 题目来源于CDA模

    2024年02月22日
    浏览(50)
  • PCL点云处理之多种体素滤波方法大汇总(一百六十四)

    对PCL中的基于八叉树体素滤波方法,以及在此基础上,自己进一步实现的新滤波方法,进行一个汇总,列出各自的效果和,具体的实现代码 PCL中自带的滤波方法,也是最常用的滤波方法,应该是体素中的点云重心取代原始点,但使用时要注意体素不可过小,

    2024年02月05日
    浏览(51)
  • 一百六十九、Hadoop——Hadoop退出NameNode安全模式与查看磁盘空间详情(踩坑,附截图)

    在海豚跑定时跑kettle的从Kafka到HDFS的任务时,由于Linux服务器的某个文件磁盘空间满了,导致Hadoop的NodeName进入安全模式,此时光执行 hdfs dfsadmin -safemode leave命令语句没有效果( 虽然显示Safe mode is OFF,但没效果,一旦执行还是报错 ) Caused by: org.apache.hadoop.ipc.RemoteException(org

    2024年02月10日
    浏览(44)
  • 一百六十、Kettle——Linux上安装的Kettle9.2.0连接Hive3.1.2

    Kettle9.2.0在Linux上安装好后,需要与Hive3.1.2数据库建立连接 之前已经在本地上用kettle9.2.0连上Hive3.1.2 kettle9.2.0安装包网盘链接 链接:https://pan.baidu.com/s/15Zq9wNDwyMnc3qFVxYOMXw?pwd=zwae  提取码:zwae 1、Hive312的lib里面MySQL驱动包的版本是mysql-connector-java-5.1.37.jar 2、Kettle9.2里MySQL驱动包的

    2024年02月12日
    浏览(67)
  • 一百六十五、Kettle——用海豚调度器调度Linux资源库中的kettle任务脚本(亲测、附流程截图)

    在Linux上脚本运行kettle的转换任务、无论是Linux本地还是Linux资源库都成功后,接下来就是用海豚调度Linux上kettle任务 尤其是团队开发中,基本都要使用共享资源库,所以我直接使用海豚调度Linux资源库的kettle任务脚本 1、先开启zookeeper服务 2、再开启海豚调度器服务 3、开启服

    2024年02月11日
    浏览(57)
  • 第二百六十四回

    我们在上一章回中介绍了SliverPadding组件相关的内容,本章回中将介绍Sliver综合示例.闲话休提,让我们一起Talk Flutter吧。 我们在前面的章回中介绍了各种Sliver相关的组件:SliverList,SliverGrid,SliverAppBar和SliverPadding,本章回将综合使用它们。下面是示例程序的 运行效果图。不过

    2024年01月18日
    浏览(47)
  • 第二百六十九回

    我们在上一章回中介绍了Card Widget相关的内容,本章回中将介绍国际化设置.闲话休提,让我们一起Talk Flutter吧。 我们在这里说的国际化设置是指在App设置相关操作,这样可以让不同国家的用户使用App时呈现不同的语言。总之,就是通过相关的操作,让App支持多个国家的语言。

    2024年01月18日
    浏览(45)
  • 冥想第九百六十二天

    1.周二,太忙了,来补日记,被客户批了,因为一个case没有测到,不过度谴责自己,提高就好。 2.下班跑了6公里很舒服。毫无压力。心率平稳并且低。 3.感谢父母,感谢朋友,感谢家人,感谢不断进步的自己。

    2024年02月06日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包