【JAVA】生产环境kafka重复消费问题记录

这篇具有很好参考价值的文章主要介绍了【JAVA】生产环境kafka重复消费问题记录。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题描述

业务系统每周都有定时任务在跑,由于是大任务因此采用分而治之思想将其拆分为多个分片小任务采用kafka异步队列消费的形式来减少服务器压力,每个小任务都会调用后台的c++算法,调用完成之后便会回写数据库的成功次数。今天观测到定时任务的分片小任务存在被重复消费的问题,表现就在于数据库中存在多余的成功次数,比如本来大任务分了10个分片,但回写了25次任务处理成功,这是不合理的,因此问题的排查便开始了。

问题排查

一般这种"回写次数"大于"任务总数"的情况,除了代码逻辑问题之外,非常有可能是因为系统出现了重复消费的问题,多次的消费导致"执行成功回写"这块逻辑被执行了多次。
后台日志确实也显示,同样的消息被消费了多次:

2024-03-31 03:17:51.926	
[2024-03-31 03:17:51:674 CST] [] [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  [] 更新分片状态成功, processId: 16, shardId: 57
2024-03-31 03:17:51.675	
[2024-03-31 03:17:51:662 CST] [] [] to update shard status, processId: 16, shardId: 57


2024-03-31 01:24:10.898	
[2024-03-31 01:24:10:803 CST] [] [] 更新分片状态成功, processId: 16, shardId: 57
2024-03-31 01:24:09.395	
[2024-03-31 01:24:09:302 CST] [] [] to update shard status, processId: 16, shardId: 57

经过后台错误日志排查,发现后台报了许多如下异常:

[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatResponseHandler.handle(AbstractCoordinator.java:1054)]: 
[Consumer clientId=consumer-consumer_group_prod-6, groupId=consumer_group_prod] 
Attempt to heartbeat failed since group is rebalancing

通过日志可以判断,系统出现了 消费者重平衡 rebalance 的问题,一般消费者组进行重平衡,可能会是以下几种原因:

  • 消费者组引入了新topic
  • 消费者组有成员进组
  • 消费者组有成员离组

由于线上系统周末没有人去新增话题或者新加实例来增加消费者,因此我往 消费者异常离组这个方向进行排查。因为当消费者离组,并重平衡重新加入消费者组后,之前 未消费的消息又会被拉取一次,符合当前的表现。

此时我查看后台,确实存在消息堆积的现象,同时根据后台c++算法的日志,发现有许多分片在算法中pending了大概两三小时,有的甚至迟迟没跑出结果。并且,我检查了目前系统的kafka配置,其中有两项,就是导致重复消费的元凶:

spring.kafka.consumer.max-poll-interval-ms=7200000
spring.kafka.consumer.max-poll-records=5

这两项配置的含义是,消费者消费间隔最高为7200秒(即2小时),而一次性消费最多会拉取5条消息。

而代码里面的消费逻辑如下:

// 代码已脱敏处理
for (InputReq inputReq : reqList) {
    boolean shardSuccess = false;

    try {
        // 调用算法A
        algoBiz.callAlgoA(inputReq.getProcessId(), inputReq.getShardId(), inputReq.getProcVersion());

        // 调用算法B
        algoBiz.callAlgoB(inputReq.getProcessId(), inputReq.getShardId(),  inputReq.getProcVersion());

        shardSuccess = true ;
    }
    catch (Exception e) {
        log.error("[] error: {}", e.getMessage());
    }

    // 更新分片状态
    Boolean updateProcShard = inputReq.getToUpdateProcShard();
    algoBiz.updateShardResult(inputReq.getProcessId(), inputReq.getShardId(), updateProcShard, shardSuccess);
}

ack.acknowledge();

代码会消费一次性拉取的最多五条消息之后,才会ack队列协调者,此时如果五条消息处理的总时间超过两小时,那么就会触发消费组重平衡,该消费者重新入队。而结合这周由于后台算法更新,运行时间大大减慢,从而导致了消息重平衡而最终导致了重复消费。

处理方式

由于一时半会儿算法优化还没那么快上线,在调度模块我们就先通过配置进行优化,先解决头疼的重复消费问题。我们先确保程序逻辑超时时间足够长,同时单次消费仅拉取一条消息即可,这样可以大大减少由于算法运行过慢导致的重平衡问题。文章来源地址https://www.toymoban.com/news/detail-848329.html

spring.kafka.consumer.max-poll-interval-ms=10800000
spring.kafka.consumer.max-poll-records=1

到了这里,关于【JAVA】生产环境kafka重复消费问题记录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(49)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(41)
  • java生产者与消费者问题

    等待唤醒机制可以解决经典的“生产者与消费者”的问题。生产者与消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个(多个) 共享固定大小缓冲区的线程 ——即所谓的“生产者

    2024年02月15日
    浏览(31)
  • Kafka重复消费、Dubbo重复调用问题排查

            本业务为车机流量充值业务,大致流程为:收到微信、支付宝端用户支付成功回调后,将用户订单信息发送至kafka中;消费者接收到kafka中信息后进行解析,处理用户订单信息,为用户订购相关流量包(调用电信相关接口),订购成功/失败后会通过MQTT发送订购成功

    2024年03月24日
    浏览(44)
  • 解决Kafka新消费者组导致重复消费的问题

             问题描述 :在使用Kafka时,当我们向新的消费者组中添加消费者时,可能会遇到重复消费的问题。本文将介绍一些解决这个问题的方法,帮助开发者更好地处理Kafka中的消费者组和消费偏移量。         Kafka是一个强大的分布式消息队列系统,但在使用过程中

    2024年02月07日
    浏览(39)
  • kafka消费、生产性能问题分析及优化方法

    问题分析:将代码逻辑注释掉,直进行拉取数据操作,性能应为每分钟产生消息的2倍以上

    2024年02月07日
    浏览(45)
  • 记一次线上kafka重复消费的问题解决及思考

    线上ELK日志发现kafka消费者消费到重复消息 由于生产方本身就发送了重复的消息,导致消费到重复消息 消费方采用的是循环poll的模式,具体是在多线程分租户去批量处理的消息

    2024年02月10日
    浏览(44)
  • kafka消费者详解,根据实际生产解决问题

    1.首先kafka每创建一个消费者就是一个消费者组,必须指定groupip 2.两个消费者组之间不相互影响,消费同一个主题的同一个分区,两个消费者组不相互影响,各自记录自己的offset 3.在开发中如果没有指定每个消费者去消费特定的分区,那么kafka默认是按照roundRobin轮询的方式分

    2024年02月10日
    浏览(44)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(46)
  • 记1次生产环境java进程内存泄漏问题定位(使用Arthas)

    简介 | arthas, Alibaba 开源的 Java 诊断工具,参照文档安装使用很简单,在线下载或者离线下载后解压运行,启动arthas-boot.jar,会自动扫描jps进程,根据序号选择后进入arthas界面: 常用的是dashboard,thread命令,dashboard命令能扫面当前java进程的全局信息,包括堆栈和线程信息,

    2024年02月22日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包