【RabbitMQ】之持久化机制

这篇具有很好参考价值的文章主要介绍了【RabbitMQ】之持久化机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

  • 一、RabbitMQ 持久化机制
    • 1、RabbitMQ 持久化概述
    • 2、队列持久化
    • 3、消息持久化
    • 4、交换器持久化
  • 二、RabbitMQ 知识扩展
    • 1、内存告警与内存换页
    • 2、磁盘告警与配置
    • 3、数据写入磁盘时机
    • 4、磁盘消息格式
    • 5、磁盘文件删除机制

一、RabbitMQ 持久化机制


1、RabbitMQ 持久化概述

持久化,即将原本存在于内存中的数据写入到磁盘上永久保存数据,防止服务宕机时内存数据的丢失。

Rabbitmq 的持久化分为队列持久化消息持久化交换器持久化

对于消息来说,不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。持久化的消息会同时写入磁盘和内存(加快读取速度),非持久化消息会在内存不够用时,将消息写入磁盘(一般重启之后就没有了)。

2、队列持久化

队列的持久化是在定义队列时的通过 durable 参数来决定的,当 durable 为 true 时,才代表队列会持久化。例如:

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二个餐胡设置为true,代表队列持久化
channel.queueDeclare("queue.persistent.name", true, false, false, null);

关键的是第二个参数设置为 true,即 durable = true。Channel 类中 queueDeclare 的完整定义如下:

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
							Map<String, Object> arguments) throws IOException;

参数说明:

  • queue:queue 的名称
  • exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
    • 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
    • “首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
    • 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  • autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

总结:如果将 queue 的持久化标识 durable 设置为 true,则代表是一个持久的队列。当服务重启之后,队列仍然会存在,这是因为服务会把持久化的 queue 存放在硬盘上,当服务重启的时候,会重新加载这些被持久化的 queue。

3、消息持久化

队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前 queue 里面如果还有未发出去的消息的话,重启之后,消息是否还存在队列里面就要取决于在发送消息时对消息的设置。

消息持久化的实现需要在发送消息时设置消息的持久化标识,例如:

channel.basicPublish("exchange01", "routing_key01", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_message".getBytes());

方法原型是:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

这里关键的是 BasicProperties props 这个参数,它的定义如下:

public BasicProperties(
     String contentType,//消息类型如:text/plain
     String contentEncoding,//编码
     Map<String,Object> headers,
     Integer deliveryMode,//1:nonpersistent 2:persistent
     Integer priority,//优先级
     String correlationId,
     String replyTo,//反馈队列
     String expiration,//expiration到期时间
     String messageId,
     Date timestamp,
     String type,
     String userId,
     String appId,
     String clusterId
)

其中 deliveryMode=1 代表不持久化,deliveryMode=2 代表持久化。而代码实现中的 MessageProperties.PERSISTENT_PLAIN 值是官方提供的一个将 deliveryMode 设置为 2 的 BasicProperties 的对象:

public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties(
    	"text/plain",
        null,
        null,
        2,
        0, null, null, null,
        null, null, null, null,
        null, null
	);

除此之外,我们也可以使用另一种方式来设置消息持久化标志位:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); // 将 deliveryMode 值设置为 2 表示消息持久化
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange01", "routing_key01", properties, "persistent_message".getBytes());

至此,我们可以知道:当 broker 服务其重启后,想要消息不丢失,既需要设置队列持久化,也需要设置消息持久化

扩展知识:

basicPublish 方法还有另外两个重载方法

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;

这里有两个关键的参数:mandatoryimmediate。这两个标识位都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。下面简单讲解下这两个标识位:

1)mandatory

  • 当 mandatory 标志位设置为 true 时:如果 exchange 根据自身类型和消息 routeKey 无法找到一个符合条件的 queue,那么会调用 basic.return 方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
  • 当 mandatory 设置为 false 时,如果出现上述情形的话,broker 会直接将消息扔掉。

2)immediate

  • 当 immediate 标志位设置为 true 时:如果 exchange 在将消息路由到 queue(s) 时发现对于的 queue 上么有消费者,那么这条消息不会放入队列中。当与消息 routeKey 关联的所有 queue(一个或者多个)都没有消费者时,该消息会通过 basic.return 方法返还给生产者。

概括来说就是:

  • mandatory 标志告诉服务器至少将该消息 route 到一个队列中,否则将消息返还给生产者;
  • immediate 标志告诉服务器如果该消息关联的 queue上有消费者,则马上将消息投递给它,如果所有 queue 都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

4、交换器持久化

