RabbitMQ如何保证消息的可靠性6000字详解

这篇具有很好参考价值的文章主要介绍了RabbitMQ如何保证消息的可靠性6000字详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点,实现了异步通讯等一些优点,但是在消息的传递中引入了MQ Broker必然会带来一些其他问题,比如如何保证消息在传输过程中可靠性(即不让数据丢失,发送一次消息就会被消费一次)?这篇博客将详细从生产者,MQ Broker以及消费者的角度讲解如何保证消息的可靠性!

1,消息丢失的情况

1.1 消息传递流程图如下

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java

 Producer -> exchange ->queue -> Consumer(其中exchange和queue属于MQ Broker的组件)

1.2 消息可能丢失的情况

  • 生产者给交换机exchange的过程中发生数据丢失;
  • 交换机exchange路由给队列queue的过程中发生数据丢失;
  • 消息到达MQ的一瞬间,MQ发生了宕机的情况造成数据丢失;
  • 消费者从队列queue中取出消息进行消费的一瞬间消费者宕机了造成数据丢失。

2,生产者确认机制

生产者确认机制主要是站在生产者的角度来保证消息的可靠性,针对的是生产者给交换机发送消息以及交换机给队列发送消息的过程中数据丢失的情况!

2.1 书写配置信息

# 配置日志信息
logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug

spring:
  rabbitmq:
    host: 123.207.72.43 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: 123
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    #消息发送失败时执行returnCallback回调函数
    template:
      mandatory: true
  • publisher-confirm-type表示开启publisher-confirm;这个参数有两种类型,分别是correlated和simple(correlated代表异步等待回调,类似于js中发送的ajax请求的回调函数,MQ返回结果时会执行定义的confirmCallback函数;simple代表同步等待confirm结果直到超时);
  • publisher-returns表示开启publish-return功能,同样是基于callback机制,不过是定义returnCallback;
  • template.mandatory定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。

2.2 定义return回调机制

我们使用的是SpringBoot来整合的RabbitMQ,所以不论是return回调还是confim回调都是用rabbittemplate对象进行定义的。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果需要的话进行消息的重发
        });
    }
}

注意:

  1. 一个RabbitTemplate只能配置一个ReturnCallback,所以需要在项目启动的时候进行定义,这样rabbitTemplate就是全局唯一的了(也可以采用PostConstruct注解中的init方法进行定义);
  2. ApplicationContextAware是Spring创建完Bean工厂之后的通知方法,当Spring创建完Bean工厂之后就可以在Spring容器中拿到RabbitTemplate对象了;
  3. 配置ReturnCallback时可以采用匿名内部类的方法简化代码,如果消息发送失败可以根据需要进行消息重发操作。

2.3 定义confirm回调机制

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同,可以通过测试方法进行定义。

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() throws InterruptedException {
        //1.准备消息
        String message = "hello spring amqp";

        //2.准备CorrelationData
        //2.1 消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2 准备准备ConfirmCallback
        correlationData.getFuture().addCallback(confirm -> {
            if (confirm.isAck()) {
                log.debug("消息成功投递到交换机!消息ID:{}", correlationData.getId());
            } else {
                log.error("消息投递到交换机上失败!消息ID:{}", correlationData.getId());
                //重发消息
            }
        }, throwable -> {
            //记录日志
            log.error("发送消息失败!",throwable);
            //重发消息
        });

        //3.发送消息
        rabbitTemplate.convertAndSend("amq.topic","a.simple.hello",message,correlationData);
        //加上休眠时间 避免mq连接直接关闭
        Thread.sleep(1000);
    }

注意:

  1. 生产者给交换机发送的消息数据很多的,为了区分每个消息的归属,每个消息都要附属上一个ID信息,可以采用UUID的方式生成唯一身份标识;
  2. 在发送消息的时候需要增加一个correlation变量,这个变量记录了两个东西(1.每个消息的ID 2.定义的cinfirm回调机制);
  3. 加上线程休眠的操作是为了避免消息发送到交换机之后mq的连接直接关闭,这样会导致返回ack的错误。

3,消息持久化

消息持久化是站在MQ Broker的角度来保证消息的可靠性的,将交换机、队列以及消息设置成持久化的从而避免MQ宕机造成消息的丢失!

3.1 交换机持久化

@Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct",true,false);
    }

第二个参数设置成true就是让就交换机是可持久化的,第三个参数是是否自动删除,一般设为false;

3.2 队列持久化

@Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }

durable的意思就是可持久化的,传入队列名称然后进行build操作,这样创建的队列就是一个可持久化的队列;

3.3 消息持久化

将交换机和队列设置为持久化的之后重启MQ服务器之后消息依然会丢失,因为发送的消息不是可持久化的,所以也需要将消息设置成可持久化的

4,消费者消息确认

