RabbitMQ 延时消息实现

这篇具有很好参考价值的文章主要介绍了RabbitMQ 延时消息实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 实现方式

1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞
   需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
  • 导入Spring 集成RabbitMQ MAEVN
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

2. 设置队列过期时间:延迟队列消息过期 + 死信队列

推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列

2.1. MQ配置信息

2.1.1. 自定义队列配置

…/bootstrap.yml文章来源地址https://www.toymoban.com/news/detail-858659.html

# rabbitmq自定义配置
rabbitmq:
  ttlExchange: medical_dev_ttl_topic_change
  ttlKey: dev_ttl
  ttlQueue: medical.dev.ttl.topic.queue
  delayExpireTime: 600
  ttlQueueSize: 10000
  deadExchange: medical_dev_dead_topic_change
  deadKey: dev_dead
  deadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 */
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {

    /**
     * 延迟队列
     */
    public String ttlExchange;
    public String ttlKey;
    public String ttlQueue;
    private Integer delayExpireTime;
    public Integer ttlQueueSize;

    /**
     * 死信队列
     */
    public String deadExchange;
    public String deadKey;
    public String deadQueue;

}

2.2. 配置文件自动生成队列

2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * 延迟队列配置文件
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigTTL {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public TopicExchange ttlTopicExchange(){
        return new TopicExchange(myConfigProperties.getTtlExchange());
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue ttlTopicduanxinQueue(){
        HashMap<String, Object> args = new HashMap<>();
        // 给队列设置消息过期时间:毫秒值
        args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);
        // 设置队列最大长度
        args.put("x-max-length", myConfigProperties.getTtlQueueSize());
        // 设置死信队列交换机名称
        // 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列
        // 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度
        args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());
        // 设置死信队列路由key
        args.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());
        return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding ttlTopicsmsBinding(){
        return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());
    }

}
2.2.2. 死信队列

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 死信队列配置文件
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigDead {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public TopicExchange deadTopicExchange(){
        return new TopicExchange(myConfigProperties.getDeadExchange());
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue deadTopicduanxinQueue(){
        return new Queue(myConfigProperties.getDeadQueue(), true);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding deadTopicsmsBinding(){
        return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());
    }

}

2.3. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * RabbitMQ生产者推送消息类
 * 
 * @author xiemingan
 */
@Component
@Slf4j
public class RabbitmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MyConfigProperties myConfigProperties;

    /**
     * @param pushMessage 推送消息体
     */
    public void pushTtlMessage(String pushMessage) {
		// 推送消息至交换机,并指定路由key
        rabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
    }

}

2.4. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * @author mingAn.xie
 */
@Log4j2
@Component
public class RabbitmqConsumer {

    /**
     * 消费死信队列
     * @param message 消息体
     */
    @RabbitListener(queues = "${rabbitmq.deadQueue}")
    public void pushMessages(Message message) {

        String body = new String(message.getBody()).trim();
        if (StringUtils.isEmpty(body)){
            return;
        }
        log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);
    }

}

3. 设置消息的过期时间

设置交换机类型为 x-delayed-type,推送消息至交换机,直连队列消费

3.1. 安装插件 rabbitmq_delayed_message_exchange

前言:这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ

具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件

安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本

插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  • 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins

# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins

# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 交换机类型中出现 x-delayed-type 表示安装成功

RabbitMQ 延时消息实现,RabbitMQ,rabbitmq,java

3.2. MQ配置信息

3.2.1. 自定义队列配置

…/bootstrap.yml

#mq队列自定义配置
rabbitmq:
  saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchange
  saveTaskTtlKey: ey240001_pro_save_task_ttl
  saveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queue
  saveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 *
 * @author mingAn.xie
 */
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {

    /**
     * 任务待办生成延时队列
     */
    public String saveTaskTtlExchange;
    public String saveTaskTtlKey;
    public String saveTaskTtlQueue;
    public Integer saveTaskTtlQueueSize;

}

