延迟队列设计
在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式
- 使用创建一个延迟队列阻塞消息
- 使用延迟队列插件
Dead Letter Exchanges — RabbitMQ
配置
- To set the DLX for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
- You may also specify a routing key to use when the messages are being dead-lettered. If the routing key is not set, the message’s own routing keys are used. args.put("x-dead-letter-routing-key", “some-routing-key”);
package com.wnhz.mq.common.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DlxConfig {
@Bean
public Queue dlxQueue(){
return new Queue("dlx_queue_test");
}
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange("dlx_exchange_test");
}
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
.with("dlx_routing_key");
}
@Bean
public Queue normalQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", "dlx_exchange_test");
map.put("x-dead-letter-routing-key","dlx_routing_key");
return new Queue("normal_queue_test",true,false,false,map);
}
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal_exchange_test");
}
@Bean
public Binding normalBinding(){
return BindingBuilder.bind(normalQueue()).to(normalExchange())
.with("normal_routing_test");
}
}
server:
port: 10005
spring:
application:
name: book-consumer
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
rabbitmq:
host: 192.168.198.130
port: 5672
username: admin
password: 123
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
prefetch: 1
acknowledge-mode: auto
logging:
level:
com.wnhz.mq.consumer: debug
生产者发送信息
@Override
public void delaySendMessage() {
String uuid = UUID.randomUUID().toString();
CorrelationData data = new CorrelationData(uuid);
String msg = "hello delay";
int delayTime =5000;
rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,
p -> {
p.getMessageProperties().setExpiration(String.valueOf(delayTime ));
return p;
});
log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);
}
}
消费者消费
@RabbitListener(queues = "dlx_queue_test")
public void delayConsume(Message message){
log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());
}
延迟队列插件安装
访问官网
Community Plugins — RabbitMQ
进入rabbitmq docker容器
[root@localhost ~]# docker exec -it rabbitmq bash
查询插件列表是否存在延迟插件
root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@6d2342d51b11
|/
[ ] rabbitmq_amqp1_0 3.9.11
[ ] rabbitmq_auth_backend_cache 3.9.11
[ ] rabbitmq_auth_backend_http 3.9.11
[ ] rabbitmq_auth_backend_ldap 3.9.11
[ ] rabbitmq_auth_backend_oauth2 3.9.11
[ ] rabbitmq_auth_mechanism_ssl 3.9.11
[ ] rabbitmq_consistent_hash_exchange 3.9.11
[ ] rabbitmq_event_exchange 3.9.11
[ ] rabbitmq_federation 3.9.11
[ ] rabbitmq_federation_management 3.9.11
[ ] rabbitmq_jms_topic_exchange 3.9.11
[E*] rabbitmq_management 3.9.11
[e*] rabbitmq_management_agent 3.9.11
[ ] rabbitmq_mqtt 3.9.11
[ ] rabbitmq_peer_discovery_aws 3.9.11
[ ] rabbitmq_peer_discovery_common 3.9.11
[ ] rabbitmq_peer_discovery_consul 3.9.11
[ ] rabbitmq_peer_discovery_etcd 3.9.11
[ ] rabbitmq_peer_discovery_k8s 3.9.11
[E*] rabbitmq_prometheus 3.9.11
[ ] rabbitmq_random_exchange 3.9.11
[ ] rabbitmq_recent_history_exchange 3.9.11
[ ] rabbitmq_sharding 3.9.11
[ ] rabbitmq_shovel 3.9.11
[ ] rabbitmq_shovel_management 3.9.11
[ ] rabbitmq_stomp 3.9.11
[ ] rabbitmq_stream 3.9.11
[ ] rabbitmq_stream_management 3.9.11
[ ] rabbitmq_top 3.9.11
[ ] rabbitmq_tracing 3.9.11
[ ] rabbitmq_trust_store 3.9.11
[e*] rabbitmq_web_dispatch 3.9.11
[ ] rabbitmq_web_mqtt 3.9.11
[ ] rabbitmq_web_mqtt_examples 3.9.11
[ ] rabbitmq_web_stomp 3.9.11
[ ] rabbitmq_web_stomp_examples 3.9.11
下载支持3.9.x的插件
退出容器:
root@6d2342d51b11:/plugins# exit
exit
上传到linux服务器
在/usr/local/software/下创建文件夹rabbitmq/plugins文章来源:https://www.toymoban.com/news/detail-831926.html
[root@localhost software]# mkdir -p rabbitmq/plugins
拷贝插件到容器中
[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
进入容器安装插件
[root@localhost plugins]# docker exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
打开管理页面
进入Exchange页面,下拉Type看是否已经安装成功。文章来源地址https://www.toymoban.com/news/detail-831926.html
代码实现
配置类
package com.wnhz.rabbitmq.mq.config;
public interface RabbitmqConstants {
String DELAYX_QUEUE = "mq_delayx__queue";
String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";
String DELAYX_EXCHANGE = "mq_delayx__exchange";
String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;
import java.util.HashMap;
@Configuration
@Slf4j
public class RabbitmqConfig {
@Bean
public Queue delayxQueue() {
return new Queue(RabbitmqConstants.DELAYX_QUEUE);
}
@Bean
public CustomExchange delayRoutingExchange() {
return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,
RabbitmqConstants.DELAYX_EXCHANGE_TYPE,
true,
false,
new HashMap<String, Object>() {{
put("x-delayed-type","direct");
}});
}
@Bean
public Binding delayxBinding() {
return BindingBuilder.bind(delayxQueue())
.to(delayRoutingExchange())
.with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
log.debug("rabbitmq配置:{}完成", rabbitTemplate);
return rabbitTemplate;
}
}
生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendDelayxUser(User user) {
int delayTime = 10000;
rabbitTemplate.convertAndSend(
RabbitmqConstants.DELAYX_EXCHANGE,
RabbitmqConstants.DELAYX_ROUTING_KEY,
user, mpp -> {
mpp.getMessageProperties().setDelay(delayTime);
return mpp;
});
log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);
}
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {
@RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)
@Override
public void receiveDelayxUser(User user) {
log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());
}
}
到了这里,关于Rabbitmq入门与应用(五)-延迟队列的设计与实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!