带你了解RabbitMQ:消息丢失、重复、积压的原因及其解决方案

这篇具有很好参考价值的文章主要介绍了带你了解RabbitMQ:消息丢失、重复、积压的原因及其解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

首先说一点,企业中最常用的实际上既不是RocketMQ,也不是Kafka,而是RabbitMQ。

RocketMQ很强大,但主要是阿里推广自己的云产品而开源出来的一款消息队列,其实中小企业用RocketMQ的没有想象中那么多。

深层次的原因在于兔宝在中小企业普及更早,经受的考验也更久,很容易产生「回头客」,当初随RabbitMQ成长的一批人才如今大部分都已成为企业中的中坚骨干,技术选型亲睐RabbitMQ的几率就更高。

至于Kafka,主要还是用在大数据和日志采集方面,除了一些公司有特定的需求会使用外,对消息收发准确率要求较高的公司依然是以RabbitMQ作为企业级消息队列的首选。

工作这么多年我自身的感受是,RabbitMQ经久不衰,除非后续其他消息中间件有与众不同的使用体验,否则依然是RabbitMQ的占有率更高。

所以准备进入软件行业的小伙伴,我建议有必要系统的先把RabbitMQ学好,然后再学习其他消息中间件扩展视野,他们的原理大同小异,是可以触类旁通的。

两个概念

RabbitMQ避免消息丢失的方法主要是利用消息确认机制和手动签收机制,所以有必要把这两个概念搞清楚。

1、消息确认机制

主要是生产者使用的机制,用来确认消息是否被成功消费。

配置如下:

spring: 
    rabbitmq:
        address: 192.168.x.x:xxxx
        virtual-host: /
        username: guest
        password: guest
        connection-timeout: 5000
        publisher-confirms: true # 消息成功确认
        publisher-returns: true # 消息失败确认
        template: 
            mandatory: true # 手动签收机制
复制代码

这样,当你实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,就可以针对性地进行消息确认的日志记录,之后做进一步的消息发送补偿,以达到接近100%投递的目的。

伪代码如下:

