消息队列(一)-- RabbitMQ入门(4)

这篇具有很好参考价值的文章主要介绍了消息队列(一)-- RabbitMQ入门(4)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ 其他知识点

幂等性
  • 消息重复消费
    消费者在消费MQ 中的消息时,MQ 已经把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故MQ 未收到确认消息,该消息会重新发给其他消费者,或网络重新连接后再次发给该消费者,但是实际上该消息已被消费过了,造成消费者重复消费同一条消息。

  • 解决思路
    MQ 消费者的幂等性的解决一般使用全局ID 或者写个唯一标识,如时间戳、UUID 或 MQ的ID等,每次消费消息时用ID 判断该消息是否已经消费过。
    业界主流的幂等性有两种操作:

    • 唯一ID + 指纹码机制
      指纹码:一些规则或时间戳加别的服务给的唯一信息码,它并不一定是我们系统生成的,基本都是由业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中。优势就是实现简单就一个拼接,然后查询判断是否重复。劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提升性能,但是不太推荐。
    • Redis 原子性
      利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。
优先级队列

曾经后端系统是使用 redis 来存储定时轮询,而 redis 只能用List 做一个简单的消息队列,并不能实现优先级的场景。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是VIP客户的订单,就给一个相对较高的优先级,否则默认优先级。

  • 控制台页面添加
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 队列声明代码中添加优先级
Map<String , Object> params = new HashMap<>();
params.put("x-max-priority", 10);//官方允许是0-255 之间 ,此处设置10 允许优先级范围为0-10 不要设置过大,浪费CPU与内存
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
for (int i = 0; i < 10; i++) {
    String message = "info"+(i+1);
    if((i+1) == 5){
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().priority(5).build();
        channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
    }else{
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }
}

先运行生产者发消息,然后再运行消费者,消息info5 因为设置了优先级,优先被消费了。
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

  • 实现队列优先级注意如下:
    队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才能消费,才有机会对消息进行排序。
惰性队列

RabbitMQ 从3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到响应的消息时才会被加载到内存中,它的一个重要设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机或者由于维护而关闭等)而致使长时间内不能消费消息造成堆积是,惰性队列就很有必要了。
默认情况下,当消费者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会消耗较长的时间,也会阻塞队列的操作,进而无法接受新的消息。

  • 两种模式
    队列具备两种模式:default 和 lazy 。默认的为 default。lazy模式为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候,在参数中设置。也可以通过 Policy 的方式设置。如果一个队列同时使用两种方式设置的话,那么 Policy 方式具备更高的优先级。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare("myqueue",false,false,false,args);

在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB。

RabbitMQ 集群

集群搭建步骤

