rabbitmq:retry重试机制和延迟消息的实现
在消费者消费消息的时候可能会因为网络等外部原因导致消息处理失败,这个时候如果将消息直接丢弃会导致正常的业务丢失,但是如果是一条本身就有问题的消息,那么这个时候又必须丢弃掉,如果选择用channel.basicNack或
channel.basicReject方法让消息重回对了,会导致消费者在不停的消费这条消息,这将是一个致命的问题。
所幸,rabbitmq提供了retry机制来控制消息的重试
yml配置文件:
spring:
rabbitmq:
host: IP
port: 5672
username: guest
password: guest
virtual-host: smallJHost
# 消费者确认机制相关配置
# 开启publisher-confirm,
# 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true
listener:
simple:
# ack机制类型
acknowledge-mode: manual
# 设置预取消息数量
prefetch: 2
# 失败重试
retry:
# 开启消费者失败重试
enabled: true
# 初始的失败等待时长为1秒
initial-interval: 1000
# 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
multiplier: 3
# 最大重试次数
max-attempts: 4
# true无状态;false有状态。如果业务中包含事务,这里改为false
stateless: true
在RabbitmqConfig中增加如下配置:
package com.gitee.small.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {
// 其他队列、交换机、绑定、回调等代码省略,需要的朋友可看我之前的文章、、、
@Bean
public Queue errorQueue() {
return new Queue("error");
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean(name = "binding.error")
public Binding bindingExchangeMessage3() {
return BindingBuilder.bind(errorQueue()).to(exchange()).with("error");
}
/**
* 定义 MessageRecoverer 将错误消息发送到指定队列
*/
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "topicExchange", "error");
}
}
在消费者定义中有一点需要注意,不能直接将异常处理掉,否则是不会将消息发送到error队列的。
package com.gitee.small.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class WorkRabbitReceiver {
private static Integer index = 0;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "监听队列名称"),
exchange = @Exchange(value = "binding对象beanname", type = ExchangeTypes.TOPIC))
)
public void process(String msg, Channel channel, Message message) throws Exception {
try {
System.out.println(1/0);
} catch (Exception e) {
log.error("消息重试");
throw new Exception();
}
}
}
小结:文章来源地址https://www.toymoban.com/news/detail-648883.html
-
消息重试是在本地进行重试,不会回到消息队列中
-
重试模式下,重试次数耗尽后,如果消息依然失败,为了防止消息被直接丢弃,需要有MessageRecovery 接口来处理,它包含三种不同的实现
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
很显然,RepublishMessageRecoverer方案应用最广最合理,本文中也是以此为例
-
实现延迟队列
消息超时方案:
- 给队列设置 ttl 属性,进入队列后超过 ttl 时间的消息变为死信
- 给消息设置 ttl 属性,队列接收到消息超过 ttl 时间后变为死信
本文讲给消息设置超时,因为这个方案更灵活。
-
创建死信队列和死信交换机,并将其绑定
@Bean public DirectExchange dlExchange() { // 声明死信交换机 dl.direct return new DirectExchange("dl.direct", true, false); } @Bean public Queue dlQueue() { // 声明存储死信的队列 dl.queue return new Queue("dl.queue", true); } @Bean(name = "binding.dl") public Binding dlBinding() { // 将死信队列 与 死信交换机绑定 return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl"); }
-
指定消息过期时间,向正常消息队列发送消息,一条5秒延时,一条10秒延时
private void deadLetter() { final Message message = MessageBuilder.withBody("延迟消息测试".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setExpiration("5000") .build(); final Message message2 = MessageBuilder.withBody("延迟消息测试".getBytes()) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setExpiration("10000") .build(); rabbitTemplate.convertAndSend("topicExchange", "topic.dead", message); rabbitTemplate.convertAndSend("topicExchange", "topic.dead", message2); }
-
监听死信队列,实现延迟消息具体逻辑
/** * 监听死信队列,处理延迟消息 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue"), exchange = @Exchange(value = "binding.dl", type = ExchangeTypes.TOPIC)) ) public void process(String msg, Channel channel, Message message) throws IOException { log.info("延迟消息:{}", msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
使用场景:文章来源:https://www.toymoban.com/news/detail-648883.html
- 延迟发送短信
- 用户下单,如果用户在一小时内未支付,自动取消
- 会议前半小时提醒参会
小结:
- 创建一个交换机作为死信交换机并绑定一个队列作为死信队列
- 给消息的目标队列设置队列超时时间并指定死信交换机和路由 key
- 将消息的目标队列绑定到死信交换机
- 消费者监听死信队列获取超时消息
到了这里,关于rabbitmq:retry重试机制和延迟消息的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!