【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

这篇具有很好参考价值的文章主要介绍了【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。

本文将介绍一些与消息队列相关的关键概念和技术,包括死信交换机(Dead Letter Exchange)、消息的 TTL(Time To Live,生存时间)、以及使用 DelayExchange 插件实现消息的延迟处理。通过深入理解这些概念和技术,将能帮助我们更好地设计和构建具有高可用性和可靠性的消息队列系统。

首先,我将介绍死信交换机以及它的作用,然后讨论如何创建死信交换机和死信队列。随后,将深入研究消息的TTL,了解它的作用和如何配置。最后,将探讨如何使用 DelayExchange 插件来实现消息的延迟处理,以满足各种应用需求。

一、死信交换机

1.1 什么是死信和死信交换机

在了解什么是死信交换机之前,让我们首先来了解一下什么是死信。 在消息队列系统中,死信(Dead Letter)是指未能被成功消费的消息。这些消息通常由于多种原因而变为死信,一些主要的原因如下:

  • 消费失败: 当消息被消费者(consumer)拒绝(reject)或未能被确认(acknowledge),并且针对与处理失败的消息没有设置重新入队(requeue)参数时,它们可能成为死信。这可能是因为消息格式错误、业务处理失败、或者其他原因导致消费者无法处理消息。

  • 消息超时: 消息在队列中等待消费,但在一定时间内未被消费者处理。这个时间限制通常由消息的 TTL(Time To Live,生存时间)来定义。当消息超过其 TTL 后,它就变为死信。

  • 队列堆积满: 当消息队列积累了大量消息,无法容纳更多消息时,最早的消息可能成为死信,因为它们无法被及时处理。

因此为了处理这些死信消息,消息队列系统引入了 死信交换机(Dead Letter Exchange)。死信交换机是一个特殊的交换机,它接收死信消息,并根据规则将这些消息路由到死信队列。通过使用死信交换机,系统可以将死信消息从正常队列中分离出来,以便进一步处理或分析。

死信交换机通常与队列绑定,当队列中的消息变为死信时,它们会被发送到与之相关联的死信交换机,然后再路由到死信队列。这种机制使得系统能够更好地处理消息的异常情况,确保消息不会被永久丢失。

给队列绑定死信交换机的方法:

  • 给队列设置dead-letter-exchange属性,指定一个交换机;
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的 RoutingKey

如下图所示:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

在上图中,simple.queue 就与死信交换机 dl.direct 绑定,最后路由到死信队列dl.queue,后续就可以编写其他逻辑来处理死信队列中的消息。

死信和死信交换机是构建可靠消息处理系统的重要组成部分,它们能够帮助我们跟踪和处理未能成功消费的消息,确保数据不会遗失,同时提供更好的可用性和可维护性。

1.2 死信交换机和死信队列的创建方式

  1. 使用 @Bean 的方式创建:
// 声明普通的 simple.queue 队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}

// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}

// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}

// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
  1. 使用 @RabbitListener 注解的方式创建:
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "dl.queue", durable = "true"),
        exchange = @Exchange(name = "dl.direct"),
        key = "dl"
))
public void listenDLQueue(String msg) {
    log.info("消费者接收到 dl.queue 的延迟消息:" + msg);
}

在这种情况下,注意需要在创建 simple.queue 是时候,绑定死信交换机。

二、消息的 TTL

2.1 什么是消息的 TTL

消息的TTL,全称为"Time To Live",是消息队列系统中的一个重要概念。它定义了消息在队列中存活的时间,也就是消息在被发送到队列后,允许存留在队列中的时间长度。一旦消息的TTL超过设定的时间,消息将被认为已过期,消息队列系统将会将其标记为死信(Dead Letter)并将其路由到相关的死信队列。

在消息队列中,消息的超时分为两种情况:

  1. 消息所在的队列设置了储存消息的超时时间;

  2. 消息本身设置了超时时间;

但是不管哪种情况,一定消息超时了,都会成为死信,如下图所示:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息对上图的简单解释:

  • 上图中,设置了ttl.queue 的超时时间为 10000 毫秒,意味着一个消息在该队列中储存的时间不会超过这么长的时间;
  • 另外,也可以在发送消息的时候给这个消息设置在队列中的超时时间,例如 5000 毫秒。
  • 无论是哪种情况,一旦消息超时了,都会发送到死信交换机,然后再路由死信队列,最后由处理死信的逻辑处理这些消息。

2.2 基于死信交换机和 TTL 实现消息的延迟

