如何保证Mq消息不丢失

这篇具有很好参考价值的文章主要介绍了如何保证Mq消息不丢失。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

mq: rabbitmq, rocketmq, kafka

1.RocketMQ

RocketMQ是如何最大限度的保证消息不丢失

如何保证Mq消息不丢失

  1. 生产阶段:消息在 Producer 发送端创建出来,经过网络传输发送到 Broker 存储端。
  2. 存储阶段:消息在 Broker 端存储,如果是主备或者多副本,消息会在这个阶段被复制到其他的节点或者副本上。
  3. 消费阶段:Consumer 消费端从 Broker存储端拉取消息,经过网络传输发送到 Consumer 消费端上,并通过重试来最大限度的保证消息的消费。

Producer:

  1. 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功

  2. 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失

  3. RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功

Broker

  1. 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的

  2. Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中

  3. Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失

cunmser

  1. Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标

  2. 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset

  3. 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作

  4. 如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的

2.Kafka

如何保证Mq消息不丢失

  1. Producer:生产者,负责创建消息,然后投递到 Kafka 集群中,投递时需要指定消息所属的 Topic,同时确定好发往哪个 Partition。
  2. Consumer:消费者,会根据它所订阅的 Topic 以及所属的消费组,决定从哪些 Partition 中拉取消息。
  3. Broker:消息服务器,可水平扩展,负责分区管理、消息的持久化、故障自动转移等。
  4. Zookeeper:负责集群的元数据管理等功能,比如集群中有哪些 broker 节点以及 Topic,每个 Topic 又有哪些 Partition 等。
2.1 消息传递语义剖析

如何保证Mq消息不丢失

  • at least once: Producer 向 Broker 发送数据后,会进行 commit,如果 commit 成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。
  • at most once: 从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。
  • 如果 Consumer 消费消息完成后, 再更新 Offset, 如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。
  • exactly once: 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。

默认 Kafka 提供 「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset 的方式提供 「at most once」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。

2.2 Producer 端丢失场景剖析

导致 Producer 端消息没有发送成功有以下原因:

  • 网络原因:由于网络抖动导致数据根本就没发送到 Broker 端。

  • 数据原因:消息体太大超出 Broker 承受范围而导致 Broker 拒收消息。

解决问题:

1.request.required.acks:

  • acks = 0:由于发送后就自认为发送成功,这时如果发生网络抖动, Producer 端并不会校验 ACK 自然也就丢了,且无法重试。
  • acks = 1:消息发送 Leader Parition 接收成功就表示发送成功,这时只要 Leader Partition 不 Crash 掉,就可以保证 Leader Partition 不丢数据,但是如果 Leader Partition 异常 Crash 掉了, Follower Partition 还未同步完数据且没有 ACK,这时就会丢数据。
  • acks = -1 或者 all: 消息发送需要等待 ISR 中 Leader Partition 和 所有的 Follower Partition 都确认收到消息才算发送成功, 可靠性最高, 但也不能保证不丢数据,比如当 ISR 中只剩下 Leader Partition 了, 这样就变成 acks = 1 的情况了。生成者消息确认机制

2.弃用调用发后即焚的方式,使用带回调通知函数的方法进行发送消息: Producer.send(msg, callback)生产者消息同步投递

3.重试次数 retries: Producer 端发送消息的重试次数, 设置为大于0的数

4.重试时间 retry.backoff.ms: 推荐设置为300ms。

2.3 Broker 端丢失场景剖析

Kafka Broker 集群接收到数据后会将数据进行持久化存储到磁盘: 同步刷盘和异步刷盘
如何保证Mq消息不丢失
kafka 通过「多 Partition (分区)多 Replica(副本)机制」已经可以最大限度的保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失。

多分区多副本:设置参数:
unclean.leader.election.enable:false
replication.factor >=3
min.insync.replicas > 1
replication.factor = min.insync.replicas +1

2.4 Consumer 端丢失场景剖析

拉取数据、业务逻辑处理、提交消费 Offset 位移信息。

enable.auto.commit = false, 采用手动提交位移的方式。设置为手动提交不断去尝试

对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。

3.如何保证RabbitMQ全链路数据100%不丢失

如何保证Mq消息不丢失

3.1 生产端可靠性投递
  • 事务消息机制:事务消息机制由于会严重降低性能,使用confirm消息确认机制
  • confirm消息确认机制: 生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

如何保证Mq消息不丢失

channel.confirmSelect();// 开启发送方确认模式

然后异步监听确认和未确认的消息:

channel.addConfirmListener(new ConfirmListener() {
    //消息正确到达broker
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("已收到消息");
        //做一些其他处理
    }

    //RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + deliveryTag);
        //做一些其他处理,比如消息重发等
    }
});

这样就可以让生产端感知到消息是否投递到RabbitMQ中了

  • 消息持久化

message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。

