RabbitMQ之消息的可靠性传递

这篇具有很好参考价值的文章主要介绍了RabbitMQ之消息的可靠性传递。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
RabbitMQ之消息的可靠性传递


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

在当今的信息化时代,消息传递在企业级应用和分布式系统中扮演着至关重要的角色。而 RabbitMQ 作为一款强大的消息队列中间件,以其可靠性和高性能成为了众多开发者的首选。本文将深入探讨 RabbitMQ 中消息的可靠性传递机制,以及如何在实际应用中确保消息的不丢失。
通过阅读本文,您将了解到 RabbitMQ 可靠消息传递的核心概念和工作原理。我们将探讨消息确认、持久性、队列和交换机的配置以及错误处理等关键主题,以帮助您构建高度可靠的消息传递系统。
无论您是刚刚开始接触 RabbitMQ,还是已经在实际项目中使用它,本文都将为您提供有价值的见解和实用的指导。让我们一起深入了解 RabbitMQ 的可靠性特性,掌握构建可靠系统的关键技能。


提示:以下是本篇文章正文内容,下面案例可供参考

一、消息的可靠性传递的概念

在RabbitMQ中,消息投递的路径为:生产者->交换机->队列->消费者。而在消息的投递过程中,每一个环节都可能投递失败,那么RabbitMQ是通过什么方法确认消息投递成功的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

二、三种模式的实现

环境准备

1.首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /
  
#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

2.在生产者的配置类创建交换机和队列

@Configuration
public class RabbitConfig {
  private final String EXCHANGE_NAME="my_topic_exchange";
  private final String QUEUE_NAME="my_queue";


  // 1.创建交换机
  @Bean("bootExchange")
  public Exchange getExchange(){
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME) // 交换机类型
         .durable(true) // 是否持久化
           .build();
   }


  // 2.创建队列
  @Bean("bootQueue")
  public Queue getMessageQueue(){
    return QueueBuilder
         .durable(QUEUE_NAME) // 队列持久化
         .build();
   }


  // 3.将队列绑定到交换机
  @Bean
  public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

确认模式

1.生产者配置文件开启确认模式

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /
  # 开启确认模式
   publisher-confirm-type: correlated

2.生产者定义确认模式的回调方法

@SpringBootTest
public class ProducerTest {
  @Autowired
  private RabbitTemplate rabbitTemplate;


  @Test
  public void testConfirm(){
    // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      /**
       * 被调用的回调方法
       * @param correlationData 相关配置信息
       * @param ack 交换机是否成功收到了消息
       * @param cause 失败原因
       */
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
          System.out.println("confirm接受成功!");
         }else{
          System.out.println("confirm接受失败,原因为:"+cause);
          // 做一些处理。
         }
       }
     });
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
   }
}

退回模式

1.生产者配置文件开启退回模式

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /
  # 开启确认模式
   publisher-confirm-type: correlated
  # 开启回退模式
   publisher-returns: true

2.生产者定义退回模式的回调方法

@SpringBootTest
public class ProducerTest {
  @Autowired
  private RabbitTemplate rabbitTemplate;


  @Test
  public void testReturn(){
    // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
      /**
       * @param returned 失败后将失败信息封装到参数中
       */
      @Override
      public void returnedMessage(ReturnedMessage returned) {
        System.out.println("消息对象:"+returned.getMessage());
        System.out.println("错误码:"+returned.getReplyCode());
        System.out.println("错误信息:"+returned.getReplyText());
        System.out.println("交换机:"+returned.getExchange());
        System.out.println("路由键:"+returned.getRoutingKey());
        // 处理消息...
       }
     });
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
   }
}

消费者确认

