【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?

这篇具有很好参考价值的文章主要介绍了【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

* entire channel rather than each consumer

* @throws java.io.IOException if an error is encountered

*/

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

  • prefetchSize:0,单条消息大小限制,0代表不限制

  • prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。

  • global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。

  • 注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

3、如何对消费端进行限流

  • 首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);

  • 第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);

  • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

这是生产端代码,与前几章的生产端代码没有做任何改变,主要的操作集中在消费端。公众号Java精选,回复Java面试,获取消息队列面试题,支持在线刷题。

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class QosProducer {

public static void main(String[] args) throws Exception {

//1. 创建一个 ConnectionFactory 并进行设置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

factory.setVirtualHost(“/”);

factory.setUsername(“guest”);

factory.setPassword(“guest”);

//2. 通过连接工厂来创建连接

Connection connection = factory.newConnection();

//3. 通过 Connection 来创建 Channel

Channel channel = connection.createChannel();

//4. 声明

String exchangeName = “test_qos_exchange”;

String routingKey = “item.add”;

//5. 发送

String msg = “this is qos msg”;

for (int i = 0; i < 10; i++) {

String tem = msg + " : " + i;

channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());

System.out.println("Send message : " + tem);

}

//6. 关闭连接

channel.close();

connection.close();

}

}

这里我们创建一个消费者,通过以下代码来验证限流效果以及 global 参数设置为 true 时不起作用.。我们通过Thread.sleep(5000); 来让 ack 即处理消息的过程慢一些,这样我们就可以从后台管理工具中清晰观察到限流情况。

import com.rabbitmq.client.*;

import java.io.IOException;

public class QosConsumer {

public static void main(String[] args) throws Exception {

//1. 创建一个 ConnectionFactory 并进行设置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

factory.setVirtualHost(“/”);

factory.setUsername(“guest”);

factory.setPassword(“guest”);

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

//2. 通过连接工厂来创建连接

Connection connection = factory.newConnection();

//3. 通过 Connection 来创建 Channel

final Channel channel = connection.createChannel();

//4. 声明

String exchangeName = “test_qos_exchange”;

String queueName = “test_qos_queue”;

String routingKey = “item.#”;

channel.exchangeDeclare(exchangeName, “topic”, true, false, null);

channel.queueDeclare(queueName, true, false, false, null);

channel.basicQos(0, 3, false);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

//5. 创建消费者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

String message = new String(body, “UTF-8”);

System.out.println(“[x] Received '” + message + “'”);

channel.basicAck(envelope.getDeliveryTag(), true);

}

};

//6. 设置 Channel 消费者绑定队列

channel.basicConsume(queueName, false, consumer);

channel.basicConsume(queueName, false, consumer1);

}

}

我们从下图中发现 Unacked值一直都是 3 ,每过 5 秒 消费一条消息即 Ready 和 Total 都减少 3,而 Unacked的值在这里代表消费者正在处理的消息,通过我们的实验发现了消费者一次性最多处理 3 条消息,达到了消费者限流的预期功能。

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

当我们将void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 设置为 true的时候我们发现并没有了限流的作用。

TTL


TTL是Time To Live的缩写,也就是生存时间。RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。公众号Java精选,回复Java面试,获取消息队列面试题,支持在线刷题。

RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.

RabbitMQ允许您为消息和队列设置TTL(生存时间)。这可以使用可选的队列参数或策略来完成(建议使用后一个选项)。可以对单个队列,一组队列强制执行消息TTL,也可以为单个消息应用消息TTL。

——摘自 RabbitMQ 官方文档

1、消息的 TTL

我们在生产端发送消息的时候可以在 properties 中指定 expiration属性来对消息过期时间进行设置,单位为毫秒(ms)。

/**

* deliverMode 设置为 2 的时候代表持久化消息

* expiration 意思是设置消息的有效期,超过10秒没有被消费者接收后会被自动删除

* headers 自定义的一些属性

* */

//5. 发送