如何保证Mq消息不丢失
所有需要给exchange、queue和message都进行持久化:

//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。

  • 消息入库

消息保存到数据库中

status=0: 表示生产端将消息发送给了RabbitMQ但还没收到确认。
status=1: 表示RabbitMQ已收到消息。
生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性)。

如何保证Mq消息不丢失

3.2 消费端消息不丢失
  1. 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
  2. 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;
  3. 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

如何保证Mq消息不丢失
自动ack机制改为手动ack机制。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        //接收到消息,做处理
        //手动确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        //出错处理,这里可以让消息重回队列重新发送或直接丢弃消息
    }
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

autoAck参数置为false, 那么RabbitMQ服务端的队列分为两部分: 1.等待投递给消费端的消息 2.已经投递给消费端。如果RabbitMQ一直没有收到消费端的确认信号, RabbitMQ会安排该消息重新进入队列(放在队列头部)等待投递给下一个消费者。文章来源地址https://www.toymoban.com/news/detail-407641.html

到了这里,关于如何保证Mq消息不丢失的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在构建分布式系统时,选择适合的消息中间件是至关重要的决策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是当前流行的消息中间件之一,它们各自具有独特的特点和适用场景。本文将对这四种消息中间件进行综合比较,帮助您在项目中作出明智的选择。 1. RabbitMQ 特点: 消息模

    2024年02月20日
    浏览(18)
  • RabbitMQ 消息丢失的场景,如何保证消息不丢失?

    RabbitMQ 消息丢失的场景,如何保证消息不丢失?

    第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。 1.针对生

    2024年02月11日
    浏览(17)
  • RabbitMQ如何保证消息不丢失

    RabbitMQ如何保证消息不丢失

    观察整个 RabbitMQ 消息发送过程: 从上述流程我们可以得知:消息从生产者到达消费者,经过两次网络传输,并且在 RabbitMQ 服务器中进行路由。 因此我们能知道整个流程中可能会出现三种消息丢失场景: 生产者发送消息到 RabbitMQ 服务器的过程中出现消息丢失。 可能是网络波

    2024年02月21日
    浏览(8)
  • RabbitMQ如何保证消息不丢失?

    RabbitMQ如何保证消息不丢失?

    1、什么 情况会导致消息丢失 ? ​​​​​​          a.发送 时丢失:                    生产者发送的消息未送达exchange                     消息到达exchange 后未到达 queue         b.MQ宕机, queue 将消息丢失         c.consumer接收到消息后未消费就宕机

    2024年02月02日
    浏览(9)
  • 如何保证 RabbitMQ 消息不丢失?

    如何保证 RabbitMQ 消息不丢失?

      第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。 第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。   方

    2024年02月08日
    浏览(10)
  • RabbitMQ如何保证消息不丢失呢?

    RabbitMQ如何保证消息不丢失呢?

    RabbitMQ 是一个流行的消息队列系统,用于在分布式应用程序之间传递消息。要确保消息不会丢失,可以采取以下一些措施: 持久化消息: RabbitMQ 允许你将消息标记为持久化的。这意味着消息将被写入磁盘,即使 RabbitMQ 服务器崩溃,也能够在恢复后重新发送消息。要使消息持

    2024年02月07日
    浏览(9)
  • 如何保证Kafka不丢失消息

    如何保证Kafka不丢失消息

    丢失消息有 3 种不同的情况,针对每一种情况有不同的解决方案。 生产者丢失消息的情况 消费者丢失消息的情况 Kafka 弄丢了消息 生产者丢失消息的情况 生产者( Producer ) 调用 send 方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用 send(

    2024年01月16日
    浏览(8)
  • Kafka 如何保证消息不丢失

    1.1 丢失原因: kafka生产端异步发送消息后,不管broker是否响应,立即返回,伪代码producer.send(msg),由于网络抖动,导致消息压根就没有发送到broker端; kafka生产端发送消息超出大小限制,broker端接到以后没法进行存储; 1.2 解决方案: 1、生产者调用异步回调消息。伪代码如

    2024年02月13日
    浏览(5)
  • 【Kafka面试】Kafka如何保证消息不丢失?

    【Kafka面试】Kafka如何保证消息不丢失?

    使用Kafka时,在消息的收发过程中都有可能会出现消息丢失。 1. 设置异步发送 同步发送:会产生阻塞,一般使用异步发送。 异步发送:实现回调方法,消息发送失败时记录日志,或者重新发送,最终确保消息能够成功发送。 2. 设置消息重试机制 由于 网络抖动 问题,很快就

    2024年02月03日
    浏览(8)
  • 一文彻底搞懂Kafka如何保证消息不丢失

    一文彻底搞懂Kafka如何保证消息不丢失

    Producer:生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。 Consumer:消费者,接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。 Consumer Group:将多个消费者组成一个消费者组,一个消费者组可以包含一个或多个消费者。

    2024年04月22日
    浏览(15)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包