前言
上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》,看过这一篇就基本上可以在SpringBoot中使用消息队列了,但是消息队列他归根结底是一个客户端服务器模式的中间件,面对复杂的网络环境和分布式使用环境,难免会出现各种问题。出现问题不可怕,重点在于如何预防和处理,本章就重点介绍一下如何预防和处理使用SpringAMQP时可能出现的问题。
一、消息堆积
1、什么是消息堆积?
消息堆积指的是消费者这边的处理能力低于生产者这边生产消息的能力,导致大量的消息积压在MQ的一种现象。消息堆积可能导致短时间内队列达到最大容量,导致使新消息无法进入队列;对于时间敏感的消息可能成为死信。
2、使用 work 模式同时开启prefetch
work模式:简单来说就是让多个消息队列绑定到一个队列,共同消费队列中的消息。
默认情况下,消息队列是通过轮询的方式将消息推送给消费者的,完全不考虑消费者的消费能力。举个例子:假设生产者生产了50条消息,消费者1的处理能力是1秒50条,消费者2的消费能力是1秒5条,实际这五十条消息会通过轮询各分配给两个消费者25条,如果消费者还没处理完就会阻塞等待,处理完之后再继续推送。
所以默认情况并没有考虑到消费者是否已经处理完消息,可能也会造成消息堆积。怎么解决呢?可以通过修改配置文件:将prefetch设置为1,即每次给消费者投递一条消息,处理完了再投递下一条,这样可以尽可能发挥每个消费者的最大处理能力。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次投递一条消息,消费完在投递下一条
3、对消息进行异步处理
再者就是可以在代码上进行优化,比如在消息处理的时候使用线程池进行异步消费,这样可以缩短每个消息的处理时间,降低消息堆积的可能性。
二、发送者可靠性
1、发送者重连机制
有时候可能因为网络波动,可能会出现客户端连接MQ失败的情况。这里可以通过重试机制来提高消息发送的成功率。
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后等初始待时间
multiplier: 1 # 失败后下次等待时长倍数
max-attempts: 3 # 最大重试次数
2、发送者确认机制
在生产者这边,是可以开启确认机制的,就是MQ他在接收到消息成功后会返回一个ack给生产者,接收失败就返回nack,生产者这边就可以根据返回的结果,如果失败了就可以进行重发。RabbitMQ这边提供了两种确认机制:
- Publisher Confirm:当生产者向消息队列发送消息时,如果有设备或网络故障导致消息丢失或其他错误,AMQP 协议会自动触发 Confirm 机制,将消息发送失败的信息返回给生产者。生产者可以根据返回的信息进行相应的处理,例如重发、记录日志等。
- Publisher Return:消息路由失败时触发,一般不开启,因为路由失败是自己业务的问题
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
# none: 关闭confirm机制
# simple: 同步阻塞等待MQ回执消息
# correlated: MQ异步回调方式返回回执消息
publisher-returns: true
对于ReturnCallback整个项目中配置一次即可:
@Slf4j
@Configuration
public class MqCommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback(路由失败时触发)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.debug("收到消息的 return callback,exchange:{}, key:{}, msg:{}, code:{}, text:{}",
returnedMessage.getExchange(),returnedMessage.getRoutingKey(),
returnedMessage.getReplyCode(),returnedMessage.getReplyText());
}
});
}
}
ConfirmCallback 每次发送消息都需要编写
@Test
void testConfirmCallback() throws InterruptedException {
// 1.创建CorrelationData,并指定消息ID
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 基本不会触发
log.error("消息回调失败",ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 这里执行回调
if (result.isAck()) {
// 消息发送成功
log.debug("消息发送成功,收到 ack");
} else {
// 消息发送失败
log.debug("消息发送失败,收到 nack");
// 重传等业务逻辑...
}
}
});
rabbitTemplate.convertAndSend("forum.direct","red","hello",cd);
}
虽然上述确认机制可以基本保证生产者发送消息的可靠性,但是会增加系统额外的负担和资源开销,因为生产者确认也需要通过MQ来执行回调,如果需要使用,不需要开启publisher return(自己代码写的有问题),对于nack也可以有限次重试,失败多了直接记录异常即可。
三、MQ可靠性
对于MQ本身是提供了持久化的功能的,可以给保证MQ重启数据不丢失。并且在持久化情况下开启生产者确认时,RabbitMQ只有在消息持久化完成之后才会给生产者返回ACK回执。
四、消费者可靠性
1、消息者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理结束消息之后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息的处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝消息,RabbitMQ从队列中删除该消息
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: AUTO
# none:不处理。消息投递给消费者后立即ack,消息立即从MQ中删除,不安全
# manual:手动模式。需要自己在业务代码中调用api,发送ack或reject
# auto:自动模式。SpringAMQP使用AOP对我们的消息处理逻辑进行了环绕增强,当业务正常执行时自动返回ack,异常时,如果是业务异常会返回nack,如果是消息处理或校验异常会返回reject
2、失败重试机制
当消费者处理消息出现异常后,MQ这边会再次将消息投递给消费者,如果无限失败就会无限重试,对于MQ和消费者来讲压力就比较大,可以利用SpringAMQP的retry进制,当消费者出现异常时限制重试次数:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: AUTO
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时间为1s
multiplier: 1 # 下次失败的等待时长的倍数
max-attempts: 3 # 最大重试次数
stateless: true # true 无状态,false 有状态
开启重试机制后,如果重试次数耗尽,消息依然失败,就需要被MessageRecoverer接口来处理:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,默认的方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机(建议)
下面演示第三中策略的接口配置实现:
@Configuration
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.exchange");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue,DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");
}
}
这样即使重试次数耗尽,消息也不会丢失,而是投递到了 error.queue 的队列里面。
五、保证幂等性
1、什么是幂等性?
幂等性就是重复执行相同的操作,系统的状态不会发送变化。比如查询和删除这些操作本身就是幂等的,它们多次操作不会给系统造成状态不一致的影响。
上述机制可以保证消息“至少”被消费1次,但是由于网络的复杂性,可能生产者收不到ack,导致消息的重发,或者MQ这边没有收到消费者的ack,导致消息的重复投递,这些都可能造成消息的重复消费,所以这个时候就要考虑幂等性问题了。
2、使用唯一 ID
生产者这边在给RabbitMQ投递消息的时候,附带一个唯一消息的ID,RabbitMQ这边它是自带去重功能的,就是相同ID的消息它是只存储一份的.
消费者这里,他就可以消费完一条消息后,先将消息ID存起来,然后后面的消息根据ID进行判断是否是重复消息,如果重复直接丢弃就行了.
给消息设置ID的方法:
@Configuration
public class Config {
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
jjmc.setCreateMessageIds(true);
return jjmc;
}
}
3、针对业务进行判断
以支付扣减余额和修改订单状态为例:
- 首先支付服务会在余额扣减成功后利用MQ将消息通知给修改订单状态的服务.
- 修改订单状态之前,先查询订单状态,只有已支付的订单才做修改,这样就可以在业务上保证幂等.
六、实现延迟消息
1、借助死信对列
因为对于那些超时为处理的消息,MQ会投递到死信对列,我们就可以借助这个特性,先将消息投递到到一个普通的对列中,然后如果超时就直接投到了死信对列,然后就让消费者监听死信对列,就可以实现延迟消息了。(PS:对列通过dead-letter-exchange属性绑定的交换机就称为 死信交换机。)
发送延迟消息:
void testSendTTLMessage() {
Message message = MessageBuilder
.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setExpiration(5000).build(); // 5秒钟的延迟消息
rabbitTemplate.convertAndSend("simple.direct1","testKey",message);
}
2、使用RabbitMQ官方提供的插件
在RabbitMQ中,官方是推出了一种原生支持延迟消息的插件的。原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后暂存一段时间,到期后投递到队列。下面讲解插件使用:
消费者声明
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true",type = ExchangeTypes.DIRECT),
key = "delay"
))
public void listenDelayMessage(String msg) {
log.info("接收到delay.queue的延迟消息 {}"+msg);
}
生产者发送文章来源:https://www.toymoban.com/news/detail-834318.html
@Test
void testSendDelayMessage() throws InterruptedException {
rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 延迟10秒
message.getMessageProperties().setDelay(5000);
return message;
}
});
log.info("延迟消息发送成功!");
}
如果感觉这一大串太麻烦,可以将 new MessagePostProcessor() 分离出来:文章来源地址https://www.toymoban.com/news/detail-834318.html
// 封装专门用来发送延迟消息的处理器
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {
private final int delay;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}
@Test
void testSendDelayMessage2() throws InterruptedException {
rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new DelayMessageProcessor(5000));
log.info("延迟消息发送成功!");
Thread.sleep(1000);
}
到了这里,关于SpringAMQP开启“可靠性”机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!