RabbitMQ Streams 详解

这篇具有很好参考价值的文章主要介绍了RabbitMQ Streams 详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ Streams是一种持久复制数据结构,可以完成与队列相同的任务:它们缓冲来自生产者的消息,这些消息由消费者读取。然而,流与队列的区别在于两个重要方面:消息的存储和消费方式。
Streams为仅追加的消息日志建模,这些消息可以重复读取,直到过期。流始终是持久的和复制的。这种流行为的更技术性的描述是“非破坏性消费者语义”。
要从RabbitMQ中的流中读取消息,一个或多个使用者订阅它,并根据需要多次读取相同的消息。
流中的数据可以通过RabbitMQ客户端库或通过专用的二进制协议插件和关联的客户端使用。强烈建议使用后一个选项,因为它提供对所有流特定功能的访问,并提供尽可能最好的吞吐量(性能)。
现在,您可能会问以下问题:

  • 那么流会取代队列吗?
  • 我应该放弃使用队列吗?

为了回答这些问题,引入流不是为了取代队列,而是为了补充队列。Streams为新的RabbitMQ用例开辟了许多机会,这些用例在使用Streams的用例中进行了描述。
以下信息详细说明流的使用以及流的管理和维护操作。
您还应该查看流插件信息,以了解有关使用二进制RabbitMQ stream协议的流的更多信息,以及功能矩阵的流核心和流插件比较页面。 

一、使用流的用例

开发流最初是为了涵盖现有队列类型无法提供或具有缺点的4个消息传递用例:

1、大型扇形分叉

当想要将相同的消息传递给多个订阅者时,用户当前必须为每个消费者绑定一个专用队列。如果使用者数量很大,这可能会变得效率低下,特别是在需要持久性和/或复制时。流将允许任意数量的使用者以非破坏性的方式消费来自同一队列的相同消息,从而不需要绑定多个队列。流消费者还可以从副本中读取数据,从而允许读取负载在集群中分布。

2、重放(时间旅行)

由于所有当前的RabbitMQ队列类型都具有破坏性消费行为,即当消费者用完消息时,将从队列中删除消息,因此不可能重新读取已消费的消息。流将允许消费者在日志中的任何点连接并从那里读取。

3、吞吐量性能

没有任何持久队列类型能够提供可以与任何现有的基于日志的消息传递系统竞争的吞吐量。Streams的设计以性能为主要目标。

4、大量积压工作

大多数RabbitMQ队列被设计为收敛于空状态,并因此进行了优化,当给定队列上有数百万条消息时,性能可能会更差。流旨在以有效的方式存储大量数据,并将内存开销降至最低。

二、如何使用RabbitMQ Streams 

可以指定可选队列和使用者参数的AMQP 0.9.1客户端库将能够将流用作常规AMQP 09.1队列。
就像队列一样,必须首先声明流。

1、声明RabbitMQ Stream

要声明流,请将x-queue-type队列参数设置为stream(默认值为classic)。此参数必须由客户端在声明时提供;不能使用策略设置或更改它。这是因为策略定义或适用的策略可以动态更改,但队列类型不能更改。必须在声明时指定。
下面的片段显示了如何使用AMQP 0.9.1 Java客户端创建流:

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
  "my-stream",
  true,         // durable
  false, false, // not exclusive, not auto-delete
  Collections.singletonMap("x-queue-type", "stream")
);

使用设置为stream的x-queue-type参数声明队列将在每个配置的RabbitMQ节点上创建一个具有副本的流。流是仲裁系统,因此强烈建议使用不均匀的集群大小。
流仍然是AMQP 0.9.1队列,因此它可以在创建后绑定到任何交换,就像任何其他RabbitMQ队列一样。
如果使用管理UI声明,则必须使用队列类型下拉菜单指定流类型。
流支持3个额外的队列参数,这些参数最好使用策略配置:

  • x-max-length-bytes

 设置流的最大大小(以字节为单位)。默认值:未设置。 

  • x-max-age

设置流的最长期限。默认值:未设置。

  • x-stream-max-segment-size-bytes

单位:字节。流在磁盘上被划分为固定大小的段文件。 此设置控制它们的大小。 默认值:(500000000 字节)。

以下代码片段演示如何将流的最大大小设置为 20 GB,并使用 100 MB 的段文件:

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
arguments.put("x-max-length-bytes", 20_000_000_000); // maximum stream size: 20 GB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(
  "my-stream",
  true,         // durable
  false, false, // not exclusive, not auto-delete
  arguments
);

三、客户端操作

1、Consuming

由于流永远不会删除任何消息,因此任何消费者都可以开始读取/消费 从日志中的任何一点。这由 x-stream-offset consumer 参数控制。 如果未指定,消费者将从写入的下一个偏移量开始读取 添加到使用者启动后的日志中。支持以下值:

  • first - 从日志中的第一条可用消息开始
  • last - 从最后写入的消息“块”(块 是流中使用的储运单位,简单来说就是批次 由几到几千条消息组成的消息,具体取决于入口)
  • next - 与不指定任何偏移量相同
  • 偏移量 - 一个数值,指定要附加到日志的确切偏移量。 如果此偏移量不存在,它将分别钳制到日志的开始或结束。
  • 时间戳 - 一个时间戳值,指定要附加到日志的时间点。 它将限制到最接近的偏移量,如果时间戳超出流的范围,它将分别限制日志的开始或结束。 在 AMQP 0.9.1 中,使用的时间戳是精度为 00 秒的 POSIX 时间,即自 00-00-1970 01:01:<> UTC 以来的秒数。 请注意,使用者可以接收在指定时间戳之前发布的消息。
  • 间隔 - 一个字符串值,指定相对于当前时间附加日志的时间间隔。使用与 x-max-age 相同的规范

