kafka消费失败重试机制

这篇具有很好参考价值的文章主要介绍了kafka消费失败重试机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

@KafkaListener(id = "eventConsumer", topics = "perception_event", groupId = "defaultConsumerGroup", containerFactory = "kafkaListenerContainerFactory")
    public void consume(List<ConsumerRecord<String, String>> consumerRecordList) {
        .......
    }

1.kafka批量消费消息,使用containerFactory 监听消费失败消息 

/**
     * 消费失败消息最大重试15次,存入到死信队列中
     *
     * @param configurer kafkaConsumerFactory kafkaTemplate
     * @return factory
     */
    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();


        configurer.configure(factory, kafkaConsumerFactory);
        //最大重试15次
        RetryingBatchErrorHandler retryingBatchErrorHandler = new RetryingBatchErrorHandler(new FixedBackOff(500L, 15L),
                createConsumerRecordRecoverer());
        factory.setBatchErrorHandler(retryingBatchErrorHandler);
        return factory;
    }

    /**
     * 最终消费失败打印日志即可
     */
    private ConsumerRecordRecoverer createConsumerRecordRecoverer() {
        return (consumerRecord, exception) -> {
            log.error("consumer event last fail:{}, exception:{}", SensitiveUtils.phone(consumerRecord.toString()), exception.toString());
        };
    }

2.使用RetryingBatchErrorHandler 指定批量消费时失败消息的重试次数和时间,如果不是批量消费,则使用RetryingErrorHandler来指定重试次数和间隔时间文章来源地址https://www.toymoban.com/news/detail-515022.html

到了这里,关于kafka消费失败重试机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(89)
  • springboot kafka消息消费学习 @KafkaListener 使用

    kafka 配置类 用途:定义使用的基本 kafka 配置,以及定义Bean 下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作 @Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有 实际 kafka 监听消费 @ConditionalOnProperty spring boot 用于判断当前类是否加

    2024年02月08日
    浏览(39)
  • 多个消费者订阅一个Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    记录 :465 场景 :一个Producer在一个Topic发布消息,多个消费者Consumer订阅Kafka的Topic。每个Consumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    浏览(48)
  • RabbitMQ的消费者处理消息失败后可以重试,重试4次仍然失败发送到死信队列。

    生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,

    2024年02月07日
    浏览(58)
  • SpringBoot集成RabbitMq,RabbitMq消费与生产,消费失败重发机制,发送签收确认机制

           这里spring-boot依赖版本为2.3.7版本,RabbitMq集成amqp包,版本在spring-boot中有涵盖,不单独指明版本了。 Exchange 交换机配置 队列queue配置 将队列和交换机绑定, 并设置用于匹配键 配置加载 RabbitTemplate 生产者 消费者 消息确认签收配置        消息确认签收机制不过多

    2024年01月21日
    浏览(45)
  • 什么是mq?可靠性、重复消息、重复消费、丢失、发送大文件、延迟、发送机制、重试、死信、幂等、有序、大小、过期、优先级、进了死信队列还能出来吗?

    “MQ” 指的是消息队列(Message Queue),是一种用于异步通信的技术。消息队列是一种中间件,用于在分布式系统中传递消息,使不同组件之间能够进行松散耦合的通信。它的核心思想是生产者将消息发送到队列,而消费者从队列中接收并处理消息。 消息队列的主要优点包括

    2024年02月06日
    浏览(56)
  • @KafkaListener 详解及消息消费启停控制

    参考:Kafka参数 (1) id: 默认是每个Listener实例的重要标识。 对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。 (2) goupId: 每个消费者所属的组。 每个消费者都有自己所属的组。一个组中可

    2024年01月24日
    浏览(35)
  • Spring Boot中使用Kafka时遇到“构建Kafka消费者失败“的问题

    在使用Spring Boot开发应用程序时,集成Apache Kafka作为消息队列是一种常见的做法。然而,有时候在配置和使用Kafka时可能会遇到一些问题。本文将探讨在Spring Boot应用程序中使用Kafka时可能遇到的\\\"构建Kafka消费者失败\\\"错误,并提供解决方案。 错误描述: 当尝试构建Kafka消费者时

    2024年01月17日
    浏览(48)
  • Kafka - 主题Topic与消费者消息Offset日志记录机制

    可以根据业务类型,分发到不同的Topic中,对于每一个Topic,下面可以有多个分区(Partition)日志文件: kafka 下的Topic的多个分区,每一个分区实质上就是一个队列,将接收到的消息暂时存储到队列中,根据配置以及消息消费情况来对队列消息删除。 可以这么来理解Topic,Partitio

    2024年02月03日
    浏览(54)
  • 保障效率与可用,分析Kafka的消费者组与Rebalance机制

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 防止消息丢失与消息重复——Kafka可靠性分析及优化实践 我们上一期从可靠性分析了消息

    2024年02月06日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包