消费者消息确认是站在消费者的角度来保证消息可靠性的,消息者处理完一条消息之后需要给MQ Broker返回一条ACK表示消息处理完成!

4.1 三种确认模式

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack;
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

4.2 none模式的演示

1.修改消费者工程中的配置文件
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack
2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除
@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        //这里模拟一个异常
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }
3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java

 抛出异常消费者并没有处理消息成功,再观察控制台是否将消息删除:

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java

 队列中已经没有消息了,说明消息被删除了!

消费者确认机制为none的时候,只要消费者拿到消息之后MQ就会把消息删除,不关心消费者是否将消息成功处理!

4.3 auto模式的演示

1.修改消费者工程中的配置文件
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 关闭ack
 2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除
@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        //这里模拟一个异常
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }
3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java

消费者确认机制为auto的时候,消费者拿到消息之后MQ并不会立刻删除队列中的消息,只有消费者成功处理完消息之后给队列返回一个ack的时候队列才会删除消息!

5, 消费者失败重试机制

我们发现当消费者确认机制为auto时,如果代码中出现了异常,消息会进行重复入队列(requeue)的操作,重复入队的操作对于MQ来说开销会非常大,消息处理飙升,所以引入了失败重试机制:当代码中出现了异常的时候,消费者内部会进行重发的操作(可以控制重发的时间和次数),如果超过设置的重发次数消费者还未成功处理消息默认将消息丢弃!

5.1 本地重试

Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列,可以在消费者工程的yml文件中添加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 3 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 4 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java

 4次重发之后消息还未成功处理spring抛出了AmqpRejectAndDontRequeueException异常,这是失败之后的默认处理方式,默认消费者给队列返回了ack,此时队列会将消息从队列中删除!

5.2 失败策略

失败达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

如果消息这个消息比较重要,达到最大重试次数之后这个消息不能被丢弃该怎么办,此时就可以使用RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

@Configuration
public class ErrorMessageConfig {
    //定义失败之后处理的交换机和队列
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    //将交换机和队列进行绑定
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }


    //定义一个RepublishMessageRecoverer,替换spring默认的处理机制​
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

流程图如下:

RabbitMQ如何保证消息的可靠性6000字详解,RabbitMQ,java-rabbitmq,rabbitmq,java文章来源地址https://www.toymoban.com/news/detail-577678.html

 6, 如何保证RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列;
  • 开启持久化功能,确保消息未消费前在队列中不会丢失;
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack;
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。

到了这里,关于RabbitMQ如何保证消息的可靠性6000字详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(53)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(50)
  • RabbitMQ如何保证消息可靠性,看完这篇文章佬会有新的理解

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的消息可靠性机制,如消息丢失,消息重复性消费,消息积压等问题。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小

    2024年02月03日
    浏览(53)
  • RabbitMQ 能保证消息可靠性吗

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    浏览(48)
  • Rabbitmq怎么保证消息的可靠性?

    一、消费端消息可靠性保证 : 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成

    2024年04月14日
    浏览(57)
  • 【SpringBoot】 整合RabbitMQ 保证消息可靠性传递

    生产者端 目录结构 导入依赖 修改yml 业务逻辑 测试结果         在publisher-confirm-type中有三个确认消息接受类型:none、correlated、simple。         publisher-confirm-type: none 表示 禁用发布确认模式 。是 默认值 。使用此模式之后,不管消息有没有发送到Broker(RabbitMQ)都不会

    2024年02月10日
    浏览(50)
  • 【云原生进阶之PaaS中间件】第四章RabbitMQ-4.3-如何保证消息的可靠性投递与消费

            根据RabbitMQ的工作模式,一条消息从生产者发出,到消费者消费,需要经历以下4个步骤: 生产者将消息发送给RabbitMQ的Exchange交换机; Exchange交换机根据Routing key将消息路由到指定的Queue队列; 消息在Queue中暂存,等待消费者消费消息; 消费者从Queue中取出消息消费

    2024年03月11日
    浏览(67)
  • [rocketmq] 如何保证消息可靠性

    1、生产者发送消息到Broker时; 2、Broker内部存储消息到磁盘以及主从复制同步时; 3、Broker把消息推送给消费者或者消费者主动拉取消息时; 1.重试策略,发送消息失败后会进行一定的重试策略 重试机制:固定重试次数,同步刷盘会切换 broker 重试,异步刷盘会在同一 broker

    2024年02月11日
    浏览(46)
  • 如何保证消息的可靠性(面试题)

    面试题 :Rebbitmq怎么保证消息的可靠性 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成处理就终止运行,那

    2024年04月14日
    浏览(49)
  • RabbitMQ高级特性解析:消息投递的可靠性保证与消费者ACK机制探究

    学习RabbitMQ高级特性,涵盖消息的持久化、确认模式、退回模式以及消费者ACK机制等方面,助您构建高可靠性的消息队列系统。

    2024年01月16日
    浏览(66)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包