深入浅出RabbitMQ:顺序消费、死信队列和延时队列

这篇具有很好参考价值的文章主要介绍了深入浅出RabbitMQ:顺序消费、死信队列和延时队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。

1. 引言

上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常用的几种消息中间件,优劣对比和使用场景。

在今天的文章中,我们来聊一聊 RabbitMQ,这是小 ❤ 在工作中用的最早的消息中间件,主要用于大量数据的异步消费。

2. RabbitMQ

2.1 核心组件

RabbitMQ 是一个开源的消息中间件,它实现了高级消息队列协议(AMQP),同时提供了各种重要组件来支持消息的生产、传输和消费。

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

  1. Producer(生产者): 生产者是消息的发送方,负责将消息发布到 RabbitMQ 服务器。消息可以包含任何内容,例如任务、日志、通知等。

  2. Channel(信道):消息推送与接收时使用的通道。

  3. Exchange(交换机): 交换机是消息的中转站,它接收来自生产者的消息并将其路由到一个或多个队列。不同类型的交换机,如 fanout,direct,topic,headers,支持不同的路由规则。

  4. Queue(队列): 队列是消息的缓冲区,消息在发送到消费者之前存储在队列中,消费者从队列中获取消息并进行处理。

  5. Consumer(消费者): 消费者是消息的接收方,它从队列中获取消息并进行处理。消费者可以是多个,它们可以在不同的应用程序或服务器上运行。

2.2 工作流程

RabbitMQ 的工作方式是基于生产者、交换机和队列之间的协作。这是一个简单的消息传递过程:

  1. 将队列与交换机绑定(Binding)起来,定义了消息的路由规则;

  2. 生产者将消息发布到交换机,交换机根据绑定规则将消息路由到一个或多个队列;

  3. 消费者从队列中获取消息并进行处理。

这种模型具有高度的灵活性,可以轻松处理大量消息,同时确保消息的可靠传递。

2.3 特性

说到消息中间件,很多人首先想到的就是 Kafka,但 RabbitMQ 也是许多金融或互联网公司构建可靠、可伸缩和高性能系统的首选。

这是为什么呢?

主要得从 RabbitMQ 的特性说起,主要有二:一个是功能强大,另一个是可靠性!

RabbitMQ 注重消息的可靠性和灵活性,适合任务排队和消息传递。而 Kafka 是分布式流式平台,注重日志存储和数据分发。

顺序消费也是可靠性的一种,RabbitMQ 可以使用单一队列或多个单一队列来确保顺序消费。

除此之外,RabbitMQ 还提供持久性队列和消息,以确保消息在 RabbitMQ 服务器宕机后不会丢失。另外,生产者可以使用发布确认机制来确认消息是否被接收。

RabbitMQ 相对 kafka 可靠性更好,数据更不易丢失,这对于一些数据敏感型的业务来说,显然更适合用前者。

并且,RabbitMQ 中原生支持死信队列,可以更好地处理未完成的业务消息,以及实现延时队列等特性,接下来我们一一介绍。

3. 保证顺序消费

RabbitMQ 提供了多个队列模型来保证消息的顺序消费。这对于某些应用程序非常重要,例如处理订单、支付和库存管理。

消息错乱消费的场景

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

如上图所示,有三条业务消息分别是删除、增加和修改操作,但是 Consumer 没有按顺序消费,最终存储的顺序是增加、修改和删除,就会发生数据错乱。

针对消息有序性的问题,RabbitMQ 的解决方法是分三个阶段来保证。

1、发送消息:入队列

消息发送时,需要业务来保证顺序性,就是保证生产者入队的顺序是有序的。

在分布式的场景下如果难以保证各个服务器的入队顺序,则可以加分布式锁的方式来解决。或者在业务生产方的消息里带上消息递增 ID,以及消息产生的时间戳。

2、队列中的消息

在 RabbitMQ 的消息会保存在队列(Queue)中,在同一个队列里的消息是先进先出(FIFO)的,这个由 RabbitMQ 来帮我们保证顺序

