RabbitMQ--重试机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ--重试机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

原文网址:RabbitMQ--重试机制_IT利刃出鞘的博客-CSDN博客

简介

说明

        本文介绍RabbitMQ的重试机制。

问题描述

        消费者默认是自动提交,如果消费时出现了RuntimException,会导致消息直接重新入队,再次投递(进入队首),进入死循环,继而导致后面的消息被阻塞。

        消息阻塞带来的后果是:后边的消息无法被消费;RabbitMQ服务端继续接收消息,占内存和磁盘越来越多。

RabbitMQ的自动确认

自动确认分四种情况(第一就是正常消费,其他三种为异常情况)

  1. 消息成功被消费,没有抛出异常,则自动确认,回复ack。
    不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  2. 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
  3. 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  4. 抛出其他的异常,消息会被拒绝,且requeue = true

        我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环。

消息未被确认时如下图所示:

rabbitmq 重试,MQ,rabbitmq,java

RabbitMQ的重试机制

        本处使用spring-rabbit中自带的重试功能解决上述问题。

注意

        重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。

        不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。

重试机制有2种情况

  1. 消息是自动确认时,如果抛出了异常导致多次重试都失败,消息被自动确认,消息就丢失了
  2. 消息是手动确认时,如果抛出了异常导致多次重试都失败,消息没被确认,也无法nack,就一直是unacked状态,导致消息积压。

RabbitMQ的重试的实例

配置

application.yml

spring:
  # RabbitMQ服务配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        # 重试机制
        retry:
          enabled: true #是否开启消费者重试
          max-attempts: 3 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          # 乘子。间隔时间*乘子=下一次的间隔时间,不能超过max-interval
          # 以本处为例:第一次间隔 5 秒,第二次间隔 10 秒,以此类推
          multiplier: 2

代码

@RabbitListener(queues = "meat_queue")
public void processMeatTwo(String message) throws InterruptedException {
    System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message);
    Thread.sleep(1000);
    //模拟异常
    String is = null;
    is.toString();
}

结果

rabbitmq 重试,MQ,rabbitmq,java

        可以看到,消息重试了5次,之后会抛出ListenerExecutionFailedException的异常。后面附带着Retry Policy Exhausted,提示我们重试次数已经用尽了。

        消息重试次数用尽后,消息就会被抛弃。

重试完之后对消息的处理

概述

        消息在重试完之后,会调用MessageRecoverer接口的recover方法。MessageRecoverer接口有如下三个实现类(看它们名字即可知道含义):

  • RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列(默认)
  • RepublishMessageRecoverer:重新发布消息
  • ImmediateRequeueMessageRecoverer:立即把消息重新放入队列

处理示例

        默认情况下是RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列。我们可以使用RepublishMessageRecoverer,重新发布消息,将它发布到其他队列,后边对它进行补偿处理。

先创建一个异常队列,然后与交换机绑定进行绑定,绑定之后设置MessageRecoverer。

@Configuration
public class MQErrorConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static String errorTopicExchange = "error-topic-exchange";
    private static String errorQueue = "error-queue";
    private static String errorRoutingKey = "error-routing-key";

    //创建异常交换机
    @Bean
    public TopicExchange errorTopicExchange(){
        return new TopicExchange(errorTopicExchange, true, false);
    }

    //创建异常队列
    @Bean
    public Queue errorQueue(){
        return new Queue(errorQueue, true);
    }

    //队列与交换机进行绑定
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange){
        return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
    }

    //设置MessageRecoverer
    @Bean
    public MessageRecoverer messageRecoverer(){
        //AmqpTemplate和RabbitTemplate都可以
        return new RepublishMessageRecoverer(rabbitTemplate, errorTopicExchange, errorRoutingKey);
    }
}

查看处理结果:

rabbitmq 重试,MQ,rabbitmq,java

        通过控制台可以看到,消息重试5次以后直接以新的routingKey发送到了配置的交换机中,此时再查看监控页面,可以看原始队列中已经没有消息了,但是配置的异常队列中存在一条消息:

rabbitmq 重试,MQ,rabbitmq,java

