RabbitMQ异步与重试机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ异步与重试机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        先来回顾一下前文,我们先基于Java原生语言,利用多线程和锁实现了串行/并行任务(Java串行/并行任务实现);之后利用SpringBoot为我们封装好的功能,尝试用SpringBoot自带的API实现了异步调用,并在此基础上,统一管理了多线程的事务(SpringBoot异步任务及并行事务实现)。

        对于多线程的使用,我们已经有了一个全面的认知,系统响应能力也确实有了不小的提升。但随着系统负载持续增大,如果每个用户请求进来都为其分配线程,线程池打满后用户也只能一直等待;计算量过大、线程数过多时,CPU也会承受不了压力。线程是针对单进程的概念,天生不支持分布式,如果某个下游服务需要处理许多上游服务发送的请求,此时多线程就不一定能提升效率了——因为既要兼顾大量计算,又要快速在线程间进行切换,单机负载会影响整个链路的响应速度。

        针对这种情况最好的解决方案,就是引入消息队列中间件。不过要注意,不能说感觉有用就用了,引入新中间件付出的代价也是极大的。需要整体评估系统的复杂性和稳定性,以及功能是否有必要解耦。比如说只是一个响应速度很快的插库,放入消息队列还需要耗费一个网络通信的时间,此时就完全没有必要;或是该操作必须放在主线程中同步,下面的操作都要依据该操作结果来判断,例如我要获取支付系统当前能否正常响应,如能正常响应再进行支付,这时哪怕这个动作再慢,你也得等着。

        使用MQ的最佳场景:流量削峰、异步解耦,本篇我们仅就异步解耦和RabbitMQ的一些特性介绍,其他更系统化的应用以后再详细说。

1 场景介绍

        整体基于前几篇文章介绍的场景,用户下单成功后一直未支付,系统就会做如下几件事:

  1. 超时订单自动取消、更新订单状态
  2. 归还原有库存
  3. 短信通知用户

        超时订单取消是基于DelayQueue做的,这里不做详细介绍。更简单点说,可以理解为下单成功后就发短信通知用户“下单成功了”,发送短信一般依赖第三方服务,是一个较为耗时的操作,但又不严格属于整个下单流程内,因此是可以解耦出来的。

        那么现在下单的整体流程就为:

用户下单 -> 延时队列监测超时订单 -> 超时订单处理逻辑 -> MQ消费者异步发送邮件

        (短信要钱,邮件免费,所以这里用邮件代替一下 )

2 业务逻辑编写

        生产者的逻辑很简单,就是一行convertAndSend()指定交换机、路由键和发送的实体类,还有个小细节,如果MQ接收的Body为实体类,消息转换器要使用“Jackson2JsonMessageConverter”,这样消息的Content-Type就被指定为Json格式了,否则会无法正常序列化。

//发送邮件队列信息
private static final String eMailExchange = "my.order";
private static final String eMailQueue = "order.email";
private static final String eMailRoutingKey = "order.email";

//实际业务逻辑就一行
MailUtils.sendMail(eMail.getAddress(), eMail.getSubject(), eMail.getContent(), true);

        我们按照上面的信息创建好交换机,将队列绑定到指定的交换机上,具体怎么绑就不赘述了,不管是去RabbitMQ Management手动创建,还是用Java Bean形式创建都可以。

        消费者的逻辑很简单,监听消费指定队列、拿到入参的EMail实体类、根据实体类属性发送邮件。但这也太简单了,那就顺便使用一下RabbitMQ自带的功能——手动/自动Ack。

3 手动/自动 Ack/Nack

        Ack为"Acknowledge Character",意思是确认字符,源于网络通信的概念。RabbitMQ中包含"Ack"和"Nack",用于告知MQ该条消息正常消费/消费异常,有几个参数需要注意。

//Ack
channel.basicAck(deliveryTag, false);  //仅确认该条消息
channel.basicAck(deliveryTag, true);  //确认所有已完成消息

