springboot:整合rabbitmq之重试机制

这篇具有很好参考价值的文章主要介绍了springboot:整合rabbitmq之重试机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

当我们消息消费失败的时候,可以进行重试,
什么情况下会重发消息
1、网络抖动
2、程序抛出异常没有try-catch
RabbitMQ自动补偿机制触发:(多用于调用第三方接口)
1.当我们的消费者在处理我们的消息的时候,程序抛出异常情况下(默认无限次数重试),如果这里的异常try-catch后自己配置的重试机制是不生效的
2.应该对我们的消息重试设置间隔重试时间,比如消费失败最多只能重试5次,间隔3秒(防止重复消费,幂等问题)

如果重试5次,也就是15秒内重试还是失败情况下应该如何处理
1.默认情况下,重试多次还是失败的话,会自动删除该消息(消息可能会丢失)
解决思路:
A:如果重试多次还是失败的情况下,最终存放到死信队列.
B:采用表日志记录,消费失败错误的日志记录 后期人工自动对消息实现补偿.

一、项目准备
依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类

spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.addresses=192.168.23.145
spring.rabbitmq.virtual-host=/rabbit

二、案例重现

# 声明队列
@Configuration
public class HelloWorldConfig {
	@Bean
	public Queue setQueue() {
		return new Queue("helloWorldqueue");
	}
}
# 生产者
@Slf4j
@RestController
public class ProducerController {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	//helloWorld 直连模式
	@ApiOperation(value="helloWorld发送接口",notes="直接发送到队列")
	@GetMapping(value="/helloWorldSend")
	public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
		//设置部分请求参数
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

		//发消息
		rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
		return "message sended : "+message;
	}
}

# 消费者
@Component
public class ConcumerReceiver {

	private int count = 1;

	//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
	//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
	@RabbitListener(queues="helloWorldqueue")
	public void helloWorldReceive(String message) {
		System.out.println("当前执行次数:" + count++);
	     System.out.println("异常前,helloWorld模式 received message : " +message);
	     int i = 1/0;
		System.out.println("异常后,helloWorld模式 received message : " +message);
	}
}

启动测试:
无限循环报错
停止后,消息重回Ready状态
springboot:整合rabbitmq之重试机制,rabbitmq,java-rabbitmq,rabbitmq,spring boot
三、实现消息重试
实现重试

spring.rabbitmq.listener.simple.retry.enabled= true
spring.rabbitmq.listener.simple.retry.max-attempts= 5
spring.rabbitmq.listener.simple.retry.max-interval= 10000
spring.rabbitmq.listener.simple.retry.initial-interval= 2000
spring.rabbitmq.listener.simple.retry.multiplier= 2

重启测试
第一次执行时间2s,第二次4s,第三次8s,第四次16s,第五次由于设置了最大间隔为10s,所有变成了10s
最后查看retry_a队列,消息没有了,也就是说重试五次失败之后就会移除该消息
移除操作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)
springboot:整合rabbitmq之重试机制,rabbitmq,java-rabbitmq,rabbitmq,spring boot

对重试失败的消息重新排队
使用下 ImmediateRequeueMessageRecoverer 重新排队在HelloWorldConfig中配置

    @Bean
    public MessageRecoverer messageRecoverer() {
        return new ImmediateRequeueMessageRecoverer();
    }

重启运行:

可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费
springboot:整合rabbitmq之重试机制,rabbitmq,java-rabbitmq,rabbitmq,spring boot
把重试失败消息放入重试失败队列
接着使用 RepublishMessageRecoverer 重新发布在HelloWorldConfig中配置

@Configuration
public class HelloWorldConfig {

	@Bean
	public Queue setQueue() {
		return new Queue("helloWorldqueue");
	}

