Kafka未触发消费异常排查实录

这篇具有很好参考价值的文章主要介绍了Kafka未触发消费异常排查实录。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

    最近生产环境系统发现一个疑难杂症,看了很久的问题但是始终无法定位到问题并处理,然后查阅了相关资料也是定位不到问题,不过资料查阅却给了个新的思路,以此为跳板最终解决了问题。

一、问题描述

    功能介绍: “主计划拆分子计划”是APS系统很常见的功能,功能大概意思是用户可选多个主计划一次性进行“展开子计划”生成子计划,因单个主计划生成子计划的逻辑相对复杂,所以单个计划耗时不能算低,故这里的批量操作使用了异步进行,这里使用了Kafka进行生产及消费消息。

    问题起因: 功能完成之后上生产系统,然而偶尔会收到客户提出少量单据卡在中间状态,导致“展开”不了的问题,前前后后查了好久也没能找到具体问题并解决。

二、问题分析

    分析数据: 通过查看用户提供的单据,发现这些数据都是卡在了某一个中间状态,这个状态是作为中转状态使用的,一开始计划的状态为“未展开”,点击执行“展开子计划”的功能之后,将计划标识为“展开中”之后再推送到Kafka消费处理,Kafka消费者接收到生产者的消息之后,将计划进行处理,处理完成之后再将状态标识为“展开完成”。

    所以从数据上分析,问题点应该出现在消息生产到消费过程这段期间,但是纵观代码,发现已经对业务逻辑做了异常处理,如果是消息消费过程发生异常,都会将错误过程记录下来,所以再次定位到问题出现在Kafka的生产消息及消费消息这个过程。

    查看日志: 根据以上分析结合日志监控的方式,确定问题数据实际上并未进行消费,所以猜想有两种情况:

  1. Kafka根本没有生产消息成功;
  2. 生产消息成功,但是Kafka未Poll到需要消费的消息。

三、进展

  1. 加addCallback回调方法
    • 生产端的kafkaTemplate对象中,封装发送消息的方法,将send()方法封装为通用方法,增加addCallback()回调方法,用于消息生产成功之后回调记录日志。以此确认生产消息是否成功.


2. 参考相关资料:

    查问题过程中,看到大佬写的文章,文章里描述了造成消费不成功的问题是因为“Kafka 内存饱和”造成的,但是实际上内存饱和造成的问题是Kafka消费服务Poll消息时候超时,相应的错误信息在我们系统日志中搜不到,最终也确认不是因为改原因造成的问题。(文章下文参考)

  • 【AGP网关】Kafka 异常排查实(内网文章)
  • 记一次kafka消费者不消费,消费组被踢出问题(外网文章)

  1. 研究kafka消费原理

    当前确认问题点应该是出现在消费端消费不了消息导致的,那么重新研究一下Kafka消费端的实现原理。

    消费者是通过KafkaConsumer对象的poll方法从Kafka队列中将消息拉取出来进行消费,这个poll方法可传入poll超时时间,超过设置的时间则会报拉取超时的异常“due to consumer poll timeout has expired.”,上文中大佬出现的报错就是提示这种拉取超时的报错,超时时间可通过配置节点【max.poll.interval.ms】进行配置;

    KafkaConsumer对象poll到数据之后取到ConsumerRecords对象,然后就可以对数据进行消费,直到取到的ConsumerRecords对象是空的(isEmpty()为true)才停止消费。

    这里发现有一个隐患的地方,当ConsumerRecords对象取到空数据才停止消费,那么这个ConsumerRecords对象是否会取到多个数据进行消费,是如何进行消费的?!

    查阅相关资料,发现Kafka的消费原理是:KafkaConsumer 对象是实时拉取消息的,但不是实时消费消息的。KafkaConsumer 在 poll() 方法中从 Kafka 集群中批量拉取数据,将多个消息封装在 ConsumerRecords 对象中返回。这些消息可以在消费者应用程序的时间间隔内处理,但poll() 方法返回的消息不是立即消费的。只有在 ConsumerRecords 中的所有消息都被处理后,才会发送下一个拉取请求。如果在处理消息时发生错误,可以根据实际需要重新处理这些消息或跳过这些消息。

    反观项目代码,发现在KafkaListener监听器拉取到数据之后,项目中仅仅只是取第一条数据进行消费,这里是不是有问题呢?(参考下面代码块)

    /**
     * 计划发布SAP-KTWKZ
     *
     * @param records
     * @param consumer
     */
    @KafkaListener(topics = {KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC},
            id = KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC, containerFactory = "batchFactory")
    public void dayPlanReleaseToSapKTWKZTopic(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
        String data = ApsKafkaUtils.getFirstRecordValues();
        log.info("mq消费-KT万颗子计划发布SAP数据:\n{}", data);
        try {
            SystemPostDTO systemPostDTO = JsonUtils.fromJson(data, SystemPostDTO.class);
            String postContent = systemPostDTO.getData();
            if (StrUtil.isEmpty(postContent)) {
                log.warn("mq消费-KT万颗子计划发布SAP数据异常警告,systemPostDTO.data数据不能为空");
                return;
            }
            pushRecordService.consumePushProductionOrderKtWKZ(postContent);
        } catch (Exception e) {
            log.error("mq消费-KT万颗子计划发布SAP数据异常:", e);
        }
    }

 