Map<String, Object> headers = new HashMap<String, Object>();

headers.put(“myhead1”, “111”);

headers.put(“myhead2”, “222”);

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

.deliveryMode(2)

.contentEncoding(“UTF-8”)

.expiration(“100000”)

.headers(headers)

.build();

String msg = “test message”;

channel.basicPublish(“”, queueName, properties, msg.getBytes());

我们也可以后台管理页面中进入 Exchange 发送消息指定expiration

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

2、队列的 TTL

我们也可以在后台管理界面中新增一个 queue,创建时可以设置 ttl,对于队列中超过该时间的消息将会被移除。

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

死信队列


死信队列:没有被及时消费的消息存放的队列

消息没有被及时消费的原因:

  • 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

  • TTL(time-to-live) 消息超时未消费

  • 达到最大队列长度

实现死信队列步骤

  • 首先需要设置死信队列的 exchange 和 queue,然后进行绑定:

`Exchange: dlx.exchange

Queue: dlx.queue

RoutingKey: # 代表接收所有路由 key

  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在普通队列加上一个参数即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

最后

作为过来人,小编是整理了很多进阶架构视频资料、面试文档以及PDF的学习资料,针对上面一套系统大纲小编也有对应的相关进阶架构视频资料

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式
【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)**

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?,程序员,rabbitmq,分布式

最后

作为过来人,小编是整理了很多进阶架构视频资料、面试文档以及PDF的学习资料,针对上面一套系统大纲小编也有对应的相关进阶架构视频资料

[外链图片转存中…(img-hSn3OvRu-1712701258173)]
[外链图片转存中…(img-i6JiYKoI-1712701258173)]

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!文章来源地址https://www.toymoban.com/news/detail-859501.html

到了这里,关于【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(43)
  • RabbitMQ中的限流、return机制、死信队列

    目录 优点 缺点 1、限流 2、return机制 3、死信队列 高可用性: RabbitMQ支持集群和镜像队列等多种方式实现高可用性,保证系统稳定运行。 可靠性强: RabbitMQ使用AMQP协议作为消息传递的标准,能够确保消息传递的可靠性和有序性。 灵活性强: RabbitMQ支持多种消息模式,包括点

    2024年02月08日
    浏览(34)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(79)
  • 深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常

    2024年02月03日
    浏览(38)
  • 浅谈RabbitMQ消费端ACK和限流

    消费者 ACK 和 消费端限流 ack指  Acknowledge ,确认。 表示消费端收到消息后的确认方式。 有三种确认方式: • 自动确认:acknowledge=\\\" none \\\" • 手动确认:acknowledge=\\\" manual \\\" • 根据异常情况确认:acknowledge=\\\" auto \\\",(这种方式使用麻烦,不作讲解) 其中自动确认是指,当消息一

    2024年02月19日
    浏览(33)
  • RabbitMQ的消费者处理消息失败后可以重试,重试4次仍然失败发送到死信队列。

    生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,

    2024年02月07日
    浏览(58)
  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(54)
  • 如何保证消息的可靠性+延迟队列(TTL+死信队列+延迟队列)

    目录 1.如何保证消息的可靠性 1.1.消息的可靠投递 confirm机制 return机制 1.2.如何保证消息在队列中不丢失 1.3.确保消息能可靠的被消费掉 2.延迟队列 2.1.TTL 2.2.死信队列 2.3.延迟队列 3.如何防止消费者重复消费消息 在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重

    2024年02月15日
    浏览(60)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(72)
  • 详细的说说mfc110u.dll丢失的解决方法分享,四种解决办法的详细步骤

    在电脑运行过程中,有时会遇到各种各样的错误提示,比如“由于找不到mfc110u.dll,无法继续执行代码”,这不仅令人困扰,也影响了我们的工作和娱乐体验。如果你也在为mfc110u.dll缺失问题感到苦恼,那么你来对地方了!今天我们就来给大家详细的说说mfc110u.dll丢失的解决方

    2024年01月20日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包