第十五章 RabbitMQ 延迟队列

这篇具有很好参考价值的文章主要介绍了第十五章 RabbitMQ 延迟队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。



第十五章 RabbitMQ 延迟队列,微服务,rabbitmq,ruby,分布式

前言

实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

1、RabbitMQ延迟队列

1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

  • TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
  • Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
  • Dead Letter Exchanges(DLX),即死信交换机
  • Dead Letter Routing Key (DLK),死信路由键
/***********************延迟队列*************************/
//创建立即消费队列
@Bean
public Queue immediateQueue(){
    return new Queue("immediateQueue");
}
//创建立即消费交换机
@Bean
public DirectExchange immediateExchange(){
    return new DirectExchange("immediateExchange");
}
@Bean
public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");
}

//创建延迟队列
@Bean
public Queue delayQueue(){
    Map<String,Object> params = new HashMap<>();
    //死信队列转发的死信转发到立即处理信息的交换机
    params.put("x-dead-letter-exchange","immediateExchange");
    //死信转化携带的routing-key
    params.put("x-dead-letter-routing-key","immediateRoutingKey");
    //设置消息过期时间,单位:毫秒
    params.put("x-message-ttl",60 * 1000);
    return new Queue("delayQueue",true,false,false,params);
}

@Bean
public DirectExchange delayExchange(){
    return new DirectExchange("delayExchange");
}

@Bean
public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");
}
@Test
public void sendDelay(){
    this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");
}

1.2、方式二:安装延迟队列插件

1.2.1、安装延迟队列插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

/**************延迟队列一个单一queue*******************/
@Bean
public Queue delayNewQueue(){
    return new Queue("delayNewQueue");
}
@Bean
public CustomExchange delayNewExchange(){
    Map<String, Object> args = new HashMap<>();
    // 设置类型,可以为fanout、direct、topic
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args);
}
@Bean
public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){
    return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();
}
@Test
public void sendDelay() {
    //生产端写完了
    UserInfo userInfo = new UserInfo();
    userInfo.setPassword("13432432");
    userInfo.setUserAccount("tiger");
    this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo
            , a -> {
                //单位毫秒
                a.getMessageProperties().setDelay(30000);
                return a;
            });
}

2、消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。
生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

2.1、生产确认

@Bean
public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    //消息发送到交换器Exchange后触发回调
    template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //  可以进行消息入库操作
            log.info("消息唯一标识 correlationData = {}", correlationData);
            log.info("确认结果 ack = {}", ack);
            log.info("失败原因 cause = {}", cause);
        }
    });

    // 配置这个,下面的ReturnCallback 才会起作用
    template.setMandatory(true);
    // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            //  可以进行消息入库操作
            log.info("消息主体 message = {}", returnedMessage.getMessage());
            log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());
            log.info("回复描述 replyText = {}", returnedMessage.getReplyText());
            log.info("交换机名字 exchange = {}", returnedMessage.getExchange());
            log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());

        }
    });
    return template;
}
spring:
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
  application:
    name: user-service  #微服务名称
  datasource:
    username: root
    password: root
    url: jdbc:mysql://127.0.0.1:3306/drp
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: tiger
    password: tiger
    virtual-host: tiger_vh
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual # 开启消息消费手动确认
        retry:
          enabled: true

2.2、消费确认

@RabbitHandler
public void process(UserInfo data, Message message, Channel channel){
    log.info("收到directQueue队列信息:" + data);
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        //成功消费确认
        channel.basicAck(deliveryTag,true);
        log.info("消费成功确认完毕。。。。。");
    } catch (IOException e) {
        log.error("确认消息时抛出异常 ", e);        
        // 重新确认,成功确认消息
        try {
            Thread.sleep(50);
            channel.basicAck(deliveryTag, true);
        } catch (IOException | InterruptedException e1) {
            log.error("确认消息时抛出异常 ", e);
            // 可以考虑入库
        }
    }catch (Exception e){
        log.error("业务处理失败", e);
        try {
            // 失败确认
            channel.basicNack(deliveryTag, false, false);
        } catch (IOException e1) {
            log.error("消息失败确认失败", e1);
        }
    }
}


第十五章 RabbitMQ 延迟队列,微服务,rabbitmq,ruby,分布式文章来源地址https://www.toymoban.com/news/detail-823607.html



到了这里,关于第十五章 RabbitMQ 延迟队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ教程】第六章 —— RabbitMQ - 延迟队列

                                                                       💧 【 R a b b i t M Q 教 程 】 第 六 章 — — R a b b i t M Q − 延 迟 队 列 color{#FF1493}{【RabbitMQ教程】第六章 —— RabbitMQ - 延迟队列} 【 R a b b i t M Q 教 程 】 第 六 章 — — R a

    2024年02月09日
    浏览(38)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(52)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(59)
  • RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

    1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件 RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html 2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明)

    2024年02月16日
    浏览(40)
  • 第十五 rabbitmq及数据同步

    了解常见的MQ产品 了解RabbitMQ的5种消息模型 会使用Spring AMQP 利用MQ实现搜索和静态页的数据同步 目前我们已经完成了商品详情和搜索系统的开发。我们思考一下,是否存在问题? 商品的原始数据保存在数据库中,增删改查都在数据库中完成。 搜索服务数据来源是索引库,如

    2024年02月17日
    浏览(38)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(79)
  • 【RabbitMQ学习日记】——死信队列与延迟队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候 由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死

    2024年02月06日
    浏览(56)
  • RabbitMQ学习-延迟队列

    延迟队列 背:也就是给队列设置个过期时间,然后到时间消息变成死信,消费死信队列中的消息就行,再没什么玩意,演示队列优化就是不给队列这只TTL,再生产者代码中消息里面设置消息TTL, 因为 RabbitMQ 只会检查第一个消息是否过期 ,如果过期则丢到死信队列, 如果第

    2024年02月07日
    浏览(30)
  • RabbitMQ延迟队列

    目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二)添加依赖 (三)修改配置文件 (四)添加Swagger配置类 五、队列TTL (一)代码架构图 (二)配置文件类 (三)

    2024年02月03日
    浏览(37)
  • Rabbitmq 延迟队列---插件

            解决没法优先发送延时时间短的消息。 插件安装 配置类 生产者 消费者

    2024年02月12日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包