rabbitmq:retry重试机制和延迟消息的实现

这篇具有很好参考价值的文章主要介绍了rabbitmq:retry重试机制和延迟消息的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

rabbitmq:retry重试机制和延迟消息的实现

在消费者消费消息的时候可能会因为网络等外部原因导致消息处理失败,这个时候如果将消息直接丢弃会导致正常的业务丢失,但是如果是一条本身就有问题的消息,那么这个时候又必须丢弃掉,如果选择用channel.basicNackchannel.basicReject方法让消息重回对了,会导致消费者在不停的消费这条消息,这将是一个致命的问题。

所幸,rabbitmq提供了retry机制来控制消息的重试

消息队列的重试机制,java-rabbitmq,rabbitmq,分布式

yml配置文件:

spring:  
  rabbitmq:
    host: IP
    port: 5672
    username: guest
    password: guest
    virtual-host: smallJHost
    # 消费者确认机制相关配置
    # 开启publisher-confirm,
    # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    template:
      mandatory: true
    listener:
      simple:
        # ack机制类型
        acknowledge-mode: manual
        # 设置预取消息数量
        prefetch: 2
        # 失败重试
        retry:
          # 开启消费者失败重试
          enabled: true
          # 初始的失败等待时长为1秒
          initial-interval: 1000
          # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          multiplier: 3
          # 最大重试次数
          max-attempts: 4
          # true无状态;false有状态。如果业务中包含事务,这里改为false
          stateless: true

在RabbitmqConfig中增加如下配置:

package com.gitee.small.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {

    // 其他队列、交换机、绑定、回调等代码省略,需要的朋友可看我之前的文章、、、
    
    @Bean
    public Queue errorQueue() {
        return new Queue("error");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    @Bean(name = "binding.error")
    public Binding bindingExchangeMessage3() {
        return BindingBuilder.bind(errorQueue()).to(exchange()).with("error");
    }

    /**
     * 定义 MessageRecoverer 将错误消息发送到指定队列
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "topicExchange", "error");
    }
}

在消费者定义中有一点需要注意,不能直接将异常处理掉,否则是不会将消息发送到error队列的。

package com.gitee.small.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class WorkRabbitReceiver {

    private static Integer index = 0;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "监听队列名称"),
            exchange = @Exchange(value = "binding对象beanname", type = ExchangeTypes.TOPIC))
    )
    public void process(String msg, Channel channel, Message message) throws Exception {
        try {
            System.out.println(1/0);
        } catch (Exception e) {
            log.error("消息重试");
            throw new Exception();
        }
    }
}

小结:文章来源地址https://www.toymoban.com/news/detail-648883.html

  • 消息重试是在本地进行重试,不会回到消息队列中

  • 重试模式下,重试次数耗尽后,如果消息依然失败,为了防止消息被直接丢弃,需要有MessageRecovery 接口来处理,它包含三种不同的实现

    • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式

    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

    • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

      很显然,RepublishMessageRecoverer方案应用最广最合理,本文中也是以此为例

实现延迟队列

消息超时方案:

  • 给队列设置 ttl 属性,进入队列后超过 ttl 时间的消息变为死信
  • 给消息设置 ttl 属性,队列接收到消息超过 ttl 时间后变为死信

本文讲给消息设置超时,因为这个方案更灵活。

  1. 创建死信队列和死信交换机,并将其绑定

    @Bean
    public DirectExchange dlExchange() {
        // 声明死信交换机 dl.direct
        return new DirectExchange("dl.direct", true, false);
    }
    
    
    @Bean
    public Queue dlQueue() {
        // 声明存储死信的队列 dl.queue
        return new Queue("dl.queue", true);
    }
    
    
    @Bean(name = "binding.dl")
    public Binding dlBinding() {
        // 将死信队列 与 死信交换机绑定
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
    }
    
  2. 指定消息过期时间,向正常消息队列发送消息,一条5秒延时,一条10秒延时

    private void deadLetter() {
        final Message message = MessageBuilder.withBody("延迟消息测试".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setExpiration("5000")
            .build();
    
        final Message message2 = MessageBuilder.withBody("延迟消息测试".getBytes())
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setExpiration("10000")
            .build();
    
        rabbitTemplate.convertAndSend("topicExchange", "topic.dead", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.dead", message2);
    }
    
  3. 监听死信队列,实现延迟消息具体逻辑

    /**
    * 监听死信队列,处理延迟消息
    */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "dl.queue"),
        exchange = @Exchange(value = "binding.dl", type = ExchangeTypes.TOPIC))
                   )
    public void process(String msg, Channel channel, Message message) throws IOException {
    
        log.info("延迟消息:{}", msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
    }
    

使用场景:

  • 延迟发送短信
  • 用户下单,如果用户在一小时内未支付,自动取消
  • 会议前半小时提醒参会

小结:

  1. 创建一个交换机作为死信交换机并绑定一个队列作为死信队列
  2. 给消息的目标队列设置队列超时时间并指定死信交换机和路由 key
  3. 将消息的目标队列绑定到死信交换机
  4. 消费者监听死信队列获取超时消息

到了这里,关于rabbitmq:retry重试机制和延迟消息的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(60)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 项目实战之RabbitMQ重试机制进行消息补偿通知

    🧑‍💻作者名称:DaenCode 🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。 😎人生感悟:尝尽人生百味,方知世间冷暖。 业务MQ消费者代码逻辑记得往外抛异常,进行try-catch了也要往外抛。 消息消费重试,达到重试次数进入到异常交换机、队列。消息确

    2024年02月05日
    浏览(41)
  • NLP(六十五)LangChain中的重连(retry)机制

      关于LangChain入门,读者可参考文章NLP(五十六)LangChain入门 。   本文将会介绍LangChain中的重连机制,并尝试给出定制化重连方案。   本文以LangChain中的对话功能( ChatOpenAI )为例。 LangChain中的重连机制   查看LangChain中对话功能( ChatOpenAI )的重连机制(retry),

    2024年02月13日
    浏览(62)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(79)
  • (五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 概念 在MQ中,当消息成为死信(Dead message)后,消息中间件可以 将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死 信交换机(Dead Letter Exchange,简称DLX)。死信交

    2024年02月15日
    浏览(54)
  • RabbitMQ的消费者处理消息失败后可以重试,重试4次仍然失败发送到死信队列。

    生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,

    2024年02月07日
    浏览(58)
  • 学会RabbitMQ的延迟队列,提高消息处理效率

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 真的好用吗?鲜有人提的 RabbitMQ-RPC模式 前面

    2024年02月14日
    浏览(68)
  • rabbitmq基础7——队列和消息过期时间设置、死信队列、延迟队列、优先级队列、回调队列、惰性队列

    这里过一个知识点——过期时间,即对消息或队列设置过期时间(TTL)。一旦消息过期,消费就无法接收到这条消息,这种情况是绝不允许存在的,所以官方就出了一个对策——死信队列,死信队列最初出现的意义就是为了应对消息过期丢失情况的手段之一。 那么过期时间具

    2024年02月03日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包