高可用环境kafka消息未按顺序消费问题

这篇具有很好参考价值的文章主要介绍了高可用环境kafka消息未按顺序消费问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、背景

2、问题排查

3、问题解决


1、背景

质检任务是异步执行,正常情况下任务状态扭转是    等待中》运行中》成功(失败)。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的kafka消息,正常情况下状态是顺序下发。

升级部署某个项目,生产环境突然出现很多任务,一直是运行中状态。

2、问题排查

(1)怀疑大数据平台,任务没有正常执行完成,所以任务一直是运行中

1、在yarn平台以及数据库中,都没有发现正在运行中的质量sql任务

2、排查质量平台服务器日志(kafka消息打印接收消息日志很必要,出问题利于排查),发现这个某个sqlId,正常返回了kafka消息,包括运行中、成功、失败等消息。

通过上面的排查,大数据平台没问题,正常执行了任务,正常按顺序给质量平台发了kafka消息

(2)排查质量平台处理kafka消息逻辑

kafka按顺序返回了状态,质量平台没按顺序消费,看质量平台代码如下。

@Slf4j
@Component("dsjAdapterListen")
public class DsjAdapterListen {

    @Autowired
    ZyslCljgService zyslCljgService;

    /**
     * kafka消费消息,需要配置kafka
     */
    @KafkaListener(groupId = "${spring.kafka.consumer.group-id:dquality}", topics =
            {"status_dquality_" + CommonConstant.ZYJH_CHANNEL})
    public void topicConsumer(String message) {
        StatusInfo statusInfo = JSON.parseObject(message, StatusInfo.class);
        log.warn("==============>KafkaListener:start作业实例结果处理,sqlId:{},zyslId:{},状态:{}", statusInfo.getSqlId(),
                statusInfo.getTaskId(), statusInfo.getStatus());
        zyslCljgService.procZyslJobData(statusInfo);
    }
}

@Slf4j
@Component("zyslCljgServiceImpl")
public class ZyslCljgServiceImpl implements ZyslCljgService {

    @Override
    @Async("zyslClThreadPool")
    public void procZyslJobData(StatusInfo statusInfo) {
        try {
            String zyslId = statusInfo.getTaskId();
            String sqlId = statusInfo.getSqlId();
          -dosomething();
            //运行中任务把检核状态更新成运行中
            if (StatusEnum.RUNNING.getCode().equals(statusInfo.getStatus().getCode())) {
                if (ZyslYxztEnum.WAITING_SUBMIT.getBm().equals(oldGxZysl.getYxzt())
               || ZyslYxztEnum.WAITING.getBm().equals(oldGxZysl.getYxzt())) {
                  //运行中
                oldGxZysl.setYxzt(ZyslYxztEnum.RUNNING.getBm());


                    gxZyslMapper.updateById(oldGxZysl);
                }
                //运行中状态
               oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.RUNNING.getBm());
                gxZyrzMapper.updateById(oldGxZyrzUpdate);
                return;
            }
            if (StatusEnum.FAILED.getCode().equals(statusInfo.getStatus().getCode())) {
                log.error("大数据job错误,执行任务失败:{},zyslid:{},参数:{}", statusInfo.getMessage(), zyslId,
                        JSON.toJSONString(statusInfo));
                //处理失败
                procFail(statusInfo, oldGxZyrzUpdate);
            }
            if (StatusEnum.FINISH.getCode().equals(statusInfo.getStatus().getCode())) {
                //正常sql和异常sql都执行完成
                oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.SUCCESS.getBm());
            }
            // 先更新状态,后处理事件
             dosomething2();
        } catch (Exception e) {
            log.error("大数据作业实例结果处理报错:{}", e.getMessage());
        }
    }
    
     public void  dosomething2(GxZyrz gxZyrz, SsZyxx zyxx) {
        if (Constants.CODE_SUCCESS.equals(gxZyrz.getYxzt())) {
              //成功状态查询es、发邮件等
             dosomething3();
        } else if (Constants.CODE_FAILED.equals(gxZyrz.getYxzt())) {
            gxZyrz.setYcs(0L);
            gxZyrz.setZyl(0L);
            gxZyrz.setSfgj("N");
        }
        gxZyrzMapper.updateById(gxZyrz);
    }

}

(1)kafka这个topic只有一个分区,数据质量服务器kafka消息是设置了消费者组,即使是高可用,现场部署多台服务器,也会只有一台服务器会消费这个topic的消息数据,所有排除是因为部署了多台服务器的原因。

(2)kafka消息被多线程异步处理了

1、如果任务sql执行成功,kafka返回运行中、执行成功消息

线程1 - 处理待运行任务

线程2- 处理成功状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如gxZyslMapper.updateById是2秒时间,   线程2更新状态之前会dosomething3()查询es、查询数据表、发邮件,肯定超过5秒,然后更新sql任务状态。

结论: 如果sql是执行成功,这种情况,应该不会出现线程2先把任务状态更新成成功,然后线程1把状态更新是运行中。

2、如果任务sql执行失败,kafka返回运行中、执行失败消息

线程1 - 处理待运行任务

线程2- 处理失败状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如update是2秒时间,   线程2直接更新sql任务状态。

