rabbitmq的消息确认和消息回退

这篇具有很好参考价值的文章主要介绍了rabbitmq的消息确认和消息回退。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


theme: vue-pro

前言

前面章节我们至少知道了rabbitmq的几个核心组件, 比如 exchange queuerouting key

还有java编程方面的 channelconnection

但是这些还不够运用于生产环境

本章内容

  1. 消息确认机制(message confirm)
  2. 消息return机制

消息确认机制(confirm)

开始confirm机制, 生产者消息投递到exchange, exchange就会立即ack给生产者, 如果无法投递到exchange, 那么生产者就是产生nack

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。

问: 事务机制为什么不讲?

答: 事务机制效率比较慢, P发送消息需要等待C的响应, 会导致极大的耗费性能

问: return机制或者confirm机制收到消息不成功后, 你要怎么处理呢?

答: 一般处理方式是: 重试几次, 如果还不通过, 那么打印日志, 交给人工处理

这里的重试可以考虑使用指数退避(重试频率从快到慢的过程)

异步方式消息确认

详细过程是在发送消息前保存消息到一个队列中, 在拿到遇到消息发送失败后, 我们拿取到队列中的消息, 然后尝试重试补偿

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

注意 Confirm 机制需要主动开启, 上图缺少了这段代码: channel.confirmSelect();

问: 这里引入redis保存这消息不合适吧?

答: 看起来这里还可以将消息保存到一个高并发队列中, 开一个定时器定时解决(立马再次发送消息大概率还是会失败, 每次等他个几百毫秒吧), 失败累计次数, 直到超过域值email通知运营人员

但是使用高并发队列只不过在内存中, 进程崩溃后改消息队列中的消息也将丢失

问: 那有别的方式吗?

答: 有, 保存到mysql或者直接持久化到queue中, 等到管理员找到原因并恢复后再次发送

同步消息确认

channel.queueDeclare(QUEUE_NAME, true, false, true, null);
channel.confirmSelect();
stopWatch.start();
for (int i = 0; i < 1000; i++) {
   channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
         String.format("hello%d", i).getBytes(StandardCharsets.UTF_8));
   // 单个消息确认机制
   boolean flag = channel.waitForConfirms();
   if (flag) {
      System.err.println("消息发送成功");
   }
}

但是1000次循环就需要调用 1000次 waitForConfirms 这效率特别低

channel.queueDeclare(QUEUE_NAME, true, false, true, null);
channel.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
stopWatch.start();
for (int i = 0; i < 1000; i++) {
   channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
         String.format("hello%d", i).getBytes(StandardCharsets.UTF_8));
   outstandingMessageCount++;
   if (batchSize == outstandingMessageCount) {
      // 这是批量消息处理
      channel.waitForConfirms();
      outstandingMessageCount = 0;
   }
}

这里就改成100次消息发送, 处理一次同步操作消息等待

消息return机制

当前的消息通过exchange投递到queue不成功, 就会触发return机制的函数

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

mandatory参数的作用是什么?

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

文字描述: 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

immediate参数的作用(已弃用)

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

RabbitMQ 3.0版本开始去掉了对immediate参数的支持,原因是因为immediate参数会影响镜像队列的性能、增加了代码的复杂度,建议采用TTL和DLX的方法替代。

消息回退机制代码实现

Producer的思路非常简单, 设置mandatory = true, 然后channel.addReturnListener