3.3. 配置文件生成 x-delayed-type 交换机

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * x-delayed-type 交换机延迟队列配置
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigSaveTaskTtl {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public CustomExchange saveTaskTopicExchange() {
        Map<String, Object> args = new HashMap<>();
        // 设置延迟队列插件类型:按过期时间消费
        args.put("x-delayed-type", "direct");
        // 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数
        return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue saveTaskTopicduanxinQueue() {
        return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding saveTaskTopicsmsBinding() {
        return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();
    }

}

3.4. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 生产者推送消息类
 * 
 * @author xiemingan
 */
@Component
@Slf4j
public class RabbitmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MyConfigProperties myConfigProperties;

    /**
     * @param pushMessage 推送消息体
     * @param ttlTime     延时时间(毫秒值)
     */
    public void pushTtlMessage(String pushMessage, long ttlTime) {
        ttlTime = ttlTime <= 0 ? 1000 : ttlTime;
        // 3.1.推送MQ延迟消息队列
        long finalTtlTime = ttlTime;
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay((int) finalTtlTime);
            return message;
        };
        rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);
    }

}

3.5. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * @author mingAn.xie
 */
@Log4j2
@Component
public class RabbitmqConsumer {

    /**
     * 消费延时消息
     * @param message 消息体
     */
    @RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")
    public void pushMessages(Message message) {

        String body = new String(message.getBody()).trim();
        if (StringUtils.isEmpty(body)) {
            return;
        }
        log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);

    }

}

到了这里,关于RabbitMQ 延时消息实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

    业务开发中有很多延时操作的场景,比如最常见的 超时订单自动关闭 、 延时异步处理 ,我们常用的实现方式有: 定时任务轮询 (有延时)。 借助Redission的延时队列 。 Redis的key过期事件通知机制 (需开启key过期事件通知,对Redis有性能损耗)。 RocketMQ中定时消息推送 (支

    2024年02月04日
    浏览(39)
  • Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

    erlang 和 rabbitmq 版本说明 https://www.rabbitmq.com/which-erlang.html 确认需要安装的mq版本以及对应的erlang版本。 RabbitMQ下载地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下载地址: https://packagecloud.io/rabbitmq/erlang RabbitMQ延迟消息插件下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exc

    2024年02月08日
    浏览(51)
  • RabbitMq应用延时消息

    一.建立绑定关系 二.建立生产者 1.消息实体 三.建立消费者 四.测试类测试 五.效果如图所示

    2024年02月12日
    浏览(38)
  • RabbitMQ延时队列的详细介绍以及Java代码实现

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的延时队列以及其详细代码实现。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大

    2024年02月01日
    浏览(41)
  • 【Bug】.net6 cap总线+rabbitmq延时消息收不到

    我有两个服务一个叫05一个叫15 然后用的cap总线+rabbitmq 05消息队列发了一条延时消息,到时间了05服务的订阅者能收到 15服务订阅同一个消息的没收到(cap的cashboard)(手动requeue05和15都能收到) 相关回答:.net6 cap总线+rabbitmq延时消息收不到 05:连接数据库配置: 15:连接数据库

    2024年01月19日
    浏览(37)
  • RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

    我们先看一串代码,并思考一下为什么要先入库然后发MQ: 如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。 ① 消息从生产者发送到Broker 生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接

    2024年02月11日
    浏览(55)
  • RabbitMQ基于Java实现消息应答

    概念 RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不

    2024年04月10日
    浏览(39)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(43)
  • RabbitMQ可靠性消息发送(java实现)

    本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客 step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性; step2:Producer发送消息到MQ Broker; step3:Producer收到 broker 返回的确认消息; step4:更改消息记录库的状态(定义三种状态:0待确

    2024年02月04日
    浏览(70)
  • 【Java】微服务——RabbitMQ消息队列(SpringAMQP实现五种消息模型)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月08日
    浏览(65)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包