结论: 如果sql是失败成功,这种情况,如果运行中、运行失败状态消息时间建个在2秒内,应该会出现线程2先把任务状态更新成失败,然后线程1把状态更新是运行中。

代码分析之后,带着结论去现场验证,发现确实是失败状态任务状态被逆写了。

按这个结论,按理来讲部署的所有现场都会出现问题,为什么只有这个现场有问题呢?

大数据那边升级了代码,以前执行失败的任务,运行中和运行失败,他们发消息间隔至少耗时在5秒,改了逻辑之后直接失败的任务,发信息间隔在2秒内。这就验证了这个问题

3、问题解决

1、运行中状态,先更新sqlId对应的任务状态,然后更新别的数据表状态;2、更新运行中的状态不直接更新,带着状态更新    update zyrz set yxzt = '2' where id = 'xxx' and yxzt not in('0','1')文章来源地址https://www.toymoban.com/news/detail-861882.html

到了这里,关于高可用环境kafka消息未按顺序消费问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 如何保证消息的顺序消费

    在Kafka分布式集群中,要保证消息的顺序消费,您可以采取以下措施: 分区策略 :Kafka的主题可以分为多个分区,每个分区内的消息是有序的。因此,首先要确保生产者将相关的消息发送到同一个分区。这可以通过生产者的分区策略来实现。默认情况下,Kafka会使用基于消息

    2024年02月06日
    浏览(37)
  • Kafka 如何保证消息消费的全局顺序性

    哈喽大家好,我是咸鱼 今天我们继续来讲一讲 Kafka 当有消息被生产出来的时候,如果没有指定分区或者指定 key ,那么消费会按照【轮询】的方式均匀地分配到所有可用分区中,但不一定按照分区顺序来分配 我们知道,在 Kafka 中消费者可以订阅一个或多个主题,并被分配一

    2024年02月05日
    浏览(45)
  • Kafka、RocketMQ、RabbitMQ如何保证消息的顺序消费?

    一、1个Topic(主题)只创建1个Partition (分区),这样生产者的所有数据都发送到了一个Partition (分区),保证了消息的消费顺序; 二、生产者在发送消息的时候指定要发送到哪个 Partition,这样同一个 Partition 的数据会被同一个消费者消费,从而保证了消息的消费顺序。 实现思路

    2024年02月09日
    浏览(46)
  • JAVA面试题分享一百六十二:Kafka消息重复消费问题?

    消息重复消费的根本原因都在于:已经消费了数据,但是offset没有成功提交。 其中很大一部分原因在于发生了再均衡。 1)消费者宕机、重启等。导致消息已经消费但是没有提交offset。 2)消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生

    2024年02月03日
    浏览(43)
  • kafka 分布式的情况下,如何保证消息的顺序消费?

    目录 一、什么是分布式 二、kafka介绍 三、消息的顺序消费 四、如何保证消息的顺序消费   分布式是指将计算任务分散到多个计算节点上进行并行处理的一种计算模型。在分布式系统中,多台计算机通过网络互联,共同协作完成任务。每个计算节点都可以独立运行,并且可以

    2024年02月10日
    浏览(52)
  • 【JAVA】生产环境kafka重复消费问题记录

    业务系统每周都有定时任务在跑,由于是大任务因此采用分而治之思想将其拆分为多个分片小任务采用 kafka异步队列消费 的形式来减少服务器压力,每个小任务都会调用后台的c++算法,调用完成之后便会回写数据库的成功次数。今天观测到定时任务的分片小任务存在被重复消

    2024年04月12日
    浏览(41)
  • mq常见问题:消息丢失、消息重复消费、消息保证顺序

    mq常见问题:消息丢失、消息重复消费、消息保证顺序 消息丢失问题 拿rabbitmq举例来说,出现消息丢失的场景如下图 从图中可以看到一共有以下三种可能出现消息丢失的情况: 1 生产者丢消息 生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败 2MQ自身丢

    2024年02月09日
    浏览(55)
  • Kafka 消息不能正常消费问题排查

    事情的起因是用户在 app 上查不到订单了,而订单数据是从 mysql 的 order_search 表查询的,order_search 表的数据是从 oracle 的 order 表同步过来的,查不到说明同步有问题 首先重启,同步数据,问题解决,然后查找原因。首先看日志,有如下两种情况 有的容器消费消息的日志正常

    2024年01月18日
    浏览(48)
  • 【消息中心】kafka消费失败重试10次的问题

    Kafka重试的原理基于其消息重试机制。当Kafka生产者发送消息至服务端(broker)时,如果broker返回成功,则表示该消息已成功投递。然而,如果broker返回错误,生产者会根据错误类型进行处理。 可重试错误:这类错误表示可以进行重试,例如broker返回NotEnoughReplicasException异常,

    2024年02月10日
    浏览(37)
  • Java实现Kafka消费者及消息异步回调方式

    Kafka 在创建消费者进行消费数据时,由于可以理解成为是一个kafka 的单独线程,所以在Kafka消费数据时想要在外部对消费到的数据进行业务处理时是获取不到的,所以就需要实现一个消息回调的接口来进行数据的保存及使用。 消息回调接口实现代码如下 Kafka消费者代码实现如

    2024年02月06日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包