public static final String EXCHANGE = "exchange";
   public static final String ROUTING_KEY = "";
   
   public static void main(String[] args) throws Exception {
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true,
            true, null);
      
      ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
      
      channel.confirmSelect();
      channel.addConfirmListener(new ConfirmListener() {
         @Override
         public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.err.println("消息发送成功...");
            if (multiple) {
               ConcurrentNavigableMap<Long, String> headMap = map.headMap(deliveryTag);
               headMap.clear();
            } else {
               map.remove(deliveryTag);
            }
         }
         
         @Override
         public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.err.println("消息发送失败");
            // 意味着这里存在多个消息被否定, 可以使用 head 拿到小于 deliveryTag 的消息, 全部进行重试
            if (multiple) {
               ConcurrentNavigableMap<Long, String> messageHeadMap = map.headMap(deliveryTag);
               // 这里可以将消息保存到数据库或者redis中
               messageHeadMap.forEach((id, s) -> {
                  System.err.println("id: " + id + " message: " + s);
               });
               messageHeadMap.clear();
            } else {
               String messageNode = map.remove(deliveryTag);
               // 这里可以将消息保存到数据库或者redis中
               System.err.println("message: " + messageNode);
            }
         }
      });
      
      channel.addReturnListener(returnMessage -> {
         System.err.println("消息被return回来了...");
         int replyCode = returnMessage.getReplyCode();
         String replyText = returnMessage.getReplyText();
         String exchange = returnMessage.getExchange();
         String routingKey = returnMessage.getRoutingKey();
//       AMQP.BasicProperties properties = returnMessage.getProperties();
         String message = new String(returnMessage.getBody(), Charset.defaultCharset());
         System.err.println("message: " + message);
         System.err.println("exchange: " + exchange);
         System.err.println("routingKey: " + routingKey);
         System.err.println("replyText: " + replyText);
         System.err.println("replyCode: " + replyCode);
      });
      
      String message = "message";
      map.put(channel.getNextPublishSeqNo(), message);
      channel.basicPublish(EXCHANGE, ROUTING_KEY, true, false, MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes(Charset.defaultCharset()));
   }

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

Consumer:

public static final String QUEUE = "queue";
public static final String EXCHANGE = "exchange";

public static void main(String[] args) throws Exception {
   ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   
   channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true, true, null);
   channel.queueDeclare(QUEUE, true, false, true, null);
   channel.queueBind(QUEUE, EXCHANGE, "");
   
   channel.basicConsume(QUEUE, false, (consumerTag, message) -> {
      System.err.println("消息消费成功, consumeTag: " + consumerTag + "  message: "
            + new String(message.getBody(), Charset.defaultCharset()));
      channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
   }, consumerTag -> System.err.println("消息接受失败, consumeTag: " + consumerTag));
   
}

问题思考

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。

但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

另一个return机制: exchange备机

来总结下前面的问题

  1. 消息回退增加消费者代码复杂度
  2. 回退回来的消息要怎么处理? 记录日志也不知道怎么处理? 难道要手动处理?

RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,**当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。**当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

备机架构设计

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

备机和return机制同时存在默认走哪个机制?

备份exchange模式优先级高于消息return模式

什么情况的消息会进入备机?

消息无法投递到queue的原因, 我想想都有哪些?

  1. queue不存在
  2. queue满了
  3. 根据 routing key 找不到 queue (和第一种很像)

消息进入备机后, 我们该做什么?

答:

  1. 保存消息
  2. 报警告, 推消息告知管理员
  3. 最后在解决问题后将消息转发给正常的exchange

有哪些方法保存消息?

答:

  1. 可以保存到数据库, 但有IO上限, 不是很好的选择.
  2. 将消息持久化到queue中, 但是不创建自动消费的消费者, 我们可以在问题被发现并解决后再手动启动处理备份消息
  3. 保存到redis中, 方案不错, 但是在整个分布式系统中引入了新的变量, 导致项目不太问题

最终我还是选择了redis, 因为业务需求, 我们需要对备份的消息进行发送和查询甚至是删除操作, 持久化到queue不好操作, 当然如果你持久化到queue那么你需要保证消息

另一个原因是redis是分布式系统不可或缺的一个组件, 早晚都会用上的

我们虽然选择了redis方案, 但不意味着第二个方案(持久化到queue)不好, 我本人更加推荐使用方案二, 但是第二种方案我们需要及时处理问题, 否则可能导致消息爆满, 当然redis方案也是

发送消息的目的地是哪里?

正常是 exchange

那么怎么发送消息?

首先我们需要对消息进行包装, 不仅仅有内容还需要有发送地址

大致结构是这样:

