RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题

这篇具有很好参考价值的文章主要介绍了RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

rabbitmq消费者宕机怎么办,微服务,java-rabbitmq,rabbitmq,java
RabbitMq 消息可靠性问题(一) — publisher发送时丢失
前面我们从publisher的方向出发解决了发送时丢失的问题,那么我们在发送消息到exchange, 再由exchange转存到queue的过程中。如果MQ宕机了,那么我们的消息是如何确保可靠性的呢?当消息由队列发到对应的消费者处理时,consumer 接受到消息未消费就宕机,这时消息又如何确保可靠性呢?

消息可靠性问题及其对应的解决方案:

场景 publisher发送时丢失 MQ消息丢失 consumer消费问题
解决方案 生产者确认机制 消息持久化 消费者消息确认&&失败重试机制

消息持久化

MQ 默认是内存存储信息, 开启持久化功能可以确保缓存在 MQ 中的消息不丢失

  1. 交换机持久化
@Bean
public DirectExchange simpleDirect(){
	// 三个参数:交换机名称, 是否持久化, 当没有queue 与其绑定时是否自动删除
	return new DirectExchange("simple.direct", true, false);
}
  1. 队列持久化
@Bean
public Queue simpleQueue(){
	// 使用QueueBuilder构建队列,durable就是持久化的
	return QueueBuilder.durable("simple.queue").build();
}
  1. 消息持久化,SpringAMQP 中的消息默认是持久化, 可以通过MessagePropertie中的DeliveryMode 来指定的
@Test
public void testDurableMessage(){
	// 1. 准备消息
	Message message = MessageBuilder.withBody("hello,spring".getBytes(StandardCharsets.UTF_8))
			  .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
			  .build();
	// 2. 发送消息
	rabbitTemplate.convertAndSend("simple.queue", message);
}

其实,默认情况下 SpringAMQP 中的交换机,队列,消息都是持久化的
(所以上诉的持久化参数和代码只需要了解即可)

消费者消息确认

RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向MQ 发送ack回执。MQ收到 ack 回执后才删除该消息。而SpringAMQP 则允许配置三种确认模式:

  • manual:手动 ack, 需要在业务代码结束后,调用 api 发送ack

  • auto: 自动 ack, 由Spring检测 listener 的代码是否出现异常,没有异常则返回 ack; 抛出异常则返回 nack

  • none: 关闭 ack, MQ 假定消费者获取消息后成功处理,因此消息投递后立即删除

在 consumer 服务的 application,yml 中进行如下配置:

spring:
	rabbitmq:
		listener:
		  simple:
			prefetch: 1
			acknowledge-mode: auto

失败重试机制

消费者失败重试
当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue, 无限循环,导致 mq 的消息处理飙升,带来不必要的压力
rabbitmq消费者宕机怎么办,微服务,java-rabbitmq,rabbitmq,java
我们可以利用 Spring 的 retry 机制,在消费者出现异常先利用本地重试,而不是无限的 requeue 到 mq 的队列。

可在 yml 的文件中进行如下配置:

spring:
	rabbitmq:
		listener:
			simple:
				prefetch: 1
				retry: 
					enabled: true # 开启消费者失败重试
					initial-interval: 1000 # 初始失败等待时长为 1s
					multiplier: 3 # 下次失败等待时长的倍数, 下次等待时长= multiplier * last-interval
					max-attempts: 4 # 最大重试次数

消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer 接口来处理,它包含三种不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject, 丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer: 重试耗尽后,返回 nack, 消息重新入队
  • RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定交换机

rabbitmq消费者宕机怎么办,微服务,java-rabbitmq,rabbitmq,java
下面简单说明一下 RepublishMessageRecoverer 处理模式

  1. 首先,定义接收失败消息的交换机,队列及其绑定关系:
@Bean
public DirectExchange errorMessageExchange(){
	return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
	return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
	return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
  1. 定义 RepublishMessageRecoverer
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
{
	return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
}

进行完上面的配置后,我们在消费者处理消息异常后,会进行一个本地的重试。而不是直接 requeue 到队列中。
当我们重试次数耗尽后,我们会把错误消息投递到 error.direct 的交换机上,然后在 error.queue上进行转存。这样既能减轻 mq 的压力,也能在队列上找到处理异常的消息,进行人工介入处理。

总结

如何确保 RabbitMQ 消息的可靠性?文章来源地址https://www.toymoban.com/news/detail-677932.html

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费在队列中不会丢失
  • 开启消费者确认机制为 auto, 由 spring 确认消息处理成功后完成 ack
  • 开启消费者失败重试机制,并设置 MessageRecoverer, 多次重试失败后将消息投递到异常交换机,交由人工处理

到了这里,关于RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ之MQ可靠性

    RabbitMQ实现数据持久化包括3个方面 (1)交换机持久化 (2)队列持久化 (3)消息持久化 注:开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执 从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列 注:从3.12版本后,所有队

    2024年01月21日
    浏览(28)
  • RabbitMQ原理(四):MQ的可靠性

    消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。 为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括: 交换机持久化 队列持久化 消息持久化 我们以控

    2024年02月07日
    浏览(35)
  • 防止消息丢失与消息重复——Kafka可靠性分析及优化实践

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 在上一章内容中,我们解析了Kafka在读写层面上的原理,介绍了很多Kafka在读出与写入时的

    2024年02月08日
    浏览(32)
  • RabbitMQ --- 消息可靠性

    消息队列在使用过程中,面临着很多实际问题需要思考:      消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 co

    2024年02月14日
    浏览(34)
  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(37)
  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(60)
  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(38)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(39)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(41)
  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包