mq常见问题:消息丢失、消息重复消费、消息保证顺序

这篇具有很好参考价值的文章主要介绍了mq常见问题:消息丢失、消息重复消费、消息保证顺序。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

mq常见问题:消息丢失、消息重复消费、消息保证顺序

消息丢失问题
拿rabbitmq举例来说,出现消息丢失的场景如下图

从图中可以看到一共有以下三种可能出现消息丢失的情况:

1> 生产者丢消息
生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败

2>MQ自身丢消息
未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失;
开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小

3>消费者弄丢了消息
消费者刚接收到消息还没处理完成,结果消费者挂掉了…

针对以上三种情况,每种情况都有对应的处理方法:

1》生产者弄丢消息的解决方法
方法一:开启RabbitMQ的事务

rabbitmq提供了与三个事务相关的命令:select开启事务、commit提交事务、rollback回滚事务
采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。

方法二:开启confirm模式

使用springboot时在application.yml配置文件中做如下配置

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest

发送者开启 confirm 确认机制

publisher-confirm-type: correlated

实现confirm回调接口

@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
        log.error("消息发送异常!");
        //可以进行重发等操作
    } else {
        log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
    }
}

}

生产者发送消息时设置confirm回调

@Slf4j
@Configuration
public class RabbitMqConfig {

@Bean
public ConfirmCallbackService confirmCallbackService() {
    return new ConfirmCallbackService();
}

@Bean
public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);

    /**
     * 消费者确认收到消息后,手动ack回执回调处理
     */
    rabbitTemplate.setConfirmCallback(confirmCallbackService());
    return rabbitTemplate;
}

//其他配置代码
......

小结: 事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。

2》MQ自身弄丢消息时的解决方法
使用持久化队列,发送消息时做持久化到磁盘处理。
同时设置queue和message持久化以后,RabbitMQ 挂了再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据,保证数据不会丢失。

但是就算开启持久化机制,也有可能出现消息落盘时服务挂掉的情况。这时可以考虑结合生产者的confirm机制来处理,持久化机制开启后消息只有成功落盘时才会通过confirm回调通知生产者,所以可以考虑生产者在生产消息时维护一个正在等待消息发送确认的队列,如果超过一定时间还没从confirm中收到对应消息的反馈,自动进行重发处理。

3》消费者自身弄丢消息时的解决方法
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费,不过我们只需要保证幂等性就好了,重复消费也不会造成问题。

在springboot中修改application.yml配置文件更改为手动ack模式

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest

发送者开启 confirm 确认机制

publisher-confirm-type: correlated

发送者开启 return 确认机制

publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true

设置消费端手动 ack

acknowledge-mode: manual

是否支持重试

retry:
enabled: true

消费端手动ack参考代码:

@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
    try {
        //业务处理代码
        ......
    
        //手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            log.error("消息已重复处理失败,拒绝再次接收...", e);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
        } else {
            log.error("消息即将再次返回队列处理...", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }


}

消息重复消费问题
出现消息重复消费的情况

原因一
1、生产者发送给消息队列以后,消息队列会应达给生产者,但是这个过程中,消息队列出问题了没有收到消息,那么生产者就会重复发生消息,这时就产生了重复消息。
2、生产者发生消息给消息队列,消息队列由于数量太大延迟了,生产者等待响应超时了,这时生产者又会从新发生消息给消息队列。
3、生产者和消息队列因网络问题引起,生产者会发起重试。这样也会产生重复消息。
4、其实主要原因就是,消息成功进入了消息队列,但是由于各种原因消息队列没有给生产者成功的返回值,而生产者又有重试机制这种情况下就会产生重复消息。

原因二
1、消息队列推送给消费者,消费者处理消息这个过程中消费出现了问题,消息队列不知道消费者处理结果,就会在次投递。
2、消费者处理完,网络出现问题,这时没有给中间件消息队列返回结果,消息队列会在次投递消费者。
3、消费者处理超时,超过了消息队列的超时时间,这时消息队列也会再次投递。
4、消费者处理完结果返回给消息中间件,但是消息中间件出现问题,处理结果丢失了,重启后,消息中间件内部检查发现这个消息还没有处理也会在次投递给消费者。

针对该问题,一般是在消费者端做幂等处理。

如何保证消息队列消费的幂等性

这一块应该还是要结合业务来选择合适的方法,有以下几个方案:

消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。

消息保证顺序消费
消息在投入到queue的时候是有顺序,如果只是单个消费者来处理对应的单个queue,是不会出现消息错乱的问题。但是在消费的时候有可能多个消费者消费同一个queue,由于各个消费者处理消息的时间不同,导致消息未能按照预期的顺序处理。其实根本的问题就是如何保证消息按照预期的顺序处理完成。

出现消费顺序错乱的情况

为了提高处理效率,一个queue存在多个consumer

一个queue只存在一个consumer,但是为了提高处理效率,consumer中使用了多线程进行处理

保证消息顺序性的方法
将原来的一个queue拆分成多个queue,每个queue都有一个自己的consumer。该种方案的核心是生产者在投递消息的时候根据业务数据关键值(例如订单ID哈希值对订单队列数取模)来将需要保证先后顺序的同一类数据(同一个订单的数据) 发送到同一个queue当中。

一个queue就一个consumer,在consumer中维护多个内存队列,根据业务数据关键值(例如订单ID哈希值对内存队列数取模)将消息加入到不同的内存队列中,然后多个真正负责处理消息的线程去各自对应的内存队列当中获取消息进行消费。