消息id, 消息内容, 消息创建时间, 消息优先级
  • 消息id方式消息重复消费
  • 消息创建时间检查消息超过一定域值, 还没被消费, 后续可以做严重警告
  • 消息优先级: 可以当做消息重要性, 数字越大优先级越高
public class ConfirmConsumer {
   public static final String BACKUP_EXCHANGE = "backup.exchange";
   public static final String CONFIRM_EXCHANGE = "confirm.exchange";
   public static final String CONFIRM_QUEUE = "confirm.queue";
   
   public static void main(String[] args) throws Exception {
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      HashMap<String, Object> arguments = new HashMap<>();
      // 将exchange无法发送的消息, 转发给备份exchange
      arguments.put("alternate-exchange", BACKUP_EXCHANGE);
      channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);
      channel.queueDeclare(CONFIRM_QUEUE, true, false, true, null);
      channel.queueBind(CONFIRM_QUEUE, CONFIRM_EXCHANGE, "");
      
      DeliverCallback deliverCallback = (consumerTag, message) -> {
         System.err.println("消息接收时间: "
               + DatePattern.NORM_DATETIME_FORMAT.format(new Date())
               + " consumerTag: " + consumerTag + " message: " + message);
         channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      };
      CancelCallback cancelCallback = System.err::println;
      channel.basicConsume(CONFIRM_QUEUE, false, deliverCallback, cancelCallback);
   }
}

注意上面这段代码:

HashMap<String, Object> arguments = new HashMap<>();
// 将exchange无法发送的消息, 转发给备份exchange
arguments.put("alternate-exchange", BACKUP_EXCHANGE);
channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);

rabbitmq已经发送成功的消息怎么回滚、,springboot,Spring Cloud,rabbitmq,java-rabbitmq,rabbitmq,java

在正常的exchange上绑定备份exchange

public class BackupConsumer {
   public static final String BACKUP_EXCHANGE = "backup.exchange";
   public static final String BACKUP_QUEUE = "backup.queue";
   public static final String CONFIRM_EXCHANGE = "confirm.exchange";
   
   public static void main(String[] args) throws Exception {
      Scanner scanner = new Scanner(System.in);
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT, true, true, null);
      channel.queueDeclare(BACKUP_QUEUE, true, false, true, null);
      channel.queueBind(BACKUP_QUEUE, BACKUP_EXCHANGE, "");
      
      DeliverCallback deliverCallback = (consumerTag, message) -> {
         System.err.println("消息保存, 然后问题发现, 解决问题, 最后发送消息...");
         scanner.next();
         System.err.println("发送消息: " + ", messageId: " + message.getProperties().getMessageId()
               + ", 消息内容: " + new String(message.getBody(), Charset.defaultCharset()));
         channel.basicPublish(CONFIRM_EXCHANGE, "", message.getProperties(), message.getBody());
         channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      };
      CancelCallback cancelCallback = System.err::println;
      channel.basicConsume(BACKUP_QUEUE, false, deliverCallback, cancelCallback);
   }
}
public class WarningConsumer {
   public static final String BACKUP_EXCHANGE = "backup.exchange";
   public static final String WARNING_QUEUE = "warning.queue";
   
   public static void main(String[] args) throws Exception {
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT, true, true, null);
      channel.queueDeclare(WARNING_QUEUE, true, false, true, null);
      channel.queueBind(WARNING_QUEUE, BACKUP_EXCHANGE, "");
      
      DeliverCallback deliverCallback = (consumerTag, message) -> {
         AMQP.BasicProperties properties = message.getProperties();
         Integer priority = properties.getPriority();
         String messageId = properties.getMessageId();
         Date timestamp = properties.getTimestamp();
         
         System.err.println("日志警告, 消息等级: " + priority + ", 消息id: " + messageId
               + ", 消息创建时间: " + DatePattern.NORM_DATETIME_FORMAT.format(timestamp) + ", 消息内容: "
               + new String(message.getBody(), Charset.defaultCharset()));
         channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      };
      CancelCallback cancelCallback = System.err::println;
      channel.basicConsume(WARNING_QUEUE, false, deliverCallback, cancelCallback);
   }
}
public class Producer {
   
