RabbitMQ - 消息堆积问题的最佳解决方案?惰性队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ - 消息堆积问题的最佳解决方案?惰性队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、惰性队列

1.1、消息堆积问题

1.2、消息堆积问题的解决方法

从消费者的角度:

从队列的角度:

1.3、引入惰性队列

1.3.1、什么是惰性队列

1.3.2、惰性队列的使用

1.3.3、效果演示


一、惰性队列


1.1、消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息到达上限.

这就像是有一个蓄水池,一边往里注水,一边向外排水,但是注水速度比排水快,因此这个水池最终就会填满.

接着,最早收到的消息,就可能成为死信,默认情况下会把死信丢弃,丢弃了之后队列就有了空间,就可以新消息进入队列.

1.2、消息堆积问题的解决方法

从消费者的角度:

1. 增加更多的消费者,调高消费速度.

如果说现在只有单个消费者,处理速度是 100ms 一个消息,那么如果有三个消费者,理论上就处理速度就是 300ms 一个消息.

2.在消费者内开启一个线程池,每当一个消息过来了,就交给一个线程去处理,那么当前这个消费者内部处理消息的速度就加快了.

但是,这个方法有一个限制,就是如果消息特别多的情况下,你是不是就需要分配很多线程.  线程越多,对 cpu 来讲也是一种浪费,因为 cpu 需要在多个线程之间做上下文切换,所以这个方案比较适合消息处理业务比较耗时的情况,开多个线程,让 cpu 并行处理.

从队列的角度:

3. 扩大队列容积,提高堆积上限.

就像一个蓄水池,我把这个水池搞的像黄河那么大,你再往里进水,啥时候才能填满呢~

前两种方法我们已经是比较的熟悉的了,但是最后扩大队列容积,具体该如何实现呢?

惰性队列就可以用来解决这个问题~

1.3、引入惰性队列

1.3.1、什么是惰性队列

对于传统的队列来讲,如果没有开启消息持久化,所有接收到的消息都是放在内存里面的,目的就是为了加快消息投递的速度,这也是 RabbitMQ 的一个很优势——响应速度快.

但是他也带来了一个问题,RabbitMQ 设置了一个内存预警值(内存存储的上限,默认是 40%),如果在消息堆积的情况下,很容易就到达这个预警值,此时,RabbitMQ 就会处于一个暂停状态,会阻止生产者投递消息进来,然后把内存中的一部分消息清理出来,刷出到磁盘中,这动作也叫 “page out” .  进而导致 mq 的并发能力,忽高忽低,性能不稳定(每次page out 都会比较耗时,停顿一段时间).

而惰性队列,就是是专门用来处理消息堆积问题的~

他有以下三个特点:

1. 收到的消息会直接写入磁盘,而非内存,也因此很难触发 mq 的内存预警,几乎不会出现 page out 的情况.

2. 消费者消费惰性队列的消息,也需要先从磁盘中读取并加载到内存中.  实际上这也会使延迟稍微高一点,毕竟磁盘的性能和内存还是有很大差距的,但是也在可以接收到范围内.

3. 支持数百万的消息存储,这也是因为他是磁盘存储(空间非常大).

1.3.2、惰性队列的使用

设置一个队列为 惰性队列,有以下三种方式:

1. 在 RabbitMQ Managerment 中,我们只需要在声明队列的时候,指定 x-queue-mode 属性为 lazy 即可.

rabbitmq 消息积压怎么解决,RabbitMQ(从理论到根据源码实现),rabbitmq,分布式

2. 用 SpringAMQP 声明惰性队列,@Bean 方式如下:

@Configuration
public class LazyConfig {

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder
                .durable("lazy.queue")
                .lazy() //开启 x-queue-mode 属性为 lazy
                .build();
    }

}

3. 用 SpringAMQP 声明惰性队列,注解方式如下:

    @RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void listenLazyQueue(String msg) {
        log.info("消费者接收到了惰性队列的消息! msg=" + msg);
    }

1.3.3、效果演示

a)使用 @Bean 声明一个普通队列和一个惰性队列,来对比效果.

@Configuration
public class LazyConfig {

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder
                .durable("lazy.queue")
                .lazy() //开启 x-queue-mode 属性为 lazy
                .build();
    }

    @Bean
    public Queue normalQueue() {
        return QueueBuilder
                .durable("normal.queue")
                .build();
    }

}

b)使用两个生产者,分别向两个队列中发送 100000 消息.

    @Test
    public void testLazyMessage() {
        for(int i = 0; i < 1000000; i++) {
            //1.准备消息
            Message message = MessageBuilder.withBody("hello lazy message".getBytes())
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            //2.发送消息
            rabbitTemplate.convertAndSend("lazy.queue", message);
        }
    }

    @Test
    public void testNormalMessage() {
        for(int i = 0; i < 1000000; i++) {
            //1.准备消息
            Message message = MessageBuilder.withBody("hello normal message".getBytes())
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            //2.发送消息
            rabbitTemplate.convertAndSend("normal.queue", message);
        }
    }