而不同队列中的消息,RabbitMQ 无法保证其顺序性,就像我们在食堂打饭一样,站在不同的排队队列,我们也无法保证会比其他队列的人先打上饭。

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

3、消费消息:出队

一般来说,出队后的顺序消费交给消费者去保证。我们说的保证消费顺序,通常也是指消费者消费消息的顺序。

有多个消费者的情况下,通常是无法保证消息顺序的。

这就相当于我们在排队打饭时,有多个打饭阿姨,但是每个阿姨打饭的速度不一致,对应我们消费者的消费能力也不同。

所以,为了保证消息的顺序性,我们可以只使用一个消费者来接收业务消息。

就好比只有一个阿姨在打饭,来得早就一定能早点打上饭。但很明显,这样效率不是很高,所以在使用时我们需要权衡利弊:看业务更需要顺序性,还是更需要消费效率

优先级队列

在保证顺序消费时,另一个迂回策略是可以使用优先级队列(Priority Queue)。

在 RabbitMQ3.5 之后,当消费者数量较少,如果服务器检测到消费者不能及时消费消息的情况下,优先级队列就会生效。

具体有两种优先级策略:

  1. 设置队列的优先级

  2. 设置消息的优先级

在声明队列时,我们可以通过 x-max-priority 属性来设置队列的最大优先级,或通过 Priority 属性来设置消息的优先级,从 1~10。

Golang 实现代码如下:

// 队列属性
props := make(map[string]interface{})
// 设置队列最大优先级
props["x-max-priority"] = 10

ch.Publish(
   "tizi365",     // 交换机
   "", // 路由参数
   false,
   false,
   amqp.Publishing{
       Priority:5, // 设置消息优先级
       DeliveryMode:2,  // 消息投递模式,1代表非持久化,2代表持久化,
       ContentType: "text/plain",
       Body:       []byte(body),
  })

当优先级队列消费生效时,会首先消费高优先级队列中的优先级高的消息,以此来实现顺序消费

但需要注意的是,优先级队列触发的条件比较苛刻,在需要严格保证业务消息顺序的情况下最好不要使用!

4. 死信队列

RabbitMQ 里,当消息在队列中变成死信(消费者无法正常处理的消息)之后,它会被重新投递到一个交换机上(即死信交换机),死信交换机上绑定的消费队列就是死信队列

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

死信的产生

死信产生需要满足如下条件:

  1. 消息被消费者手动拒绝接收,并且 requeue(重新加入队列)策略为 False;

  2. 消息已经过期(TTL);

  3. 队列达到最大长度,消息装不下了。

死信的处理步骤

当死信产生时,如果我们定义了一个死信交换机(其实就是一个普通的交换机,只是用于处理死信,所以叫死信交换机),然后在死信交换机上绑定了一个队列(称作死信队列)。

最后,如果死信队列有消费者监听时,死信消息的处理就会和正常业务消息一样,从交换机到队列,再由死信消费者(监听死信队列的消费者)正常消费。

5. 延时队列

RabbitMQ 本身不支持延时队列,但是我们可以通过 RabbitMQ 的插件 rabbitmq-delayed-message-exchange,或者使用 死信队列 + 消息过期 的方式来实现。

5.1 应用场景

当我们在电商里购物,或者在 12306 买票时,大概都会遇到这样一个场景:每次下订单后,到支付订单中间有一段商品锁定时间,超过时间后未支付订单就会关闭

状态转换图如下:

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

5.2 插件实现

1.安装插件

Github 地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

从 github 的 release 页面的 assets, 下载 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 文件,把文件放到 rabbitmq 插件目录(plugins目录)

提示:版本号可能跟本教程不一样,如果你的 rabbitmq 就是最新版本,插件也选择最新版本就行。

2.激活插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.定义交换机

通过 x-delayed-type 设置自定义交换机属性,支持发送延迟消息:

   props := make(map[string]interface{})
   //关键参数,支持发送延迟消息
   props["x-delayed-type"] = "direct"

   // 声明交换机
   err = ch.ExchangeDeclare(
       "delay.queue",   // 交换机名字
       "fanout", // 交换机类型
       true,     // 是否持久化
       false,    
       false,
       false,
       props,      // 设置属性
  )