@Component
@Slf4j
public class RabbitMQSender implements RabbitTemplate.ConfirmCallback, 
RabbitTemplate.ReturnCallback {
    
    /**
     * 发送消息
     */
    public void sendOrder(Order order) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        
        // 发送消息
        rabbitTemplate.convertAndSend(xx, xx, order, xx);
    }
    
    
    /**
     * 成功接收后的回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
    
        // 如果成功接收了,这里可以对日志表的消息收发状态做更新。
        // ....
        
    }
    
    
    /**
     * 失败后的回调
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    
        // 如果失败了,这里可以对日志表的消息收发状态做更新,之后通过任务调度去补偿发送。
        // ....
        
    }
}
复制代码

2、消息签收机制

RabbitMQ的消息是自动签收的,你可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。

企业级开发中,RabbitMQ我们基本都开启手动签收方式,这样可以有效避免消息的丢失。

前文中已经在生产者开启了手动签收机制,那么作为消费方,也要设置手动签收。

配置如下:

spring: 
    rabbitmq:
        address: 192.168.x.x:xxxx
        virtual-host: /
        username: guest
        password: guest
        connection-timeout: 5000
        listener: 
            simple: 
                concurrency: 5 # 并发数量
                max-concurrency: 10 # 最大并发数量
                acknowledge-mode: manual # 开启手动签收
                prefetch: 1 # 限制每次只消费一个(一个线程),上面配置5,也就是能一次接收5个
复制代码

消费监听时,手动签收就一行代码,伪代码如下:

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    // ....
    
    // 手动签收
    channel.basicAck(tag, false);
    
}
复制代码

消息丢失

两个概念搞清楚后,就可以来学习消息丢失的问题和处理方案了。

1、出现原因

消息丢失的原因无非有三种:

1)、消息发出后,中途网络故障,服务器没收到;

2)、消息发出后,服务器收到了,还没持久化,服务器宕机;

3)、消息发出后,服务器收到了,消费方还未处理业务逻辑,服务却挂掉了,而消息也自动签收,等于啥也没干。

这三种情况,(1) 和 (2)是由于生产方未开启消息确认机制导致,(3)是由于消费方未开启手动签收机制导致。

2、解决方案

1)、生产方发送消息时,要try...catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中,日志表中要有消息发送状态,若发送失败,由定时任务定期扫描重发并更新状态;

2)、生产方publisher必须要加入确认回调机制,确认成功发送并签收的消息,如果进入失败回调方法,就修改数据库消息的状态,等待定时任务重发;

3)、消费方要开启手动签收ACK机制,消费成功才将消息移除,失败或因异常情况而尚未处理,就重新入队。

其实这就是前面阐述两个概念时已经讲过的内容,也是接近100%消息投递的企业级方案之一,主要目的就是为了解决消息丢失的问题。

消息重复

1、出现原因

消息重复大体上有两种情况会出现:

1)、消息消费成功,事务已提交,签收时结果服务器宕机或网络原因导致签收失败,消息状态会由unack转变为ready,重新发送给其他消费方;

2)、消息消费失败,由于retry重试机制,重新入队又将消息发送出去。

2、解决方案

网上大体上能搜罗到的方法有三种:

1)、消费方业务接口做好幂等;

2)、消息日志表保存MQ发送时的唯一消息ID,消费方可以根据这个唯一ID进行判断避免消息重复;

3)、消费方的Message对象有个getRedelivered()方法返回Boolean,为TRUE就表示重复发送过来的。

我这里只推荐第一种,业务方法幂等这是最直接有效的方式,(2)还要和数据库产生交互,(3)有可能导致第一次消费失败但第二次消费成功的情况被砍掉。

消息积压

1、出现原因

消息积压出现的场景一般有两种:

1)、消费方的服务挂掉,导致一直无法消费消息;

2)、消费方的服务节点太少,导致消费能力不足,从而出现积压,这种情况极可能就是生产方的流量过大导致。

2、解决方案

1)、既然消费能力不足,那就扩展更多消费节点,提升消费能力;

2)、建立专门的队列消费服务,将消息批量取出并持久化,之后再慢慢消费。

(1)就是最直接的方式,也是消息积压最常用的解决方案,但有些企业考虑到服务器成本压力,会选择第(2)种方案进行迂回,先通过一个独立服务把要消费的消息存起来,比如存到数据库,之后再慢慢处理这些消息即可。

使用心得

这里单独讲一下本人在工作中使用RabbitMQ的一些心得,希望能有所帮助。

1)、消息丢失、消息重复、消息积压三个问题中,实际上主要解决的还是消息丢失,因为大部分公司遇不到消息积压的场景,而稍微有水准的公司核心业务都会解决幂等问题,所以几乎不存在消息重复的可能;

2)、消息丢失的最常见企业级方案之一就是定时任务补偿,因为不论是SOA还是微服务的架构,必然会有分布式任务调度的存在,自然也就成为MQ最直接的补偿方式,如果MQ一定要实现100%投递,这种是最普遍的方案。但我实际上不推荐中小企业使用该方案,因为凭空增加维护成本,而且没有一定规模的项目完全没必要,大家都小看了RabbitMQ本身的性能,比如我们公司,支撑一个三甲医院,也就是三台8核16G服务器的集群,上线至今3年毫无压力;

3)、不要迷信网上和培训机构讲解的生产者消息确认机制,也就是前面两个概念中讲到的ConfirmCallback和ReturnCallback,这种机制十分降低MQ性能,我们团队曾遇到过一次流量高峰期带来的MQ传输及消费性能大幅降低的情况,后来发现是消息确认机制导致,关闭后立马恢复正常,从此以后都不再使用这种机制,MQ运行十分顺畅。同时我们会建立后台管理实现人工补偿,通过识别业务状态判断消费方是否处理了业务逻辑,毕竟这种情况都是少数,性能和运维成本,在这一块我们选择了性能;

4)、我工作这些年使用RabbitMQ没见过自动签收方式,一定是开启手动签收;

5)、手动签收方式你在网上看到的教程几乎都是处理完业务逻辑之后再手动签收,但实际上这种用法是不科学的,在分布式的架构中,MQ用来解耦和转发是非常常见的,如果是支付业务,往往在回调通知中通过MQ转发到其他服务,其他服务如果业务处理不成功,那么手动签收也不执行,这个消息又会入队发给其他消费者,这样就可能在流量洪峰阶段因为偶然的业务处理失败造成堵塞,甚至标题所讲的三种问题同时出现,这样就会得不偿失。

不科学的用法:在处理完业务逻辑后再手动签收,否则不签收,就好比客人进店了你得买东西,否则不让走。

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    // 处理业务
    doBusiness(order);
    
    // 手动签收
    channel.basicAck(tag, false);
    
}
复制代码

科学的用法:不论业务逻辑是否处理成功,最终都要将消息手动签收,MQ的使命不是保证客人进店了必须消费,不消费就不让走,而是客人能进来就行,哪怕是随便看看也算任务完成。

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    try {
        // 处理业务
        doBusiness(order);
    } catch(Exception ex) {
        // 记录日志,通过后台管理或其他方式人工处理失败的业务。
    } finally {
        // 手动签收
        channel.basicAck(tag, false);
    }
    
}
复制代码

可能有人会问你这样不是和自动签收没区别吗,NO,你要知道如果自动签收,出现消息丢失你连记录日志的可能都没有。

另外,为什么一定要这么做,因为MQ是中间件,本身就是辅助工具,就是一个滴滴司机,保证给你送到顺便说个再见就行,没必要还下车给你搬东西。

如果强加给MQ过多压力,只会造成本身业务的畸形。我们使用MQ的目的就是解耦和转发,不再做多余的事情,保证MQ本身是流畅的、职责单一的即可。

总结

本篇主要讲了RabbitMQ的三种常见问题及解决方案,同时分享了一些作者本人工作中使用的心得,我想网上是很难找到的,如果哪一天用到了,不妨再打开看看,也许能避免一些生产环境可能出现的问题。

我总结下来就是三点:

1)、消息100%投递会增加运维成本,中小企业视情况使用,非必要不使用;

2)、消息确认机制影响性能,非必要不使用;

3)、消费者先保证消息能签收,业务处理失败可以人工补偿。

工作中怕的永远不是一个技术不会使用,而是遇到问题不知道有什么解决思路。文章来源地址https://www.toymoban.com/news/detail-765618.html

到了这里,关于带你了解RabbitMQ:消息丢失、重复、积压的原因及其解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka消息积压的原因和处理的方法

            Kafka作为目前主流的消息中间件,被广泛的应用在了生产环境中。消息积压是日常生产经常遇到的问题,下面我们来展开了说一下。 上游数据激增(生产侧原因):由于业务系统,访问量徒增,如热点事件,热门活动等,导致了大量的数据涌入业务系统,有可能导致

    2024年02月11日
    浏览(31)
  • Rabbitmq消息积压问题如何解决?

    一、 增加处理能力         优化系统架构、增加服务器资源、采用负载均衡等手段,以提高系统的 处理能力和并发处理能力 。通过增加服务器数量或者优化代码,确保系统能够及时处理所有的消息。 二、 异步处理         将消息的处理过程设计为 异步执行 ,即接

    2024年02月12日
    浏览(33)
  • RabbitMQ清除积压消息/管理界面出现 Unacked 消息

    1.问题: rabbitmq的生产者端循环产生了多条消息给消费者,而消费者无法及时将消息处理掉,在消费端积压了多条消息(消费失败的时候,消息队列会一直重复的发送消息,导致程序死循环)需要清理项目因为错误而产生的积压消息队列。 2.原因: 消息接收方因退出企业,账

    2024年02月09日
    浏览(26)
  • Rabbitmq消息积压问题如何解决以及如何进行限流

    一、增加处理能力 优化系统架构、增加服务器资源、采用负载均衡等手段,以提高系统的处理能力和并发处理能力。通过增加服务器数量或者优化代码,确保系统能够及时处理所有的消息。 二、异步处理 将消息的处理过程设计为异步执行,即接收到消息立即返回响应,然后

    2024年02月11日
    浏览(26)
  • mq常见问题:消息丢失、消息重复消费、消息保证顺序

    mq常见问题:消息丢失、消息重复消费、消息保证顺序 消息丢失问题 拿rabbitmq举例来说,出现消息丢失的场景如下图 从图中可以看到一共有以下三种可能出现消息丢失的情况: 1 生产者丢消息 生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败 2MQ自身丢

    2024年02月09日
    浏览(44)
  • 防止消息丢失与消息重复——Kafka可靠性分析及优化实践

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 在上一章内容中,我们解析了Kafka在读写层面上的原理,介绍了很多Kafka在读出与写入时的

    2024年02月08日
    浏览(32)
  • RabbitMQ 消息丢失的场景,如何保证消息不丢失?

    第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。 1.针对生

    2024年02月11日
    浏览(38)
  • Kafka消息丢失:原因、解决方案和零丢失的配置

    在使用Apache Kafka作为分布式消息系统时,消息丢失是一种常见的问题。消息丢失可能会导致数据不一致或功能故障,因此对于许多应用程序来说是不可接受的。本文将介绍Kafka消息丢失的原因、解决方案以及如何配置Kafka以实现零丢失。 Kafka消息丢失可能由多种原因引起。下面

    2024年02月13日
    浏览(25)
  • RabbitMQ消息丢失的场景,MQ消息丢失解决方案

    第一种 : (生产者) 生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种 : (服务端) RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种 : (消费者) 消费端弄丢了数据。刚消费到,还没处理

    2024年02月08日
    浏览(35)
  • MQ消息丢失的可能原因与解决方案

    当我们使用消息队列(MQ)作为分布式系统中的核心组件时,消息丢失是一个常见的问题。消息丢失可能导致数据不一致或功能故障,因此对于许多应用程序来说是不可接受的。本文将介绍几种常见的MQ消息丢失的原因,并提供相应的解决方案。 生产者在发送消息时可能会遇

    2024年02月15日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包