在 RabbitMQ 中,当消费者接收到消息后,会向队列发送一个确认消息,表示已经成功接收并处理了该消息。只有当确认消息被发送后,该消息才会从队列中被移除。这种机制被称为消费者消息确认(Consumer Acknowledge,简称 Ack)。这就类似于快递员派送快递时需要我们签收一样,否则快递就会一直存在于快递公司的系统中。
消息确认分为自动确认和手动确认两种方式。自动确认意味着只要消费者接收到消息,无论是否成功处理了该消息,都会自动发送确认消息,并将消息从队列中移除。然而,在实际开发过程中,如果在接收到消息后业务处理出现异常,那么消息就可能会丢失。因此,需要设置手动确认,也就是只有在业务处理成功后,才会发送确认消息通知队列;如果出现异常,则会发送拒绝消息,让消息仍然保留在队列中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

1.消费者配置开启手动签收

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /
  # 开启手动签收
   listener:
    simple:
     acknowledge-mode: manual

2.消费者处理消息时定义手动签收和拒绝签收的情况

@Component
public class AckConsumer {
  @RabbitListener(queues = "my_queue")
  public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
    // 消息投递序号,消息每次投递该值都会+1
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
      int i = 1/0; //模拟处理消息出现bug
      System.out.println("成功接受到消息:"+message);
      // 签收消息
      /**
       * 参数1:消息投递序号
       * 参数2:是否一次可以签收多条消息
       */
      channel.basicAck(deliveryTag,true);
     }catch (Exception e){
      System.out.println("消息消费失败!");
      Thread.sleep(2000);
      // 拒签消息
      /**
       * 参数1:消息投递序号
       * 参数2:是否一次可以拒签多条消息
       * 参数3:拒签后消息是否重回队列
       */
      channel.basicNack(deliveryTag,true,true);
     }
   }
}


总结

提示:这里对文章进行总结:

在 RabbitMQ 中,为了确保消息的可靠性传递,需要使用确认应答和重传机制。当生产者将消息发送到 RabbitMQ 服务器时,服务器会向生产者返回一个确认应答,表示消息已成功接收。如果生产者没有收到确认应答,它可以在一定时间内重传消息,直到收到确认应答或达到重试次数限制。
当消费者从队列中消费消息时,它会向 RabbitMQ 服务器发送一个确认应答,表示消息已成功处理。如果消费者在处理消息时出现异常或崩溃,RabbitMQ 服务器可以通过重传机制将消息重新推送给其他消费者进行处理,以确保消息不会丢失。文章来源地址https://www.toymoban.com/news/detail-806402.html

到了这里,关于RabbitMQ之消息的可靠性传递的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(74)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(58)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(50)
  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(53)
  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(57)
  • rabbitmq如何保证消息的可靠性

    RabbitMQ可以通过以下方式来保证消息的可靠性: 在发布消息时,可以设置消息的delivery mode为2,这样消息会被持久化存储在磁盘上,即使RabbitMQ服务器重启,消息也不会丢失。 可以创建持久化的队列,这样即使RabbitMQ服务器重启,队列也不会丢失。 在消费者端,可以 设置手动

    2024年01月23日
    浏览(54)
  • 【RabbitMQ】之消息的可靠性方案

    一、数据丢失场景 二、数据可靠性方案 1、生产者丢失消息解决方案 2、MQ 队列丢失消息解决方案 3、消费者丢失消息解决方案 MQ 消息数据完整的链路为 :从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据 Routing_Key 路由到指定的 Queue 队列中,最后投送到消

    2024年02月14日
    浏览(43)
  • RabbitMQ消息可靠性问题及解决

    说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 针对以上问题,提供以下解决方案: 消息确认:确认消息是否发送到交换机、队列;

    2024年02月16日
    浏览(46)
  • 如何保证 RabbitMQ 的消息可靠性?

    项目开发中经常会使用消息队列来 完成异步处理、应用解耦、流量控制等功能 。虽然消息队列的出现解决了一些场景下的问题,但是同时也引出了一些问题,其中使用消息队列时如何保证消息的可靠性就是一个常见的问题。 如果在项目中遇到需要保证消息一定被消费的场景

    2024年02月07日
    浏览(50)
  • RabbitMQ 能保证消息可靠性吗

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包