4.发送延迟消息

通过消息头(x-delay),设置消息延迟时间。

       msgHeaders := make(map[string]interface{})
       // 通过消息头,设置消息延迟时间,单位毫秒
       msgHeaders["x-delay"] = 6000

       err = ch.Publish(
           "delay.queue",     // 交换机名字
           "", // 路由参数
           false,
           false,
           amqp.Publishing{
               Headers:msgHeaders, // 设置消息头
               ContentType: "text/plain",
               Body:       []byte(body),
          })

5.3 死信队列 + 消息过期方案

该方案的核心思想是,先创建死信交换机、队列和消费者,来监听死信消息。

然后创建定时过期的消息,比如订单支付的时间为 30min,则将消息的 TTL(最大存活时间)设置为 30min,将消息放到一个没有消费者消费的队列中,当消息过期后就会成为死信。

死信消息被重新发送到死信交换机,然后我们在死信队列中消费该消息,根据商品 ID 判断该商品是否被支付。

如果没有支付,就取消订单,修改订单状态为待下单。如果已经支付,就将商品状态修改为已完成,并丢掉这条死信消息。

5. 小结

RabbitMQ 是一个功能强大的消息中间件,它在许多互联网应用中扮演了关键角色,比如华为摄像机 SDK 的监控图像数据上报,大部分电商系统的异步消费等等。

希望今天的文章能帮助你更深入地了解 RabbitMQ,并在工作中运用它来构建可靠的消息传递系统,下一篇文章小❤将带来 Kafka 的核心工作流程、底层原理及常见面试题,敬请期待!

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

如果觉得文章有所启发或收获,不妨点赞、分享,加入在看,这对我是最大的鼓励!ღ( ´・ᴗ・` )比心

深入浅出RabbitMQ:顺序消费、死信队列和延时队列,rabbitmq,分布式

xin猿意码

山回路转不见君,雪上空留码行处

如果你有任何问题或想了解更多,也随时在评论区提问,谢谢你的阅读!

我是小❤,我们下期再见。文章来源地址https://www.toymoban.com/news/detail-771273.html

到了这里,关于深入浅出RabbitMQ:顺序消费、死信队列和延时队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入浅出带你玩转栈与队列——【数据结构】

    W...Y的主页 😊 代码仓库分享 💕 目录 1.栈 1.1栈的概念及结构 1.2栈的结构特征图  ​编辑 1.3栈的实现 1.3.1栈的初始化 1.3.2进栈 1.3.3出栈 1.3.4销毁内存 1.3.5判断栈是否为空 1.3.5栈底元素的读取 1.3.6栈中大小 1.4栈实现所有接口 2.队列 2.1队列的概念 2.2队列的结构   2.3队列的实

    2024年02月11日
    浏览(62)
  • 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

    使用系统控制读取操作的DefaultMQPushConsumer可以自动调用传入的处理方法来处理收到的消息。通过设置各种参数和传入处理消息的函数,使用DefaultMQPushConsumer的主要目的是方便配置和处理消息。在收到消息后,系统会自动保存Offset,并且如果加入了新的DefaultMQPushConsumer,系统会

    2024年02月11日
    浏览(45)
  • RabbitMQ的消费者处理消息失败后可以重试,重试4次仍然失败发送到死信队列。

    生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,

    2024年02月07日
    浏览(58)
  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(54)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(79)
  • 【深入浅出RocketMQ原理及实战】「消息队列架构分析」帮你梳理RocketMQ或Kafka的选择理由以及二者PK

    前提背景 大家都知道,市面上有许多开源的MQ,例如,RocketMQ、Kafka、RabbitMQ等等,现在Pulsar也开始发光,今天我们谈谈笔者最常用的RocketMQ和Kafka,想必大家早就知道二者之间的特点以及区别,但是在实际场景中,二者的选取有可能会范迷惑,那么今天笔者就带领大家分析一下

    2024年02月19日
    浏览(57)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(58)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(48)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(47)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包