简介
安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。
RabbitMQ官网:RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ与Erlang/OTP版本对照表:Erlang Version Requirements | RabbitMQ
Erlang官网下载:Downloads - Erlang/OTP
1.Windows上安装RabbitMQ前需要安装Erlang。(下载安装不做叙述,除了需要自定义安装目录外,按照默认安装即可。安装Erlang后配置一下环境变量,与配置jdk环境变量一致)
给大家一个我用的RabbitMQ版本是3.13.1
Erlang/OTP版本是26.0。需要自取即可。
下载地址:https://share.weiyun.com/dWR1pwh3
密码:3jcsqs
1.1 RabbitMQ常用命令
管理员身份打开命令提示符。
1:cd C:\rabbitmq_server-3.13.1\sbin 到自己解压的文件路径的sbin目录下。
2:rabbitmq-service install 安装一下
3:rabbitmq-service start 启动
4:rabbitmq-service stop 停止
1.2 启动后如果没问题输入下面的url应该会出现一个登录的页面。
localhost:15672
用户名和密码都是guest
1.3 如果没有登录页面提示找不到的话按照以下操作应该就可以了。
输入命令rabbitmq-plugins list正常启动的服务应该是如下所示,E和e分别表示显性和隐性启动,如果没有E和e,这也就是你打不开localhost:15672的原因所在,可能性很大。
那么,就再输入rabbitmq-plugins enable rabbitmq_management基本就可以解决打不开的问题了。
2.环境准备完后后,上代码。
2.1 maven的pom文件。
<!-- springboot集成rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 yml文件配置。
# RabbitMQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
retry:
initial-interval: 5000
enabled: true
max-attempts: 3
default-requeue-rejected: false
acknowledge-mode: auto
解释一下配置文件中的内容:
host: 127.0.0.1 这指定了 RabbitMQ 服务器的主机地址,127.0.0.1 表示本地主机,即运行 RabbitMQ 的计算机。port: 5672 这是 RabbitMQ 服务器监听连接的端口号。默认情况下,RabbitMQ 使用 5672 端口。
username和password这是连接 RabbitMQ 服务器所需的用户名和密码。在默认情况下,RabbitMQ 的默认用户名和密码都是 guest。
virtual-host: / 虚拟主机的名称。RabbitMQ 允许将多个逻辑消息服务运行在单个物理主机上,每个逻辑服务都有自己的虚拟主机。
/
表示默认的虚拟主机器
listener: 这个部分定义了监听器配置。
simple: 这个监听器类型是简单的消息监听器。
retry:这个子部分定义了消息重试策略。
initial-interval: 5000:初始重试间隔,单位为毫秒。即在第一次重试失败后,将等待 5000 毫秒再次重试。
enabled: true:表示重试机制启用
max-attempts: 3:最大重试次数,即消息在被丢弃之前尝试重新传递的最大次数。
default-requeue-rejected: false:当消息被拒绝时,是否重新排队。这里设置为 false 表示不重新排队。
acknowledge-mode: auto 消息确认模式。在这里设置为 auto,表示消息处理后会自动确认,如果处理失败,则消息将会重传。
2.3 新建一个交换机配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* <p>
* 消息队列配置类
* </p>
*
* @author Lch
* @dateTime 2024/4/1 14:28
*/
@Configuration
public class OrderQueueConfig {
/**
* 普通交换机名称
*/
private static final String ORDER_EXCHANGE = "normal_order_exchange";
/**
* 死信交换机名称
*/
private static final String DEAD_EXCHANGE = "normal_dead_exchange";
/**
* 普通队列名称
*/
private static final String ORDER_QUEUE = "normal_order_queue";
/**
* 延迟队列名称
*/
private static final String DELAY_QUEUE = "normal_delay_queue";
/**
* 声名普通交换机
* @return direct(默认)交换机
*/
@Bean("orderNormalExchange")
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE);
}
/**
* 声名死信交换机
* @return direct(默认)交换机
*/
@Bean("deadDelayExchange")
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
/**
* 声名延迟队列
* @return direct(默认)交换机
*/
@Bean("delayDelayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
/**
* 声名普通队列并与死信队列进行绑定
* @return 名为ORDER_QUEUE的持久化队列(Durable Queue)
*
* x-dead-letter-exchange:设置了死信交换机,即消息过期后将会发送到哪个交换机。在这里
* 设置为 DEAD_EXCHANGE,即名为normal_dead_exchange的交换机。
* x-dead-letter-routing-key:设置了死信的路由键,即消息过期后发送到死信交换机后的路由键。在这里,设置为 expire
* 通过这些设置,配置了一个普通队列,并且定义了当队列中的消息过期后的行为,即发送到指定的死信交换机并且指定了死信的路由键。
* 当队列中的消息过期后,会被标记为死信并发送到名为 DEAD_EXCHANGE(normal_dead_exchange)的交换机中,并且使用"expire"作为路由键。
* 所以消息过期后,将消息标注为死信,然后根据此方法配置的死信交换机和路由键进行重新路由到 normal_order_queue 队列中
* 通过 @RabbitListener(queues = {"normal_delay_queue"}) 可以监听到 DEAD_EXCHANGE(normal_order_queue)的消息。
*/
@Bean("orderNormalDelayQueue")
public Queue orderQueue() {
HashMap<String, Object> param = new HashMap<>();
// 设置死信交换机
param.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信routingKey
param.put("x-dead-letter-routing-key", "expire");
return QueueBuilder.durable(ORDER_QUEUE).withArguments(param).build();
}
/**
* 普通队列与交换机绑定
* @param queue 普通队列
* @param exchange 直连交换机
* @return Binding 对象,它代表了这个绑定关系。
*
* 这个方法的作用是创建一个绑定(Binding),将指定的队列
* (通过 @Qualifier("orderNormalDelayQueue") 注解注入的 queue 参数)
* 与指定的交换机(通过 @Qualifier("orderNormalExchange") 注解注入的 exchange 参数)绑定在一起。
* 参数 queue 是一个普通队列,参数 exchange 是一个直连交换机(Direct Exchange)。
* 这个方法通过 BindingBuilder.bind(queue).to(exchange).with("normal_order") 创建了一个绑定
* 将队列 queue 绑定到交换机 exchange,并且指定了一个路由键(Routing Key)为 "normal_order"。
*/
@Bean
public Binding orderQueueBindingToOrderExchange(@Qualifier("orderNormalDelayQueue") Queue queue,
@Qualifier("orderNormalExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("normal_order");
}
/**
* 死信队列与与死信交换机进行绑定
* @param queue 一个延迟队列
* @param exchange 直连交换机
* @return Binding 对象,它代表了这个绑定关系
*
* 这个方法的作用是创建一个绑定(Binding),将指定的队列(通过 @Qualifier("delayDelayQueue") 注解注入的 queue 参数)
* 与指定的死信交换机(通过 @Qualifier("deadDelayExchange") 注解注入的 exchange 参数)绑定在一起。
* 参数 queue 是一个延迟队列,参数 exchange 是一个直连交换机(Direct Exchange)。
* 这个方法通过 BindingBuilder.bind(queue).to(exchange).with("expire") 创建了一个绑定
* 将队列 queue 绑定到交换机 exchange,并且指定了一个路由键(Routing Key)为 "expire"。
* 当延迟队列中的消息过期后,会被发送到死信交换机中,并且使用路由键 "expire" 进行路由。
*/
@Bean
public Binding DelayQueueBingingToDeadExchange(@Qualifier("delayDelayQueue") Queue queue,
@Qualifier("deadDelayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("expire");
}
}
2.4 下单的时候发送消息
if (orderMapper.insert(orderInsertEntity)>0) {
//生产者生产消息
logger.info("当前时间:{}订单入库成功,订单Id:{},正在发送到RabbitMQ,超时时间:{}s", LocalDateTime.now(), orderInsertEntity.getId(), 30 * 60 * 1000);
try {
String orderJson = objectMapper.writeValueAsString(orderInsertEntity.getId());
rabbitTemplate.convertAndSend("normal_order_exchange", "normal_order", orderJson,message -> {
message.getMessageProperties().setExpiration(String.valueOf(30 * 60 * 1000L));
return message;
});
} catch (JsonProcessingException e) {
logger.error("EQOrderJson序列化失败");
e.printStackTrace();
}
}
说明:
message.getMessageProperties().setExpiration(String.valueOf(30 * 60 * 1000L));
30 * 60 * 1000L这个是设置时间的,我这边设置的是下单超过30分钟如果没付款就会自动取消订单,我这个业务不是卖商品所以不涉及到库存,如果涉及到减库存。如果要是在2.5中收到死信消息后记得加库存。文章来源:https://www.toymoban.com/news/detail-852599.html
2.5新建一个监听器。
这一步其实就收下单半个小时后,收到消息了,会先查看一下订单状态,如果是未付款的情况下,就把订单给取消了。
文章来源地址https://www.toymoban.com/news/detail-852599.html
import com.cx.sasmc.entity.Order;
import com.cx.sasmc.mapper.OrderMapper;
import com.cx.sasmc.orderstatus.OrderStatusConstants;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
/**
* <p>
* 创建监听器(消费者端),用于接收超时未支付的订单,并对该订单进行处理
* </p>
*
* @author Lch
* @dateTime 2024/4/1 14:29
*/
@Component
public class OrderListener {
private final Logger logger = LoggerFactory.getLogger(OrderListener.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Resource
private OrderMapper orderMapper;
@RabbitListener(queues = {"normal_delay_queue"})
public void orderExpire(Message message) throws IOException {
// 使用JackSon反序列化
String orderNo = objectMapper.readValue(message.getBody(), String.class);
orderNo = orderNo.replace("\"", "");
logger.info("----------------------------------------------------");
logger.info("当前时间:{},订单超时检查订单支付状态,订单信息:{}", LocalDateTime.now(), orderNo);
// 通过订单Id查询该订单超时后的支付状态
Order order = orderMapper.selectById(orderNo);
// 如果订单状态为未支付则将此订单状态关闭
try {
if (order.getOrderStatus() == OrderStatusConstants.UNPAID) {
logger.info("当前时间:{},订单超时关闭订单,订单信息:{}", LocalDateTime.now(), orderNo);
Order orderUp = new Order();
orderUp.setId(Long.valueOf(orderNo));
orderUp.setOrderStatus(OrderStatusConstants.CANCELED);
orderUp.setFeedback("订单超时未支付!");
orderMapper.updateById(orderUp);
}
logger.info("----------------------------------------------------");
}catch (Exception e) {
e.printStackTrace();
}
}
}
到了这里,关于SpringBoot+RabbitMQ实现超时未支付订单自动取消,localhost:15672没有登录页面。的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!