目录
一、Linux(ubuntu)安装RabbitMQ
(1)首先确认Linux 内核版本,确定是Ubuntu还是CentOS版本。
(2)rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang。
(3)确定relang是否安装成功。
(4)更新一下软件包,确保软件最新版本。
(5)安装最新版本的RabbitMQ
(5.1)安装指定版本 (先准备环境)
(5.2)查询目前可以安装的版本
(5.3)选择版本
(6)查看状态,验证是否安装成功。
(7)启动插件,允许web访问。(很关键)
(8)重启一下RabbitMQ服务
(9)添加用户方便web访问
(10)登录
(11)注意:如果遇见没有死信队列可用,可以在官网下载 (以下为帮助文档)
二、RabbitMQ整合SpringBoot使用
(1)导入依赖
(2)配置文件
(3)声明exchange、queue
(4)发布消息到RabbitMQ
(5)创建消费者监听消息
(6)ACK
(7)添加配置文件
(8)手动ack
三、消息的可靠性
(1)RabbitMQ的事务
(2)事务模型
(3)普通Confirm方式
(4)批量Confirm方式。
(5)异步Confirm方式。
(6)Return机制
(7)SpringBoot整合实现
(8)避免消息重复消费
(9)SpringBoot实现
四、使用死信队列实现订单超时取消
一、Linux(ubuntu)安装RabbitMQ
(1)首先确认Linux 内核版本,确定是Ubuntu还是CentOS版本。
uname -a
(2)rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang。
apt-get install erlang-nox
(3)确定relang是否安装成功。
erl
(4)更新一下软件包,确保软件最新版本。
sudo apt-get update
(5)安装最新版本的RabbitMQ
sudo apt-get install rabbitmq-server
(5.1)安装指定版本 (先准备环境)
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
(5.2)查询目前可以安装的版本
apt-cache madison rabbitmq-server
(5.3)选择版本
sudo apt-get install rabbitmq-server=3.7.7-1
(6)查看状态,验证是否安装成功。
systemctl status rabbitmq-server
(7)启动插件,允许web访问。(很关键)
rabbitmq-plugins enable rabbitmq_management
(8)重启一下RabbitMQ服务
service rabbitmq-server restart
(9)添加用户方便web访问
rabbitmqctl add_user admin adimin // 增加普通用户
abbitmqctl set_user_tags admin administrator // 给普通用户分配管理员角色
(10)登录
(11)注意:如果遇见没有死信队列可用,可以在官网下载 (以下为帮助文档)
https://blog.csdn.net/u010404909/article/details/125329160
下载好以后上传插件到Linux
// 插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
// 重启mq
systemctl restart rabbitmq-server
二、RabbitMQ整合SpringBoot使用
(1)导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置文件
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
(3)声明exchange、queue
@Configuration
public class RabbitMQConfig {
//1. 创建exchange - topic
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("boot-topic-exchange",true,false);
}
//2. 创建queue
@Bean
public Queue getQueue(){
return new Queue("boot-queue",true,false,false,null);
}
//3. 绑定在一起
@Bean
public Binding getBinding(TopicExchange topicExchange,Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
(4)发布消息到RabbitMQ
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
(5)创建消费者监听消息
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Object message){
System.out.println("接收到消息:" + message);
}
}
(6)ACK
消费者在消费消息时,为了保证消息的可靠性,可以设置ack模式,当没有设置时,为默认自动ACK。当消费成功后,会对mq进行一个响应,mq收到成功消费的响应后,才会移除消息。自动ACK是默认设置,当消费过程中没有异常并顺利执行完毕,会自动响应mq服务器,当消费过程中出现异常,则不会响应,会被mq视为ACK失败。
手动ACK:当消息成功后,需要手动ACK响应mq服务器,否则在连接未断开前,一直是unacked状态,不会重新分配消费者,直到连接断开才会回到ready状态,并重新分配消费者。
消息拒绝:当消息被拒绝时,如果设置重新入列,会继续将该消息返回到mq重新分配消费者,如果设置false,则直接丢弃消息。
ACK也支持事务,与生产者投递消息时事务代码几乎一样。但是必须设置为手动ACK才能支持事务,自动ack情形下事务是无效的。
(7)添加配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #手动确定
ACK有三种确认方式:
• 自动确认:acknowledge = none
• 手动确认:acknowledge = manual
• 根据异常情况确认:acknowledge = auto
(8)手动ack
@RabbitListener(queues = "myqueue1") // 监听mq
public void consumer1(String msg, Channel channel, Message message) throws IOException{
System.out.println("consumer1===" + msg);
// 当设置了手动ack时,没有进行ack,mq会将该消息设置为unacked状态,此时如果连接断开,会将消息重新设置为ready状态,继续分配消费者
// 消息的拒绝,会将消息直接设置为ready状态,继续分配消费者
// 参数:1,消息对象的标识, 2、requeue,是否重新回到队列
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
三、消息的可靠性
(1)RabbitMQ的事务
事务可以保证消息正确传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100。RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
(2)事务模型
在投递消息时开启事务,投递完消息无误的情况下提交事务,有误的情况下回滚事务。性能非常低下。一般情况下mq就是用来解耦进行削峰填谷,使用事务不推荐。
注意:在连接关闭前,事务都没有commit时,等同于rollback。
try (
// 获得连接
Connection connection = MQConnections.getConnection();
// 创建通道
Channel channel = connection.createChannel();
){
// 开启事务
channel.txSelect();
// 直接发送简单消息到队列
// 参数:1、交换机 2、队列名称 3、消息携带的properties 4、消息主体
channel.basicPublish("myex1", "", null, msg.getBytes());
System.out.println("消息发送成功");
// 提交事务
channel.txCommit();
// 回滚事务
// channel.txRollback();
}catch (Exception e){
e.printStackTrace();
}
(3)普通Confirm方式
开启了确认机制后,也会降低性能,大概是原来的10倍。不论生产者是否接收确认消息,消息都已经成功发送了。
//3.1 开启confirm
channel.confirmSelect();
//3.2 发送消息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判断消息发送是否成功
if(channel.waitForConfirms()){
System.out.println("消息发送成功");
}else{
System.out.println("发送消息失败");
}
(4)批量Confirm方式。
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 确定批量操作是否成功
channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
(5)异步Confirm方式。
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
消息传递可靠性
(6)Return机制
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。而且exchange是不能持久化消息的,queue是可以持久化消息。采用Return机制来监听消息是否从exchange送到了指定的queue中
消息传递可靠性
开启Return机制,并在发送消息时,指定mandatory为true
// 开启return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 当消息没有送达到queue时,才会执行。
System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
}
});
// 在发送消息时,指定mandatory参数为true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());
(7)SpringBoot整合实现
- 配置文件
spring:
rabbitmq:
publisher-confirm-type: simple
publisher-returns: true
- 开启confirm和return
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // init-method
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已经送达到Exchange");
}else{
System.out.println("消息没有送达到Exchange");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息没有送达到Queue");
}
}
(8)避免消息重复消费
重复消费消息,会对非幂等行操作造成问题。原因是消费者没有给RabbitMQ一个ack。
重复消费
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中。
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
消费者,在消费消息时,根据具体业务逻辑去操作redis
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("192.168.199.109",6379);
String messageId = properties.getMessageId();
//1. setnx到Redis中,默认指定value-0
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if(result != null && result.equalsIgnoreCase("OK")) {
System.out.println("接收到消息:" + new String(body, "UTF-8"));
//2. 消费成功,set messageId 1
jedis.set(messageId,"1");
channel.basicAck(envelope.getDeliveryTag(),false);
}else {
//3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
String s = jedis.get(messageId);
if("1".equalsIgnoreCase(s)){
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
(9)SpringBoot实现
- 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 配置文件
spring:
redis:
host: 192.168.199.109
port: 6379
- 修改生产者
@Test
void contextLoads() throws IOException {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
System.in.read();
}
- 修改消费者
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
四、使用死信队列实现订单超时取消
死信队列:DLX,
dead-letter-exchange
利用DLX,当消息在一个队列中变成死信
(dead message)
之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息变成死信有以下几种情况:
消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度
死信处理过程:
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。
具体实现:
上面的代码中,修改RabbitConfig类,配置延迟队列和死信队列文章来源:https://www.toymoban.com/news/detail-768339.html
@Configuration
public class RabbitConfig {
// 创建队列
@Bean
public Queue orderDelayQueue(){
// 将订单延迟队列绑定死信队列
Map map = new HashMap();
map.put("x-dead-letter-exchange", "dead_exchange");
map.put("x-dead-letter-routing-key", "dead_route_key");
return new Queue("orderDelayQueue", true, false, false, map);
}
// 创建Fanout类型交换机
@Bean
public DirectExchange orderDelayExchange(){
return new DirectExchange("orderDelayExchange");
}
// 将队列绑定到交换机
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with("orderDelayKey");
}
// 创建死信队列
@Bean
public Queue deadQueue(){
// 将订单延迟队列绑定死信队列
return new Queue("deadQueue");
}
// 创建Fanout类型交换机
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead_exchange");
}
// 将队列绑定到交换机
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_route_key");
}
}
@RestController
public class OrderController {
@Resource
private OrderService orderService;
@Resource
private MySender mySender;
@RequestMapping("/add")
public String save(Orders order){
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
order.setNo(UUID.randomUUID().toString().replace("-", ""));
order.setStatus(0);
orderService.save(order);
// 投递到订单延迟队列中,该队列没有消费者,作用是为了超时后投递到死信队列中
mySender.sendOrder(order);
return "success";
}
}
@Component
public class MySender {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendOrder(Orders orders){
rabbitTemplate.convertAndSend("orderDelayExchange", "orderDelayKey", orders, message -> {
// 设置超时时间,单位毫秒
message.getMessageProperties().setExpiration("10000");
return message;
}
);
System.out.println(orders.getNo() + "订单已经放入队列中...");
}
}
@Component
public class MyConsumer {
@Resource
private OrderDAO orderDAO;
@RabbitListener(queues = "deadQueue")
public void consume(Orders orders){
orders.setStatus(2);
QueryWrapper queryWrapper = new QueryWrapper<>();
queryWrapper.eq("no", orders.getNo());
orderDAO.update(orders, queryWrapper);
System.out.println(orders.getNo() + "取消成功");
}
}
@Service
public class OrderService {
@Resource
private OrderDAO orderDAO;
public void save(Orders orders){
orderDAO.insert(orders);
}
}
通过controller中添加方法,测试添加一条订单,等待10秒后,看是否会取消订单。文章来源地址https://www.toymoban.com/news/detail-768339.html
到了这里,关于Java - RabbitMq的安装&使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!