RabbitMQ常见问题之消息可靠性

这篇具有很好参考价值的文章主要介绍了RabbitMQ常见问题之消息可靠性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、介绍

MQ的消息可靠性,将从以下四个方面展开并实践:

  1. 生产者消息确认
  2. 消息持久化
  3. 消费者消息确认
  4. 消费失败重试机制

二、生产者消息确认

对于publisher,如果message到达exchange与否,rabbitmq提供publiser-comfirm机制,如果message达到exchange但是是否到达queuerabbitmq提供publisher-return机制。这两种机制在代码中都可以通过配置来自定义实现。

以下操作都在publisher服务方完成。

1. 引入依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

配置说明:
publish-confirm-type:开启publisher-confirm,这里支持两种类型:

  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback

publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback; false,则直接丢弃消息

2. 配置ReturnCallBack

每个RabbitTemplate只能配置一个ReturnCallBack,所以直接给IoC里面的RabbitTemplate配上,所有人都统一用。
新建配置类,实现ApplicationContextAware 接口,在接口中setReturnCallback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            //check if is delay message
            if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {
                return;
            }
            log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

3. 配置ConfirmCallBack

ConfirmCallBack在message发送时配置,每个message都可以有自己的ConfirmCallBack。

@Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        String message = "hello, spring amqp!";
        // confirm callback
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(
                result -> {
                    if (result.isAck()){
                        log.debug("消息到exchange成功, id={}", correlationData.getId());
                    }else {
                        log.error("消息到exchange失败, id={}", correlationData.getId());
                    }
                },
                throwable -> {
                    log.error("消息发送失败", throwable);
                }
        );

        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
    }

4. 测试

将消息发送到一个不存在的exchange,模拟消息达到exchange失败,触发ConfirmCallBack,日志如下。

18:22:03:913 ERROR 23232 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aamq.topic' in vhost '/', class-id=60, method-id=40)
18:22:03:915 ERROR 23232 --- [nectionFactory1] cn.itcast.mq.spring.SpringAmqpTest       : 消息到exchange失败, id=0c0910a3-7937-43ea-9606-e5bbcdda0b5c

将消息发送到一个存在的exchange,但routekey异常,模拟消息到达exchange但没有到达queue,触发ConfirmCallBackReturnCallBack,日志如下。

18:27:22:757  INFO 20184 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#7428de63:0/SimpleConnection@6d60899e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 53662]
18:27:22:797 DEBUG 20184 --- [ 127.0.0.1:5672] cn.itcast.mq.spring.SpringAmqpTest       : 消息到exchange成功, id=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11
18:27:22:796 ERROR 20184 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig         : 消息发送到queue失败,replyCode=312, reason=NO_ROUTE, exchange=amq.topic, routeKey=simplee.test, message=(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

三、消息持久化

新版本的SpringAMQP默认开启持久化。RabbitMQ本身并不默认开启持久化。

队列持久化,通过QueueBuilder构建持久化队列,比如

	@Bean
    public Queue simpleQueue(){
        return QueueBuilder
                .durable("simple.queue")
                .build();
    }

消息持久化,在发送时可以设置,比如

@Test
public void testDurableMessage(){
    Message message = MessageBuilder.withBody("hello springcloud".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    rabbitTemplate.convertAndSend("simple.queue", message);
}

四、消费者消息确认

消费者消息确认是指,consumer收到消息后会给rabbitmq发送回执来确认消息接收状况。

SpringAMQP允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack, MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # manual auto none

但是auto有个很大的缺陷,因为rabbitmq会自动不断给有问题的listen反复投递消息,导致不断报错,所以建议使用下一章的操作。

五、消费失败重试机制

当消费者出现异常后,消息会不断requeue (重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue
,无限循环,导致mq的消息处理飙升,带来不必要的压力。

我们可以利用Springretry机制,在消费者出现异常时利用本地重试,而不是无限制的requeuemq队列。

1. 引入依赖

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 #初识的失败等待时长为1秒
          multiplier: 2 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2. 配置重试次数耗尽策略

RabbitMQ常见问题之消息可靠性,Server架构,# RabbitMQ,rabbitmq,分布式
我们采用RepublishMessageRecoverer
定义用于接收失败消息的exchangequeue以及它们之间的bindings

然后定义MessageRecoverer,比如

@Component
public class ErrorMessageConfig {
    @Bean
    public MessageRecoverer republishMessageRecover(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }
}

3. 测试

定义处理异常消息的exchangequeue,比如

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "error.queue"),
            exchange = @Exchange(name = "error.exchange"),
            key = "error"
    ))
    public void listenErrorQueue(String msg){
        log.info("消费者接收到error.queue的消息:【" + msg + "】");
    }

定义如下一个listener,来模拟consumer处理消息失败触发消息重试。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple.queue"),
            exchange = @Exchange(name = "simple.exchange"),
            key = "simple"
    ))
    public void listenSimpleQueue(String msg) {
        log.info("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1/0);
        log.info("consumer handle message success");
    }

写一个简单的测试,往simple.exchange发送消息,比如

    @Test
    public void testSendMessageSimpleQueue() throws InterruptedException {
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend("simple.exchange", "simple", message);
    }

运行测试,consumer得到以下日志

18:51:10:164  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:11:167  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:168  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:176  WARN 24072 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.exchange' with routing key error
18:51:13:181  INFO 24072 --- [ntContainer#1-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到error.queue的消息:【hello, spring amqp!】

可以看到spring尝试2次重发,一共3次,第一次间隔1秒,第二次间隔2秒,重试次数耗尽,消息被consumer传入error.exchange,注意,是consumer传的,不是simple.queue文章来源地址https://www.toymoban.com/news/detail-802054.html

到了这里,关于RabbitMQ常见问题之消息可靠性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(73)
  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(56)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(49)
  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(52)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(57)
  • RabbitMQ 能保证消息可靠性吗

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    浏览(47)
  • RabbitMQ之消息的可靠性传递

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 RabbitMQ之消息的可靠性传递 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在当今的信息化时代,消息传递在企业级应用和分布式

    2024年01月19日
    浏览(44)
  • 如何保证 RabbitMQ 的消息可靠性?

    项目开发中经常会使用消息队列来 完成异步处理、应用解耦、流量控制等功能 。虽然消息队列的出现解决了一些场景下的问题,但是同时也引出了一些问题,其中使用消息队列时如何保证消息的可靠性就是一个常见的问题。 如果在项目中遇到需要保证消息一定被消费的场景

    2024年02月07日
    浏览(49)
  • rabbitmq如何保证消息的可靠性

    RabbitMQ可以通过以下方式来保证消息的可靠性: 在发布消息时,可以设置消息的delivery mode为2,这样消息会被持久化存储在磁盘上,即使RabbitMQ服务器重启,消息也不会丢失。 可以创建持久化的队列,这样即使RabbitMQ服务器重启,队列也不会丢失。 在消费者端,可以 设置手动

    2024年01月23日
    浏览(54)
  • 【RabbitMQ】之消息的可靠性方案

    一、数据丢失场景 二、数据可靠性方案 1、生产者丢失消息解决方案 2、MQ 队列丢失消息解决方案 3、消费者丢失消息解决方案 MQ 消息数据完整的链路为 :从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据 Routing_Key 路由到指定的 Queue 队列中,最后投送到消

    2024年02月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包