rabbitmq之Consumer Prefetch(消费者预取)

这篇具有很好参考价值的文章主要介绍了rabbitmq之Consumer Prefetch(消费者预取)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

官方文档:
https://www.rabbitmq.com/consumer-prefetch.html
https://www.rabbitmq.com/confirms.html#channel-qos-prefetch

【问题】

测试”消息积压“场景:在消费者没有启动的情况下,生产者先生产很多消息。然后先开启一个a消费者,再开启b消费者,发现只有a消费者不断的消费旧的消息,而b消费者”无动于衷“。。。
后面再生成新消息,b消费者确实能帮忙消费一下新消息。也就是说,直到新消息产生后b队列它才开始消费。这是为什么?

这就涉及到Consumer Prefetch(消费者预取)概念。

对于大多数消费者来说,限制该窗口的大小以避免消费者端的缓冲区(堆)无限制增长问题是有意义的。
这可以通过使用 basic.qos 方法设置 "预取计数 "值来实现。该值定义了通道上允许的最大未确认交付次数。当数量达到配置的计数时,RabbitMQ 将停止在通道上交付更多消息,直到至少有一条未确认的消息被确认。
QoS 预取设置对使用 basic.get(“pull API”)获取的报文没有影响,即使在手动确认模式下也是如此。

#值为 0 表示 "无限制",允许任何数量的未确认信息:
channel.basicQos(0); // No limit for this consumer

#最多可同时接收 10 条未确认的信息:
channel.basicQos(10); // Per consumer limit 10
#QoS 设置可针对特定通道或特定消费者进行配置。
channel.basicQos(10, false); // Per consumer limit 通常我们都是写false,针对每个消费者
channel.basicQos(15, true);  // Per channel limit

有了上面这个知识补充后,继续我之前对”消息积压“测试。这次我在代码中加入:

$channel->basic_qos(null,5,false); //每个消费者最多可同时接收5条未确认的信息

然后先停止所有队列,再生产6个消息,此时积压消息数量为6。先启动a消费者,隔1秒再启动b消费者,每条消息处理时间为5秒。看到的结果是a消费者消费了5条消息,b消费者消费了1条信息。
显然,结果符合我们的预期。因为,a消费者一开始启动,就同时接收5条消息。因为每条消息处理时间为5秒。所以在a消费者第一条消息处理完成并确认之前,b消费者已启动,并接收了仅存的1条消息。

我用的扩展是php-amqplib,代码示例:

//提示: NOT_IMPLEMENTED - prefetch_size!=0 (2)(60, 10)
//$channel->basic_qos(2,2,false);
//正确用法:https://github.com/php-amqplib/php-amqplib/blob/master/demo/basic_qos.php
$channel->basic_qos(null,5,false);//每个消费者最多可同时接收 5 条未确认的信息

【消费者确认模式、预取和吞吐量】
确认模式和 QoS 预取值对消费者吞吐量有显著影响。一般来说,增加预取值会提高向用户发送信息的速度。自动确认模式可获得最佳传输速率。不过,在这两种情况下,已交付但尚未处理的信息数量也会增加,从而增加用户 RAM(内存)消耗。
应谨慎使用自动确认模式或无限制预取的手动确认模式。如果消费者在未确认的情况下消耗大量信息,将导致其所连接节点的内存消耗增长。寻找合适的预取值需要反复试验,不同的工作负载会有不同的预取值。
100 到 300 之间的值通常能提供最佳的吞吐量,而且不会给用户带来过大的压力。更高的值通常会遇到收益递减规律。
预取值为 1 是最保守的值。它会大大降低吞吐量,尤其是在消费者连接延迟较高的环境中。对于许多应用而言,更高的值是合适和最佳的。文章来源地址https://www.toymoban.com/news/detail-680106.html

到了这里,关于rabbitmq之Consumer Prefetch(消费者预取)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • [RocketMQ] Consumer消费者启动主要流程源码 (六)

    客户端常用的消费者类是DefaultMQPushConsumer, DefaultMQPushConsumer的构造器以及start方法的源码。 1.创建DefaultMQPushConsumer实例 最终都是调用下面四个参数的构造函数: 指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器, 创建了一个DefaultMQPushConsumerImpl实例

    2024年02月16日
    浏览(34)
  • kafka-consumer-groups.sh消费者组管理

      先调用 MetadataRequest 拿到所有在线Broker列表 再给每个Broker发送 ListGroupsRequest 请求获取 消费者组数据。 查看指定消费组详情 --group 查看所有消费组详情 --all-groups 查询消费者成员信息 --members 查询消费者状态信息 --state 删除指定消费组 --group 删除所有消费组 --all-groups 想要

    2024年02月03日
    浏览(30)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(37)
  • kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池

    网上搜索kafka消费者通过多线程进行顺序消费的内容都不太理想,或者太过复杂,所以自己写了几个demo,供大家参考指正。         单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。 1、如果1秒钟生产

    2024年02月15日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(33)
  • RabbitMQ多消费者实例时,保证只有一个消费者进行消费(单活消费者模式)

    有一种业务场景,当人员组织结构变更时,会有大量数据进行推送。这些数据类型有的是add,有的是update,并且必须先add,才能进行update。 这时,为了保证消费顺序,需要 只有一个实例进行按顺序消费,其他实例仅提供日常对外服务 ,不进行消息消费。当唯一消费实例无法

    2024年02月11日
    浏览(35)
  • RabbitMQ 消费者

      RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。   消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制   顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通

    2024年02月11日
    浏览(28)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(35)
  • RabbitMQ-消费者确认机制

    none:不做任何处理,消息投递到消费者了之后,立即返回ACK,并且从MQ将消息删除,非常不安全,不建议使用。 manual:手动模式,需要在业务中调用api,ack或者reject。 auto:自动模式,SpringAMQP利用AOP对我们的消息处理做了环绕增强,当业务正常执行时返回ACK,执行异常时,根

    2024年01月21日
    浏览(46)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包