RabbitMQ 死信队列实现

这篇具有很好参考价值的文章主要介绍了RabbitMQ 死信队列实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

// consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量
channel.basicAck(deliveryTag, multiple);

// 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列,该方法reject后,该消费者还是会消费到该条被reject的消息
channel.basicReject(deliveryTag, requeue);

// 不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
channel.basicNack(deliveryTag, multiple, requeue);

// 是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
channel.basicRecover(false);

搭建项目

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
@Configuration
public class RabbitMQConfig {


    // 正常业务
    public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";

    public static final String NORMAL_QUEUE_A = "normal-queue-a";

    public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";


    // 死信队列
    public static final String DEAD_EXCHANGE_A = "dead-exchange-a";

    public static final String DEAD_QUEUE_A = "dead-queue-a";

    public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";


    // 声明交换机
    @Bean("businessExchange")
    public TopicExchange normalExchangeA() {
        return new TopicExchange(NORMAL_EXCHANGE_A);
    }

    @Bean("deadExchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE_A);
    }


    // 声明队列

    @Bean("businessQueueA")
    public Queue businessQueueA() {
        HashMap<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
        args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
        return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
    }


    @Bean("deadQueueA")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_A).build();
    }


    // 声明绑定关系
    @Bean
    public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
    }

    @Bean
    public Binding bindingDead(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY_A);
    }
}
@Component
public class SmsListener {

    @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
    public void smsListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());

        System.out.println("收到消息:" + body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (body.contains("dead")) {
            channel.basicNack(deliveryTag, false, false);
        }
        channel.basicAck(deliveryTag, false);
    }
}
@Component
public class DeadListener {

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_A)
    public void deadListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());
        System.out.println("dead listener: " + body);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
@RestController
public class HelloController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/hello")
    public Boolean hello(String msg) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
        return true;
    }

}

使用注解

@Configuration
public class RabbitMQConfig {


    // 正常业务
    public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";

    public static final String NORMAL_QUEUE_A = "normal-queue-a";

    public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";


    // 死信队列
    public static final String DEAD_EXCHANGE_A = "dead-exchange-a";

    public static final String DEAD_QUEUE_A = "dead-queue-a";

    public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";


    // 声明交换机
    @Bean("businessExchange")
    public TopicExchange normalExchangeA() {
        return new TopicExchange(NORMAL_EXCHANGE_A);
    }


    // 声明队列
    @Bean()
    public Queue businessQueueA() {
        HashMap<String, Object> args = new HashMap<>(2);
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
        args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
        return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
    }


    // 声明绑定关系
    @Bean
    public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
    }


}

死信队列使用注解实现

@Component
public class DeadListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A),
            exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, type = ExchangeTypes.DIRECT),
            key = RabbitMQConfig.DEAD_ROUTING_KEY_A
    ))
    public void deadListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());
        System.out.println("死信队列消费消息: " + body);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
@Component
public class SmsListener {

    @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
//            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
//            key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
//    ))
    public void smsListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());

        System.out.println("正常消费消息:" + body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (body.contains("dead")) {
            channel.basicNack(deliveryTag, false, false);
//            return;
        }
        channel.basicAck(deliveryTag, false);
    }
}

报错:

Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    
// 由于程序编写不严谨,在 basicNack 执行后没有退出方法,导致最后还执行了 basicAck,出现了上述错误
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
            key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
    ))
    public void smsListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());

        System.out.println("正常消费消息:" + body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (body.contains("dead")) {
            channel.basicNack(deliveryTag, false, false);
            return;
        }
        channel.basicAck(deliveryTag, false);
    }
// 问题二: 控制台报错,但是也能正常消费mq消息,这里与第一种唯一的区别是在于 @RabbitListener, 我的推测是 自定义 bean 和注解生成的 bean 重复导致,看能不能使用注解绑定死信队列
2023-04-23 22:03:25.630 ERROR 8580 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'normal-queue-a' in vhost '/': received none but current is the value 'dead-exchange-a' of type 'longstr', class-id=50, method-id=10)
Broker not available; cannot force queue declarations during start: java.io.IOException
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
    String TRUE = "true";
    String FALSE = "false";

    @AliasFor("name")
    String value() default "";

    @AliasFor("value")
    String name() default "";

    String type() default "direct";

    String durable() default "true";

    String autoDelete() default "false";

    String internal() default "false";

    String ignoreDeclarationExceptions() default "false";

    String delayed() default "false";

    Argument[] arguments() default {};

    String declare() default "true";

    String[] admins() default {};
}
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC, arguments = {
                    @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
                    @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
            }),
            key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
    ))
    public void smsListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());

        System.out.println("正常消费消息:" + body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (body.contains("dead")) {
            channel.basicNack(deliveryTag, false, false);
            return;
        }
        channel.basicAck(deliveryTag, false);
    }

