RabbitMQ 其他知识点
幂等性
-
消息重复消费
消费者在消费MQ 中的消息时,MQ 已经把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故MQ 未收到确认消息,该消息会重新发给其他消费者,或网络重新连接后再次发给该消费者,但是实际上该消息已被消费过了,造成消费者重复消费同一条消息。 -
解决思路
MQ 消费者的幂等性的解决一般使用全局ID 或者写个唯一标识,如时间戳、UUID 或 MQ的ID等,每次消费消息时用ID 判断该消息是否已经消费过。
业界主流的幂等性有两种操作:- 唯一ID + 指纹码机制
指纹码:一些规则或时间戳加别的服务给的唯一信息码,它并不一定是我们系统生成的,基本都是由业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中。优势就是实现简单就一个拼接,然后查询判断是否重复。劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提升性能,但是不太推荐。 - Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。
- 唯一ID + 指纹码机制
优先级队列
曾经后端系统是使用 redis 来存储定时轮询,而 redis 只能用List 做一个简单的消息队列,并不能实现优先级的场景。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是VIP客户的订单,就给一个相对较高的优先级,否则默认优先级。
- 控制台页面添加
- 队列声明代码中添加优先级
Map<String , Object> params = new HashMap<>();
params.put("x-max-priority", 10);//官方允许是0-255 之间 ,此处设置10 允许优先级范围为0-10 不要设置过大,浪费CPU与内存
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
for (int i = 0; i < 10; i++) {
String message = "info"+(i+1);
if((i+1) == 5){
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().priority(5).build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
先运行生产者发消息,然后再运行消费者,消息info5 因为设置了优先级,优先被消费了。
- 实现队列优先级注意如下:
队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才能消费,才有机会对消息进行排序。
惰性队列
RabbitMQ 从3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到响应的消息时才会被加载到内存中,它的一个重要设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机或者由于维护而关闭等)而致使长时间内不能消费消息造成堆积是,惰性队列就很有必要了。
默认情况下,当消费者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会消耗较长的时间,也会阻塞队列的操作,进而无法接受新的消息。
- 两种模式
队列具备两种模式:default 和 lazy 。默认的为 default。lazy模式为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候,在参数中设置。也可以通过 Policy 的方式设置。如果一个队列同时使用两种方式设置的话,那么 Policy 方式具备更高的优先级。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare("myqueue",false,false,false,args);
在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB。
RabbitMQ 集群
集群搭建步骤
- 把原有装过RabbitMQ 的虚拟机克隆两份,分别修改IP。
- 修改3台机器的主机名称,改完需要重启机器
vim /etc/hostname
- 配置各个节点的 hosts 文件,让各个节点都能互相识别对方
vim /etc/hosts
192.168.1.130 node1
192.168.1.131 node2
192.168.1.132 node3
- 以确保各个节点的 cookie 文件使用的是同一个值
在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie - 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和RabbitMQ 应用服务器
rabbitmq-server -detached - 在节点2 执行
rabbitmqctl stop_app
(rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
(需要节点1服务器防火墙放开4369和25672端口)
rabbitmqctl start_app(只启动应用服务)
- 在节点3执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app - 集群状态
rabbitmqctl cluster_status
- 需要重新设置用户
创建账号
rabbitmqctl add_user admin 123456
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p “/” admin “.*” “.*” “.*”
然后可以访问管理界面查看(可以任意访问其中一个IP)
- 解除集群节点(node2 和 node3 机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
镜像队列
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上,保证服务的可用性。
- 启动三台集群节点
- 随便找一个节点添加policy
其中
Name:mirrior-two 为名称,可随意起名
Pattern:^mirrior 规则,以mirrior 前缀的交换机和队列进行备份
ha-mode:exactly 指定模式
ha-params:2 备两份
ha-sync-mode:automatic 同步模式为自动
创建一个队列,并发送一个消息
然后通过命令把节点1停止,发现自动又备份到节点3上。
通过连接节点2,依然能消费消息。
- 就算整个集群只剩一台机器,依然能消费队列里的消息。
HAProxy 高可用负载均衡
HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
Federation Exchange
因为减少网络延迟,两个地方(比如北京和上海)都有MQ服务,但是需要数据同步,就需要Federation 插件解决这个问题。
-
需要保证每台节点单独运行
-
在每台机器上开启 Federation 相关插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management -
原理图
先运行consumer 在 node2 创建 fed_exchange
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.131");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fed_exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("node2_queue", true, false, false, null);
channel.queueBind("node2_queue", "fed_exchange", "routeKey");
//声明 接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
- 在 downstream(node2)配置 upstream(node1)
- 添加policy
- 查看状态
Federation Queue
联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供负载的功能。一个联邦队列可以连接一个或多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。文章来源:https://www.toymoban.com/news/detail-604035.html
- 原理图
- 添加 upstream(同上)
- 添加policy
Shovel
跟 Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker上。Shovel 可以翻译为“铲子”,是一种比较形象的比喻,这个“铲子”可以将消息从一方“铲到”另一方。Shovel 行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
文章来源地址https://www.toymoban.com/news/detail-604035.html
- 开启插件(需要的机器都开启)
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
- 添加 shovel 源和目的地
到了这里,关于消息队列(一)-- RabbitMQ入门(4)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!