以下代码片段演示如何使用第一个偏移量规范:

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

以下代码片段演示如何指定要从中消费的特定偏移量:

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", 5000), // offset value
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

以下代码片段演示如何指定要从中使用的特定时间戳:

 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

2、其他流操作

以下操作的使用方式与经典队列和仲裁队列类似 但有些具有一些特定于队列的行为。

  • 声明
  • 队列删除
  • 发布者确认
  • 消费(订阅):消费需要 QoS 要设置的预取。acks 作为一种信用机制来推进当前 消费者的偏移量。
  • 为使用者设置 QoS 预取
  • 消费者确认(牢记 QoS 预取限制)
  • 取消消费者

四、流的单个活动消费者功能

流的单个活动使用者是 RabbitMQ 3.11 及更高版本中提供的一项功能。 它在流上提供独占消费和消费连续性。 当共享同一流和名称的多个使用者实例启用单个活动使用者时,这些实例中只有一个实例将同时处于活动状态,因此将接收消息。 其他实例将处于空闲状态。

单一活跃使用者功能提供 2 个好处:

  • 消息按顺序处理:一次只有一个使用者。
  • 保持消费连续性:如果活动消费者停止或崩溃,则该组中的消费者将接管。

五、超级流

超级流是一种通过将大型流划分为较小的流来横向扩展的方法。 它们与单个活动使用者集成,以保留分区内的消息顺序。 超级流从 RabbitMQ 3.11 开始可用。

超级流是由单个常规流组成的逻辑流。 这是一种使用 RabbitMQ 流横向扩展发布和使用的方法:将大型逻辑流划分为分区流,将存储和流量拆分到多个集群节点上。

超级流仍然是一个逻辑实体:由于客户端库的智能性,应用程序将其视为一个“大”流。 超级流的拓扑基于 AMQP 0.9.1 模型,即它们之间的交换、队列和绑定。

可以使用任何 AMQP 0.9.1 库或管理插件创建超级流的拓扑,它需要创建直接交换,即“分区”流,并将它们绑定在一起。 不过,使用 rabbitmq-streams add_super_stream 命令可能更容易。 以下是如何使用它来创建具有 3 个分区的发票超级流:

rabbitmq-streams add_super_stream invoices --partitions 3

使用 rabbitmq-streams add_super_stream --help 了解有关该命令的更多信息。

与单个流相比,超级流增加了复杂性,因此不应将其视为涉及流的所有用例的默认解决方案。 仅当您确定已达到单个流的限制时,才考虑使用超级流。

六、功能比较:常规队列与流

流不是传统意义上的真正队列,因此不是 与 AMQP 0.9.1 队列语义非常一致。其他队列类型的许多功能 不支持支持,并且永远不会由于队列类型的性质而支持。

可以使用常规队列的 AMQP 0.9.1 客户端库将能够使用流 只要它使用消费者确认。

由于流中许多功能是非破坏性的,因此它们永远不会被流支持 读取语义。

1、功能矩阵

特征

经典

非持久性队列

是的

排他性

是的

每条消息的持久性

每条消息

总是

成员资格变更

自动

手动

TTL的

是的

否(但请参阅保留期)

队列长度限制

是的

否(但请参阅保留期)

懒惰行为

是的

固有

消息优先级

是的

消费者至上

是的

死信交换

是的

遵守政策

是的

(请参阅保留期)

对内存警报做出反应

是的

否(使用最少的 RAM)

病毒邮件处理

全局 QoS 预取

是的

2、非持久性队列

根据其假定的用例,流始终是持久的, 它们不能像常规队列那样是非持久的。

3、排他性

根据其假定的用例,流始终是持久的,它们不能像常规队列那样是排他性的。 它们不应用作临时队列。

4、懒惰模式

写入消息后,流将所有数据直接存储在磁盘上 在读取之前,它不会使用任何内存。可以这么说,流本质上是懒惰的。

5、全球服务质量

流不支持全局 QoS 预取,其中通道设置单个 使用该通道的所有使用者的预取限制。如果尝试 从启用了全局 QoS 的通道中的流中使用 将返回通道错误。

使用每使用者 QoS 预取,这是几个常用客户端中的默认设置。文章来源地址https://www.toymoban.com/news/detail-764694.html

到了这里,关于RabbitMQ Streams 详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ——解决分布式事务问题,RabbitMQ的重要作用之一!!!通过可靠生产和可靠消费来完美解决!

    分布式事务是指涉及多个独立的计算机系统(也称为节点或参与者)之间的事务处理。在分布式系统中,每个节点可能各自拥有自己的数据存储和事务管理机制。分布式事务的目标是保证在跨多个节点执行的一系列操作可以以一致和可靠的方式执行和提交,即使在面对故障或

    2024年04月23日
    浏览(46)
  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(54)
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(40)
  • 分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ

    在现代分布式系统中,消息队列是一种常见的异步通信模式,它可以帮助系统处理高并发、高可用性以及容错等问题。在这篇文章中,我们将深入探讨三种流行的分布式消息队列:Apache Kafka、RabbitMQ和ActiveMQ。我们将讨论它们的核心概念、算法原理、特点以及使用场景。 随着

    2024年02月02日
    浏览(60)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(52)
  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(48)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(45)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(64)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(46)
  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包