四、问题重现及

重现:

    从第三点的分析中,暂时确定可能是ConsumerRecords对象接收到多条消息,但是消费端仅仅消费了第一条消息导致的问题,那么通过写demo来测试批量生产消息是否会导致ConsumerRecords一次性拉取到多条消息。

生产:

    @ApiOperation("测试Kafka poll消息机制")
    @PostMapping("/v99/schedule/kafka/test")
    public ResponseEntity<String> testKafkaPollMessage(
            @RequestParam("testData") String testData) {
        String topic = "my-test-topic";
        for (int i = 0; i < 100; i++) {
            Thread.sleep(0);//参数有0,10,100
            SubassemblyOpenPlanListInfoRespDTO dto = kafkaDemo.sendMessage(testData);
        }
        return Results.success("Success");
    }

 

消费:

    @KafkaListener(topics = {"my-test-topic"},
            id = "my-test-topic", containerFactory = "batchFactory")
    public void myTestKafka(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
        List<String> recordValues = ApsKafkaUtils.getRecordValues();
        try {
            log.info("接收到的数据为【{}】{}", recordValues.size(), JsonUtils.toJson(recordValues));
        } catch (Exception e) {
            log.error("接收到的数据为异常:", e);
        }
    }

 

    以上接口,通过调用发现确实出现ConsumerRecords对象poll到多条消息的情况:

    其中,for循环中执行等待时间越长,出现一个ConsumerRecords对象拉取到多条数据的情况越少:

    那么分析为什么在实际使用过程中,【主计划拆分子计划】这个功能是偶然出现消费失败的问题,而不是稳定出现呢?

    再次通过代码ReView的方式去回顾一下这个功能,发现当前代码中,是使用了for循环将一批次计划单循环推送给MQ进行消费,单个循环里执行了一次读库一次写库的操作,一次循环耗时大概几十毫秒,与上述demo的Thread.sleep(10)场景类似,所以基本确定偶发这种问题的原因出现在这里。

解决:

    其实解决该问题很简单,只需要在消费端获取到ConsumerRecords对象之后,将拉取到的所有消息列表循环消费而不是只消费单条消息即可,之前的仅消费单条消息的场景经过沟通确认只存在某些特殊场景才需要使用,暂时不再保证该种场景。

五、总结

    本案例中,通过日志、业务场景、写Demo使用并发工具等方式来分析及重现问题,将一个生产上的疑难杂症处理掉,其中也通过参考大佬的文章,虽然问题描述和大佬描述的基本一致,也和网络上的Blog描述一致,但是产生的问题却并不一样。

    总的来说,其实解决问题不难,重要的是要了解问题,了解原理以及了解到解决问题的步骤,建议从多个方面一起查看问题。从其他参考文章描述中,可以从业务、日志、内存环境等查看问题,我这里补充一点,也可以多多结合业务来适当写demo去测试问题,可能也会有意外收获。

其他

    大家有没有遇到其他的生产上的疑难杂症呢,大家都是怎么遇到问题,最后怎么解决问题呢,这里大家不妨进行讨论,也可以列出多多的跟进方案或者工具,大家一起学习进步。文章来源地址https://www.toymoban.com/news/detail-451613.html

 
 