//Nack
channel.basicNack(deliveryTag, false, false);  //消费失败,丢弃消息
channel.basicNack(deliveryTag, false, true);  //消费失败,放入队列重新消费
  1. 第一个参数为每条消息的唯一标识deliveryTag,用于确认指定消息。
  2. 第二个参数为单条确认或批量确认,传入true即为确认所有已消费消息,传入false为仅确认该条消息;一般用false,自己确认自己的就行,批量确认能节省一点网络开销,但没必要。
  3. 第三个参数只有Nack才有,意为是否重新消费,传入true则重新放入队列头部,再次进行消费;传入false则直接丢弃。但是需要注意的是,重入队列会放在队列头部,等于会立即进行重新消费,如果该消息一直报错,就会阻塞该队列。

        RabbitMQ默认Ack模式是"Auto",也就是会自动Ack,这是为了防止用户没有手动Ack导致消费消息一直积压在Unacked队列中,导致MQ服务OOM而死。但实际上手动Ack是比较合理的选择,一是能够提高MQ的响应能力,我消费完了立马告知MQ,可以处理下一条了;二是更加安全,Auto模式下即使消息消费异常,还是会自动Ack,这条消息就无影无踪了。

        最后一点,也是最重要的一点,消息正在消费的时候MQ服务挂了,如果是Auto模式,这条消息会直接丢失,因为消费者在获取到这条消息时就会自动Ack;但如果是Manual模式,一切都改变了,由于Broker没有接收到你的Ack/Nack,消息会处于Unacked状态,在下次服务恢复正常时会重新进行消费,振奋人心!

        因此我们把MQ设置为手动Ack模式"Manual",在消费方法中trycatch,正常消费就Ack,发现异常就Nack并把消息丢弃。响应能力确实提高了,消息确认也更灵活了,但是看起来怪怪的——不论是消费成功还是失败,消息最终都被丢弃了。有人会说异常就Nack消息放回队列重新消费,但如果这条消息一直消费失败,这条消息会被无限消费,这是十分可怕的。可以实验一下,设置"prefetch = 1",消费者每次只能获取一条消息进行消费,有一条消息异常重入队列后,这个消费者就永远卡在这了。

        但我们还是想多给消费者几次机会,起码试几次再让他丢掉嘛,RabbitMQ提供了一种优雅地重试方式“Retry机制”。

4 Retry

        只需要修改一下配置文件,就能开启RabbitMQ的本地重试机制,之所以称他为“本地重试”是由于消费者是将该消息在本机重试,不与MQ服务交互。

        retry:
          enabled: true
          max-attempts: 3  #重试次数
          initial-interval: 3000  #间隔时间, ms
          max-interval: 5000  #重试最大间隔时间, ms
          multiplier: 1  #负载因子, 重试间隔时间倍数, 默认1

        这样就开启了重试功能,只要消费者抛出异常就会以指定间隔时间、重试指定次数,记得不要catch住异常哦,哪怕catch了也要再抛出去。

        但是默认的Retry机制并不是完美的,他有几个很明显的缺陷:

  • 由于无法try catch异常,也就无法使用手动Ack模式。换句话说,Retry和手动Ack是一对互斥的选项。
  • 重试到达上限次数后,也会将消息丢弃,默认不会有特殊的处理机制。

        第二个问题比较好解决,可以给队列绑定一个死信队列,指定死信交换机和路由键,如下这两个属性。 在重试到达上限后,会放入指定的死信队列,可以由监听死信队列的消费者进行后续补偿处理。

x-dead-letter-exchange: email.dead
x-dead-letter-routing-key: email.dead

        或者是自定义"MessageRecoverer"并注入,默认的消息恢复器是"RejectAndDontRequeueRecoverer",意为拒绝且不重入队列,在到达上限后会报错告诉你重试次数耗尽然后丢掉消息。这个显然不是很好用,我们可以用"RepublishMessageRecoverer",将异常消息重发至死信队列。

        但是使用Retry就注定和手动Ack是无缘了,我们需要设计一个两全的方案,既保留手动Ack的安全性,又兼顾Retry机制的稳定性——那就来手动实现一下Retry吧。

5 手动实现Retry

        实现思路是使用Redis标记该消息的重试次数,在未达到重试上限前,使用Nack将消息重入队列;达到重试上限后,将消息Nack自动发送至死信队列。使用该方法的重点,就是要给每条消息携带一个唯一ID,可以使用UUID或是Snowflake。直接上代码。

