rabbitmq
1、为什么要使用rabbitmq
核心场景有三个:解耦、异步、削峰
异步:提升用户的响应
解耦:
削峰:
每夹0点到16点,A系统风平浪静,每秒并发请求数量就100个。结果每次一到16点~23点,每秒并发请求数量突然会暴增到1万条。但是系统最大的处理能力就只能是每秒钟处理1000个请求啊。怎么办?需要我们进行流量的削峰,让系统可以平缓的处理突增的请求。
2、rabbitmq如何确保消息发送?消息接收?
- 确保消息到MQ
- 确保消息路由到正确的队列
- 确保消息在队列正确的存储公
- 确保消息从队列中正确的投递至消费者
第一、确保消息到MQ:发送方的确认模式
在 RabbitMQConfiguration(rabbitmq的配置类)中,在连接工厂中,把消息发送器确认的开关打开
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// 消息发送器确认的开关打开
connectoryFactory.setPublisherConfirms(true);
}
然后再发送的RabbitTemplate中,进行发送发确认
@Bean
public RabbitTemplate newRabbitTemplate() {
Rab
}
发送方确认机制:
信道需要设置为confirm模式,则所有在信道上发布的信息有一个唯一的ID。
一旦消息内推送到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一ID),ConfirmCallback接口:只确认是否正确到达 Exchange 中,成功到达则回调
如果 RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(未确认)消息给生产者,这一步是通过实现Returncallback接口:消息失败返回时回调
所有被发送的消息都将被confirm(即 ack)或者被nack一次。但是没有对消息被 confirm的快慢做任何保证,并且同一条消息不会既被confirm又被nack
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者,生产者的回调方法会被触发。
接收方确认机制:
消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitNQ会等待消费者显式发回ack信号后才从内存(或者磁盘,持久化消息)中移去消息。否则,消息被消费后会被立即删除。
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitNQ允许消费者消费一条消息的时间可以很长。保证数据的最终一致性:
如果消费者返回ack之前断开了链接,RabbitMQ会重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
3、RabbitMQ的构造
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
(1)生产者Publisher:生产消息,就是投递消息的一方。消息一般包含两个部分:消息体(payload)和标签(Label)
(2)消费者Consumer:消费消息,也就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
(3)Broker服务节点:表示消息队列服务器实体。一般情况下一个Broker可以看做一个RabbitMQ服务器。
(4)Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
(5)Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
(6)Routing Key: 路由关键字,用于指定这个消息的路由规则,需要与交换器类型和绑定键(Binding Key)联合使用才能最终生效。
(7)Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,通过BindingKey,交换器就知道将消息路由给哪个队列了。
4、Exchange交换器的类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers
(1)direct(直连交换机):消息中的 路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
(2)fanout(扇形交换机):把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
(3)topic(主题交换机):通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
匹配规则:
① RoutingKey 和 BindingKey 为一个 点号 ‘.’ 分隔的字符串。 比如: java.xiaoka.show
② BindingKey可使用 * 和 # 用于做模糊匹配:*匹配一个单词,#匹配多个或者0个单词
(4)headers(头部交换机):不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
5、RabbitMQ的持久化
RabbitMQ的持久化主要体现在三个方面,即交换机持久化,队列持久化及消息持久化
1、交换机持久化
交换器的持久化是通过声明队列时,将durable
参数设置为true实现的。如果交换器不设置持久化,那么rabbitmq服务重启之后,相关的交换器元数据将会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了,建议将交换器设置为持久化
2、队列持久化
队列的持久化是通过声明队列时,将durable
参数设置为true实现的。如果队列不设置持久化,那么rabbitmq服务重启之后,相关的队列元数据将会丢失,而消息是存储在队列中的,所以队列中的消息也会被丢失
3、消息持久化
队列的持久化只能保证其队列本身的元数据不会被丢失,但是不能保证消息不会被丢失。所以消息本身也需要被持久化,可以在投递消息前设置 AMQP.BasicProperties
的属性 deliveryMode
为 2 即可:
6、RabbitMQ消息发送和接收过程
生产者消息的过程:
- Producer 先连接到 Broker,建立连接 Connection,开启一个信道 channel
- Producer 声明一个交换器并设置好相关属性
- Producer 声明一个队列并设置好相关属性
- Producer 通过绑定键将交换器和队列绑定起来
- Producer 发送消息到 Broker,其中包含路由键、交换器等信息
- 交换器根据接收到的路由键查找匹配的队列
- 如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
- 关闭信道
消费者接收消息过程:
- Producer 先连接到 Broker,建立连接 Connection,开启一个信道 channel
- 向 Broker 请求消费相应队列中消息,可能会设置响应的回调函数。
- 等待 Broker 回应并投递相应队列中的消息,接收消息。
- 消费者确认收到的消息,ack。
- RabbitMQ从队列中删除已经确定的消息。
- 关闭信道
7、如何保证消息队列的高可用
RabbitMQ 是基于主从(非分布式)做高可用性的,RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式
单机模式: 一般没人生产用单机模式
普通集群模式:
普通集群模式用于提高系统的吞吐量,通过添加节点来线性扩展消息队列的吞吐量。也就是在多台机器上启动多个 RabbitMQ 实例,而队列 queue 的消息只会存放在其中一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费的时候,如果连接到了另外的实例,那么该实例就会从数据实际所在的实例上的queue拉取消息过来,就是说让集群中多个节点来服务某个 queue 的读写操作
但普通集群模式的缺点在于:无高可用性,queue所在的节点宕机了,其他实例就无法从那个实例拉取数据;RabbitMQ 内部也会产生大量的数据传输。
镜像队列集群模式:
镜像队列集群是RabbitMQ 真正的高可用模式,集群中一般会包含一个主节点master和若干个从节点slave,如果master由于某种原因失效,那么按照slave加入的时间排序,"资历最老"的slave会被提升为新的master。
镜像队列下,所有的消息只会向master发送,再由master将命令的执行结果广播给slave,所以master与slave节点的状态是相同的。比如,每次写消息到 queue 时,master会自动将消息同步到各个slave实例的queue;如果消费者与slave建立连接并进行订阅消费,其实质上也是从master上获取消息,只不过看似是从slave上消费而已,比如消费者与slave建立了TCP连接并执行Basic.Get的操作,那么也是由slave将Basic.Get请求发往master,再由master准备好数据返回给slave,最后由slave投递给消费者。
从上面可以看出,队列的元数据和消息会存在于多个实例上,也就是说每个 RabbitMQ 节点都有这个 queue 的完整镜像,任何一个机器宕机了,其它机器节点还包含了这个 queue 的完整数据,其他消费者都可以到其它节点上去消费数据。
(1)缺点:
① 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重
② 非分布式,没有扩展性,如果 queue 的数据量大到这个机器上的容量无法容纳了,此时该方案就会出现问题了
(2)如何开启镜像集群模式呢?
在RabbitMQ 的管理控制台Admin页面下,新增一个镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
8、如何处理消息丢失的情况
对于消息的可靠性传输,每种MQ都要从三个角度来分析:生产者丢数据、消息队列丢数据、消费者丢数据。以RabbitMQ为例:
生产者丢数据
解决方法:确认机制
- 生产环境常用的是confirm模式。生产者将信道 channel 设置成 confirm 模式,一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了。
- 如果rabbitMQ没能处理该消息,也会发送一个Nack消息给你,这时就可以进行重试操作。
- Confirm模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。
消息队列丢数据:
解决方法:开启持久化磁盘
处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。这样的话,如果消息持久化磁盘之前,即使 RabbitMQ 挂掉了,生产者也会因为收不到Ack信号而再次重发消息。
持久化设置如下(必须同时设置以下 2 个配置):
(1)创建queue的时候,将queue的持久化标志durable在设置为true,代表是一个持久的队列,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化queue里的数据;
(2)发送消息的时候将 deliveryMode 设置为 2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
这样设置以后,RabbitMQ 就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
消费者丢数据:
解决方法:自动确认消息模式(处理异常有缺陷)+手动确认消息
消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。
解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理
但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:
- 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
- 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息
需要注意的点:
1、消息可靠性增强了,性能就下降了,因为写磁盘比写 RAM 慢的多,两者的吞吐量可能有 10 倍的差距。所以,是否要对消息进行持久化,需要综合考虑业务场景、性能需要,以及可能遇到的问题。若想达到单RabbitMQ服务器 10W 条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传输,要么使用非常快速的存储系统以支持全持久化,例如使用 SSD。或者仅对关键消息作持久化处理,且应该保证关键消息的量不会导致性能瓶颈。
2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。
3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。
总结:
1,生产者发送消息至MQ的数据丢失
解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,
然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了
2,MQ 收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
解决方式:MQ设置为持久化。将内存数据持久化到磁盘中
3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完
解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话**,如果你还没处理完,不就没有 ack 了**?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
9、如何保证消息没有重复消费
正常情况下,消费者在消费消息后,会给消息队列发送一个确认,消息队列接收后就知道消息已经被成功消费了,然后就从队列中删除该消息,也就不会将该消息再发送给其他消费者了。不同消息队列发出的确认消息形式不同,RabbitMQ是通过发送一个ACK确认消息。但是因为网络故障,消费者发出的确认并没有传到消息队列,导致消息队列不知道该消息已经被消费,然后就再次消息发送给了其他消费者,从而造成重复消费的情况。
重复消费问题的解决思路是:保证消息的唯一性,即使多次传输,也不让消息的多次消费带来影响,也就是保证消息等幂性;幂等性指一个操作执行任意多次所产生的影响均与一次执行的影响相同。具体解决方案如下:
(1)改造业务逻辑,使得在重复消费时也不影响最终的结果。例如对SQL语句: update t1 set money = 150 where id = 1 and money = 100; 做了个前置条件判断,即 money = 100 的情况下才会做更新,更通用的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号。
(2)基于数据库的的唯一主键进行约束。消费完消息之后,到数据库中做一个 insert 操作,如果出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
(3)通过记录关键的key,当重复消息过来时,先判断下这个key是否已经被处理过了,如果没处理再进行下一步。
① 通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则直接返回。
② 使用全局唯一ID,再配合第三组主键做消费记录,比如使用 redis 的 set 结构,生产者发送消息时给消息分配一个全局ID,在每次消费者开始消费前,先去redis中查询有没有消费记录,如果消费过则不进行处理,如果没消费过,则进行处理,消费完之后,就将这个ID以k-v的形式存入redis中(过期时间根据具体情况设置)。
10、如何保证消息传递的有序性
针对保证消息有序性的问题,解决方法就是保证生产者入队的顺序是有序的,出队后的顺序消费则交给消费者去保证。
(1)方法一:拆分queue,使得一个queue只对应一个消费者。由于MQ一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了。但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了。
(2)方法二:对于多线程的消费同一个队列的情况,可以使用重试机制:比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作。如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。
(3)方法三:标号,对消息进行编号,消费者处理时根据编号判断顺序。
11、如何保证数据一致性的问题
一、采用confirm消息确认机制及return返回机制 确保消息发送成功
二、将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在文章来源:https://www.toymoban.com/news/detail-451217.html
三、手动确认接收消息方式 消息处理失败拒收重回队列文章来源地址https://www.toymoban.com/news/detail-451217.html
到了这里,关于Java 面试 | RabbitMQ(2023版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!