目录
一、概念
二、使用场景
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
(二)消息设置 TTL
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
(三)修改配置文件
(四)添加Swagger配置类
五、队列TTL
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
六、延迟队列优化
(一)代码架构图
(二)配置文件类
(三)消息生产者
七、Rabbitmq 插件实现延迟队列
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
八、总结
一、概念
二、使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
文章来源:https://www.toymoban.com/news/detail-779933.html
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
Map<String, Object> arguments = new HashMap<>();
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
(二)消息设置 TTL
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
msg.getMessageProperties().setExpiration(ttl);
return msg;
});
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
(三)修改配置文件
spring.rabbitmq.host=192.168.23.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
(四)添加Swagger配置类
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webapi")
.apiInfo(webApiInfo())
.select()
.build();
}
public ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1551388580@qq.com"))
.build();
}
}
五、队列TTL
(一)代码架构图
(二)配置文件类
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列A
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>();
// 当前队列的死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
// 声明队列A绑定交换机X
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列B
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 声明队列B绑定交换机X
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明死信队列
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
// 声明死信队列 QD 绑定关系
public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,
@Qualifier("yExchange")DirectExchange exchange) {
return BindingBuilder.bind(queueD).to(exchange).with("YD");
}
}
(三)消息生产者
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);
}
(四)消息消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
六、延迟队列优化
(一)代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间
(二)配置文件类
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
public static final String QUEUE_C = "QC";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列A
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>();
// 当前队列的死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
// 声明队列A绑定交换机X
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列B
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 声明队列B绑定交换机X
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明队列C
@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
// 声明队列C绑定交换机X
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
// 声明死信队列
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
// 声明死信队列 QD 绑定关系
public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,
@Qualifier("yExchange")DirectExchange exchange) {
return BindingBuilder.bind(queueD).to(exchange).with("YD");
}
}
(三)消息生产者
@GetMapping("/sendExpirationMsg/{message}/{ttl}")
public void sendMsg(@PathVariable String message, @PathVariable String ttl) {
log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
msg.getMessageProperties().setExpiration(ttl);
return msg;
});
}
七、Rabbitmq 插件实现延迟队列
关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件
(一)代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
(二)配置文件类
(三)消息生产者
/*
* 基于插件的延迟队列和延迟交换机
*/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 声明自定义交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 声明队列和延迟交换机的绑定
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue,
@Qualifier("delayedExchange")CustomExchange exchange) {
return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
(四)消息消费者
@Component
@Slf4j
public class DelayedQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(String message) {
log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message);
}
}
第二个消息被先消费掉了,符合预期文章来源地址https://www.toymoban.com/news/detail-779933.html
八、总结
到了这里,关于RabbitMQ延迟队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!