RabbitMQ系列(17)--延迟队列的简介与实现

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(17)--延迟队列的简介与实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、延迟队列的概念

延迟队列内部是有序的,重要的特性体现在它的延迟属性上,延迟队列中的元素希望在指定时间到了之后或之前取出处理,简单的说延迟队列就是用来存放需要在指定时间被处理的元素的队列。

2、延迟队列的应用场景

(1)订单指定时间内未支付则自动取消

(2)用户发起退款,指定时间内未处理则通知相关运营人员

3、定时任务和延迟队列的取舍

以上场景都有一个特点,那就是都需要在某个事件发生前或发生后执行一项任务,如生成订单后,在十分钟后检查订单状态,未支付的订单将关闭,这种场景也可以用定时任务来处理,但数据量比价少的话确实可以用定时任务来处理,但在活动期间,订单的数据量可能会变得很庞大,对于庞大的数据,定时任务很难在1秒内检查完订单,从而不能及时的关闭未支付的订单,而且用定时任务来检查订单会给数据库带来很大的压力,所以在数据量大的情况下,定时任务无法满足业务需求且性能低下

4、延迟队列架构图 (后面我们就根据这个架构图进行代码的设计与实现)

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

  

5、延迟队列的实现

(1)新建一个名为config的包,用于装实现特定配置的代码mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

效果图:

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

(2)在config包里新建一个名为TtlQueueConfig的类用于编写配置队列延迟的代码

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 用于配置TTL队列的延迟时间
 */
@Configuration
public class TtlQueueConfig {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列的名称
    public static final String NORMAL_QUEUE01 = "normal_queue01";
    public static final String NORMAL_QUEUE02 = "normal_queue02";

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    //声明普通交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    //声明交换机交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列,TTL为10S
    @Bean("normalQueue01")
    public Queue normalQueue01() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build();
    }

    //声明普通队列,TTL为40S
    @Bean("normalQueue02")
    public Queue normalQueue02() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build();
    }

    //声明死信队列
    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    //绑定队列1和普通交换机
    @Bean
    public Binding queue01BindNormalExchange(@Qualifier("normalQueue01") Queue normalQueue01,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01");
    }

    //绑定队列2和普通交换机
    @Bean
    public Binding queue02BindNormalExchange(@Qualifier("normalQueue02") Queue normalQueue02,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02");
    }

    //绑定队列2和普通交换机
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
                                             @Qualifier("deadExchange") DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
    }

}

(3)新建一个名为controller的包,用于装控制层的代码mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

效果图:

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

(4)新建一个名为SendMsgController的类用于充当生产者用于发送消息

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

 代码如下:

package com.ken.springbootrqbbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

}

(5)新建一个名为consumer的包,用于装消费者的代码

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

效果图:

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

(6)新建一个名为DeadQueueConsumer的类用于消费死信队列里的消息

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 死信队列消费者
 */
@Slf4j
@Component
public class DeadQueueConsumer {

    //接收消息
    @RabbitListener(queues = "dead_queue")
    public void receiveMsg(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }

}

(7)进入项目的启动类启动项目

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

(8)启动完毕后在浏览器地址栏输入http://localhost:8080/ttl/sendMsg/参数往队列发送消息

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

 (9)查看控制台的输出,发现分别在10s和40s后进行输出,这证明我们的延迟队列成功运行

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

6、延迟队列的优化

虽然上述能实现延迟队列,但上述的实现过程是一个队列只能延迟固定的已经设置好的时间,若想增加一个新的时间需要,用上述的实现方法就只能新增一个队列,这样很麻烦,所以我们需要优化延迟队列

(1)延迟队列优化架构图 (后面我们就根据这个架构图对延迟队列进行优化)

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

(2)修改config包里TtlQueueConfig类的代码,多加一些关于NormalQueue03队列的配置

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 用于配置TTL队列的延迟时间
 */
