RabbitMQ系列(25)--RabbitMQ搭建镜像队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(25)--RabbitMQ搭建镜像队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 前言:如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失,虽然可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,这样可以保证消息不丢失,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用,而通过引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。


验证过程如下(没开启消息持久化,有兴趣的同学可以看看):

我这里准备了3台虚拟机来跑RabbitMQ服务分别为

node1:192.168.194.128

node2:192.168.194.129

node3:192.168.194.130

(1)执行以下代码,在node1节点里生成队列

package com.ken;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 生产者
 */
public class Producer {

    //队列名称(用于指定往哪个队列发送消息)
    public static final String QUEUE_NAME = "my_queue";

    //进行发送操作
    public static void main(String[] args) throws Exception{

        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.128");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false ,false,false,null);
        //发消息
        String message = "Hello World";
        /**
         * 用信道对消息进行发布(消息持久化)
         * 第一个参数:发送到哪个交换机
         * 第二个参数:路由的Key值是哪个,本次是队列名
         * 第三个参数:其他参数信息
         * 第四个参数:发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        System.out.println("消息发送成功!");
    }

}

效果图:

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(2)进页面查看效果,可以得知my_queue这个队列只在node1节点创建了,没在其他节点同步创建RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(3) 在node1节点执行关闭RabbitMQ服务的命令来模拟节点宕机

rabbitmqctl stop_app

效果图:

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(4)用其他节点的可视化页面来查看集群信息,由图可知node1节点没在运行

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(5)查看队列,可以看到my_queue队列的状态为停止

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(6)执行以下代码,在node2节点里生成消费者,尝试消费消息

package com.ken;

import com.rabbitmq.client.*;

/**
 * 消费者
 */
public class Consumer {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //进行接收操作
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.129");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };

        /**
         * 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数
         *
         *  @FunctionalInterface
         *  public interface CancelCallback {
         *      void handle (String consumerTag) throws IOException;
         *  }
         *
         */
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费消息");
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

效果图:(报错信息提示node1节点上的my_queue队列已经关闭了)RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(7)把node1节点上的RabbitMQ服务重新启动起来

rabbitctl start_app

效果图:

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(8)查看消费者的日志,可以发现消费者并没有消费消息 

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(9)查看队列里消息的情况,可以看到消息丢失了(在未设置消息持久化的情况下)

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq


搭建镜像队列

1、启动node1、node2、node3三台集群节点

2、随便找一个节点添加policy(策略)

(1)进入node1节点的可视化界面

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(2)进入添加策略的界面 

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(3)给策略取一个名字 RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

 (4)给策略加上匹配规则,通过正则表达式匹配队列,若交换机或者队列的名字满足以mirror开头这个条件,则那条队列使用该策略

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

例:

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(5)为策略选择模式为ha-mode(ha-mode表示是备机模式)点击HA mode即可

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(6)为ha-mode指定获取参数方式为exactly(exactly表示指定参数)

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(7)点击HA params,就会往自定义参数里填入ha-params,这里用于指定策略作用的节点的数量

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

 (8)为ha-params指定策略作用的节点的数量为2(包含被镜像的队列,镜像和被镜像的队列数总共为2)RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(9)点击HA sync mode,就会往自定义参数里填入ha-sync-mode,这里用于指定同步的模式RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(10)为ha-sync-mode指定同步模式为自动同步模式

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(11)最后点击Add/update policy添加策略即可RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(12)往上滑动查看策略添加情况

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(13)执行代码在node1节点上创建名字以mirror开头的队列

代码如下

package com.ken;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 生产者
 */
public class Producer {

    //队列名称(用于指定往哪个队列发送消息)
    public static final String QUEUE_NAME = "mirror_queue";

    //进行发送操作
    public static void main(String[] args) throws Exception{

        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.128");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false ,false,false,null);
        //发消息
        String message = "Hello World";
        /**
         * 用信道对消息进行发布(消息持久化)
         * 第一个参数:发送到哪个交换机
         * 第二个参数:路由的Key值是哪个,本次是队列名
         * 第三个参数:其他参数信息
         * 第四个参数:发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        System.out.println("消息发送成功!");
    }

}

效果图:

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(14)进入Queues查看队列的情况,可以发现刚刚创建的mirror_queue队列上有+1,这证明镜像队列创建成功

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

(15)进入mirror_queue队列查看详情,可以发现镜像队列在node2节点上

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

3、测试镜像队列是否正常运行

(1)关闭node1节点,模拟node1节点宕机

rabbitmqctl stop_app

效果图: 

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

从node2的可视化页面可以看到node1节点停机了

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq


注意:

从Queues进入mirror_queue队列查看详情,可以发现当node1节点停掉后node2自动替代了node1节点的位置,node3作为镜像队列的节点,由此可见我们策略里写的ha-params:2这一参数是生效的,使得节点的个数总是保持2个,这样就算我们整个集群只剩下一台机器,在节点不断替代的情况下,消费者始终能消费队列里面的消息RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq


(2)执行以下代码,在node2节点里生成消费者,尝试消费mirror_queue队列的消息,发现node2可以消费mirror_queue队列的消息并且消费成功,这证明mirror_queue队列成功镜像到node2节点上

package com.ken;

import com.rabbitmq.client.*;

/**
 * 消费者
 */
public class Consumer {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "mirror_queue";

    //进行接收操作
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.129");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };

        /**
         * 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数
         *
         *  @FunctionalInterface
         *  public interface CancelCallback {
         *      void handle (String consumerTag) throws IOException;
         *  }
         *
         */
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费消息");
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

效果图:

RabbitMQ系列(25)--RabbitMQ搭建镜像队列,rabbitmq,rabbitmq文章来源地址https://www.toymoban.com/news/detail-548038.html

到了这里,关于RabbitMQ系列(25)--RabbitMQ搭建镜像队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ 镜像队列 使用和原理详解

    单节点的 RabbitMQ 存在性能上限,可以通过垂直或者水平扩容的方式增加 RabbitMQ 的吞吐量。垂直扩容指的是提高 CPU 和内存的规格;水平扩容指部署 RabbitMQ 集群。 通过将单个节点的队列相对平均地分配到集群的不同节点,单节点的压力被分散,RabbitMQ 可以充分利用多个节点的

    2023年04月08日
    浏览(37)
  • RabbitMQ系列(23)--RabbitMQ惰性队列

    1、概念:RabbitMQ从 3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中(持久化队列若想持久化消息还需要看消息设置了持久化没),而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多

    2024年02月16日
    浏览(43)
  • RabbitMQ-消息队列:镜像队列、Haproxy+Keepalive 实现高可用负载均衡

    如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并 且也可能会导致消息的丢失 。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true, 但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写

    2024年03月18日
    浏览(40)
  • RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

    1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件 RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html 2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明)

    2024年02月16日
    浏览(41)
  • RabbitMQ系列(8)--实现RabbitMQ队列持久化及消息持久化

    概念:在上一章文章中我们演示了消费者宕机的情况下消息没有被消费成功后会重新入队,然后再被消费,但如何保障RabbitMQ服务停掉的情况下,生产者发过来的消息不会丢失,这时候我们为了消息不会丢失就需要将队列和消息都标记为持久化。 1、实现RabbitMQ队列持久化 只需

    2024年02月09日
    浏览(39)
  • RabbitMQ系列(17)--延迟队列的简介与实现

    1、延迟队列的概念 延迟队列内部是有序的,重要的特性体现在它的延迟属性上,延迟队列中的元素希望在指定时间到了之后或之前取出处理,简单的说延迟队列就是用来存放需要在指定时间被处理的元素的队列。 2、延迟队列的应用场景 (1)订单指定时间内未支付则自动取消

    2024年02月07日
    浏览(34)
  • RabbitMQ集群环境搭建-镜像模式

    集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工作中也是用的最多的。并且实现集群非常的简单,一般互联网大厂都会构建这种镜像集群模式。 Mirror镜像队列,目的是为了保证rabbitmq数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是2-

    2024年02月14日
    浏览(54)
  • SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法

    原文网址:SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法_IT利刃出鞘的博客-CSDN博客         本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列。 交换机 下边两种方式等价。 队列 下边两种方式等价 绑定 下边两种方式等价 注意:第一种的参数并不是字符

    2023年04月09日
    浏览(58)
  • RabbitMQ系列(29)--RabbitMQ搭建Shovel

    前言: Federation具备的数据转发功能类似,Shovel能够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker 上。Shovel可

    2024年02月13日
    浏览(25)
  • 【外行也能看懂的RabbitMQ系列(三)】—— RabbitMQ进阶篇之死信队列(内含视频演示业务和业务代码)

    准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶篇之通过插件实现延迟队列 恭喜所有看到本篇文章的小伙伴,成功解锁了RabbitMQ系列之高级特性 死信队列 的内容🎁通过本文,你将清楚的了解

    2024年02月07日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包