RabbitMQ常见问题之延迟消息

这篇具有很好参考价值的文章主要介绍了RabbitMQ常见问题之延迟消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而
这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
RabbitMQ常见问题之延迟消息,Server架构,# RabbitMQ,rabbitmq,分布式

二、TTL

RabbitMQ常见问题之延迟消息,Server架构,# RabbitMQ,rabbitmq,分布式

如果messagequeue都有ttl,采用更小的一方。

1. Queue指定死信交换机并设置TTL

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

2. 消息设置TTL

@Test
public void testTTLMessage(){
    Message message = MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setExpiration("5000")
            .build();
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
    log.info("ttl消息已发送");
}

借助TTL机制可以用死信交换机模拟延迟队列,但是设计上比较牵强,性能不好。

三、延迟队列

这是官方提供的一些额外插件
https://www.rabbitmq.com/community-plugins.html

下载其中的DelayExchange插件,把.ez文件挂载到RabbitMQ容器的/plugins目录下,然后进入容器,执行

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
root@7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@7c4ba266e5bc:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@7c4ba266e5bc...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

1. SpringAMQP创建延迟队列

基于@RabbitListener或者基于@Bean都可以。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg){
        log.info("消费者接收到delay.queue的延迟消息:【" + msg + "】");
    }

RabbitMQ常见问题之延迟消息,Server架构,# RabbitMQ,rabbitmq,分布式

2. 设置消息延迟

这个插件只能在消息上设置延迟时间,没有队列设置延迟时间的概念,不过都是一样的。
message要在Header上添加一个x-delay

    @Test
    public void testDelayMessage(){
        Message message = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // confirm callback
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
        log.info("发送消息成功");
    }

3. 测试

直接运行测试,可能会报错,因为rabbitmq意识到消息到了exchange却没有立即到queue,被认为错误,回调returnback,所以我们在ReturnCallBack中绕过这个限制。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            //check if is delay message
            if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {
                return;
            }
            log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

运行Test测试,可以看到Test方面,消息发送的时间为21:09:13

21:09:13:516  INFO 25468 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection@6415f61e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 62470]
21:09:13:557  INFO 25468 --- [           main] cn.itcast.mq.spring.SpringAmqpTest       : 发送消息成功

listener方面消息消费的时间为21:09:18,刚好5s。文章来源地址https://www.toymoban.com/news/detail-800489.html

21:08:31:952  INFO 19532 --- [           main] cn.itcast.mq.ConsumerApplication         : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357)
21:09:18:583  INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到delay.queue的延迟消息:【hello delay】

到了这里,关于RabbitMQ常见问题之延迟消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 架构师必读:RabbitMQ常见问题与解决办法

    作者:禅与计算机程序设计艺术 Apache RabbitMQ是一个开源的消息代理中间件,它可以实现在分布式系统中应用间、跨平台和服务间通讯。本文通过常见问题解答的方式,讲述了RabbitMQ的架构、基本概念、术语、核心算法、具体操作步骤、代码实例以及未来的发展方向等内容,将

    2024年02月07日
    浏览(31)
  • RabbitMQ消息丢失、消息重复消费、消息顺序性无法保证、消息积压、一致性问题、系统可用性降低等这些常见问题怎么解决

    该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 1. 消息丢失 问题 :在生产者发送消息到MQ、MQ内部处理、消费者接收消息的任一环节都可能导致消息丢失。 解决方案 : 生产者确认机制 :确保消息

    2024年04月25日
    浏览(27)
  • RaabitMQ(三) - RabbitMQ队列类型、死信消息与死信队列、懒队列、集群模式、MQ常见消息问题

    这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需

    2024年02月14日
    浏览(35)
  • RabbitMQ常见问题以及实际问题解决

    ** ** 消息可靠性问题: 消息从生产者发送到Exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性? 发送时丢失: - 生产者发送的消息为送达exchange - 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 ①生产者消息确认 RabbitMQ提供

    2024年02月16日
    浏览(34)
  • 如何解决RabbitMQ中的延迟消息问题

    首先我们要知道什么是死信? 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter): 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false。 消息是一个过期消息,超时无人消费。 要投递的队列消息堆积满了,最早的消息可能成为

    2024年02月14日
    浏览(37)
  • Rabbitmq 常见问题处理

    Rabbitmq queue NaN status code 如下图: 参考文章 原因分析: Queue在mear数据库中存,但在队列列表中并不存在,所以才会存在该问题,并且是在RabbitMQ做了镜像集群的时候才会出现这样的情况。 解决 删除队列再重建。或者重启镜像机器服务。

    2024年02月09日
    浏览(30)
  • mq常见问题:消息丢失、消息重复消费、消息保证顺序

    mq常见问题:消息丢失、消息重复消费、消息保证顺序 消息丢失问题 拿rabbitmq举例来说,出现消息丢失的场景如下图 从图中可以看到一共有以下三种可能出现消息丢失的情况: 1 生产者丢消息 生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败 2MQ自身丢

    2024年02月09日
    浏览(44)
  • 消息队列常见问题(1)-如何保障不丢消息

    目录 1. 为什么消息队列会丢消息? 2. 怎么保障消息可靠传递? 2.1 生产者不丢消息 2.2 服务端不丢消息 2.3 消费者不丢消息 3. 消息丢失如何快速止损? 3.1 完善监控 3.2 完善止损工具 现在主流的消息队列都会提供完善的高可用解决方案,但是我们依然会有多种原因导致消息丢

    2024年02月14日
    浏览(25)
  • 消息中间件中常见问题

    MQ的用途 异步发送(验证码,短信,邮件) MySQL,ES,Redis之间的数据同步 分布式事务 削峰填谷 消息可能丢失的环境 消息在产生端时候生产端挂掉,消息未到达交换机,消息丢失 消息在交换机未到达队列,消息丢失 消息队列中如果队列挂掉消息也可能丢失 消费者未接收消

    2024年02月15日
    浏览(34)
  • RabbitMQ常见的应用问题

    在实际生产环境中,可能会由于网络问题导致消息接收异常产生某种影响,基于这种情况我们需要保障消息的可靠性。 RabbitMQ中的消息可靠性也称为消息补偿,如下图所示,可以保证消息的可靠性。 分为9种种步骤实现消息补偿 1、生产者处理业务逻辑,将数据写入到数据库。

    2024年02月11日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包