源码分析

        上面的例子在测试中发现了一个问题,就是经过5次重试以后,控制台输出了一个异常的堆栈日志,然后队列中的数据也被ack掉了(自动ack模式),首先我们看一下这个异常日志是什么。

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted

        出现消息被消费掉并且出现上述异常的原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:

ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
    RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
            : RetryInterceptorBuilder.stateful();
    RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
            .createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
    builder.retryOperations(retryTemplate);
    MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
            : new RejectAndDontRequeueRecoverer(); // 1
    builder.recoverer(recoverer);
    factory.setAdviceChain(builder.build());

        注意看1处的代码,默认使用的是RejectAndDontRequeueRecoverer实现类,根据实现类的名字我们就可以看出来该实现类的作用就是拒绝并且不会将消息重新发回队列,我们可以看一下这个实现类的具体内容:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
    @Override
    public void recover(Message message, Throwable cause) {
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Retries exhausted for message " + message, cause);
        }
        throw new ListenerExecutionFailedException("Retry Policy Exhausted",
                    new AmqpRejectAndDontRequeueException(cause), message);
    }
}

        上述源码给出了异常的来源,但是未看到拒绝消息的代码,猜测应该是使用aop的方式实现的,此处不再继续深究。文章来源地址https://www.toymoban.com/news/detail-788552.html

到了这里,关于RabbitMQ--重试机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq:retry重试机制和延迟消息的实现

    rabbitmq:retry重试机制和延迟消息的实现 在消费者消费消息的时候可能会因为网络等外部原因导致消息处理失败,这个时候如果将消息直接丢弃会导致正常的业务丢失,但是如果是一条本身就有问题的消息,那么这个时候又必须丢弃掉,如果选择用channel.basicNack 或 channel.basi

    2024年02月13日
    浏览(41)
  • 【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势

    我们知道 RabbitMq 提供了两种机制,来确保发送端的消息被 brocke 正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势 首先创建一个 SpringBoot 项目,用于后续的演示 springboot 版本为 2.2.1.RELEASE rabbitmq 版本为  3.7.5   依赖配置文件 pom.xml 在 a

    2024年01月18日
    浏览(43)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(49)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(85)
  • 消息中间件之八股面试回答篇:一、问题概览+MQ的应用场景+RabbitMQ如何保证消息不丢失(生产者确认机制、持久化、消费者确认机制)+回答模板

    目前主流的消息队列技术(MQ技术)分为RabbitMQ和Kafka,其中深蓝色为只要是MQ,一般都会问到的问题。浅蓝色是针对RabbitMQ的特性的问题。蓝紫色为针对Kafka的特性的问题。 MQ主要提供的功能为:异步 解耦 削峰 。 展开来讲就是 异步发送(验证码、短信、邮件…) MYSQL和Redi

    2024年01月24日
    浏览(57)
  • RabbitMQ--MQ介绍和RabbitMQ安装

    微服务间通讯有同步和异步两种方式:         同步通讯:就像打电话,需要实时响应。         异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是

    2024年01月20日
    浏览(37)
  • 整合MQ-----RabbitMQ

    应用场景: 异步处理 。把消息放入消息中间件中,等到需要的时候再去处理。 流量削峰 例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃 安装rabbitMQ 管理后台 :http://IP

    2024年02月03日
    浏览(31)
  • MQ 简介-RabbitMQ

    消息队列作为高并发系统的核心组件之一,能够帮助业务系统结构提升开发效率和系统 稳定性,消息队列主要具有以下特点: 削峰填谷 :主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题 系统解耦 :解决不同重要程度、不同能力级别系统之间依赖导致一死

    2024年02月11日
    浏览(32)
  • MQ-消息队列-RabbitMQ

    MQ(Message Queue) 消息队列 ,是基础数据结构中“ 先进先出 ”的一种 数据结构 。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由

    2024年02月09日
    浏览(48)
  • MQ学习笔记--(RabbitMQ)

    初识MQ RabbitMQ快速入门 SpringAMQP 同步通讯 异步通讯 MQ常见框架 同步通讯和异步通讯 同步通讯:比如微信视频,同一时间只能跟一个人视频,其他人想跟你视频的话,得等你这个视频结束之后才可以 异步通信:比如微信发消息,发了一个人后,别人可能还没回你,但你还可以

    2024年02月08日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包