Rabbitmq死信队列及延时队列实现

这篇具有很好参考价值的文章主要介绍了Rabbitmq死信队列及延时队列实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题:什么是延迟队列

我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。

但RabbitMq中并没有提供延迟队列功能。那么RabbitMQ如何实现延迟队列

通过:死信队列 + RabbitMQ的TTL特性实现。

实现原理

给一个普通带有过期功能的队列绑定一个死信队列,消息先进延时队列,过期了后消息进入死信队列,死信队列的消息会转发到对应的queue里面,我们只需要消费死信的queue里面的消息就可以了。

一、TTL特性说明

TTL就是消息或者队列的过期功能。当消息过期就会进到死信队列,死信队列和普通队列没啥区别,然后我们只需要配置一个消费者来消费死信队列里面的消息就可以了。

  1. 如果不设置x-message-ttl,则表示消息不会过期
  2. 如果x-message-ttl设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
  3. 如果x-message-ttl设置不为0,表明消息或队列中所有消息的最大存活时间(过期时间),单位是毫秒。

需要注意: RabbitMQ只会对队列头部的消息进行过期淘汰,消息是否过期是在即将投递消息到消费者之前判定的,如果队列出现消息堆积情况,则已过期的消息还是会继续存活的

比如过期时间设置在消息上,由于消息进队列是先进先出,假设先进去的消息过期时间长,后进的消息过期时间短,先进来的消息还没过期,后面进来的消息就算到了过期时间,消息也不会过期也就不会从队列中剔除,从而导致队列里面的消息积压。

二、死信队列

简单理解就是要被丢弃的消息才会放到死信队列。

1、消息什么时候变为死信

1、消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue重回队列属性设为false
2、消息在队列里的时间超过了该消息设置的过期时间(TTL)
3、消息队列到达了它的最大长度,之后再收到的消息。

2、死信队列的原理

当一个消息在队列里变为死信,它会被重新publish到绑定的死信队列所对应的exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个死信交换机所绑定队列就可以了。

下面看个案例:
正常情况,我们配置交换机-A,会为其绑定路由键-A,和路由键-A所映射的队列-A,并设置这个队列-A的死信交换机-B、死信路由键-B。 消息消费者-A,监听队列-A,基本上就没死信交换机什么事了,特殊时候队列满了,或者接收后给的回执是false才会进到死信交换机-B。现在我们把消费者-A删了,并且设置队列-A里面消息过期时间,那么所有进到队列A里面消息就自然过期,然后被推给死信交换机,那么监听死信交换机-B的消息者-B,就可以得到过期后的消息了,就实现了延时功能。

3、死信交换机

死信交换机DLX实际上也是普通的交换机,说白的就是将死信(过期消息)路由到死信队列的交换机。

4、死信队列

死信队列DLQ(Dead Letter Queue)实际上也是普通的队列,只不过它存储的是死信交换机路由过来的死信(过期消息)

三、代码

1、给队列绑定死信队列
Map<String, Object> args = new HashMap<>(2);
args.put(x-dead-letter-exchange,死信队列交换机)
args.put(x-dead-letter-routing-key,死信routeKey)
args.put(x-message-ttl,队列里面的消息最大存活时间) 
//申明一个普通队列,并且给这个普通队里绑定一个死信队列。当消息过期就会转到死信队列里面去。
Queue queue = QueueBuilder.durable(queue).withArguments(args).build();
2、设置过期时间

目前有两种方式可以设置消息的TTL。第一种是通过队列的属性设置,队列中的所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法同时设置,则TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时就会进到死信。

1、通过队列属性设置消息过期时间
Map<String, Object> arguments = Maps.newHashMap();
//设置消息发送到队列中在被丢弃之前可以存活的时间,单位:毫秒
arguments.put("x-message-ttl", 5000);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
2、发送消息的时候,对消息本身设定过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setExpiration("5000");//设置消息多久没有被消费在过期。
        message.getMessageProperties().setContentEncoding("UTF-8");
        return message;
    }
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);

上面底层代码是:文章来源地址https://www.toymoban.com/news/detail-611951.html

AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder();
properties.expiration("5000");//设置消息多久没有被消费在过期。
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties.build(), message.getBytes());

到了这里,关于Rabbitmq死信队列及延时队列实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(38)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(33)
  • MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

    书接上文,展示一下五种模型我使用的是spring could 微服务的框架 文章说明:         本文章我会分享总结5种实用的rabbitMQ的实用模型 1、hello world简单模型 2、work queues工作队列 3、Publish/Subscribe发布订阅模型 4、Routing路由模型 5、Topics 主题模型 (赠送) 6、消息转换器 Rabbi

    2024年02月05日
    浏览(40)
  • mq 消息队列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十几年前,淘宝的notify,借鉴ActiveMQ。京东的ActiveMQ集群几百台,后面改成JMQ。 Linkedin的kafka,因为是scala,国内很多人不熟。淘宝的人把kafka用java写了一遍,取名metaq,后来再改名RocketMQ。 总的来说,三大原因,语言、潮流、生态。 MQ这种东西,当你的消息量不大的时候,用啥

    2024年02月12日
    浏览(37)
  • Rabbitmq死信队列及延时队列实现

    问题:什么是延迟队列 我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。 但RabbitMq中并 没有提供延迟队列功能 。那么RabbitMQ如何实现延迟队列 通过:死信队列 + RabbitMQ的TTL特性实现。 实现原理 给一个普通带有过期功能的队列绑定一

    2024年02月15日
    浏览(33)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(37)
  • .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(32)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(36)
  • 深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常

    2024年02月03日
    浏览(28)
  • RabbitMQ --- 惰性队列、MQ集群

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。 解决消息堆积有三种思路: 增加更多消费者,提高消费速度。也就是我们之前说的work

    2024年02月03日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包