13.RocketMQ之消息的存储与发送

这篇具有很好参考价值的文章主要介绍了13.RocketMQ之消息的存储与发送。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 消息存储

1.1 消息存储

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

13.RocketMQ之消息的存储与发送

  1. 消息生成者发送消息
  2. Broker收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回ACK给生产者
  4. Broker消息给对应的消费者,然后等待消费者返回ACK
  5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
  6. MQ删除消息

1.1.1 存储介质

  • 关系型数据库DB

Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障

  • 文件系统

    目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。

    消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。

    除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

1.1.2 性能对比

文件系统>关系型数据库DB

1.1.3 消息的存储和发送

1.1.3.1消息存储:顺序写

磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

1.1.3.2消息发送

Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复 制到运行程序用户态内存;
  3. 然后从运行程序用户态 内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

13.RocketMQ之消息的存储与发送

通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现.

13.RocketMQ之消息的存储与发送

RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5\~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了

1.1.4 消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。

每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

13.RocketMQ之消息的存储与发送

  • CommitLog:存储消息的元数据
  • ConsumerQueue:消息在CommitLog的索引。 记录了minOffset和maxOffset和已经消费了的消息的偏移量consumerOffset。可以通过offset去CommitLog查找消息。
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程。
  • ConsumerQueue和messageQueue是一 一对应的。
1.1.5 刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。

RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。

消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式:同步刷盘和异步刷盘。

13.RocketMQ之消息的存储与发送

1.1.5.1同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

1.1.5.2异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

1.1.5.3刷盘配置

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。文章来源地址https://www.toymoban.com/news/detail-504210.html

到了这里,关于13.RocketMQ之消息的存储与发送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(50)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)
  • 基于RocketMQ实现分布式事务

    在一个微服务架构的项目中,一个业务操作可能涉及到多个服务,这些服务往往是独立部署,构成一个个独立的系统。这种分布式的系统架构往往面临着分布式事务的问题。为了保证系统数据的一致性,我们需要确保这些服务中的操作要么全部成功,要么全部失败。通过使用

    2024年03月12日
    浏览(68)
  • 分布式事务,zookeeper,dubbo,rocketmq

    CAP理论是分布式领域中非常重要的一个指导理论,C(Consistency)表示强一致性,A(Availability)表示可用性,P(Partition Tolerance)表示分区容错性,CAP理论指出在目前的硬件条件下,一个分布式系统是必须要保证分区容错性的,而在这个前提下,分布式系统要么保证CP,要么保

    2024年04月12日
    浏览(45)
  • RocketMQ分布式事务 -> 最终一致性实现

    · 分布式事务的问题常在业务与面试中被提及, 近日摸鱼看到这篇文章, 阐述的非常通俗易懂, 固持久化下来我博客中, 也以便于我二刷 转载源 : 基于RocketMQ分布式事务 - 完整示例 本文代码不只是简单的demo,考虑到一些异常情况、幂等性消费和死信队列等情况,尽量向可靠业务

    2024年02月15日
    浏览(54)
  • SpringCloudAlibaba集成RocketMQ实现分布式事务事例(一)

    业务需求 用户请求订单微服务 order-service 接口删除订单(退货),删除订单时需要调用 account-service的方法给账户增加余额,一个典型的分布式事务问题。 代码实现 事务消息有三种状态: TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息 TransactionStatus.Roll

    2024年02月13日
    浏览(104)
  • 【分布式】分布式存储架构

    说到分布式存储,我们先来看一下传统的存储是怎么个样子。 传统的存储也称为集中式存储, 从概念上可以看出来是具有集中性的,也就是整个存储是集中在一个系统中的,但集中式存储并不是一个单独的设备,是集中在一套系统当中的多个设备,比如下图中的 EMC 存储就需

    2024年02月10日
    浏览(49)
  • 【分布式技术】分布式存储ceph之RBD块存储部署

    目录 创建 Ceph 块存储系统 RBD 接口 服务端操作 1、创建一个名为 rbd-demo 的专门用于 RBD 的存储池 2、将存储池转换为 RBD 模式 3、初始化存储池 4、创建镜像 5、在管理节点创建并授权一个用户可访问指定的 RBD 存储池 6、修改RBD镜像特性,CentOS7默认情况下只支持layering和stripin

    2024年01月18日
    浏览(72)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包