RabbitMQ消息可靠性投递与ACK确认机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ消息可靠性投递与ACK确认机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.RabbitMQ的消息可靠性投递

  • 什么是消息的可靠性投递
    • 保证消息百分百发送到消息队列中去
    • 保证MQ节点成功接收消息
    • 消息发送端需要接收到MQ服务端接收到消息的确认应答
    • 完善的消息补偿机制,发送失败的消息可以再感知并二次处理
  • RabbitMQ消息投递路径
    • 生产者–>交换机–>队列–>消费者
    • 通过两个节点控制消息的可靠性投递
      • 生产者到交换机:通过confirmCallback
      • 交换机到队列:通过returnCallback
  • 建议
    • 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ整体效率变低,吞吐量下降严重,不是很重要的消息不建议使用消息确认机制

2.RabbitMQ消息可靠性投递confirmCallback实战

  • 生产者到交换机

    • 通过confirmCallback
    • 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
  • 开启confirmCallback配置

    spring.rabbitmq.publisher-confirm-type=correlated
    
  • 消息发送测试

    package com.gen;
    
    import com.gen.config.RabbitMQConfig;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class GenRabbitmqApplicationTests {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        void testConfirmCallback() {
            this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                System.out.println(correlationData);
                System.out.println(cause);
                if (ack) {
                    System.out.println("发送成功");
                } else {
                    System.out.println("发送失败");
                }
            });
            this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "您有新订单!!!");
            // 模拟消息投递失败
    //        this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"1", "order.new", "您有新订单!!!");
        }
    }
    

3.RabbitMQ消息可靠性投递returnCallback实战

  • 交换机到队列

    • 通过returnCallback
    • 消息从交换机发送到对应队列失败时触发
    • 两种模式
      • 交换机到队列不成功,则丢弃消息(默认)
      • 交换机到队列不成功,返回给消息生产者,触发returnCallback
  • 配置文件开启配置

    # 开启returnCallback配置
    spring.rabbitmq.publisher-returns=true
    # 修改交换机投递到队列失败的策略,true交换机处理消息到路由失败会返回给生产者
    spring.rabbitmq.template.mandatory=true
    
  • 消息发送测试

    package com.gen;
    
    import com.gen.config.RabbitMQConfig;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class GenRabbitmqApplicationTests {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        void testReturnCallback() {
            this.rabbitTemplate.setReturnsCallback((returned) -> {
                System.out.println(returned);
            });
            this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "您有新订单!!!");
            // 模拟消息转发队列失败
    //        this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "gen.order.new", "您有新订单!!!");
        }
    }
    

4.RabbitMQ消息确认机制ACK

  • 背景:消费者从Broker中监听消息,需要确保消息被合理处理

RabbitMQ消息可靠性投递与ACK确认机制,RabbitMQ,rabbitmq

  • RabbitMQ的ACK介绍

    • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
    • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
    • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除
    • 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
  • 确认方式

    • 自动确认(默认)
    • 手动确认manual
  • 配置文件开启手动确认

    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
  • 消费者代码

    package com.gen.listener;
    
    import com.gen.config.RabbitMQConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public class OrderMQListener {
    
        @RabbitHandler
        public void orderConsumer(String msg, Message message, Channel channel) throws IOException {
            System.out.println(msg);
            System.out.println(message);
            System.out.println(channel);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            System.out.println(deliveryTag);
    
            // 成功确认,消费成功
            channel.basicAck(deliveryTag, false);
            // 拒绝后重新入队
    //        channel.basicNack(deliveryTag, false,true);
        }
    }
    
  • deliveryTag介绍:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

  • basicNack和basicReject介绍

    • basicReject一次只能拒绝接收一个消息,可以设置是否重新入队requeue
    • basicNack方法可以支持一次0个或者多个消息的拒收,可以设置是否重新入队requeue
  • 人工审核异常消息文章来源地址https://www.toymoban.com/news/detail-830590.html

    • 设置重试阈值,超过后确认消费成功,记录消息,人工处理

到了这里,关于RabbitMQ消息可靠性投递与ACK确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【云原生进阶之PaaS中间件】第四章RabbitMQ-4.3-如何保证消息的可靠性投递与消费

            根据RabbitMQ的工作模式,一条消息从生产者发出,到消费者消费,需要经历以下4个步骤: 生产者将消息发送给RabbitMQ的Exchange交换机; Exchange交换机根据Routing key将消息路由到指定的Queue队列; 消息在Queue中暂存,等待消费者消费消息; 消费者从Queue中取出消息消费

    2024年03月11日
    浏览(62)
  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(52)
  • RabbitMQ --- 消息可靠性

    消息队列在使用过程中,面临着很多实际问题需要思考:      消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 co

    2024年02月14日
    浏览(46)
  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(48)
  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(69)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(45)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(51)
  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(47)
  • RabbitMQ消息可靠性问题及解决

    说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 针对以上问题,提供以下解决方案: 消息确认:确认消息是否发送到交换机、队列;

    2024年02月16日
    浏览(43)
  • 【RabbitMQ】之消息的可靠性方案

    一、数据丢失场景 二、数据可靠性方案 1、生产者丢失消息解决方案 2、MQ 队列丢失消息解决方案 3、消费者丢失消息解决方案 MQ 消息数据完整的链路为 :从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据 Routing_Key 路由到指定的 Queue 队列中,最后投送到消

    2024年02月14日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包