根据上面的死信交换机和 TTL 的特点,我们可以实现延迟处理消息的功能,TTL 和 死信的交换机及其队列的结构图示如下:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息
下面就使用 Spring AMQP 来声明和实现这些交换机和队列:

  1. 首先通过 @RabbitListener 注解声明一组死信交换机和死信队列,并指定处理死信的逻辑:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDLQueue(String msg) {
        log.info("消费者接收到 dl.queue 的延迟消息:" + msg);
    }
    
  2. 然后通过 @Bean 的方式声明一组 TTL 的交换机和队列

    /**
     * 声明 TTL 交换机
     */
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttl.direct", true, false);
    }
    
    /**
     * 声明 TTL 队列
     * 1. 指定消息的 TTL
     * 2. 指定死信交换机
     * 3. 指定死信交换机的 RoutingKey
     */
    @Bean
    public Queue ttlQueue() {
        return QueueBuilder
                .durable("ttl.queue") // 指定队列的名称
                .ttl(10_000) // 指定 TTL 为 10 秒
                .deadLetterExchange("dl.direct") // 指定死信交换机
                .deadLetterRoutingKey("dl") // 指定死信交换机的 RoutingKey
                .build();
    }
    
    /**
     * 绑定 TTL 交换机和队列
     */
    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
    
  3. 最后,在publisher 中编写发送消息的逻辑

    @Test
    public void testTTLMessage() {
        // 1. 创建消息
        Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        // 2. 创建消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3. 发送消息
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
        log.info("发送延迟消息成功!消息ID: {}", correlationData.getId());
    }
    
    1. 验证延迟消息

在创建ttl.queue的时候,指定了消息在队列中的 TTL 不超过 10 秒,因此预测当发送消息 10s 后,才会被消费者接收:

首先启动 consumer,并清除控制台日志,然后再发送消息:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息
通过对比控制台日志的时间,可以发现成功将消息延迟了 10 秒。

另外,也可以在发送消息时设置超时时间,可以通过 MessageBuilder 中的 setExpiration 设置消息的超时时间,这里设置为 5 秒:
【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息再次发送消息,并对比观察控制台日志的输出时间:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息可以发现,此时消息延迟了 5 秒,通过上面的对比演示可以得出结论:那就是在同时指定了消息的过期时间以及队列的超时时间,将会以短的那个时间为准。

三、基于 DelayExchang 插件实现延迟队列

3.1 安装 DelayExchang 插件

  1. 下载插件

RabbitMQ 有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html。其中包含各种各样的插件,包括我们要使用的 DelayExchange 插件:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息这里我选择的是 3.8.9 的版本:
【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

  1. 上传插件

这里我的 RabbitMQ 是基于 Docker 安装的,因此需要先查看 RabbitMQ 的插件目录对应的数据卷:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息
然后,直接进入数据卷挂载点目录:
【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息可以发现这个目录下其实以及有很多的插件的了,然后上传刚才下载的插件到这个目录:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

  1. 安装插件

最后就是安装了,安装时需要进入 MQ 容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

然后执行安装的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最后出现下面的日志,就说明安装 DelayExchang 插件成功了:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

3.2 DelayExchang 实现消息延迟的原理

DelayExchange 是一个用于实现消息延迟发送的插件,可以在消息队列系统中非常有用。其工作原理如下:

  1. 创建 DelayExchange:首先,需要创建一个 DelayExchange,这是一个特殊的交换机,用于处理延迟消息。通常,可以使用消息队列系统的管理工具或API(如 Spring AMQP 的API)来声明和配置 DelayExchange。

  2. 发送消息到 DelayExchange:当需要发送一个延迟消息时,将消息发送到 DelayExchange,而不是直接发送到目标队列。在发送消息时,需要为消息设置一个属性,通常称为 x-delay,它表示消息的延迟时间。这个属性的值通常以毫秒为单位,定义了消息应该延迟多长时间才会被投递。

  3. DelayExchange 检查 x-delay 属性:当消息到达DelayExchange时,它会检查消息的 x-delay 属性。如果该属性存在,说明这是一个延迟消息。DelayExchange会将消息持久化到硬盘,并记录 x-delay 的值作为延迟时间。

  4. 返回 Routing Not Found:DelayExchange 会向消息的发送者返回 “Routing Not Found” 的响应,意味着消息当前没有目标队列可以接收。这是因为消息不会立即被投递,而是需要等待一定的延迟时间。因此如果设置了生产者消息确认的 publisher-returnReturnCallback,就需要进行额外的处理以避免错误的提示。

  5. 延迟时间到期:经过预定的延迟时间后,DelayExchange 会重新检查已存储的消息,查看是否有消息已经到达或超过了其设定的延迟时间。

  6. 重新投递消息:一旦消息的延迟时间到期,DelayExchange将重新投递消息到指定的目标队列,允许消费者最终接收和处理消息。

通过 DelayExchange 的这一机制,可以实现消息的延迟发送,非常适合需要进行任务调度、处理延迟任务或者在时间敏感任务的应用中使用。它有助于减轻系统负载,提高消息传递的可靠性,以及更好地满足特定的应用需求。

3.3 使用 DelayExchang 实现消息的延迟

  1. 首先,使用 @RabbitListener 注解声明一组延迟交换机和队列,以及延迟消息的处理逻辑。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了 delay.queue 的消息:" + msg);
    }
    