	/*@Bean
	public MessageRecoverer messageRecoverer() {
		return new ImmediateRequeueMessageRecoverer();
	}*/
	@Bean
	public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
		// 需要配置交换机和绑定键
		return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY);
	}
	//失败队列
	public static final String RETRY_FAILURE_KEY = "retry.failure.key";
	//失败交换机
	public static final String RETRY_EXCHANGE = "retry_exchange";

	@Bean
	public Queue setQueueFailure() {
		return new Queue(RETRY_FAILURE_KEY);
	}

	@Bean
	public FanoutExchange setFailureExchange() {
		return new FanoutExchange(RETRY_EXCHANGE);
	}
	@Bean
	public Binding bindFailureBind() {
		return BindingBuilder.bind(setQueueFailure()).to(setFailureExchange());
	}

}

在ConcumerReceiver中创建重试失败消息监听

@RabbitListener(queues="retry.failure.key")
	public void retryFailure(String message) throws InterruptedException {
		Thread.sleep(20000);
		System.out.println(" [ 消费者@重试失败号 ] 接收到消息 ==> '" + message);
	}

重启,运行结果:

重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到重试失败队列,由重试失败消费者消费
springboot:整合rabbitmq之重试机制,rabbitmq,java-rabbitmq,rabbitmq,spring boot文章来源地址https://www.toymoban.com/news/detail-702032.html

到了这里,关于springboot:整合rabbitmq之重试机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq:retry重试机制和延迟消息的实现

    rabbitmq:retry重试机制和延迟消息的实现 在消费者消费消息的时候可能会因为网络等外部原因导致消息处理失败,这个时候如果将消息直接丢弃会导致正常的业务丢失,但是如果是一条本身就有问题的消息,那么这个时候又必须丢弃掉,如果选择用channel.basicNack 或 channel.basi

    2024年02月13日
    浏览(41)
  • Java操作RabbitMq并整合SpringBoot

    秋风阁-北溪入江流 RabbitMq自带有专门的管理界面,可以在其管理界面对RabbitMq进行管理查看等操作。 RabbitMq的管理界面的对外端口为 15672 ,当我们启动RabbitMq后,需要启动管理界面插件后才能访问界面。 通过参数配置连接RabbitMq 通过amqp协议连接RabbitMq queueDeclarePassive: 创建或

    2024年02月16日
    浏览(58)
  • Springboot 中接口服务重试机制

    在平时开发中可能在调用服务时会遇到调用失败的情况,在springboot 中retery 机制可以很好的满足我们的开发场景,下面举个简单的例子模拟第三方调用。 使用起来很简单,只需要在引入相关jar,并且在启动的时候进行开启,这是springboot 的老套路,在我们服务层进行 @Retryabl

    2024年01月22日
    浏览(33)
  • 【重试】Java 中的 7 种重试机制

    随着互联网的发展项目中的业务功能越来越复杂,有一些基础服务我们不可避免的会去调用一些第三方的接口或者公司内其他项目中提供的服务,但是远程服务的健壮性和网络稳定性都是不可控因素。在测试阶段可能没有什么异常情况,但上线后可能会出现调用的接口因为内

    2024年02月16日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(49)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(84)
  • Java 中的 7 种重试机制,还有谁不会?!

    随着互联网的发展项目中的业务功能越来越复杂,有一些基础服务我们不可避免的会去调用一些第三方的接口或者公司内其他项目中提供的服务,但是远程服务的健壮性和网络稳定性都是不可控因素。 在测试阶段可能没有什么异常情况,但上线后可能会出现调用的接口因为内

    2024年02月14日
    浏览(34)
  • RabbitMQ: SpringBoot 整合 RabbitMQ

    重点是这个依赖 通过              和上一个一样  

    2024年02月09日
    浏览(45)
  • TCP详解之重传机制

    TCP 实现可靠传输的方式之一,是通过序列号与确认应答。 在 TCP 中,当发送端的数据到达接收主机时,接收端主机会返回一个确认应答消息,表示已收到消息。 但在错综复杂的网络,并不一定能如上图那么顺利能正常的数据传输,万一数据在传输过程中丢失了呢?所以TCP针

    2024年02月04日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包