RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件

RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明) 

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmqrabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

 3、把这个插件传输到服务器上

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

4、根据官网的指示把插件放到RabbitMQ指定的文件夹下

RabbitMQ官网指示安装插件步骤的网址:https://www.rabbitmq.com/installing-plugins.html

我这里安装RabbitMQ的系统是CentOS,所以放在

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

5、拷贝插件到指定的目录下

例:

cp rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins/

效果图:

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

 6、安装延迟队列插件

输入以下命令安装延迟队列插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

效果图:

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

7、重启RabbitMQ

输入以下命令重启RabbitMQ

systemctl restart rabbitmq-server.service

效果图:

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

8、查看插件是否安装成功

 进入RabbitMQ的管理页面,进入Exchange的管理页面,新增Exchange,在Type里面可以看到x-delayed-message的选项,证明延迟队列插件安装成功rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

9、基于插件实现延迟队列的原理示意图

原先我们没下插件之前实现延迟队列是基于图下这种方式实现的

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

但我们下载插件后就能通过交换机延迟消息的方式来实现消息的延迟了(由步骤8可见,我们验证插件是否安装成功是从Exchange进去的,而不是从Queues进去的)

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

10、基于插件延迟队列的代码实现

(1)在config包里新建一个名为DelayedQueueConfig的类用于编写配置队列延迟的代码

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {

    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";

    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";

    //交换机
    public static final String DELAYED_ROUTING_KEY = "delayed";

    //声明延迟队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //声明延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置延迟类型
        arguments.put("x-delayed-type","direct");
        /**
         * 声明自定义交换机
         * 第一个参数:交换机的名称
         * 第二个参数:交换机的类型
         * 第三个参数:是否需要持久化
         * 第四个参数:是否自动删除
         * 第五个参数:其他参数
         */
        return new CustomExchange(DELAYED_QUEUE_NAME,"x-delayed-message",true,false,arguments);
    }

    //绑定队列和延迟交换机
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") Exchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

 (2)在SendMsgController类里写一个接口,让其能往延迟队列里发送消息

代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}", new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

    /**
     * 给延迟队列发送消息
     * @param message
     * @param delayTime
     */
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的消息给延迟队列:{}", new Date(),delayTime,message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_QUEUE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

}

(3)在consumer包里新建一个名为DelayQueueConsumer的类用于编写消费延迟队列的消费者代码

效果图:

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 延迟队列消费者
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    private void receiveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间{},收到延迟队列的消息",new Date(),msg);
    }

}

(4)启动项目,往浏览器输入接口地址和参数,从而调用接口

[1]第一条消息

http://localhost:8080/ttl/sendDelayMsg/我是第一条消息/20000

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

[2]第二条消息

http://localhost:8080/ttl/sendDelayMsg/我是第二条消息/2000

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

效果图:

rabbitmq 安装延时插件,rabbitmq,rabbitmq,java-rabbitmq

结论:基于测试发现在使用延迟插件的情况下,延迟时间短的消息会被先消费,这证明基于插件的延迟消息达到预期效果文章来源地址https://www.toymoban.com/news/detail-597116.html

到了这里,关于RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战

    在项目中经常有延迟业务处理的背景,此时可以借助于Rabbitmq的延迟队列进行实现,但Rabbitmq本身并不支持延迟队列,但可以通过安装插件的方式实现延迟队列 首先确认目前项目使用的Rabbitmq的版本,这里博主的版本是3.9.15的。 访问 Rabbitmq的github网址,检索 delay 找到插件 rabb

    2024年02月02日
    浏览(41)
  • liunx+docker+rabbitmq安装延迟队列插件

    前言 在这篇文章中,我们将讨论如何在 Linux 系统上安装 Docker 和 RabbitMQ,并设置延迟队列。 Docker 是一个开放源代码的软件,它可以使应用程序的部署更加简单,而 RabbitMQ 是一个开放源代码的消息代理软件,它接受和转发消息。 延迟队列是一种在特定的延迟之后才开始处理

    2024年02月11日
    浏览(46)
  • Docker中为RabbitMQ安装rabbitmq_delayed_message_exchange延迟队列插件

    1、前言 rabbitmq_delayed_message_exchange是一款向RabbitMQ添加延迟消息传递(或计划消息传递)的插件。 插件下载地址:https://www.rabbitmq.com/community-plugins.html 1、下载插件 首先需要确定我们当前使用的RabbitMQ的版本,我们可以直接登录Web端的管理界面查看版本   也可以在RabbitMQ容器中

    2024年02月12日
    浏览(46)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(55)
  • RabbitMQ之TTL+死信队列实现延迟队列

    RabbitMQ是一个流行的消息队列系统,它提供了许多有用的功能,其中之一是TTL(Time To Live)和死信队列。这些功能可以用来实现延迟队列,让我们来看看如何使用它们。 首先,什么是TTL?TTL是消息的存活时间,它可以设置为一个特定的时间段。如果消息在这个时间段内没有被

    2024年02月13日
    浏览(45)
  • Springboot集成rabbitmq——实现延迟队列

    目录 1.rabbitmq简介 2.延迟队列 3.Springboot集成rabbitmq 4.以死信队列形式实现 5.以插件形式实现  MQ(message queue),从字面意思上看,本质是个队列,遵从先入先出的规则,只不过队列中存放的内容是 message 而已,是一种跨进程的通信机制,用于上下游传递消息。RabbitMq是开发中常用

    2024年02月05日
    浏览(39)
  • RabbitMQ+springboot用延迟插件实现延迟消息的发送

    延迟队列:其实就是死信队列中消息过期的特殊情况 延迟队列应用场景: 可以用死信队列来实现,不过死信队列要等上一个消息消费成功,才会进行下一个消息的消费,这时候就需要用到延迟插件了,不过要线在docker上装一个插件 前置条件是在Docker中部署过RabbitMq。 1、打开

    2024年02月10日
    浏览(48)
  • Rabbitmq入门与应用(五)-延迟队列的设计与实现

    在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。 常规使用rabbitmq设计延迟队列有两种方式 使用创建一个延迟队列阻塞消息 使用延迟队列插件 Dead Letter Exchanges — RabbitMQ 配置 To set the DLX for a queue, s

    2024年02月21日
    浏览(40)
  • rabbitMq实现延迟队列,阿里最爱考的前端面试题

    @Qualifier(“deadLetterExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier(“deadLetterQueueB”) Queue queue, @Qualifier(“deadLetterExchange”) DirectExchange exchange) { return BindingBuilder.

    2024年04月14日
    浏览(39)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包