这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。
目录
一、防止消息丢失
1.1、消息确认机制(生产者)
(1)生产者丢失消息
(2)生产者消息确认机制
1.2、消息确认机制(消费者)
(1)消费者丢失消息
(2)消费者消息确认机制
1.3、消息持久化(RabbitMQ)
(1)RabbitMQ丢失消息
(2)消息持久化机制
一、防止消息丢失
RabbitMQ消息队列,在使用的时候,可能会存在消息丢失的情况,所谓的消息丢失就是生产者发送的消息没办法被消费者正确的消费,消息队列中导致消息丢失的地方有三个,分别是:
- 第一种情况:生产者发送的消息没有正确的发送到RabbitMQ里面,导致发送的消息丢失。
- 第二种情况:消费者从RabbitMQ消费消息时候,消费失败,但是RabbitMQ认为消费成功,从而删除了消息。
- 第三种情况:RabbitMQ中保存的消息还没有被消费者消费,此时RabbitMQ服务宕机,导致内存中的消息丢失。
1.1、消息确认机制(生产者)
(1)生产者丢失消息
生产者丢失消息,是指:当生产者发送消息给RabbitMQ的时候,此时消息发送失败了,并且生产者又没有重新发送这一条消息,所以这个时候,生产者这一条失败的消息就丢失了。
既然是生产者发送消息失败导致这一条消息丢失的,那么我们在处理这个丢失消息问题的时候,就可以这样做:当生产者消息发送失败之后,可以让生产者再次发送这一条消息,这里就有一个问题啦,那就是生产者怎么知道消息有没有发送成功???
RabbitMQ给我们提供了一个机制,即:发布确认机制,大致思想是:当生产者将消息发送到RabbitMQ之后,并且RabbitMQ正确接收到消息并将其放入Queue队列里面时,RabbitMQ会返回一个ACK标识给生产者,生产者接收到ACK标识就可以认为消息发送成功啦;如果消息接收失败,RabbitMQ会返回一个NACK标识,表示接收失败。
(2)生产者消息确认机制
生产者消息确认机制,上一篇文章已经介绍了(【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式),这里就不再重复。
1.2、消息确认机制(消费者)
(1)消费者丢失消息
如果生产者已经将消息正确的发送到RabbitMQ里面了,消费者从Queue队列里面获取消息消费时候,如果消费失败,那么此时就会导致这一条消息丢失,这是因为,默认情况下,RabbitMQ将消息分发给消费者之后,消费者接收到消息时候,就会返回一个ACK标识给消息队列RabbitMQ,此时RabbitMQ就会将这一条消息从Queue队列里面删除,但是这种情况下,消费者是否正确将这条消息消费了,RabbitMQ是不知道的,所以这就有可能导致丢失。
如何解决消费者丢失消息???
- 既然丢失消息是因为消费者消费失败,并且RabbitMQ把消息删除了,那么我们就可以开启手动确认的方式来告诉RabbitMQ,消费者是否正确的消费消息,是否可以将消息从Queue队列里面删除了。
(2)消费者消息确认机制
- 消费者进行消息确认,需要关闭自动确认,将【basicConsume()】方法的第二个参数设置为【false】。
- 消息成功消费之后,主动调用【basicAck()】方法,返回ACK标识给RabbitMQ。
package com.rabbitmq.demo.dropmsg;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:30
* @Copyright (C) ZhuYouBin
* @Description: 消息消费者
*/
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接的 RabbitMQ 服务地址
factory.setHost("127.0.0.1"); // 默认就是本机
factory.setPort(5672); // 默认就是 5672 端口
// 3、获取连接
Connection connection = null; // 连接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、获取通道
channel = connection.createChannel();
// 5、声明 Exchange,如果不存在,则会创建
String exchangeName = "exchange_dropmsg_2023";
channel.exchangeDeclare(exchangeName, "direct");
// 6、指定需要操作的消息队列,如果队列不存在,则会创建
String queueName = "queue_dropmsg_2023";
channel.queueDeclare(queueName, false, false, false, null);
// 7、绑定 Exchange 和 Queue, 接收 routingKey = "info" 的消息
channel.queueBind(queueName, exchangeName, "key_2023");
// 8、消费消息
Channel finalChannel = channel;
DeliverCallback callback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
// 接收消息
System.out.println("这是接收的消息:" + new String(delivery.getBody()));
// TODO 消费者正确消费消息之后,主动返回 ACK 标识
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
}
};
// TODO 这第二个参数修改为 false,表示消费者需要手动发送 ACK 标识给 RabbitMQ(默认是true)
channel.basicConsume(queueName, false, callback, i->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3、消息持久化(RabbitMQ)
(1)RabbitMQ丢失消息
上面介绍了两种丢失消息的情况,分别是生产者和消费者丢失消息,还有一种丢失消息的情况,那就是RabbitMQ消息队列将消息丢失了。假设,现在存在这一种情况,生产者已经正确将消息发送到RabbitMQ里面,正准备将消息发送给消费者的时候,此时RabbitMQ服务宕机了,导致RabbitMQ中的消息丢失了(默认情况下,RabbitMQ是将消息保存在内存中的),由于内存中的数据断电即失,所以这就导致消息丢失情况。
如何解决RabbitMQ出现的消息丢失问题呢???
- 既然RabbitMQ是将消息保存在内存中的,那么为了避免消息丢失,可以将内存中的消息保存到磁盘文件里面,这样即使RabbitMQ宕机了,重新启动的时候也可以从磁盘文件里面读取消息到内存里面。
(2)消息持久化机制
- 在调用【queueDeclare()】方法,创建Queue队列的时候,设置第二个参数等于【true】,表示消息允许持久化。
- 生产者调用【basicPublish()】方法发送消息的时候,设置消息属性等于【MessageProperties.PERSISTENT_TEXT_PLAIN】,表示文本持久化。
// 第二个参数设置为true,表示开启持久化消息
channel.queueDeclare("Queue队列名称", true, false, false, null);
// 生产者发送消息时候,设置消息属性是文本持久化
channel.basicPublish("Exchange交换机名称", "Queue队列名称", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
到此,RabbitMQ消息队列防止消息丢失的三种方式介绍完啦。文章来源:https://www.toymoban.com/news/detail-787896.html
综上,这篇文章结束了,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。文章来源地址https://www.toymoban.com/news/detail-787896.html
到了这里,关于【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!