//利用Redis手动实现重试机制
    private void retryExecute(Channel channel, Message message, EMail eMail, Map<String, Object> headers) throws IOException {
        MessageProperties messageProperties = message.getMessageProperties();
        String redisKey = RETRY_EXECUTE_TIMES_KEY.concat(":").concat(eMail.getMessageId());
        Object value = RedisUtil.get(redisKey);
        if (Objects.isNull(value)){
            //当前为第一次执行,返回重试
            RedisUtil.set(redisKey, 2, 60 * 5);
            channel.basicNack(messageProperties.getDeliveryTag(), false, true);
        } else {
            Integer integer = Integer.parseInt(value.toString());
            if (integer < RETRY_EXECUTE_TIMES_MAX) {
                //当前为第二次执行,返回重试
                RedisUtil.set(redisKey, integer + 1, 60 * 5);
                channel.basicNack(messageProperties.getDeliveryTag(), false, true);
            } else {
                log.error("3次了,不试了,扔死信队列了");
                channel.basicNack(messageProperties.getDeliveryTag(), false, false);
            }
        }

    }

        我们给每条记录5分钟的超时时间,足够消费者进行重试了。 都整好了我们来试验一下,运行一下看看日志。

2023-02-24 16:32:29.122 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 该报错了哈
2023-02-24 16:32:29.122 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 已经报错了哈
2023-02-24 16:32:29.931 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 该报错了哈
2023-02-24 16:32:29.931 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 已经报错了哈
2023-02-24 16:32:29.935 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 该报错了哈
2023-02-24 16:32:29.935 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 已经报错了哈
2023-02-24 16:32:29.936 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer  : 3次了,不试了,扔死信队列了

        结果符合我们的预期,消息也确实从 order.email 移入了 email.dead 死信队列中。这里有这么多条是因为我之前光放没消费,不用太在意。

rabbitmq重试机制,java,微服务,spring cloud,rabbitmq,分布式

 文章来源地址https://www.toymoban.com/news/detail-726597.html

到了这里,关于RabbitMQ异步与重试机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq:retry重试机制和延迟消息的实现

    rabbitmq:retry重试机制和延迟消息的实现 在消费者消费消息的时候可能会因为网络等外部原因导致消息处理失败,这个时候如果将消息直接丢弃会导致正常的业务丢失,但是如果是一条本身就有问题的消息,那么这个时候又必须丢弃掉,如果选择用channel.basicNack 或 channel.basi

    2024年02月13日
    浏览(28)
  • 项目实战之RabbitMQ重试机制进行消息补偿通知

    🧑‍💻作者名称:DaenCode 🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。 😎人生感悟:尝尽人生百味,方知世间冷暖。 业务MQ消费者代码逻辑记得往外抛异常,进行try-catch了也要往外抛。 消息消费重试,达到重试次数进入到异常交换机、队列。消息确

    2024年02月05日
    浏览(32)
  • Java分布式微服务4——异步服务通讯(RabbitMQ)中间件

    为什么需要异步调用? 故障隔离 :支付服务不负责调用其他三个服务,只负责通知Broker支付成功这个事件,然后就返回结果,后面的服务故障了和前面发布事件的服务无关,前面的服务发布完事件就结束了 吞吐量提升 :Broker将支付成功的事件广播给订阅了这个事件的那些服

    2024年02月13日
    浏览(34)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(74)
  • 微服务: 05-rabbitmq设置重试次数并设置死信队列

    目录 1. 上文传送门:  2. 前言简介:  2.1 问: 消费端重复循环异常如何解决? 2.2 为什么要使用死信队列 2.3 案例思路 - ps: 以下案例经过测试(思路一/二实现原理一样) - 2.3.1 思路一  - 2.3.2 思路二 3. 案例代码 3.1 简单介绍案例 3.2 声明交换机 队列 以及绑定路由键 3.3 修改配置文件

    2024年02月17日
    浏览(31)
  • 微服务——服务异步通讯RabbitMQ

     前置文章 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客 消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客 消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客 目录 Work queues 工作队列模式  案例

    2024年02月15日
    浏览(30)
  • 服务异步通讯——RabbitMQ

    微服务间通讯有同步和异步两种方式: 同步通讯 :就像打电话,需要实时响应。 异步通讯 :就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是一个人却不能跟多个人同时通话。而发送邮件可以同时与多个人收发邮件,但是往往响应会有延

    2024年01月18日
    浏览(30)
  • 服务异步通信-高级篇(RabbitMQ)

    每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 RabbitMQ分别给出了解决方案: 生产者发送确认机制 mq持久化 消费者消费确认机制 失败重

    2024年02月02日
    浏览(31)
  • RabbitMQ服务异步通信-高级篇

    提出问题:消息投递过程中,生产者—— MQ —— 消费者 中间会出现消息丢失问题,导致信息没有及时同步 先梳理一下流程 生产者生产个消息 —— 建立连接——通道传递进mq交换机——交换机传给队列——消费者拉取数据消费 1.生产者生产完消息,相当于写好代码,写错了

    2024年03月24日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包