【初始RabbitMQ】持久化的实现

这篇具有很好参考价值的文章主要介绍了【初始RabbitMQ】持久化的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ持久化

如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

队列如何实现持久化

之前我们创建的队列都是非持久化的,RabbitMQ如果重启的话,该队列就会被删除,如果要实现队列持久化,需要在声明队列的时候把durable参数设置持久化

【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式

 但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新 创建一个持久化的队列,不然就会出现错误

【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式

以下是控制台中持久化与非持久化队列的UI显示区,当Features列显示为D代表是持久化队列

 【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式

消息实现持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式

生产者代码:

/*
* 消息再手动应答不丢失、放回消息队列重新消费
 */
public class Task2 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了

不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是 很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。为了避免这种情况,我们可以设置参数 channel.basicQos(1),将想要不公平分发的消费者设置该参数

【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式

消费者01代码如下:

public class Work01 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息短");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        //设置不公平分发
        channel.basicQos(1);
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消费者02代码如下:

public class Work02 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息长");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        //设置不公平分发
        channel.basicQos(1);
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

 

效果展示:

【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式

预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此 缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有 未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何 消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知 这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高 向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理 的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理 的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的 内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这 将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的【初始RabbitMQ】持久化的实现,消息队列,消息队列,java,分布式文章来源地址https://www.toymoban.com/news/detail-827597.html

到了这里,关于【初始RabbitMQ】持久化的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(34)
  • 根据源码,模拟实现 RabbitMQ - 实现消息持久化,统一硬盘操作(3)

    目录 一、实现消息持久化 1.1、消息的存储设定 1.1.1、存储方式 1.1.2、存储格式约定 1.1.3、queue_data.txt 文件内容  1.1.4、queue_stat.txt 文件内容 1.2、实现 MessageFileManager 类 1.2.1、设计目录结构和文件格式 1.2.2、实现消息的写入 1.2.3、实现消息的删除(随机访问文件) 1.2.4、获取队

    2024年02月12日
    浏览(33)
  • 项目实战 — 消息队列(4){消息持久化}

    目录  一、消息存储格式设计        🍅 1、queue_data.txt:保存消息的内容         🍅 2、queue_stat.txt:保存消息的统计信息 二、消息序列化 三、自定义异常类 四、创建MessageFileManger类 🍅 1、约定消息文件所在的目录和文件名字  🍅 2、队列的统计信息 🍅 3、创建队列对应

    2024年02月14日
    浏览(31)
  • RabbitMQ队列持久化的重要性与意义

    持久化队列的一个主要目的是确保数据的安全性。在RabbitMQ中,消息通常存储在内存中,以提高消息传递的速度。然而,如果队列没有持久化,一旦RabbitMQ服务器发生故障或者重启,所有未被处理的消息都会丢失。这可能导致数据丢失,对于关键业务应用程序来说是不可接受的

    2024年02月07日
    浏览(26)
  • 【RabbitMQ 实战】10 消息持久化和存储原理

    rabbitmq的持久化分为三个部分: 交换器的持久化。 队列的持久化。 消息的持久化。 1.1.1 交换器持久化 交换器的持久化是通过在声明交换器时, 指定Durability参数为durable实现的。 若交换器不设置持久化,在rabbitmq服务重启之后,相关的交换器元数据会丢失,但消息不会丢失,

    2024年02月07日
    浏览(30)
  • RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)

    在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列-RabbitMO.代表使用者保留的消息缓冲区 第一步:导入依赖 第二步:创建生产者 第三步:创建消费者 因为你为了确保同一条消息被其中一个工作线程接收到了之后,其它工作就不能消费的到了 三者

    2023年04月14日
    浏览(33)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(74)
  • 消息中间件之八股面试回答篇:一、问题概览+MQ的应用场景+RabbitMQ如何保证消息不丢失(生产者确认机制、持久化、消费者确认机制)+回答模板

    目前主流的消息队列技术(MQ技术)分为RabbitMQ和Kafka,其中深蓝色为只要是MQ,一般都会问到的问题。浅蓝色是针对RabbitMQ的特性的问题。蓝紫色为针对Kafka的特性的问题。 MQ主要提供的功能为:异步 解耦 削峰 。 展开来讲就是 异步发送(验证码、短信、邮件…) MYSQL和Redi

    2024年01月24日
    浏览(44)
  • RabbitMQ学习(五):RabbitMQ持久化

    在上一章内容中我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉后消 息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它将忽视队列 和消息,除非告知它不要这样做。 确保消息不会丢失需要做两件事:我们需

    2024年02月16日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包