消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

  • 把原有装过RabbitMQ 的虚拟机克隆两份,分别修改IP。
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 修改3台机器的主机名称,改完需要重启机器
    vim /etc/hostname
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 配置各个节点的 hosts 文件,让各个节点都能互相识别对方
    vim /etc/hosts
    192.168.1.130 node1
    192.168.1.131 node2
    192.168.1.132 node3
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 以确保各个节点的 cookie 文件使用的是同一个值
    在 node1 上执行远程操作命令
    scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
    scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
  • 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和RabbitMQ 应用服务器
    rabbitmq-server -detached
  • 在节点2 执行
    rabbitmqctl stop_app
    (rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
    rabbitmqctl reset
    rabbitmqctl join_cluster rabbit@node1
    (需要节点1服务器防火墙放开4369和25672端口)
    rabbitmqctl start_app(只启动应用服务)
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 在节点3执行
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster rabbit@node2
    rabbitmqctl start_app
  • 集群状态
    rabbitmqctl cluster_status
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 需要重新设置用户
    创建账号
    rabbitmqctl add_user admin 123456
    设置用户角色
    rabbitmqctl set_user_tags admin administrator
    设置用户权限
    rabbitmqctl set_permissions -p “/” admin “.*” “.*” “.*”
    然后可以访问管理界面查看(可以任意访问其中一个IP)
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 解除集群节点(node2 和 node3 机器分别执行)
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl start_app
    rabbitmqctl cluster_status
    rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
镜像队列

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上,保证服务的可用性。

  • 启动三台集群节点
  • 随便找一个节点添加policy
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
    其中
    Name:mirrior-two 为名称,可随意起名
    Pattern:^mirrior 规则,以mirrior 前缀的交换机和队列进行备份
    ha-mode:exactly 指定模式
    ha-params:2 备两份
    ha-sync-mode:automatic 同步模式为自动
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

创建一个队列,并发送一个消息
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
然后通过命令把节点1停止,发现自动又备份到节点3上。
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
通过连接节点2,依然能消费消息。
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

  • 就算整个集群只剩一台机器,依然能消费队列里的消息。
HAProxy 高可用负载均衡

HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

Federation Exchange

因为减少网络延迟,两个地方(比如北京和上海)都有MQ服务,但是需要数据同步,就需要Federation 插件解决这个问题。

  • 需要保证每台节点单独运行

  • 在每台机器上开启 Federation 相关插件
    rabbitmq-plugins enable rabbitmq_federation
    rabbitmq-plugins enable rabbitmq_federation_management

  • 原理图
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
    先运行consumer 在 node2 创建 fed_exchange

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.131");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("fed_exchange", BuiltinExchangeType.DIRECT);
        channel.queueDeclare("node2_queue", true, false, false, null);
        channel.queueBind("node2_queue", "fed_exchange", "routeKey");
        //声明 接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  • 在 downstream(node2)配置 upstream(node1)
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 添加policy
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 查看状态
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
Federation Queue

联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供负载的功能。一个联邦队列可以连接一个或多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

  • 原理图
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 添加 upstream(同上)
  • 添加policy
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
Shovel

跟 Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker上。Shovel 可以翻译为“铲子”,是一种比较形象的比喻,这个“铲子”可以将消息从一方“铲到”另一方。Shovel 行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-604035.html

  • 开启插件(需要的机器都开启)
    rabbitmq-plugins enable rabbitmq_shovel
    rabbitmq-plugins enable rabbitmq_shovel_management
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
  • 添加 shovel 源和目的地
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式
    消息队列(一)-- RabbitMQ入门(4),消息队列,rabbitmq,分布式

到了这里,关于消息队列(一)-- RabbitMQ入门(4)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(48)
  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(55)
  • 分布式消息队列RocketMQ概念详解

    目录 1.MQ概述 1.1 RocketMQ简介 1.2 MQ用途 1.3 常见MQ产品 2.RocketMQ 基本概念 2.1 消息 2.2 主题 2.3 标签 2.4 队列  2.5 Producer 2.6 Consumer 2.7 NameServer 2.8 Broker 2.9 RocketMQ 工作流程   RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息

    2024年02月03日
    浏览(65)
  • 架构核心技术之分布式消息队列

    Java全能学习+面试指南:https://javaxiaobear.cn 今天我们来学习分布式消息队列,分布式消息队列的知识结构如下图。 主要介绍以下内容: 同步架构和异步架构的区别。异步架构的主要组成部分:消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型:点对点模型

    2024年02月07日
    浏览(46)
  • Spring Boot如何实现分布式消息队列

    在分布式系统中,消息队列是非常重要的一部分,可以帮助开发人员实现异步处理、解耦系统、提高系统可靠性等。本文将介绍如何使用 Spring Boot 实现分布式消息队列。 消息队列是一种存储消息的容器,可以缓存消息并在需要的时候按照一定的规则将消息发送给消费者。常

    2024年02月14日
    浏览(43)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(49)
  • Rabbitmq----分布式场景下的应用

    如果单机模式忘记也可以看看这个快速回顾rabbitmq,在做学习 消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消

    2024年02月08日
    浏览(50)
  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(66)
  • zookeeper+kafka分布式消息队列集群的部署

    目录 一、zookeeper 1.Zookeeper 定义 2.Zookeeper 工作机制 3.Zookeeper 特点 4.Zookeeper 数据结构 5.Zookeeper 应用场景 (1)统一命名服务 (2)统一配置管理 (3)统一集群管理 (4)服务器动态上下线 6.Zookeeper 选举机制 (1)第一次启动选举机制 (2)非第一次启动选举机制 7.部署zookeepe

    2024年02月14日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包