【技术分享】四、RabbitMQ “延时队列”

这篇具有很好参考价值的文章主要介绍了【技术分享】四、RabbitMQ “延时队列”。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。
ex:

  1. 定时任务:十分钟后执行某种操作。
  2. 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

一、RabbitMQ “延时队列”概念

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。

二、实现RabbitMQ “延时队列”两种方式

1. 利用两个特性:TTL + DLX [A队列过期->转发给B队列] (此种方式有缺陷)

TTL,全称Time To Live,消息过期时间设置。若没有队列进行消息消费,此消息过期后将被丢弃。
RabbitMq只会检查第一个消息是否过期,如果过期则丢到死信队列。
ex:若有两条消息,第一个消息延迟20秒执行,第二个消息延迟10秒执行,但RabbitMq只会检测队首第一条消息的过期时间。这样就会造成第二条消息延迟30秒执行。

DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

2. 采用 RabbitMq延时插件rabbitmq_delayed_message_exchange的方式。

为了解决 “队列阻塞”问题,新的插件发布了,再消息粒度上实现 消息过期控制。

三、RabbitMQ “延时队列”项目应用

1. 引入pom文件,并配置yml

<dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- web相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  json相关依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

2. 下载插件

插件下载官方链接: rabbitmq_delayed_message_exchange
rabbitmq 延迟消息队列,技术分享,rabbitmq,分布式,java,后端文章来源地址https://www.toymoban.com/news/detail-714991.html

安装指南(以linux环境为例)
  1. 将rabbitmq_delayed_message_exchange-3.9.0.ez上传指定目录下使用unzip解压即可
    安装目录: /rabbitmq/plugins
    rabbitmq 延迟消息队列,技术分享,rabbitmq,分布式,java,后端
  2. 完成第一步解压后,执行以下图中安装操作
    开启插件:
	export PATH=$PATH:/opt/middle/rabbitmq/erlang/bin
	cd /opt/middle/rabbitmq/sbin
	./rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
  1. 查询安装状态
    ./rabbitmq-plugins list
	./rabbitmq-plugins list

3. 配置资源信息


import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.util.HashMap;
import java.util.Map;

/**
 * @author liuzz
 * @date 2023/5/18 0018下午 4:09
 */
@Configuration("relevancyRabbitMqConfig")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RelevancyRabbitMqConfig {

    private final CachingConnectionFactory factory;

    /**
     * rabbitmq 下发计划延时队列
     **/
    public static final String RELEVANCY_DELAYED_EXCHANGE = "saas.cbs.relevancy.delayed.exchange";

    /**
     * rabbitmq 下发延时队列订阅路由key
     **/
    public static final String RELEVANCY_DELAYED_ROUTINGKEY = "saas.cbs.relevancy.delayed.routingkey";

    /**
     * rabbitmq 下发延时队列
     **/
    public static final String RELEVANCY_DELAYED_QUEUE = "saas.cbs.relevancy.delayed.queue";

	
    @Bean("relevancyRabbitTemplate")
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        //开启发送失败退回
        rabbitTemplate.setMandatory(true);
        //消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    //插件版本 -- 实现延迟队列
    @Bean("relevancyDelayedQueue")
    public Queue relevancyDelayedQueue() {
        return new Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE);
    }

    //定义延时交换机 -- 插件版本
    //指定交换器类型为 x-delayed-message 
    //设置属性 x-delayed-type 
    @Bean("relevancyDelayedExchange")
    public CustomExchange relevancyDelayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }
	
	/**
	* 绑定延时队列与交换机信息
	*/
    @Bean
    public Binding bindingNotify(@Qualifier("relevancyDelayedQueue") Queue relevancyDelayedQueue,
                                 @Qualifier("relevancyDelayedExchange") CustomExchange relevancyDelayedExchange) {
        return BindingBuilder
                .bind(relevancyDelayedQueue)
                .to(relevancyDelayedExchange)
                .with(RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY).noargs();
    }
}


4. 发送消息至交换机

@Slf4j
@Component
public class RelevancyExecuteMqConsume {

    @Autowired
    @Qualifier("relevancyRabbitTemplate")
    RabbitTemplate rabbitTemplate;

    /**
     * @Desc: 发送下发计划过期MQ
     * @param relevancyFrsMqSendMsgBo
     * @param finalExpirationTime
     **/
    public void sendSnapshotPlanMsg(RelevancyFrsMqSendMsgBo relevancyFrsMqSendMsgBo, Integer finalExpirationTime) {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1.设置message的信息
                message.getMessageProperties().setDelay(finalExpirationTime);//消息的过期时间
                //2.返回该消息
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY,relevancyFrsMqSendMsgBo,messagePostProcessor);
    }
}

5. 死信队列消费

@Slf4j
@Component
public class RelevancyExecuteMqConsume {

    @Autowired
    @Qualifier("relevancyRabbitTemplate")
    RabbitTemplate rabbitTemplate;

	
    @RabbitListener(bindings = {
            @QueueBinding(value =
            @Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE),
                    exchange = @Exchange(name = RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,type ="x-delayed-message" ),
                    key= RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY)
    })
    public void process(Message message, Channel channel) {
    	//消费数据
	}
}

到了这里,关于【技术分享】四、RabbitMQ “延时队列”的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(59)
  • 学会RabbitMQ的延迟队列,提高消息处理效率

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 真的好用吗?鲜有人提的 RabbitMQ-RPC模式 前面

    2024年02月14日
    浏览(68)
  • rabbitmq基础7——队列和消息过期时间设置、死信队列、延迟队列、优先级队列、回调队列、惰性队列

    这里过一个知识点——过期时间,即对消息或队列设置过期时间(TTL)。一旦消息过期,消费就无法接收到这条消息,这种情况是绝不允许存在的,所以官方就出了一个对策——死信队列,死信队列最初出现的意义就是为了应对消息过期丢失情况的手段之一。 那么过期时间具

    2024年02月03日
    浏览(76)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(103)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 【消息队列技术 RabbitMQ 和 Apache Kafka对比】

    消息队列技术有 RabbitMQ 和 Apache Kafka 是一个开源的消息队列实现,它采用 AMQP(高级消息队列协议)作为通信协议。RabbitMQ 的特点是可靠性高、扩展性好、功能丰富。它支持基于消息的异步通信模式,发布-订阅模式和消息分发模式,可以在不同的应用之间传递消息。RabbitMQ 在

    2024年03月10日
    浏览(43)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(48)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(58)
  • Rabbitmq死信队列及延时队列实现

    问题:什么是延迟队列 我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。 但RabbitMq中并 没有提供延迟队列功能 。那么RabbitMQ如何实现延迟队列 通过:死信队列 + RabbitMQ的TTL特性实现。 实现原理 给一个普通带有过期功能的队列绑定一

    2024年02月15日
    浏览(48)
  • RabbitMQ如何实现延时队列

    RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列。 1. 延时队列的概念 延

    2024年02月16日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包