普通队列情况如下:

普通队列每次往 memory 中写入一定量的数据后,暂停一段时间,向磁盘写数据(page out).

rabbitmq 消息积压怎么解决,RabbitMQ(从理论到根据源码实现),rabbitmq,分布式

lazy 队列情况如下:

lazy 一直都往磁盘上写数据,所以 total 等于 paged out.

rabbitmq 消息积压怎么解决,RabbitMQ(从理论到根据源码实现),rabbitmq,分布式

rabbitmq 消息积压怎么解决,RabbitMQ(从理论到根据源码实现),rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-753252.html

到了这里,关于RabbitMQ - 消息堆积问题的最佳解决方案?惰性队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ消息堆积方案处理

    在消息队列中,消息堆积是生产环境中的需要考虑的问题,一旦消息产生积压,来不及消费,可能会导致MQ服务器宕机,而解决消息积压有这样一些方案解决: 1.增加消费者数量 可以根据业务情况适当添加多台服务器部署消费者服务实例,消费者数量增加,可以有效提高消息

    2024年02月11日
    浏览(55)
  • RabbitMQ常见问题之消息堆积

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最 早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。 解决消息堆积有三种种思路: 增加 更多消费者 ,提高消费速度 在消费者内开启 线程

    2024年01月18日
    浏览(42)
  • RabbitMq消息堆积问题及惰性队列

    当生产者发送消息的速度超过了消费者处理的速度,就会导致队列的消息堆积,知道队列存储消息达到上限。最早接受的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。 1.增加更多的消费者,提高消费者速度 2.在消费则内开启线程池加快消息处理速度 3.使用惰性

    2024年02月16日
    浏览(45)
  • RabbitMq消息丢失原因及其解决方案

    我们首先了解下一条消息从生产到消费的整个流程如下: 生产--MQ Broker -- 消费。所以这三个环节都有丢失消息的可能。 1.1、生产者丢失消息 生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。 1.使用事务(性能差) ​ RabbitMQ 客户端中与事务机

    2024年02月08日
    浏览(44)
  • RabbitMQ消息丢失的场景,MQ消息丢失解决方案

    第一种 : (生产者) 生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种 : (服务端) RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种 : (消费者) 消费端弄丢了数据。刚消费到,还没处理

    2024年02月08日
    浏览(50)
  • 带你了解RabbitMQ:消息丢失、重复、积压的原因及其解决方案

    前言 首先说一点,企业中最常用的实际上既不是RocketMQ,也不是Kafka,而是RabbitMQ。 RocketMQ很强大,但主要是阿里推广自己的云产品而开源出来的一款消息队列,其实中小企业用RocketMQ的没有想象中那么多。 深层次的原因在于兔宝在中小企业普及更早,经受的考验也更久,很容

    2024年02月04日
    浏览(36)
  • 如何解决RocketMQ消息堆积的问题

    MQ消息堆积是指生产者发送的消息短时间内在Broker端大量堆积,无法被消费者及时消费,从而导致业务功能无法正常使用。 (1)新上线的消费者功能有bug,消息无法被消费 (2)消费者实例宕机或者因为网络问题暂时无法与Broker建立连接。 (3)生产者短时间内大量推送消息

    2023年04月26日
    浏览(44)
  • RabbitMQ常见问题及其解决方案

    目录 RabbitMQ如何保证顺序消费 RabbitMQ消息丢失及其解决方案 RabbitMQ如何保证顺序消费 RabbitMQ重复消费及其解决方案 RabbitMQ如何保证不重复消费 RabbitMQ消息积压及其解决方案 RabbitMQ如何实现分布式事务以及保障消息最终一致性 在 RabbitMQ 中实现顺序消费可以考虑以下方法: 单一

    2024年02月09日
    浏览(44)
  • RabbitMQ常见问题及解决方案

    目录 一、消息丢失 1、生产者重连 2、生产者确认 3、数据持久化 4、惰性队列 5、消费失败处理 二、消息重复 1、通过业务保证幂等性(优先) 2、通过消息状态去重保证幂等性 三、消息堆积 1、优化消费者处理逻辑 2、增加队列及消费者数量 3、使用惰性队列 四、保证消息顺

    2024年02月03日
    浏览(49)
  • RabbitMQ详解与常见问题解决方案

    RabbitMQ 是一个开源的消息中间件,使用 Erlang 语言开发。这种语言天生非常适合分布式场景,RabbitMQ 也就非常适用于在分布式应用程序之间传递消息。RabbitMQ 有非常多显著的特点: 消息传递模式 :RabbitMQ 支持多种消息传递模式,包括发布/订阅、点对点和工作队列等,使其更

    2024年03月15日
    浏览(78)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包