   public static final String BACKUP_EXCHANGE = "backup.exchange";
   public static final String CONFIRM_EXCHANGE = "confirm.exchange";
   
   public static void main(String[] args) throws Exception {
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      HashMap<String, Object> arguments = new HashMap<>();
      // 将exchange无法发送的消息, 转发给备份exchange
      arguments.put("alternate-exchange", BACKUP_EXCHANGE);
      channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);
      
      channel.confirmSelect();
      channel.addConfirmListener((deliveryTag, multiple) -> {
         String dateTime = DatePattern.NORM_DATETIME_FORMAT.format(new Date());
         System.err.println(deliveryTag + ": 消息应答" + " '发送'时间为: " + dateTime);
      }, (deliveryTag, multiple) -> System.err.println(deliveryTag + ": 消息未得到应答"));
      
      // 消费者提供消息超时功能
      AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
      builder.contentType("text/plain");
      builder.deliveryMode(2);
      builder.priority(0);
      for (int i = 0; i < 10; i++) {
         builder.timestamp(new Date());
         builder.messageId(UUID.fastUUID().toString(true));
         String message = "content: " + UUID.fastUUID().toString(false) + i;
         channel.basicPublish(CONFIRM_EXCHANGE, "",
               builder.build(), message.getBytes(Charset.defaultCharset()));
      }
   }
}

记住, 这里我们的备份消费者并没有做真实的操作, 而是使用

System.err.println("消息保存, 然后问题发现, 解决问题, 最后发送消息...");
scanner.next();
channel.basicPublish(CONFIRM_EXCHANGE, "", message.getProperties(), message.getBody());

模拟了管理员找问题, 解决问题最后将消息转发给正常的exchange的过程文章来源地址https://www.toymoban.com/news/detail-790756.html

到了这里,关于rabbitmq的消息确认和消息回退的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

    SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致; 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;

    2024年02月14日
    浏览(36)
  • RabbitMQ发送方确认机制

    RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失: 消息发送到交换机的过程。 消息从交换机发送到队列的过程。 而RabbitMQ提供了类似于回调函数的机制来告诉发送

    2024年02月09日
    浏览(25)
  • 【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

    这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。 目录 一、防止消息丢失 1.1、消息确认机制(生产者) (1)生产者丢失消息 (2)生产者消息确认机制 1.2、消息确认机制(消费者) (1)消费者丢失消息

    2024年02月02日
    浏览(44)
  • 消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列

    2024年02月21日
    浏览(31)
  • RabbitMQ消息确认

    目录 1. 消息确认作用 2 开发示例 2.1 生产者确认 2.2 消费者确认 保证消息的可靠性主要依靠三种机制:一个是消息的持久化,一个是事务机制,一个就是消息的确认机制。 1)消息持久化 消息持久化是将消息写入本地文件,如果rabbitmq故障退出,在重启时会从本地文件系统读

    2024年02月13日
    浏览(31)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(32)
  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(31)
  • Rabbitmq的消息确认

    配置文件 消息从生产者到交换机 无论消息是否到交换机ConfirmCallback都会触发。 消息从交换机到队列 只有消息没到达队列才会触发ReturnsCallback 消息从队列到消费者 (ACK) 消息默认是自动确认的(手动确认需配置文件开启),无论消息是否被成功消费都会被确认,确认后消息

    2024年02月14日
    浏览(22)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(31)
  • 【RabbitMQ】RabbitMQ如何确认消息被消费、以及保证消息的幂等

    目录 一、如何保证消息被消费 二、如何保证消息幂等性 RabbitMQ提供了消息补偿机制来保证消息被消费,当一条消费被发送后,到达队列后发给消费者。消费者消费成功后会给MQ服务器的队列发送一个确认消息,此时会有一个回调检测服务监听该接收确认消息的队列,然将消费

    2024年02月16日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包