RabbitMq-发布确认高级(避坑指南版)

这篇具有很好参考价值的文章主要介绍了RabbitMq-发布确认高级(避坑指南版)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。

废话不说直接开始撸代码!!!在代码中解决实际问题~

一、代码架构分析:

        接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

RabbitMq-发布确认高级(避坑指南版),rabbitmq,分布式

二、构造“配置类”代码: 

声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。

package com.example.rabbitmq_demo.fabuquerengaoji;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClaseName: ConfirmConfig$
 * @Description:配置类 发布确认(高级)
 * @Author: wuhs
 * @Date: 2023/8/16$ 14:32$
 * 快捷键ctrl+shift+u  字母大小写转化
 */
@Configuration
public class ConfirmConfig {
    // 交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    // 队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    // ROUTING-KEY
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,
                                        @Qualifier("confirmQueue") Queue queue) {
        //一般使用在项目中使用@Qualifier来限定注入的Bean。
        return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);

    }
}

三、构建消费者代码:

通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClaseName: Consumer$
 * @Description:消费者
 * @Author: wuhs
 * @Date: 2023/8/16$ 15:18$】
 */
@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void reciverConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm.queue消息:{}", msg);
    }
}

四、创建“回调”方法

        在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?

        在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @ClaseName: MyCallBack$
 * @Description:
 * @Author: wuhs
 * @Date: 2023/8/16$ 16:17$
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 注入
        rabbitTemplate.setConfirmCallback(this);

    }
   //交换机确认回调方法  
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {//发消息 交换机接收到了消息 回调
            log.info("交换机已经收到了ID为:{}的消息", id);
        } else {//发消息 交换机没有接收到了消息 回调
            log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);

        }
    }
}

confirm方法参数介绍:

* 1. correlationData 保存回调消息的ID及相关信息
* 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到)
* 3. reason 失败的原因

 六、配置类声明:(application.yml)

        在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:

server:
  port: 8899

spring:
  rabbitmq:
    host: 124.221.94.214
    port: 5672
    username: xgsm
    password: xgsm123
    # 发送者开启 confirm 确认机制
    publisher-confirms: true
    publisher-confirm-type: correlated

 publisher-confirm-type参数介绍:

publisher-confirm-type这个参数一共有三种配置方法:

# NONE:禁用发布确认,是默认值。

# CORRELATED:发布消息后,交换机会触发回调方法。

# SIMPLE:有两种效果:

1:和CORRELATED一样会触发回调方法

2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,

# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

 七、创建Controller层(消息生产者)

        这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClaseName: ProducerController$
 * @Description:消息生产者
 * @Author: wuhs
 * @Date: 2023/8/16$ 14:58$
 */
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // @PathVariable主要作用:映射URL绑定的占位符
    @RequestMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //正常发送
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);

        //发送失败-交换机不存在的情况
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);

        log.info("发送的消息为:{}", message);
    }
}

测试结果: 

RabbitMq-发布确认高级(避坑指南版),rabbitmq,分布式 如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”

  CorrelationData correlationData3 = new CorrelationData("3");
  rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);

测试结果:

RabbitMq-发布确认高级(避坑指南版),rabbitmq,分布式

         通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下

1、application.yml文件中添加消息回退配置

# 发送者开启 return 确认机制
    publisher-returns: true

 2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @ClaseName: MyCallBack$
 * @Description:
 * @Author: wuhs
 * @Date: 2023/8/16$ 16:17$
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);

    }

    /**
     * 交换机确认回调方法
     * 1、发消息 交换机接收到了消息 回调
     * 1.1 correlationData 保存回调消息的ID及相关信息
     * 1.2 交换机收到消息 ack=true
     * 2、发消息 交换机接收失败了 回调
     * 2.1 correlationData 保存回调消息的ID及相关信息
     * 2.2 交换机接收到消息 ack=false
     * 2.3 reason 失败的原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到了ID为:{}的消息", id);
        } else {
            log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);

        }
    }

    //可以在当消息传递的过长中不可达目的地时将消息返回给生产者
    // 只有不可待目的地的时候 才进行回退
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);
    }
}

测试结果:

2023-08-17 10:36:32.476  INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3

问题解决!~ 文章来源地址https://www.toymoban.com/news/detail-678007.html

到了这里,关于RabbitMq-发布确认高级(避坑指南版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列

    2024年02月21日
    浏览(37)
  • 【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认

                                                                       💧 【 R a b b i t M Q 教程】第三章—— R a b b i t M Q − 发布确认 color{#FF1493}{【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认} 【 R abbi tMQ 教程】第三章 —— R abbi tMQ − 发布确认

    2024年02月08日
    浏览(87)
  • RabbitMQ 发布确认机制

    发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段   生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。   confirm模式下的信道所发送的消息都将被应带ack或者nack一次

    2024年02月13日
    浏览(38)
  • rabbitmq的发布确认

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式, 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队

    2024年02月12日
    浏览(50)
  • RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现

    概念:虽然我们可以设置队列和队列中的消息持久化,但任然存在消息在持久化的过程中,即在写入磁盘的过程中,消息未完全写入,然后服务器宕机导致消息丢失的情况,发布确认就是为了解决这种情况的概念,在消息完全写入磁盘后才确认消息完全持久化了 1、发布确认

    2024年02月13日
    浏览(37)
  • 8. springboot + rabbitmq 消息发布确认机制

    在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。 消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,

    2023年04月25日
    浏览(60)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(73)
  • RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月10日
    浏览(54)
  • (七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

    发布者确认 是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理 异步确认 ,这意味着它们已在服务器端得到处理。 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主

    2024年02月16日
    浏览(40)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(70)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包