可以使用注解的方式来绑定 死信队列,但是还是会报上面的错误,继续修改 参数试试

java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow

但是使用注解绑定的话好像又不生效了,问题原因,tmd将死信参数绑到交换机上了,c

修改代码

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
                    @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
                    @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
            }),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
            key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
    ))
    public void smsListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());

        System.out.println("正常消费消息:" + body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (body.contains("dead")) {
            channel.basicNack(deliveryTag, false, false);
            return;
        }
        channel.basicAck(deliveryTag, false);
    }

 至于问题二是由于队列和交换机默认持久化,这样就导第二次启动项目时重复

Springboot纯注解版的RabbitMq 死信队列_注解声明私信队列_lopo呀的博客-CSDN博客

全注解版

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
// 正常业务
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";

public static final String NORMAL_QUEUE_A = "normal-queue-a";

public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";


// 死信队列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";

public static final String DEAD_QUEUE_A = "dead-queue-a";

public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
@Component
public class SmsListener {
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
                    @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
                    @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
            }),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
            key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
    ))
    public void smsListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());

        System.out.println("正常消费消息:" + body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (body.contains("dead")) {
            channel.basicNack(deliveryTag, false, false);
            return;
        }
        channel.basicAck(deliveryTag, false);
    }
}
@Component
public class DeadListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A, durable = "false"),
            exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, durable = "false"),
            key = RabbitMQConfig.DEAD_ROUTING_KEY_A
    ))
    public void deadListener(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());
        System.out.println("死信队列消费消息: " + body);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
@GetMapping("/hello")
public Boolean hello(String msg) {
    System.out.println("发送消息:" + msg);
    rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
    return true;
}
// conslog
发送消息:dead
正常消费消息:dead
死信队列消费消息: dead

明天在研究下回调啥的

springboot整合rabbitMQ confirm 确认模式 return 退回模式_weixin_44318244的博客-CSDN博客

回调

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

  • return 退回模式

rabbitmq 整个消息投递的路径为:producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback

我们将利用这两个 callback 控制消息的可靠性投递

消息的可靠投递小结 ➢ 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。 ➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

➢ 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。 ➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

确认模式

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirm-type: correlated # 发布确认属性配置
    publisher-returns: true # 开启 退回模式
	public enum ConfirmType {

		/**
		 * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
		 * within scoped operations.
		 SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或		  			 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会			关闭channel,则接下来无法发送消息到broker;

		 */
		SIMPLE,

		/**
		 * Use with {@code CorrelationData} to correlate confirmations with sent  发布消息成功到交换器后会触发回调方法
		 * messsages.
		 */
		CORRELATED,

		/**
		 * Publisher confirms are disabled (default).
		 */
		NONE

	}
@Configuration
public class PublisherConfirmHandler implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            System.out.println("发送消息到交换机成功!MessageId: " + correlationData.getId());
        }else {
            System.out.println("发送消息到交换机失败!MessageId: " + correlationData.getId() + ", 退回原因:" + cause);
        }
    }
}
@Resource
private PublisherConfirmHandler publisherConfirmHandler;

rabbitTemplate.setConfirmCallback(publisherConfirmHandler);

RabbitMQ 死信队列实现

回退模式  

@Configuration
public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("return 执行了!" + returned);
    }
}

 文章来源地址https://www.toymoban.com/news/detail-434000.html

// 
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallbackHandler);

到了这里,关于RabbitMQ 死信队列实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【初始RabbitMQ】死信队列的实现

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的 原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有 后续的处理,就变成了死

    2024年02月21日
    浏览(36)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(60)
  • RabbitMQ的消费者处理消息失败后可以重试,重试4次仍然失败发送到死信队列。

    生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,

    2024年02月07日
    浏览(58)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(43)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(59)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(49)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(47)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(53)
  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(82)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包