到了这里,关于Kafka未触发消费异常排查实录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 消费Kafka每日不定时积压(非重启不能解决)问题排查解决

    1. 背景         接手了一个问题排查的工作,有个Flink任务每天不定时会出现数据积压,无论是白天还是数据量很少的夜里,且积压的数据量会越来越多,得不到缓解,只能每日在积压告警后重启,重启之后消费能力一点毛病没有,积压迅速缓解,然而,问题会周而复始的

    2024年02月09日
    浏览(32)
  • 力扣刷题实录(大厂用题)—— 前言

    力扣刷题笔记与力扣官方的解答有什么区别吗?为什么不直接去看官方的解答呢 ?并且官方的解答部分还有视频讲解。 这个问题困扰了我很长时间,我不断地怀疑自己做笔记是否有意义。 后来有一个小伙伴问我问题的时候我悟了,那时手头事情比较多,我说为什么不看官方

    2024年02月09日
    浏览(110)
  • Kafka消费异常处理策略及重试机制

    在使用Kafka进行消息传递时,消费者可能会遇到各种异常情况,例如网络故障、消息处理失败等。为了保证消息的可靠消费,我们需要实现一套有效的异常处理策略和重试机制。本文将介绍如何在Kafka消费过程中处理异常,并提供相应的源代码示例。 异常处理策略 在Kafka消费

    2024年02月04日
    浏览(40)
  • Kafka消费者异常问题解析与解决方案

    Kafka是一个分布式流处理平台,它提供了高吞吐量、容错性和可扩展性的特性。然而,有时候在使用Kafka消费者时,可能会遇到一些异常情况。本文将详细讨论几种常见的Kafka消费异常问题,并提供相应的解决方案。 问题1:消费者无法连接到Kafka集群 当消费者无法连接到Kafk

    2024年02月05日
    浏览(47)
  • 【复盘】记录一次类型不一致导致的Kafka消费异常问题

    业务主要是通过A系统向B系统写入Kafka,然后B系统消费Kafka 将结果写到Kafka中,A进行消费最终结果。 在整个流程中,A写入Kafka会写入一张 record1表记录,然后在A消费最终结果的时候也记录一张record2表。主要改动的话 只是B系统内进行写入数据,但是没有想到用的同一个Map导致

    2024年02月16日
    浏览(34)
  • Vue3 el-dialog 二次封装踩坑实录(v-model moduleValue 不触发)

    如果你封装了一个el-dialog的子组件,想通过父组件v-model的形式调用,切记不能使用v-model,要使用 v-model:[你的参数],例如 v-model:visible

    2024年04月27日
    浏览(27)
  • mysql异常占用资源排查

    通过执行日志与连接信息排查 查看是否开启日志记录 开启sql记录 查看日志位置观察异常sql 查看当前连接数 查看哪些客户端连接到了mysql 通过慢sql信息排查 查看是否开启慢sql记录 开启慢sql记录 关闭慢sql记录 修改slow_launch_time与 long_query_time slow_launch_time:定义一个客户端连接

    2024年02月10日
    浏览(36)
  • clickhouse一次异常排查记录

    clickhouse中报错 关闭了自启动,删了status,重启了clickhouse还是报错 1,排查定时执行的脚本日志(每小时第5分钟执行) INSERT INTO quality0529.previously_reported_urls (url) SELECT url FROM quality0529.hourly_data_view WHERE findUrlListLastTime = now() - INTERVAL 1 HOUR GROUP BY url 2,查看ck执行异常详细信息统计

    2024年02月11日
    浏览(43)
  • linux异常情况,排查处理中

    登录客户环境后,发现一个奇怪情况如下图,之前也遇到过,直接fuser -ck /backup操作的话,主机将会重启,因数据库运行中,等待停机维护时间,同时也在想办法不重启的情况下解决该问题 [root@db ~]#  fuser -cu /backup Specified filename /backup does not exist. [root@db ~]#  umount -l /bacup umo

    2024年01月21日
    浏览(39)
  • 异常排查 | 重复Cookie访问导致HTTP请求引发空指针异常

    近几日,遇到一个困惑了我很久的异常,是浏览器页面向Tomcat服务器发起HTTP请求时,服务器发还回来的一处异常 首先来说一下我是在做什么的过程中遇到这个问题 现在我需要实现一个监听器,去监听在线用户人数,也去 实时记录一下当前这个页面中有多少用户在线 ,这一

    2024年02月08日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包