RabbitMq消息丢失原因及其解决方案

这篇具有很好参考价值的文章主要介绍了RabbitMq消息丢失原因及其解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMq消息丢失原因及其解决方案

一、RabbitMQ消息丢失原因

我们首先了解下一条消息从生产到消费的整个流程如下:

rabbitmq消息丢失,windows

生产-->MQ Broker --> 消费。所以这三个环节都有丢失消息的可能。

1.1、生产者丢失消息

生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。

1.使用事务(性能差)

​ RabbitMQ 客户端中与事务机制相关的方法有三个: channel.txSelect 、channel.txCommit 和 channel.txRollback。channel.txSelect 用于将当前的信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。

​ 事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会“吸干”RabbitMQ 的性能。

2.发送回执确认(推荐)

​ 生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理,注意辨别这里的确认和消费时候的确认之间的异同。

rabbitmq消息丢失,windows

注意要点:

  • 事务机制和 publisher confirm 机制两者是互斥的,不能共存。
  • 事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。

1.2、RabbitMQ弄丢了数据-开启RabbitMQ的数据持久化

​ 为了防止rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

若生产者那边的confirm机制未开启的情况下,哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

1.3、消费端弄丢了数据

​ 为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

​ 采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。

​ 这里我们可以通过RabbtiMQ 的 Web 管理平台上可以看到当前队列中的“Ready”状态和“Unacknowledged”状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。

二、模拟消费端丢失数据

2.1、初始化工程

这里不做说了,可以看我《SpringBoot集成RabbitMQ》中的相关步骤。这里我们直接写生产者和消费者。

2.2、编写生产者

我们在之前的工程上进行相应的修改,通过http方式生产消息。我们直接在IndexController中新增simpleProducer方法;

内容如下:

/**
 * @Author julyWhj
 * @Description IndexController$
 * @Date 2021/10/7 10:18 上午
 **/
@RestController
public class IndexController {

    @Autowired
    private SimpleProducer simpleProducer;

    @GetMapping("/index")
    public String index() {
        return "Hello RabbitMQ";
    }

    @GetMapping("/producer")
    public String simpleProducer() {
        for (int i = 0; i < 10; i++) {
            simpleProducer.sendOrderMessage(Simple.builder()
                    .createTime(new Date())
                    .name("JulyWhj")
                    .age(i)
                    .no("ID-000" + i)
                    .phone("138XXXXXXXX")
                    .build());
        }
        return "消息发送成功";
    }
}

我们测试下生产者,启动服务,在浏览器中访问"http://127.0.0.1:8080/producer"。

注意:

​ 这里我们使用Debug方式启动,在消费者中加上断点,因为我们的生产者和消费者在同一个服务中,生产的消息会被消费者直接消费掉。所以我们加上断点,阻止消费者消费消息。

rabbitmq消息丢失,windows

可以看到消息浏览器返回消息发送成功。

rabbitmq消息丢失,windows

我们通过RabbitMQ管理平台看下队列中消息数量:

rabbitmq消息丢失,windows

通过管理平台我们可以看到有10条消息没消费。

2.3、模拟消费者断电

我们可以看到队列中有10条消息没有消费,我们通过Debug让消费者消费一条消息。

rabbitmq消息丢失,windows

消费一条后,我们首先查看下队列中消息数量:

rabbitmq消息丢失,windows

可以看到,队列中还有9条消息,这里我们停止消费者服务。

rabbitmq消息丢失,windows

这里我们看到,队列中消息条数为0,出现了消息丢失的问题。

​ 因为这里我们使用了消费者自动消息确认机制。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息

2.4、使用手动消息确认机制

​ 我们通过配置spring.rabbitmq.listener.simple.acknowledge-mode=manual属性,开启手动ack模式。默认配置auto自动确认。

在application配置文件中新增如下配置:

spring.rabbitmq.host=host
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 开启手动ack确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

这里我们在测试下,重新启动服务,

rabbitmq消息丢失,windows

我们可以看到,这里控制台已经消费了10条消息,但是我们没有确认消费消息,我们可以看下队列中是否依然存在10条数据。

rabbitmq消息丢失,windows

我们可以看到队列中依然存在10条消息。如果我们重启服务,模拟断电,消息依然存在不会丢失。

我们将手动ack代码注释去掉,我们可以看到已经消费了一条消息。我们停止服务,模拟断电效果。

rabbitmq消息丢失,windows

我们查看下消息队列中的数据,是9条,而非0。这样消费端不会因断电等情况导致消费端数据丢失。

rabbitmq消息丢失,windows

三、RabbitMQ消息可靠性保障策略

1、生产者开启消息确认机制

2、消息队列数据持久化

3、消费者手动ack