这里使用 @RabbitListener 注解声明交换机和队列和前面的操作基本一致,唯一的区别在于声明交换机的时候,额外设置了一个 delayed 参数,表明声明的是一个延迟交换机。

  1. publisher 中发送延迟消息

    @Test
    public void testDelayMessage() {
        // 1. 创建消息
        Message message = MessageBuilder.withBody("hello, delay message".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000) // 添加 x-delay 头信息
                .build();
        // 2. 创建消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3. 发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
        log.info("通过延迟交换机发送延迟消息成功!消息ID: {}", correlationData.getId());
    }
    

同样,此处发送消息的逻辑也和前面基本一致,只是在 MessageBuilder 中使用 setHeader 额外设置了一个x-delay 的头信息,表明了该消息是延迟消息,同时也指定了消息的超时时间。

  1. 验证延迟消息

同样的,首先启动 consumer,清除控制台日志,然后向延迟交换机发送消息:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息
通过日志可以看到,成功发送了延迟消息,但是却出现了错误的日志信息,告诉我们是delay.direct交换机没有成功将消息路由到 delay.queue中,但是通过 consumer 的控制台在延迟 5 秒后发现成功接收并处理了这个消息:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息出现上面错误日志的原则在上文的 DelayExchang 实现消息延迟的原理中的第 4 点已经提到了,使用 DelayExchang 实现消息的延迟,是会在达到了设置延迟时间,再将消息发送给队列的。但是,由于交换机在收到消息的时候,没有立即路由给队列,在返回确认消息给生产者的就是“Routing Not Found”,因此就会使得生产者误以为路由失败了。

另外,在上面的错误日志中,可以发现有一个 receivedDelay 参数的值是 5000,也就是延迟的时间,我们可以根据这个参数,在 RetuenCallback 中排除发送延迟消息时产生的的错误提示:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

然后,再次发送延迟消息到延迟交换机,就不会出现上面的错误提示了:

【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列,微服务,RabbitMQ,rabbitmq,延迟消息

至此,我们便成功使用 DelayExchang 实现了发送延迟消息的功能。可以发现,使用 DelayExchang 插件实现延迟消息比前面使用死信交换机和 TTL 来实现延迟消息更加的简单。文章来源地址https://www.toymoban.com/news/detail-744869.html

到了这里,关于【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 死信交换机&延迟队列

    说明:在MQ中,当一个队列中的消息出现以下情况时,就成为了死信(Dead Letter); 被消费者拒绝消费或者声明失败,并且requeue设置为false,即不再重新入队列; 队列中的消息存满,消息无法再入队列; 消息过期 此时,可以通过指定死信交换机,把这些消息路由到一个专门

    2024年02月16日
    浏览(41)
  • 【学习日记2023.6.19】 之 RabbitMQ服务异步通信_消息可靠性_死信交换机_惰性队列_MQ集群

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收

    2024年02月11日
    浏览(58)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(47)
  • RabbitMQ 备份交换机和死信交换机

      为处理生产者将消息推送到交换机中,交换机按照消息中的路由键及自身策略无法将消息投递到指定队列中造成消息丢失的问题,可以使用备份交换机。   为处理在消息队列中到达TTL的过期消息,可采用死信交换机进行消息转存。可以通过死信交换机的方式实现延迟队

    2024年02月14日
    浏览(52)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • RabbitMQ系列之死信交换机的使用

      🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《RabbitMQ系列之死信交换机的使用》。🎯🎯 🎁如果感觉还不错的话请给我关注加三连吧!🎁🎁           在我们上一期的RabbitMQ博客系列的分享中我们分

    2024年02月19日
    浏览(52)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(59)
  • RabbitMQ系列(27)--RabbitMQ使用Federation Exchange(联邦交换机)解决异地访问延迟问题

    前言: (broker北京)、(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA 中,就算在开启了publisherconfirm机制或

    2024年02月13日
    浏览(70)
  • rabbitmq基础7——队列和消息过期时间设置、死信队列、延迟队列、优先级队列、回调队列、惰性队列

    这里过一个知识点——过期时间,即对消息或队列设置过期时间(TTL)。一旦消息过期,消费就无法接收到这条消息,这种情况是绝不允许存在的,所以官方就出了一个对策——死信队列,死信队列最初出现的意义就是为了应对消息过期丢失情况的手段之一。 那么过期时间具

    2024年02月03日
    浏览(75)
  • 消息队列RabbitMQ.02.交换机的讲解与使用

    目录 RabbitMQ中交换机的基本概念与作用解析 交换机的作用: 交换机的类型: 直连交换机(Direct Exchange): 将消息路由到与消息中的路由键(Routing Key)完全匹配的队列。 主题交换机(Topic Exchange): 使用通配符匹配路由键,允许更灵活的消息路由。 扇形交换机(Fanout E

    2024年01月24日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包