对于消息的可靠性来说,只需要设置队列的持久化和消息的持久化即可。exchange 的持久化并没有什么影响,但是,如果 exchange 不设置持久化的话,当 broker 服务重启之后,exchange 将不复存在,这样会导致消息发送者 producer 无法正常发送消息。

所以,建议同样设置 exchange 的持久化。而 exchange 的持久化设置也特别简单,设置方法原型如下:

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable)throws IOException;
  • exchange:交换器的名称;
  • type:交换器的类型,常见的如 fanout direct topic
  • durable:持久话标志位, durable 设置为 true 表示持久化, 反之为非持久 。

所以,只需要在声明的时候将 durable 字段设置为 true 即可:

channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);

二、RabbitMQ 知识扩展


1、内存告警与内存换页

1-1、内存告警

内存使用超过配置的阈值时,RabbitMQ 会暂时阻塞客户端的连接,并停止接收从客户端发来的消息,以避免服务崩溃,客户端与服务端的心跳检测也会失败。

当出现内存告警时,可以通过管理命令临时调整内存大小:

RabbitMQctl set_vm_memory_high_watermark <fraction>
  • fraction 为内存阈值,RabbitMQ 默认是 0.4,表示当 RabbitMQ 使用的内存超过总内存的 40% 时,就会产生告警并阻塞所有生产则连接。

通过此命令修改的阈值在 RabbitMQ 重启之后将会失效,如果想要设置的阈值永久有效需要修改配置文件:

# 相对值,也就是前面的fraction,建议设置在0.4~0.66之间,不要超过0.7
vm_memory_high_watermark.relative=0.4
# 绝对值,单位为KB,MB,GB,对应的临时命令是:RabbitMQctl set_vm_memory_high_watermark absolute <value>
#vm_memory_high_watermark.absolute=1GB

修改完配置文件后,需要重启服务才会生效。

1-2、内存换页

在某个 broker 节点触及内存阈值并阻塞生产者之前,它会尝试将队列内存中的消息换页存储到磁盘以释放内存空间。持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一个备份,所以这里会将持久化的消息直接从内存中清除掉。

默认情况下,在内存使用达到设置的阈值的 50% 时会进行换页操作。也就是说,在默认的内存阈值 40% 的情况下,当内存超过 40% * 50% = 20% 时会经行换页动作。

内存换页阈值可以通过在配置文件中设置来进行调整:

vm_memory_high_watermark_paging_ratio=0.75

上面的配置将会在 RabbitMQ 内存使用率达到 30%(假设内存阈值是 0.4)时进行换页动作,并在 40% 时阻塞生产者(当 vm_memory_high_watermark_paging_ratio 的值大于 1 时,相当于禁用了换页功能)。

2、磁盘告警与配置

2-1、磁盘告警

当磁盘剩余空间低于设置的阈值时,RabbitMQ 同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩溃。

默认情况下,磁盘的阈值是50M,表示当磁盘剩余空间低于50M时,会阻塞生产者并停止内存中消息的换页动作。这个阈值的设置可以减小,但不能完全消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间检测期间内,磁盘空间从大于50M被耗尽到0M。

2-2、修改磁盘告警阈值

可以通过以下命令临时调整磁盘阈值:

#设置具体大小,单位为KB/MB/GB
RabbitMQctl set_disk_free_limit <disk_limit>
#设置相对值,建议取值为1.0~2.0(相对于内存的倍数,如内存大小是8G,若为1.0,则表示磁盘剩余8G时,阻塞)
RabbitMQctl set_disk_free_limit mem_relative <fraction>

如果要永久生效需要对应的配置文件,配置如下(需要重启生效):

disk_free_limit.relative=2.0
#disk_free_limit_absolute=50MB

这里有个建议:一个相对谨慎的做法是将磁盘阈值设置为与操作系统所显示的内存大小一致

3、数据写入磁盘时机

  • 消息的正常写入磁盘流程为:消息数据写入到缓存 Buffer 中(大小为 1 M),Buffer 数据满了之后会写入内存文件中,最后再刷新到磁盘文件中;
  • 存在个固定的刷盘时间:25ms,也就是不管 Buffe r满不满,每隔 25ms,Buffer 里的数据及未刷新到磁盘的文件内容必定会刷到磁盘;
  • 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用 Erlang 的 receive x after 0 来实现。只要进程的信箱里没有消息,则产生一个 timeout 消息,而 timeout 会触发刷盘操作。

4、磁盘消息格式

消息保存于 $MNESIA/msg_store_persistent/x.rdq 文件中,其中 x 为数字编号,从 1 开始,每个文件最大为 16M(16777216),超过这个大小会生成新的文件,文件编号加 1。

文件中的消息格式如下:

<<Size:64, MsgId:16/binary, MsgBody>>
  • MsgId 为 RabbitMQ 通过 rabbit_guid:gen() 每一个消息生成的 GUID;
  • MsgBody 会包含消息对应的 exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式(二进制还是其它)等等。

5、磁盘文件删除机制

  • 当所有磁盘文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率。
  • publish 消息时写入内容,ack 消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于 0 时,删除该文件。

相关官方文档:文章来源地址https://www.toymoban.com/news/detail-619562.html

  • https://blog.RabbitMQ.com/posts/2011/01/RabbitMQ-backing-stores-databases-and-disks
  • https://www.RabbitMQ.com/persistence-conf.html

到了这里,关于【RabbitMQ】之持久化机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ系列(8)--实现RabbitMQ队列持久化及消息持久化

    概念:在上一章文章中我们演示了消费者宕机的情况下消息没有被消费成功后会重新入队,然后再被消费,但如何保障RabbitMQ服务停掉的情况下,生产者发过来的消息不会丢失,这时候我们为了消息不会丢失就需要将队列和消息都标记为持久化。 1、实现RabbitMQ队列持久化 只需

    2024年02月09日
    浏览(39)
  • RabbitMQ学习(五):RabbitMQ持久化

    在上一章内容中我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉后消 息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它将忽视队列 和消息,除非告知它不要这样做。 确保消息不会丢失需要做两件事:我们需

    2024年02月16日
    浏览(41)
  • 消息中间件之八股面试回答篇:一、问题概览+MQ的应用场景+RabbitMQ如何保证消息不丢失(生产者确认机制、持久化、消费者确认机制)+回答模板

    目前主流的消息队列技术(MQ技术)分为RabbitMQ和Kafka,其中深蓝色为只要是MQ,一般都会问到的问题。浅蓝色是针对RabbitMQ的特性的问题。蓝紫色为针对Kafka的特性的问题。 MQ主要提供的功能为:异步 解耦 削峰 。 展开来讲就是 异步发送(验证码、短信、邮件…) MYSQL和Redi

    2024年01月24日
    浏览(61)
  • RabbitMQ-数据持久化

    1、交换机持久化(SpringAMQP默认) 2、队列持久化(SpringAMQP默认) 3、消息持久化         如果采用纯内存操作,那么消息存储达到队列的上限之后,会有一个page out操作,这个操作是将队列中已经有的一部分MQ消息转移到磁盘,给队列腾出空间,使得队列能够继续接收MQ消息

    2024年01月21日
    浏览(44)
  • RabbitMQ---持久化

    • 如何避免消息丢失? 1) 消费者的ACK机制。可以防止消费者丢失消息。 2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。 • 如何将消息进行持久化呢? 要将消息持久化,前提是:队列、Exchange都持久化

    2024年02月11日
    浏览(44)
  • RabbitMQ 持久化

      通过持久化可以 尽量 防止在RabbitMQ异常情况下(重启、关闭、宕机)的数据丢失。持久化技术是解决消息存储到队列后的丢失问题,但是通过持久化并不能完全保证消息不丢失。   持久化技术可以分为交换机持久化、队列持久化以及消息持久化,它们的实现方案和实

    2024年02月12日
    浏览(40)
  • 【初始RabbitMQ】持久化的实现

    如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事: 我们需要将队列和消息都标记为持久化 之前我们创建的队列都是非持久化的

    2024年02月19日
    浏览(34)
  • 【RabbitMQ 实战】10 消息持久化和存储原理

    rabbitmq的持久化分为三个部分: 交换器的持久化。 队列的持久化。 消息的持久化。 1.1.1 交换器持久化 交换器的持久化是通过在声明交换器时, 指定Durability参数为durable实现的。 若交换器不设置持久化,在rabbitmq服务重启之后,相关的交换器元数据会丢失,但消息不会丢失,

    2024年02月07日
    浏览(41)
  • RabbitMQ队列持久化的重要性与意义

    持久化队列的一个主要目的是确保数据的安全性。在RabbitMQ中,消息通常存储在内存中,以提高消息传递的速度。然而,如果队列没有持久化,一旦RabbitMQ服务器发生故障或者重启,所有未被处理的消息都会丢失。这可能导致数据丢失,对于关键业务应用程序来说是不可接受的

    2024年02月07日
    浏览(35)
  • RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)

    在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列-RabbitMO.代表使用者保留的消息缓冲区 第一步:导入依赖 第二步:创建生产者 第三步:创建消费者 因为你为了确保同一条消息被其中一个工作线程接收到了之后,其它工作就不能消费的到了 三者

    2023年04月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包