当我们消息消费失败的时候,可以进行重试,
什么情况下会重发消息
1、网络抖动
2、程序抛出异常没有try-catch
RabbitMQ自动补偿机制触发:(多用于调用第三方接口)
1.当我们的消费者在处理我们的消息的时候,程序抛出异常情况下(默认无限次数重试),如果这里的异常try-catch后自己配置的重试机制是不生效的
2.应该对我们的消息重试设置间隔重试时间,比如消费失败最多只能重试5次,间隔3秒(防止重复消费,幂等问题)
如果重试5次,也就是15秒内重试还是失败情况下应该如何处理
1.默认情况下,重试多次还是失败的话,会自动删除该消息(消息可能会丢失)
解决思路:
A:如果重试多次还是失败的情况下,最终存放到死信队列.
B:采用表日志记录,消费失败错误的日志记录 后期人工自动对消息实现补偿.
一、项目准备
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置类
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.addresses=192.168.23.145
spring.rabbitmq.virtual-host=/rabbit
二、案例重现
# 声明队列
@Configuration
public class HelloWorldConfig {
@Bean
public Queue setQueue() {
return new Queue("helloWorldqueue");
}
}
# 生产者
@Slf4j
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
//helloWorld 直连模式
@ApiOperation(value="helloWorld发送接口",notes="直接发送到队列")
@GetMapping(value="/helloWorldSend")
public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
//设置部分请求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//发消息
rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
}
# 消费者
@Component
public class ConcumerReceiver {
private int count = 1;
//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
@RabbitListener(queues="helloWorldqueue")
public void helloWorldReceive(String message) {
System.out.println("当前执行次数:" + count++);
System.out.println("异常前,helloWorld模式 received message : " +message);
int i = 1/0;
System.out.println("异常后,helloWorld模式 received message : " +message);
}
}
启动测试:
无限循环报错
停止后,消息重回Ready状态
三、实现消息重试
实现重试
spring.rabbitmq.listener.simple.retry.enabled= true
spring.rabbitmq.listener.simple.retry.max-attempts= 5
spring.rabbitmq.listener.simple.retry.max-interval= 10000
spring.rabbitmq.listener.simple.retry.initial-interval= 2000
spring.rabbitmq.listener.simple.retry.multiplier= 2
重启测试
第一次执行时间2s,第二次4s,第三次8s,第四次16s,第五次由于设置了最大间隔为10s,所有变成了10s
最后查看retry_a队列,消息没有了,也就是说重试五次失败之后就会移除该消息
移除操作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)
对重试失败的消息重新排队
使用下 ImmediateRequeueMessageRecoverer 重新排队在HelloWorldConfig中配置
@Bean
public MessageRecoverer messageRecoverer() {
return new ImmediateRequeueMessageRecoverer();
}
重启运行:
可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费
把重试失败消息放入重试失败队列
接着使用 RepublishMessageRecoverer 重新发布在HelloWorldConfig中配置
@Configuration
public class HelloWorldConfig {
@Bean
public Queue setQueue() {
return new Queue("helloWorldqueue");
}
/*@Bean
public MessageRecoverer messageRecoverer() {
return new ImmediateRequeueMessageRecoverer();
}*/
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
// 需要配置交换机和绑定键
return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY);
}
//失败队列
public static final String RETRY_FAILURE_KEY = "retry.failure.key";
//失败交换机
public static final String RETRY_EXCHANGE = "retry_exchange";
@Bean
public Queue setQueueFailure() {
return new Queue(RETRY_FAILURE_KEY);
}
@Bean
public FanoutExchange setFailureExchange() {
return new FanoutExchange(RETRY_EXCHANGE);
}
@Bean
public Binding bindFailureBind() {
return BindingBuilder.bind(setQueueFailure()).to(setFailureExchange());
}
}
在ConcumerReceiver中创建重试失败消息监听
@RabbitListener(queues="retry.failure.key")
public void retryFailure(String message) throws InterruptedException {
Thread.sleep(20000);
System.out.println(" [ 消费者@重试失败号 ] 接收到消息 ==> '" + message);
}
重启,运行结果:文章来源:https://www.toymoban.com/news/detail-702032.html
重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到重试失败队列,由重试失败消费者消费
文章来源地址https://www.toymoban.com/news/detail-702032.html
到了这里,关于springboot:整合rabbitmq之重试机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!