RabbitMQ延时队列的详细介绍以及Java代码实现

这篇具有很好参考价值的文章主要介绍了RabbitMQ延时队列的详细介绍以及Java代码实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的延时队列以及其详细代码实现。
如果文章有什么需要改进的地方还请大佬不吝赐教👏👏。
小威在此先感谢各位大佬啦~~🤞🤞
RabbitMQ延时队列的详细介绍以及Java代码实现

🏠个人主页:小威要向诸佬学习呀
🧑个人简介:大家好,我是小威,一个想要与大家共同进步的男人😉😉
目前状况🎉:24届毕业生,在一家满意的公司实习👏👏

💕欢迎大家:这里是CSDN,我总结知识的地方,欢迎来到我的博客,我亲爱的大佬😘

RabbitMQ延时队列的详细介绍以及Java代码实现

以下正文开始

RabbitMQ延时队列的详细介绍以及Java代码实现

🍣RabbitMQ 延时队列介绍

RabbitMQ 延时队列是指消息在发送到队列后,并不立即被消费者消费,而是等待一段时间后再被消费者消费。这种队列通常用于实现定时任务,例如,订单超时未支付系统取消订单释放所占库存等。

RabbitMQ实现延时队列的方法有多种,其中比较常见的是使用插件或者通过DLX(Dead Letter Exchange)机制实现。

  1. 使用插件实现延时队列

RabbitMQ提供了rabbitmq_delayed_message_exchange插件,可以通过该插件实现延时队列。该插件的原理是在消息发送时,将消息发送到一个特定的Exchange中,然后该Exchange会根据消息中的延时时间将消息转发到指定的队列中,从而实现延时队列的功能

使用该插件需要先安装插件,然后创建一个Exchange,并将该Exchange的类型设置为x-delayed-message,然后将该Exchange与队列绑定即可。

  1. 使用DLX机制实现延时队列

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。而对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果

DLX机制是RabbitMQ提供的一种消息转发机制,它可以将无法被处理的消息转发到指定的Exchange中,从而实现消息的延时处理。具体实现步骤如下:

  • 创建一个普通的Exchange和Queue,并将它们绑定在一起。
  • 创建一个DLX Exchange,并将普通Exchange绑定到该DLX Exchange上。
  • 将Queue设置为具有TTL(Time To Live)属性,并设置消息过期时间。
  • 将Queue绑定到DLX Exchange上。

当消息过期后,会被发送到DLX Exchange中,然后再由DLX Exchange将消息转发到指定的Exchange中,从而实现延时队列的功能。

使用DLX机制实现延时队列的优点是不需要安装额外的插件,但是需要对消息的过期时间进行精确控制,否则可能会出现消息过期时间不准确的情况。

🥪Java语言设置延时队列

下面是使用 Java 语言通过 RabbitMQ 设置延时队列的步骤:

  1. 安装插件

首先,需要安装 rabbitmq_delayed_message_exchange 插件。可以通过以下命令安装:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 创建延时交换机

延时队列需要使用延时交换机。可以使用 x-delayed-message 类型创建一个延时交换机。以下是创建延时交换机的示例代码:

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
  1. 创建延时队列

创建延时队列时,需要将队列绑定到延时交换机上,并设置队列的 TTL(Time To Live)参数。以下是创建延时队列的示例代码:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "delayed-exchange");
args.put("x-dead-letter-routing-key", "delayed-queue");
args.put("x-message-ttl", 5000);
channel.queueDeclare("delayed-queue", true, false, false, args);
channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");

在上述代码中,将队列绑定到延时交换机上,并设置了队列的 TTL 参数为 5000 毫秒,即消息在发送到队列后,如果在 5000 毫秒内没有被消费者消费,则会被转发到 delayed-exchange 交换机上,并发送到 delayed-queue 队列中。

  1. 发送延时消息

发送延时消息时,需要设置消息的 expiration 属性,该属性表示消息的过期时间。以下是发送延时消息的示例代码:

Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .headers(headers)
        .expiration("5000")
        .build();
channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());

在上述代码中,设置了消息的 expiration 属性为 5000 毫秒,并将消息发送到 delayed-exchange 交换机上,路由键为 delayed-queue,消息内容为 “Hello, delayed queue!”。

  1. 消费延时消息

消费延时消息时,需要设置消费者的 QOS(Quality of Service)参数,以控制消费者的并发处理能力。以下是消费延时消息的示例代码:

channel.basicQos(1);
channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("Received message: " + message);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});

在上述代码中,设置了 QOS 参数为 1,即每次只处理一个消息。然后使用 basicConsume 方法消费 delayed-queue 队列中的消息,并在消费完成后,使用 basicAck 方法确认消息已被消费。

通过上述步骤,就可以实现 RabbitMQ 延时队列,用于实现定时任务等功能。

RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

🥓RabbitMQ延时队列具体代码

下面是具体代码(附注释):

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayedQueueExample {
    private static final String EXCHANGE_NAME = "delayed_exchange";
    private static final String QUEUE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "delayed_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /*
         Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;
                                              */
        // 创建一个支持延时队列的Exchange
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);

        // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
        Map<String, Object> queueArguments = new HashMap<>();
        queueArguments.put("x-dead-letter-exchange", "");
        queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
        queueArguments.put("x-message-ttl", 5000);
        channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // 发送消息到延时队列中,设置expiration参数
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000")
                .build();
        String message = "Hello, delayed queue!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
        System.out.println("Sent message to delayed queue: " + message);
        channel.close();
        connection.close();
    }
}

在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

🍿图书推荐

