前言
延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。
ex:
- 定时任务:十分钟后执行某种操作。
- 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。
一、RabbitMQ “延时队列”概念
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。
二、实现RabbitMQ “延时队列”两种方式
1. 利用两个特性:TTL + DLX [A队列过期->转发给B队列] (此种方式有缺陷)
TTL,全称Time To Live,消息过期时间设置。若没有队列进行消息消费,此消息过期后将被丢弃。
但RabbitMq只会检查第一个消息是否过期,如果过期则丢到死信队列。
ex:若有两条消息,第一个消息延迟20秒执行,第二个消息延迟10秒执行,但RabbitMq只会检测队首第一条消息的过期时间。这样就会造成第二条消息延迟30秒执行。
DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
2. 采用 RabbitMq延时插件rabbitmq_delayed_message_exchange的方式。
为了解决 “队列阻塞”问题,新的插件发布了,再消息粒度上实现 消息过期控制。文章来源:https://www.toymoban.com/news/detail-714991.html
三、RabbitMQ “延时队列”项目应用
1. 引入pom文件,并配置yml
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- web相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- json相关依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
2. 下载插件
插件下载官方链接: rabbitmq_delayed_message_exchange
文章来源地址https://www.toymoban.com/news/detail-714991.html
安装指南(以linux环境为例)
- 将rabbitmq_delayed_message_exchange-3.9.0.ez上传指定目录下使用unzip解压即可
安装目录: /rabbitmq/plugins
- 完成第一步解压后,执行以下图中安装操作
开启插件:
export PATH=$PATH:/opt/middle/rabbitmq/erlang/bin
cd /opt/middle/rabbitmq/sbin
./rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
- 查询安装状态
./rabbitmq-plugins list
./rabbitmq-plugins list
3. 配置资源信息
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
/**
* @author liuzz
* @date 2023/5/18 0018下午 4:09
*/
@Configuration("relevancyRabbitMqConfig")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RelevancyRabbitMqConfig {
private final CachingConnectionFactory factory;
/**
* rabbitmq 下发计划延时队列
**/
public static final String RELEVANCY_DELAYED_EXCHANGE = "saas.cbs.relevancy.delayed.exchange";
/**
* rabbitmq 下发延时队列订阅路由key
**/
public static final String RELEVANCY_DELAYED_ROUTINGKEY = "saas.cbs.relevancy.delayed.routingkey";
/**
* rabbitmq 下发延时队列
**/
public static final String RELEVANCY_DELAYED_QUEUE = "saas.cbs.relevancy.delayed.queue";
@Bean("relevancyRabbitTemplate")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
//开启发送失败退回
rabbitTemplate.setMandatory(true);
//消息转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
//插件版本 -- 实现延迟队列
@Bean("relevancyDelayedQueue")
public Queue relevancyDelayedQueue() {
return new Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE);
}
//定义延时交换机 -- 插件版本
//指定交换器类型为 x-delayed-message
//设置属性 x-delayed-type
@Bean("relevancyDelayedExchange")
public CustomExchange relevancyDelayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 绑定延时队列与交换机信息
*/
@Bean
public Binding bindingNotify(@Qualifier("relevancyDelayedQueue") Queue relevancyDelayedQueue,
@Qualifier("relevancyDelayedExchange") CustomExchange relevancyDelayedExchange) {
return BindingBuilder
.bind(relevancyDelayedQueue)
.to(relevancyDelayedExchange)
.with(RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY).noargs();
}
}
4. 发送消息至交换机
@Slf4j
@Component
public class RelevancyExecuteMqConsume {
@Autowired
@Qualifier("relevancyRabbitTemplate")
RabbitTemplate rabbitTemplate;
/**
* @Desc: 发送下发计划过期MQ
* @param relevancyFrsMqSendMsgBo
* @param finalExpirationTime
**/
public void sendSnapshotPlanMsg(RelevancyFrsMqSendMsgBo relevancyFrsMqSendMsgBo, Integer finalExpirationTime) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setDelay(finalExpirationTime);//消息的过期时间
//2.返回该消息
return message;
}
};
rabbitTemplate.convertAndSend(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY,relevancyFrsMqSendMsgBo,messagePostProcessor);
}
}
5. 死信队列消费
@Slf4j
@Component
public class RelevancyExecuteMqConsume {
@Autowired
@Qualifier("relevancyRabbitTemplate")
RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = {
@QueueBinding(value =
@Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE),
exchange = @Exchange(name = RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,type ="x-delayed-message" ),
key= RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY)
})
public void process(Message message, Channel channel) {
//消费数据
}
}
到了这里,关于【技术分享】四、RabbitMQ “延时队列”的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!