RabbitMQ服务异步通信-高级篇

这篇具有很好参考价值的文章主要介绍了RabbitMQ服务异步通信-高级篇。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

消息可靠性

生产者确认机制

提出问题:消息投递过程中,生产者——> MQ ——> 消费者 中间会出现消息丢失问题,导致信息没有及时同步

RabbitMQ服务异步通信-高级篇,java-rabbitmq,rabbitmq,java

先梳理一下流程

生产者生产个消息 ——> 建立连接——>通道传递进mq交换机——>交换机传给队列——>消费者拉取数据消费

1.生产者生产完消息,相当于写好代码,写错了自己改,然后建立连接投递,连接建立不成功会再建立,这里不用操心,如果在投递过程中消息丢失了,生产者发送了,消费者没收到,这要是情侣铁定闹分手,有可能是网络波动造成的也有可能没有给了一个不存在的交换机,所以mq官方需要处理这些问题,这就交给他们了,但是我们要了解其中的处理方案

发现问题:消息没到交换机或者到了交换机没到队列,这两种情况基本一致,因为交换机没有存储信息能力

理论解决方案:没到mq的队列就丢失了,连给队列持久化保存到硬盘的机会都没得,那就只能重新传一份。

mq的实践解决方案:生产者确认机制,首先需要给每个信息绑定全局唯一的ID,信息可能有上百万条,谁知道哪个丢失了,然后生产者发生了一个带有ID的信息,连接一旦建立,交换机这边是否接收到了这条ID,接收到了给生产者发一个ack,让他先别发了,没接收到也得发一个nack,如果路由到队列后,则发生一个回执ack,说我任务完成了

消息持久化

前面的生产者确认机制已经确保消息可靠的传输到队列,如果RabbitMQ宕机的话,依旧会导致小心丢失,由于MQ是内存存储,所以需要持久化保存到硬盘

这里面涉及到了三个持久化

交换机持久化,队列持久化,消息持久化

1.交换机默认是非持久化的,mq重启就会丢失,SpringAMQP中可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的交换机都是持久化的。

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}

2.队列持久化

队列默认是非持久化的,mq重启就会丢失,SpringAMQP中可以通过代码指定队列持久化,默认情况下,由SpringAMQP声明的队列都是持久化的。

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化

消息持久化是把生产者生产的信息需要持久化,保存到硬盘里

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

RabbitMQ服务异步通信-高级篇,java-rabbitmq,rabbitmq,java

 消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

这里面涉及一点,消息是存储在MQ当中,消费者拉取完之后,MQ删除消息的时机是哪,如果消费者拉取完之后,没消费呢,自己挂了,又怎么办

结合以上两点,最可靠的就是消费者消费完了,通知一声我完事了,然后MQ把消息删除,防止占用内存

MQ提供了三种机制

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

很显然none模式下,消费者获取完消息之后,服务一旦宕机,消息也就丢失了,除非能确保自己的服务非常稳定才使用

auto模式类似于事务的回滚,消息一旦长时间没被消费会抛出异常,导致回滚,MQ消息不会删除

消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

解决方案:本地重试

可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启消费者服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了

  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。这个队列就是死信队列

Rabbitmq的可靠性保证

通过以下四点保证

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理文章来源地址https://www.toymoban.com/news/detail-843005.html

到了这里,关于RabbitMQ服务异步通信-高级篇的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务——服务异步通讯RabbitMQ

     前置文章 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客 消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客 消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客 目录 Work queues 工作队列模式  案例

    2024年02月15日
    浏览(30)
  • 服务异步通讯——RabbitMQ

    微服务间通讯有同步和异步两种方式: 同步通讯 :就像打电话,需要实时响应。 异步通讯 :就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是一个人却不能跟多个人同时通话。而发送邮件可以同时与多个人收发邮件,但是往往响应会有延

    2024年01月18日
    浏览(30)
  • SpringCloud学习路线(9)——服务异步通讯RabbitMQ

    一、初见MQ (一)什么是MQ? MQ(MessageQueue) ,意思是 消息队列 ,也就是事件驱动架构中的Broker。 (二)同步调用 1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。

    2024年02月15日
    浏览(28)
  • 微服务RabbitMQ高级篇

    目录 一.消息可靠性传递概述 二.生产者消息确认机制 三.publisher-comfirm 四.publisher-return 五.消息持久化 六.消费者消息确认机制 七.如何确保RabbitMQ消息的可靠性? 八.死信交换机 九.延迟队列 十.惰性队列 十一.MQ集群 生产者发送消息到交换机,交换机将消息路由到队列,消费者

    2024年02月20日
    浏览(27)
  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(46)
  • 微服务—RabbitMQ高级(业务在各方面的可靠性)

      本博客为个人学习笔记,学习网站:2023黑马程序员RabbitMQ入门到实战教程 高级篇章节 目录 生产者可靠性 生产者重连机制 生产者确认机制 介绍 实现 总结与建议 MQ可靠性 数据持久化  LazyQueue 消费者可靠性 消费者确认机制 失败重试机制 失败重试策略  业务幂等性 何为幂

    2024年02月21日
    浏览(24)
  • ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

    目录 一、数据同步 1.1、什么是数据同步 1.2、解决数据同步面临的问题 1.3、解决办法 1.3.1、同步调用 1.3.2、异步通知(推荐) 1.3.3、监听 binlog 1.3、基于 RabbitMQ 实现数据同步 1.3.1、需求 1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听 1.3.3、在“酒店

    2024年02月08日
    浏览(39)
  • RabbitMQ(四):RabbitMQ高级特性

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息可靠性问题:如何确保发送的消息至少被消费—次 延迟消息问题:如何实现消息的延迟投递 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题 高可用问题:如何避免单点的MQ故障而导致的不可用问题

    2024年01月19日
    浏览(35)
  • RabbitMQ-同步和异步区别&快速入门

    服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.1.1

    2024年04月26日
    浏览(26)
  • RabbitMQ异步与重试机制

            先来回顾一下前文,我们先基于Java原生语言,利用多线程和锁实现了串行/并行任务(Java串行/并行任务实现);之后利用SpringBoot为我们封装好的功能,尝试用SpringBoot自带的API实现了异步调用,并在此基础上,统一管理了多线程的事务(SpringBoot异步任务及并行事务实

    2024年02月07日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包