RabbitMQ:死信队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ:死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.死信队列

1.1死信队列基本介绍

  • 队列中不能被消费的消息称为死信队列
  • 有时候因为特殊原因,可能导致队列中的某些信息无法被消费,而队列中这些不能被消费的消息在后期没有进行处理,就会变成死信队列,死信队列中的消息称为死信
  • 应用场景:未来保证订单业务的消息数据不丢失,我们需要使用到RabbitMQ的死信队列机制,当消息消费发生异常的时候,我们就把消息投入到死信队列中,比如说用户买东西,下单成功后去支付,但是没有在指定时间支付的时候就会自动失效。
  • 死信队列,英文缩写:DLX 。DeadLetter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
  • 当消息在一个队列中变成死信后,它能被重新发布到另一个Exchange中,这个Exchange就是DLX
  • rabbitmq死信队列,消息中间件,微服务,java-rabbitmq,rabbitmq,java,死信队列,消息中间件

1.2消息成为死信的三种情况

  1. 队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
  3. 原队列存在消息过期设置,消息到达超时时间未被消费。

1.3死信队列结构图

通常情况下,消费者是能正常消费消息的,但是出现上面说的三种情况之一,就无法正常消费信息,消息就会进入死信交换机,死信交换机会和死信队列进行绑定,最后由其他消费者来消费死信消息。

rabbitmq死信队列,消息中间件,微服务,java-rabbitmq,rabbitmq,java,死信队列,消息中间件

1.4死信的处理方式

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
① 丢弃,如果不是很重要,可以选择丢弃
② 记录死信入库,然后做后续的业务分析或处理
③ 通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理。
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange 和x-dead-letter-routing-key
rabbitmq死信队列,消息中间件,微服务,java-rabbitmq,rabbitmq,java,死信队列,消息中间件

2.TTL消息过期时间

2.1基本介绍

当消息到达存活时间后,还没有被消费,就会被自动清除。RabbitMQ可以对消息或者队列设置过期时间,队列中的消息过期是成为死信队列的三种原因之一。
rabbitmq死信队列,消息中间件,微服务,java-rabbitmq,rabbitmq,java,死信队列,消息中间件

2.2生产者

public class Producer {
    //正常交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //正常队列
    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //声明交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //声明队列
            //channel.queueDeclare(NORMAL_QUEUE, true, false, false, null);
            //把正常交换机和正常队列进行绑定
            //channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "tom");
            //设置过期时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

            //发送消息
            for (int i = 0; i < 10; i++) {
                String message = "消息:" + i;
                //发送消息
                channel.basicPublish(NORMAL_EXCHANGE, "tom", null, message.getBytes());

            }


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }


}

2.3消费者1

public class Consumer1 {
    //定义交换机(正常交换机,死信交换机)
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //定义队列(正常队列,死信队列)
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) {

        try {
            //创建信道对象
            Channel channel = ConnectUtil.getChannel();
            //声明交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

            //设置正常队列和死信队列进行绑定,key固定不可以改变
           Map<String, Object> map = new HashMap<>();
            map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            map.put("x-dead-letter-routing-key", "jack");
            //声明正常队列
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
            //正常交换机绑定正常队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_QUEUE,"tom");
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            //死信交换机绑定死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"jack");
            //消费消息

            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)
            channel.basicConsume(NORMAL_QUEUE, true, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

2.4消费者2

public class Consumer2 {
    //定义交换机(死信交换机)

    public static final String DEAD_EXCHANGE = "dead_exchange";
    //定义队列(死信队列)
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) {

        try {
            //创建信道对象
            Channel channel = ConnectUtil.getChannel();
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
            //死信交换机绑定死信队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "jack");

            //消费消息

            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)
            channel.basicConsume(DEAD_QUEUE, true, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

2.5设置TTL的两种方式

2.5.1队列设置TTL

在创建队列的时候设置队列的x-message-ttl属性,例如:

  Map<String, Object> map = new HashMap<>();
//设置队列有效期为10秒
map.put("x-message-ttl",10000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,map);

2.5.2消息设置TTL

对每条消息设置TTL

  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes());

2.5.3区别

如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃
如果是消息设置了TTL属性,那么即使消息过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,那么已经过期的消息也许还能存活较长时间。

如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃。文章来源地址https://www.toymoban.com/news/detail-785699.html

到了这里,关于RabbitMQ:死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入详解高性能消息队列中间件 RabbitMQ

      目录 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 优势 4、RabbitMQ 整体架构剖析 4.1、发送消息流程 4.2、消费消息流程 5、RabbitMQ 应用 5.1、广播 5.2、RPC VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++软件异常排查从入

    2024年02月05日
    浏览(35)
  • 消息队列中间件 - Docker安装RabbitMQ、AMQP协议、和主要角色

    不管是微服务还是分布式的系统架构中,消息队列中间件都是不可缺少的一个重要环节,主流的消息队列中间件有RabbitMQ、RocketMQ等等,从这篇开始详细介绍以RabbitMQ为代表的消息队列中间件。 AMQP协议 AMQP协议是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与

    2024年02月03日
    浏览(38)
  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(41)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(37)
  • 消息中间件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年01月17日
    浏览(57)
  • RabbitMQ消息中间件

    RabbitMQ消息中间件 RabbitMQ简介 windows下安装RabbitMQ RabbitMQ基本概念 RabbitMQ简单模式 RabbitMQ工作队列模式 RabbitMQ发布订阅模式 RabbitMQ路由模式 RabbitMQ主题模式 RabbitMQ RPC模式 RabbitMQ发布确认模式

    2024年02月10日
    浏览(40)
  • 消息中间件之RabbitMQ

    1.基于AMQP协议Erlang语言开发的一款消息中间件,客户端语言支持比较多, 比如Python,Java,Ruby,PHP,JS,Swift.运维简单,灵活路由,但是性能不高, 可以满足一般场景下的业务需要,三高场景下吞吐量不高,消息持久化没有采取 零拷贝技术,消息堆积时,性能会下降 2.消息吞吐量在

    2024年01月19日
    浏览(74)
  • 消息中间件RabbitMQ详解

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件机制的系统中

    2024年02月16日
    浏览(36)
  • RabbitMQ:可靠消息传递的强大消息中间件

     消息中间件在现代分布式系统中起着关键作用,它们提供了一种可靠且高效的方法来进行异步通信和解耦。在这篇博客中,我们将重点介绍 RabbitMQ,一个广泛使用的开源消息中间件。我们将深入探讨 RabbitMQ 的特性、工作原理以及如何在应用程序中使用它来实现可靠的消息传

    2024年02月12日
    浏览(60)
  • 高性能消息中间件 RabbitMQ

    消息队列 MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的 异步通信 。 同步通信相当于两个人当面对话,你一言我一语。必须及时回复: 异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。 消息

    2024年02月11日
    浏览(88)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包