Java诞生28年来,这本享誉全球的 Java 经典著作《Core Java》一路伴随着 Java 的成长,得到了百万 Java 开发者的青睐,成为一本畅销不衰的Java经典图书,影响了几代技术人。
最新版中文版《Java核心技术(原书第12版)经全面修订,以涵盖Java 17的新特性。新版延续之前版本的优良传统,用数百个实际的工程案例,全面系统地讲解了Java语言的核心概念、语法、 重要特性、 开发方法。

着力让读者在充分理解Java语言和Java类库的基础上,灵活应用Java提供的高级特性,具体包括面向对象程序设计、反射与代理、接口与内部类、异常处理、泛型程序设计、集合框架、事件监听器模型、图形用户界面设计和并发。

Core Java最新版卷Ⅱ现已上市

Java 之父先前也说,开发者应尽快弃用 JDK 8,可以选择 JDK 17 长期支持版本。针对 Java 17 新特性全面更新的《Core Java》最新版第12版中文版《Java核心技术·卷Ⅰ开发基础(原书第12版)》自去年5月上市以来,一经发布就引起了轰动,得到数万读者的高度关注 ,大家纷纷留言都在盼望卷Ⅱ的上市!
RabbitMQ延时队列的详细介绍以及Java代码实现

对经验丰富的程序员来说,如果希望为实际应用编写出健壮的代码,那么《Java核心技术》绝对是一本业内领先的、言简意赅的宝典。如今,它终于来啦!《Java核心技术·卷Ⅱ 高级特性(原书第12版》现已上市,各大渠道均已现货。

卷Ⅱ针对Java 17的新特性和改进进行了修订。与以往一样,所有的章节都做了全面更新,移除了过时的内容,并且详细讨论了各种新API。

卷Ⅰ、卷Ⅱ有何不同?

RabbitMQ延时队列的详细介绍以及Java代码实现

如何阅读《Java核心技术》

学完本书,你将成为一个真正的 Java 程序员。本书不仅让你深入了解设计和实现 Java 应用涉及的所有基础知识和 Java 特性,还会帮助你掌握开发 Java 程序所需的全部基本技能。相信在学习Java的道路上有了本书的辅助,你的学习一定可以做到事半功倍。
从未远离工业界的Java大神带你学

50位行业专家、技术媒体赞誉推荐

RabbitMQ延时队列的详细介绍以及Java代码实现

如何选择版本

RabbitMQ延时队列的详细介绍以及Java代码实现

京东购买链接:点击跳转界面

粉丝福利:评论区任意留言可参与活动抽奖(可评论最多五条,抽取四名欧皇)

注:为粉丝更精准地获取更多利益,中奖的欧皇可从书单中任选一本适合自己的书籍哦~

好了,本篇文章就先分享到这里了,后续会继续分享其他方面的知识,感谢大佬认真读完支持咯~
RabbitMQ延时队列的详细介绍以及Java代码实现

文章到这里就结束了,如果有什么疑问的地方请指出,诸佬们一起讨论😁
希望能和诸佬们一起努力,今后我们顶峰相见🍻
再次感谢各位小伙伴儿们的支持🤞

RabbitMQ延时队列的详细介绍以及Java代码实现文章来源地址https://www.toymoban.com/news/detail-427773.html

到了这里,关于RabbitMQ延时队列的详细介绍以及Java代码实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot + RabbitMQ从延时队列中删除指定的值【RabbitMQ中的basicAck和basicNack的区别以及basicReject又是什么?】

    业务需求是,就是我本来是有一个order-queue队列绑定到了死信队列交换机order-dead-direct-exchange上,然后我的业务是,现在有一个用户下单但是没有付款,order-queue队列写入该条信息并计时24小时后如果用户还是未付款状态则移除到死信队列order-dead-queue中。问题来了,如果在这个

    2024年02月16日
    浏览(42)
  • RabbitMQ延时队列的实现原理和应用实例

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。 目前有两种方法可以设置消息的 TTL: 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间; 第二种方法是对消息本身进行单独设置,每条消息

    2024年02月05日
    浏览(58)
  • 一种多策略下RabbitMQ的延时队列实现

    场景: 最近在开发一款系统中遇到这样一个场景,A系统开通套餐需要把套餐信息以邮件的形式发送给相关工作人员,经过人工审核通过后,在B系统里面开通,A系统会调B系统套餐列表接口查询套餐是否开通成功,开通成功则从A系统去完成订单,假如超过设定时间未开通成功,则关闭订

    2024年02月12日
    浏览(42)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(48)
  • Java实现Redis延时队列

    “如何实现Redis延时队列”这个面试题应该也是比较常见的,解答如下: 使用sortedset(有序集合) ,拿时间戳作为 score ,消息内容作为key 调用 zadd 来生产消息,消费者用zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。 Java实现Redis延时队列,首先要了解何为延时队列,即

    2024年02月20日
    浏览(43)
  • MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

    书接上文,展示一下五种模型我使用的是spring could 微服务的框架 文章说明:         本文章我会分享总结5种实用的rabbitMQ的实用模型 1、hello world简单模型 2、work queues工作队列 3、Publish/Subscribe发布订阅模型 4、Routing路由模型 5、Topics 主题模型 (赠送) 6、消息转换器 Rabbi

    2024年02月05日
    浏览(56)
  • 【技术分享】四、RabbitMQ “延时队列”

    延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。 ex: 定时任务:十分钟后执行某种操作。 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

    2024年02月08日
    浏览(53)
  • .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(44)
  • 深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常

    2024年02月03日
    浏览(37)
  • Spring Boot进阶(63):「超详细」利用 Redis 实现高效延时队列:踩坑、优化、实践

            提到延时队列,相信各位同学并不会陌生,JDK原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢?举一个常见的例子,比如淘宝下单30分钟内,若没有支付,则自动取消订单,这该如何实现

    2024年02月07日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包