4、生产者消息记录+定期补偿机制

5、服务幂等处理

6、消息挤压处理等

这里的内容我们在后续文章中单独展开分析,这里只是给大家提供下需要注意的事项。包括TTL和DLX队列的使用等。文章来源地址https://www.toymoban.com/news/detail-717981.html

到了这里,关于RabbitMq消息丢失原因及其解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka数据丢失原因及解决方案

    Kafka包括Producer、Broker、Consumer,因此从这三个方面分析。 丢失原因:Kafka在Producer端的消息发送采用的是异步发送的方式(还有同步发送,但是同步发送会导致消息阻塞、需要等待),丢失数据是因为消息没有到达Broker端,原因可能是网络波动导致没有回调和数据消息太大超出

    2024年02月14日
    浏览(32)
  • kafka消息丢失解决方案

    目录 一、生产端数据丢失 二、存储端消息丢失 三、消费端数据丢失 四、小结 一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。 存储

    2023年04月26日
    浏览(40)
  • 94、Kafka消息丢失的场景及解决方案

    1、ack=0,不重试 producer发送消息完,不管结果了,如果发送失败也就丢失了。 2、ack=1,leader crash producer发送消息完,只等待 leader 写入成功就返回了,leader crash了,这时follower没来及同步,消息丢失, 3、unclean .leader .election .enable 配置true 允许选举ISR以外的副本作为leader,会导

    2024年02月16日
    浏览(44)
  • 一文读懂kafka消息丢失问题和解决方案

    今天分享一下kafka的消息丢失问题,kafka的消息丢失是一个很值得关注的问题,根据消息的重要性,消息丢失的严重性也会进行放大,如何从最大程度上保证消息不丢失,要从生产者,消费者,broker几个端来说。 kafka生产者生产好消息后,会将消息发送到broker节点,broker对数据

    2024年02月08日
    浏览(49)
  • MapReduce数据倾斜产生的原因及其解决方案

    数据倾斜就是数据的key的分化严重不均,造成一部分数据很多,一部分数据很少的局面。 数据频率倾斜 —— 某一个区域的数据量要远远大于其他区域。 数据大小倾斜 —— 部分记录的大小远远大于平均值。 (1)Hadoop框架的特性 Job数多的作业运行效率会相对比较低; count

    2023年04月08日
    浏览(37)
  • Android 内存泄漏的常见原因及其对应的解决方案

    Android应用程序中常见的内存泄漏原因有很多,以下是一些常见的原因及对应的解决方案: 1. 静态引用导致的内存泄漏: 静态变量持有对Activity或Fragment的引用,导致它们无法被垃圾回收机制释放。 解决方案: 确保不将Activity或Fragment的实例赋值给静态变量。如果确实需要使用

    2024年02月08日
    浏览(52)
  • RabbitMQ常见问题及其解决方案

    目录 RabbitMQ如何保证顺序消费 RabbitMQ消息丢失及其解决方案 RabbitMQ如何保证顺序消费 RabbitMQ重复消费及其解决方案 RabbitMQ如何保证不重复消费 RabbitMQ消息积压及其解决方案 RabbitMQ如何实现分布式事务以及保障消息最终一致性 在 RabbitMQ 中实现顺序消费可以考虑以下方法: 单一

    2024年02月09日
    浏览(44)
  • RabbitMQ--消息堆积--解决方案

    原文网址:RabbitMQ--消息堆积--解决方案_IT利刃出鞘的博客-CSDN博客 本文介绍如何处理RabbitMQ消息堆积(积压)。 对于消息队列(MQ)来说,消息丢失/消息重复/消费顺序/消息堆积(积压)是比较常见的问题,都属于消息异常,这几个问题比较重要,面试中也会经常问到。 消息堆积即

    2023年04月08日
    浏览(47)
  • Kafka消息发送失败的常见原因及解决方案

    1.1、网络故障 网络故障是Kafka消息发送失败的最常见原因之一。当网络出现故障时,Kafka就无法将消息发送到目标主题或分区。 解决方法: - 检查网络连接是否正常。 - 增加Kafka生产者的重试次数和超时时间。 1.2、分区副本不可用 如果Kafka生产者将消息发送到一个不可用的分

    2024年02月03日
    浏览(63)
  • RabbitMQ虚拟主机无法启动的原因和解决方案

    摘要: RabbitMQ是一个广泛使用的开源消息代理系统,但在使用过程中可能会遇到虚拟主机无法启动的问题。本文将探讨可能导致该问题的原因,并提供相应的解决方案,以帮助读者解决RabbitMQ虚拟主机启动失败的困扰。 在RabbitMQ中,虚拟主机(Virtual Host)是一种逻辑分区,用

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包