RabbitMQ保证消息顺序性总结:
核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。

本文总结:
消息丢失:

生产者 建议使用异步confirm(非高并发需求情况下也可以考虑rabbitmq的事务机制)

    mq:做消息持久化

    消费者:关闭自动提交offset/自动ack  使用手动处理

重复消费:很容易解决,建立去重表,做幂等处理

如何保证有序:文章来源地址https://www.toymoban.com/news/detail-493048.html

多个queue, 每个queue都有一个自己的consumer,将一类消息投递到一个queue中

    消费者维护内存队列,同一类消息hash到一个内存队列中

到了这里,关于mq常见问题:消息丢失、消息重复消费、消息保证顺序的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • RocketMQ和Kafka的区别,以及如何保证消息不丢失和重复消费

    RocketMQ和Kafka的区别,以及如何保证消息不丢失和重复消费

    性能(单台) 语言 多语言支持客户端 优缺点 RocketMQ 十万级 java java 模型简单、接口易用,在阿里有大规模应用 文档少,支持的语言少 Kafka 百万级 服务端scala,客户端java 主流语言均支持 天生分布式、性能最好,常用于大数据领域 运维难度大,对zookeeper强依赖,多副本机制

    2024年01月16日
    浏览(8)
  • MQ消息队列详解以及MQ重复消费问题

    MQ消息队列详解以及MQ重复消费问题

    https://blog.csdn.net/qq_44240587/article/details/104630567 核心的就是:解耦、异步、削锋 现有ABCDE五个系统,最初的时候BCD三个系统都要调用A系统的接口获取数据,一切都很正常,但是突然,D系统说:我不要了,你不用给我传数据了,A系统无奈,只能修改代码,将调用D系统的代码删除

    2024年04月13日
    浏览(10)
  • 阿里三面:MQ 消息丢失、重复、积压问题,如何解决?

    阿里三面:MQ 消息丢失、重复、积压问题,如何解决?

    作者:美得让人心动 来源:https://blog.csdn.net/gu131007416553/article/details/120934738 面试官在面试候选人时,如果发现候选人的简历中写了在项目中使用了 MQ 技术(如 Kafka、RabbitMQ、RocketMQ),基本都会抛出一个问题:在使用 MQ 的时候,怎么确保消息 100% 不丢失? 这个问题在实际工

    2024年02月09日
    浏览(8)
  • RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题

    RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题

    RabbitMq 消息可靠性问题(一) — publisher发送时丢失 前面我们从publisher的方向出发解决了发送时丢失的问题,那么我们在发送消息到exchange, 再由exchange转存到queue的过程中。如果MQ宕机了,那么我们的消息是如何确保可靠性的呢?当消息由队列发到对应的消费者处理时,consumer 接

    2024年02月11日
    浏览(9)
  • 什么是mq?可靠性、重复消息、重复消费、丢失、发送大文件、延迟、发送机制、重试、死信、幂等、有序、大小、过期、优先级、进了死信队列还能出来吗?

    “MQ” 指的是消息队列(Message Queue),是一种用于异步通信的技术。消息队列是一种中间件,用于在分布式系统中传递消息,使不同组件之间能够进行松散耦合的通信。它的核心思想是生产者将消息发送到队列,而消费者从队列中接收并处理消息。 消息队列的主要优点包括

    2024年02月06日
    浏览(8)
  • RaabitMQ(三) - RabbitMQ队列类型、死信消息与死信队列、懒队列、集群模式、MQ常见消息问题

    RaabitMQ(三) - RabbitMQ队列类型、死信消息与死信队列、懒队列、集群模式、MQ常见消息问题

    这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需

    2024年02月14日
    浏览(420)
  • 如何保证Mq消息不丢失

    如何保证Mq消息不丢失

    mq: rabbitmq, rocketmq, kafka 1.RocketMQ RocketMQ是如何最大限度的保证消息不丢失 生产阶段:消息在 Producer 发送端创建出来,经过网络传输发送到 Broker 存储端。 存储阶段:消息在 Broker 端存储,如果是主备或者多副本,消息会在这个阶段被复制到其他的节点或者副本上。 消费阶段:

    2023年04月09日
    浏览(11)
  • 如何保证消息不被重复消费?

           在Java中,可以使用消息队列来实现消息的异步处理,其中常用的消息队列有 RabbitMQ、ActiveMQ、Kafka 等。 为了避免消息被重复消费,可以使用以下几种方法: 常见的消息队列如 Kafka、RocketMQ等提供了幂等性机制,能够确保同一条消息被消费多次时只会产生一次影响。在

    2024年02月16日
    浏览(8)
  • kafka如何保证消息不被重复消费

    kafka如何保证消息不被重复消费

    (1)kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。但是当我们直接kill进程

    2024年02月11日
    浏览(14)
  • RabbitMQ防止消息重复消费、保证异步消息的幂等性

    一、rabbitmq出现消息重复的场景 1、消费成功,没有进行ack,这时 Broker 会重新发送 2、不确认(unack)或 reject 之后,重新排队,Broker 会重新发送 3、消费成功,ack时宕机,没有ack成功,消息由unack变为ready,Broker又重新发送 4、总的来说就是 Broker 发送消息后,消费端收到消息

    2024年02月13日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包