@Configuration
public class TtlQueueConfig {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列的名称
    public static final String NORMAL_QUEUE01 = "normal_queue01";
    public static final String NORMAL_QUEUE02 = "normal_queue02";
    //自定义延迟时间队列的名称
    public static final String NORMAL_QUEUE03 = "normal_queue03";

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    //声明普通交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    //声明交换机交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列,TTL为10S
    @Bean("normalQueue01")
    public Queue normalQueue01() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build();
    }

    //声明普通队列,TTL为40S
    @Bean("normalQueue02")
    public Queue normalQueue02() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build();
    }

    //声明普通队列,TTL为40S
    @Bean("normalQueue03")
    public Queue normalQueue03() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        return QueueBuilder.durable(NORMAL_QUEUE03).withArguments(arguments).build();
    }

    //声明死信队列
    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    //绑定队列1和普通交换机
    @Bean
    public Binding queue01BindNormalExchange(@Qualifier("normalQueue01") Queue normalQueue01,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01");
    }

    //绑定队列2和普通交换机
    @Bean
    public Binding queue02BindNormalExchange(@Qualifier("normalQueue02") Queue normalQueue02,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02");
    }

    //绑定队列3和普通交换机
    @Bean
    public Binding queue03BindNormalExchange(@Qualifier("normalQueue03") Queue normalQueue03,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue03).to(normalExchange).with("normal03");
    }

    //绑定死信队列和死信交换机
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
                                             @Qualifier("deadExchange") DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
    }

}

(3)修改controller包里SendMsgController类的代码,多加一个调用自定义延迟时间NormalQueue03队列的接口

代码如下:

package com.ken.springbootrqbbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}", new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

}

 (4)进入项目的启动类重新启动项目

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

(5)启动完毕后分别在浏览器地址栏输http://localhost:8080/ttl/sendExpirationMsg/第一个参数/20000和http://localhost:8080/ttl/sendExpirationMsg/第二个参数/2000队列发送消息

例:

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq

 (6)查看控制台的输出,发现第一条消息在20s后进行了输出,这证明我们优化后的延迟队列成功运行​,但当我们发送多条消息时,消息可能不会按时"死亡"从而不能按时把消息发送到死信队列,如图里的第二条消息,在第一条消息被消费后紧跟着被消费,而不是隔2秒后被消费,这是因为RabbitMQ只会检查第一条消息是否过期,过期则会被扔进死信队列,如果第一条消息延迟时间很长,第二条消息延迟时间很短,第二条消息也并不会被优先消费,而是等到第一条消息被消费后第二条消息再被消费,这时需要我们用另一种方法去实现延迟队列(另一种方法放在下一篇文章介绍)

mq实现延迟队列,rabbitmq,rabbitmq,java-rabbitmq文章来源地址https://www.toymoban.com/news/detail-726587.html

到了这里,关于RabbitMQ系列(17)--延迟队列的简介与实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(60)
  • RabbitMQ之TTL+死信队列实现延迟队列

    RabbitMQ是一个流行的消息队列系统,它提供了许多有用的功能,其中之一是TTL(Time To Live)和死信队列。这些功能可以用来实现延迟队列,让我们来看看如何使用它们。 首先,什么是TTL?TTL是消息的存活时间,它可以设置为一个特定的时间段。如果消息在这个时间段内没有被

    2024年02月13日
    浏览(46)
  • Springboot集成rabbitmq——实现延迟队列

    目录 1.rabbitmq简介 2.延迟队列 3.Springboot集成rabbitmq 4.以死信队列形式实现 5.以插件形式实现  MQ(message queue),从字面意思上看,本质是个队列,遵从先入先出的规则,只不过队列中存放的内容是 message 而已,是一种跨进程的通信机制,用于上下游传递消息。RabbitMq是开发中常用

    2024年02月05日
    浏览(41)
  • Rabbitmq入门与应用(五)-延迟队列的设计与实现

    在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。 常规使用rabbitmq设计延迟队列有两种方式 使用创建一个延迟队列阻塞消息 使用延迟队列插件 Dead Letter Exchanges — RabbitMQ 配置 To set the DLX for a queue, s

    2024年02月21日
    浏览(42)
  • rabbitMq实现延迟队列,阿里最爱考的前端面试题

    @Qualifier(“deadLetterExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier(“deadLetterQueueB”) Queue queue, @Qualifier(“deadLetterExchange”) DirectExchange exchange) { return BindingBuilder.

    2024年04月14日
    浏览(40)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(43)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(59)
  • MQ-消息队列-RabbitMQ

    MQ(Message Queue) 消息队列 ,是基础数据结构中“ 先进先出 ”的一种 数据结构 。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由

    2024年02月09日
    浏览(52)
  • 【mq】RabbitMq批量删除队列

    ​由于部分公司同事使用RabbitMq时,没有将Client设置为autodelete,导致大量冗余队列。其中这些队列又是无routekey队列,收到了批量的订阅消息,占用服务器内存。 ​如何将这些无用的队列删除成为一个问题?经过多次摸索,在rabbitmq management api里面找到了方案:

    2024年01月25日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包