系列文章目录
准备篇 RabbitMQ安装文档
第一章 RabbitMQ快速入门篇
第二章 RabbitMQ的Web管理界面详解
第三章 RabbitMQ进阶篇之死信队列
第四章 RabbitMQ进阶篇之通过插件实现延迟队列
前言
恭喜所有看到本篇文章的小伙伴,成功解锁了RabbitMQ系列之高级特性 死信队列的内容🎁通过本文,你将清楚的了解到:什么是死信?什么是死信队列?死信队列如何使用?等😄本文最后,小名将通过一个实例,来帮助大家记忆死信队列😁
一、什么是死信队列
由于某些原因消息无法被正确的投递或是无法被正常消费的消息,为了确保此类消息不会被无故的丢弃 ,一般将会将他们存在一个特殊角色的队列中,这个队列一般称之为 死信队列 。
如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。所以死信队列是RabbitMQ中为异常处理的消息提供的一种保障机制。
二、什么样的消息会变成死信
- 消息被拒绝,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
属性被设置为false
。 - 消息在队列的存活时间超过设置的TTL时间
- 消息队列的消息数量已经超过最大队列长度
三、什么是死信交换机
Dead-Letter-Exchange(DLX),称之为死信交换机。当消息在一个队列中变成死信(dead messag)之后,它被重新发送到另一个交换器中,这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
四、死信处理过程
DLX 也是一个正常的交换器,和一般的交换器没有区别,实际上就是设置某个队列的属性,它能在任何的队列上被指定。当这个队列中存在死信时 RabbitMQ 就会自动将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个死信队列上。
五、如何使用死信交换机
配置死信队列分为以下步骤:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
配置代码:
//订单最多存在10s
args.put("x-message-ttl", 10 * 1000);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "ex.go.dlx");
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", "go.dlx");
web界面:
Time To Live(TTL)
RabbitMQ可以针对队列设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时,则消息变为dead letter(死信)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange: 出现dead letter之后将dead letter重新发送到指定exchange。
x-dead-letter-routing-key: 出现dead letter之后将dead letter重新按照指定的routing-key发送。
六、实例
演示业务:
模拟用户商城购买商品时的两种情况:1. 成功下单,2. 超时提醒
由于视频是小名分屏录制的,画面有些长,看不清内容的小伙伴劳烦 点击移步 全屏观看
业务场景:
- 用户下单
- 用户下单后展示等待付款页面
- 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
- 如果超时,则给用户发送消息通知,询问用户尚未付款,是否还需要?
6.1 数据库表设计
文章来源:https://www.toymoban.com/news/detail-723484.html
CREATE TABLE `practice_dlx_order` (
`id` bigint(20) NOT NULL,
`name` varchar(255) DEFAULT NULL COMMENT '订单名称',
`price` decimal(10,2) DEFAULT NULL COMMENT '金额',
`timeout` tinyint(1) DEFAULT '0' COMMENT '是否已超时:1-已超时,0-未超时',
`pay` tinyint(1) DEFAULT NULL COMMENT '是否已付款:1-已付款,0-未付款',
`order_time` datetime DEFAULT NULL COMMENT '下单时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
6.2 配置文件
spring:
#thymeleaf配置
thymeleaf:
#模板的模式,支持如:HTML、XML、TEXT、JAVASCRIPT等
mode: HTML5
#编码,可不用配置
encoding: utf-8
servlet:
#内容类别,可不用配置
content-type: text/html
#开发配置为false,避免修改模板还要重启服务器
cache: false
#配置模板路径,默认就是templates,可不用配置
prefix: classpath:/templates/
rabbitmq:
host: 部署rabbitmq的服务器外网ip
port: 5672
username: 用户名
password: 密码
publisher-confirms: true
publisher-returns: true
6.3 页面
- 下单页面
<html xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<title></title>
<script type="text/javascript" src="http://code.jquery.com/jquery-1.10.0.js"></script>
</head>
<body>
<p>下单页面</p>
<form id="form1" action="/mqConsumer/practice-dlx-order/payPage">
<table border="1">
<tr>
<th>Name</th>
<th>Price</th>
<th>Pperate</th>
</tr>
<tr>
<td>MacBook</td>
<input name="form1name" value="MacBook" style="display:none">
<td>$999</td>
<input name="form1price" value="999" style="display:none">
<td><button>Buy</button></td>
</tr>
</table>
</form>
</body>
</html>
- 结账页面
<html xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<title></title>
<script type="text/javascript" src="http://code.jquery.com/jquery-1.10.0.js"></script>
</head>
<body>
<p>结账页面</p>
<a id="out"></a>
<span id="id" th:text="${id}"></span>
<form id="form1" action="/mqConsumer/practice-dlx-order/paySuccessful">
<input th:name="form1id" th:value="${id}" style="display:none">
<button>去支付</button>
</form>
</body>
</html>
<script type="text/javascript">
window.onload = function () {
var i = 10;
var out = document.getElementById('out');
var timer = setInterval(function () {
//setInterval() 方法可按照指定的周期(以毫秒计)来调用函数或计算表达式。
if (i == -1) {
//clearInterval() 方法可取消由 setInterval() 设置的 timeout。
clearInterval(timer);
document.write("支付超时");
} else {
//document.body.innerHTML = i;
out.innerHTML = i + "秒";
--i;
}
}, 1000);
}
</script>
- 支付成功页面
<html xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<title></title>
<script type="text/javascript" src="http://code.jquery.com/jquery-1.10.0.js"></script>
</head>
<body>
<p>支付成功!等待发货~</p>
</body>
</html>
6.4 队列绑定
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("ex.go", true, false);
}
@Bean
public Queue queue() {
Map<String, Object> args = new HashMap<>(3);
//订单最多存在10s
args.put("x-message-ttl", 10 * 1000);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "ex.go.dlx");
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", "go.dlx");
return new Queue("q.go", true, false, false, args);
}
@Bean
public DirectExchange directExchangeDlx() {
return new DirectExchange("ex.go.dlx", true, false);
}
@Bean
public Queue queueDlx() {
return new Queue("q.go.dlx", true, false, false);
}
/**
* 延时队列交换机
*
* @return
*/
@Bean
public DirectExchange delayExchange() {
Map<String, Object> pros = new HashMap<>();
//设置交换机支持延迟消息推送
pros.put("x-delayed-message", "direct");
DirectExchange directExchange = new DirectExchange("ex.delay", true, false, pros);
directExchange.setDelayed(true);
return directExchange;
}
/**
* 延时队列
*
* @return
*/
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(3);
return new Queue("q.delay");
}
/**
* 给延时队列绑定交换机
*
* @return
*/
@Bean
public Binding bindingDelay(@Qualifier("delayQueue") Queue delayQueue,
@Qualifier("delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("q.delay");
}
@Bean
@Resource
public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("go").noargs();
}
@Bean
@Resource
public Binding bindingDlx(@Qualifier("queueDlx") Queue queueDlx, @Qualifier("directExchangeDlx") Exchange exchangeDlx) {
return BindingBuilder.bind(queueDlx).to(exchangeDlx).with("go.dlx").noargs();
}
}
6.5 创建生产者
@Controller
@RequestMapping("//practice-dlx-order")
public class PracticeDlxOrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IPracticeDlxOrderService iPracticeDlxOrderService;
@RequestMapping("/orderPage")
public String orderPage() {
return "/dlx/orderPage";
}
@RequestMapping("/payPage")
public String payPage(HttpServletRequest httpServletRequest, ModelMap map) {
String form1name = httpServletRequest.getParameter("form1name");
String form1price = httpServletRequest.getParameter("form1price");
PracticeDlxOrder practiceDlxOrder = new PracticeDlxOrder();
practiceDlxOrder.setPay(false);//是否已付款:1-已付款,0-未付款
practiceDlxOrder.setPrice(new BigDecimal(form1price));
practiceDlxOrder.setName(form1name);
practiceDlxOrder.setOrderTime(new Date());
iPracticeDlxOrderService.getBaseMapper().insert(practiceDlxOrder);
//获取id
Long id = practiceDlxOrder.getId();
map.addAttribute("id", id);
rabbitTemplate.convertAndSend("ex.go", "go", String.valueOf(id));
return "dlx/payPage";
}
//支付成功
@RequestMapping("/paySuccessful")
public String paySuccessful(HttpServletRequest httpServletRequest) {
String id = httpServletRequest.getParameter("form1id");
UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>();
dlxOrder.eq("id", Long.valueOf(id));
dlxOrder.set("pay",true);
iPracticeDlxOrderService.update(dlxOrder);
return "/dlx/paySuccessful";
}
}
6.6 创建消费者
@Component
@Slf4j
public class MqListener {
@Autowired
IPracticeDlxOrderService iPracticeDlxOrderService;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "q.go.dlx")
public void dlxListener(Message message, Channel channel) throws IOException {
System.out.println("支付超时");
Long id = Long.valueOf(new String(message.getBody(), "utf-8"));
PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one();
Boolean payStatue = order.getPay();
//判断是否支付
if (!payStatue) {//未支付,修改未超时
UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>();
dlxOrder.eq("id", id);
dlxOrder.set("timeout", 1);
iPracticeDlxOrderService.update(dlxOrder);
log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), message, new Date().toString());
//未支付,10s后给用户发app信息
sendDelayMsg(id);
}
}
public Date dateRoll(Date date, int i, int d) {
// 获取Calendar对象并以传进来的时间为准
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
// 将现在的时间滚动固定时长,转换为Date类型赋值
calendar.add(i, d);
// 转换为Date类型再赋值
date = calendar.getTime();
return date;
}
//死信队列监听
@RabbitListener(queues = "q.delay")
public void delayListener(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
}
/**
* 未支付,10s后给用户发信息
*/
public void sendDelayMsg(Long id){
rabbitTemplate.setMandatory(true);
//id + 时间戳 全局唯一
Date date = DateUtil.getDate(new Date(),1,10);
CorrelationData correlationData = new CorrelationData(date.toString());
//发送消息时指定 header 延迟时间
rabbitTemplate.convertAndSend("ex.delay", "q.delay", "您的订单号:" + id + "尚未付款,是否还需要?",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(10*1000);
return message;
}
}, correlationData);
}
}
🎊大功告成,大家可以 点我 查看成果 🎊文章来源地址https://www.toymoban.com/news/detail-723484.html
如若您在文章中发现任何错误的地方,希望您可以在评论区给予小名批评指正🤝 如果觉得小名的文章帮助到了您,请关注小名的专栏【RabbitMQ】,支持一下小名😄,给小名的文章点赞👍、评论✍、收藏🤞谢谢大家啦~♥♥♥
到了这里,关于【外行也能看懂的RabbitMQ系列(三)】—— RabbitMQ进阶篇之死信队列(内含视频演示业务和业务代码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!