RibbaitMq插件实现延迟队列主要是延迟消息到交换机的时间。
TTL+死信队列实现延时队列:正常消息过期没有被消费掉,进入死信队列后立即消息。
本章主要采用第一种方式。
一、前期准备工作
1.安装RabbirMQ自行百度或者参考推荐资源:
RabbitMQ部署-Windows篇 - 知乎
RabbitMQ windows下的安装与配置 - 腾讯云开发者社区-腾讯云
安装成功图
2. 下载rabbitmq_delayed_message_exchange-3.11.1插件并上传到指定文件夹中,
插件版本根据ribbitmq版本选择:Community Plugins — RabbitMQ
3. 进入sbin目录,打开管理员控制台,输入如下命令,显示类似信息即可:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4.重新双击sbin目录下的rabbitmq-server.bat文件,启动rabbitmq服务。
5.启动服务之后打开rabbitmq管理官新增交换机即可看到新的交换模式。
二、编码
1.在pom.xml
文件中添加AMQP
相关依赖
<!--rabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在application.yml
添加RabbitMQ的相关配置
rabbitmq:
port: 5672
host: localhost
username: admin
password: 123456
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 5
initial-interval: 3000
virtual-host: newCore
publisher-confirm-type: correlated
publisher-returns: true
3.采用RibbaitMq延迟队列官方插件,新的方法实现延迟队列
package com.hxnwm.ny.diancan.common.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ延时消息配置
* @time 2022/3/11 11:40
*/
@Configuration
public class RabbitMQLazyConfig {
// ----- 订单处于已完成状态后的24小时后,对应操作 此部分采用插件 延迟发送到交换机的时间实现延迟队列 wdj-----
//队列
public static final String ACT_SETTLE_DELAY_QUEUE = "act.settle.delay.queue";
//交换机
public static final String ACT_SETTLE_DELAY_EXCHANGE = "act.settle.delay.exchange";
public static final String ACT_SETTLE_DELAY_ROUTING_KEY ="act.settle.delay.routing.key";
@Bean
public Queue delaySettleQueue() {
return new Queue(ACT_SETTLE_DELAY_QUEUE, true, false, false);
}
@Bean
public CustomExchange delaySettleExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(ACT_SETTLE_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingSettleDelay() {
return BindingBuilder.bind(delaySettleQueue()).to(delaySettleExchange()).with(ACT_SETTLE_DELAY_ROUTING_KEY).noargs();
}
}
延时队列工具类
@Component
@Slf4j
//订单工具类
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class OrderUtils {
@Resource
private AmqpTemplate amqpTemplate;
/**
*订单完成后24小时,给B端发消息
*
* @param order
* @return void
* @Author wdj
* @date 2023/3/22 16:23
*/
public void delayedSettleOrder(Order order) {
this.amqpTemplate.convertAndSend(RabbitMQLazyConfig.ACT_SETTLE_DELAY_EXCHANGE, RabbitMQLazyConfig.ACT_SETTLE_DELAY_ROUTING_KEY, order, message -> {
//给消息设置延迟毫秒值
message.getMessageProperties().setDelay(1000);//24小时后发送消息到B端 改成1秒
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
log.info("把订单完成24小时后{},发送到rabbitmq队列中", order.getOrderSn());
}
}
service层或者controller层根据业务需求调用this.orderUtils.delayedSettleOrder(order);文章来源:https://www.toymoban.com/news/detail-408402.html
//线下扫码支付:门店+余额支付成功,发送收益到B端
this.orderUtils.delayedSettleOrder(order);
延时消息监听类:主要对消息进行消费逻辑处理。文章来源地址https://www.toymoban.com/news/detail-408402.html
/**
* 延时任务监听
*
* @author knight
* @time 2022/3/11 10:40
*/
@Slf4j
@Component
public class LazyOrderListener {
@Resource
private OrderService orderService;
@Resource
private BApiUtils bApiUtils;
/**
* @Description:订单处于已完成状态后的24小时后,将订单推送给B端
* @param order
* @Author: wdj
* @Date: 2023/3/22
*/
@RabbitListener(queues = RabbitMQLazyConfig.ACT_SETTLE_DELAY_QUEUE)
@Transactional(rollbackFor = Exception.class)
public void listenDelayedSettleOrder(Order order){
log.info("监听到订单完成 " + order.getOrderSn());
try {
/* 这部分是业务逻辑,请忽略
Order order1 = this.orderService.info(order.getId(), order.getOrderSn());
if (Objects.isNull(order1)) {
log.info("订单未找到,退出");
throw new Exception("订单未找到");
}
// 判断订单状态是否处于已完成
if (!Objects.equals(order1.getStatus(), Order.STATUS.STATUS_0020.status)) {
log.info("订单状态处于未完成状态,退出");
throw new Exception("订单状态处于未完成状态,退出");
}
//开发阶段不用一直发送消息,记得测试要放行
bApiUtils.sendDelayedSettleOrder(order.getId());
*/
log.info("监听订单处理成功");
}catch (Exception e){
e.printStackTrace();
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
log.info("监听订单处理失败回滚");
}
}
}
到了这里,关于采用RibbaitMq延迟队列官方插件实现延迟队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!