RabbitMQ实现延时消息的两种方法

这篇具有很好参考价值的文章主要介绍了RabbitMQ实现延时消息的两种方法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ实现延时消息的两种方法

1、死信队列

1.1消息什么时候变为死信(dead-letter)
  1. 消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue 重回队列属性设为false。
  2. 消息在队列里得时间超过了该消息设置的过期时间(TTL)。
  3. 消息队列到达了它的最大长度,之后再收到的消息。
1.2死信队列的原理

当一个消息再队列里变为死信时,它会被重新publish到另一个exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个交换机就可以了。

1.3 代码实现
  1. 引入amqp依赖
  2. 声明交换机,队列
package com.lank.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    //死信交换机,队列,路由相关配置
    public static final String DLK_EXCHANGE = "dlk.exchange";
    public static final String DLK_ROUTEKEY = "dlk.routeKey";
    public static final String DLK_QUEUE = "dlk.queue";

    //业务交换机,队列,路由相关配置
    public static final String DEMO_EXCHANGE = "demo.exchange";
    public static final String DEMO_QUEUE = "demo.queue";
    public static final String DEMO_ROUTEKEY = "demo.routeKey";

    //延时插件DelayedMessagePlugin的交换机,队列,路由相关配置
    public static final String DMP_EXCHANGE = "dmp.exchange";
    public static final String DMP_ROUTEKEY = "dmp.routeKey";
    public static final String DMP_QUEUE = "dmp.queue";

    @Bean
    public DirectExchange demoExchange(){
        return new DirectExchange(DEMO_EXCHANGE,true,false);
    }

    @Bean
    public Queue demoQueue(){
        //只需要在声明业务队列时添加x-dead-letter-exchange,值为死信交换机
        Map<String,Object> map = new HashMap<>(1);
        map.put("x-dead-letter-exchange",DLK_EXCHANGE);
        //该参数x-dead-letter-routing-key可以修改该死信的路由key,不设置则使用原消息的路由key
        map.put("x-dead-letter-routing-key",DLK_ROUTEKEY);
        return new Queue(DEMO_QUEUE,true,false,false,map);
    }

    @Bean
    public Binding demoBind(){
        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY);
    }

    @Bean
    public DirectExchange dlkExchange(){
        return new DirectExchange(DLK_EXCHANGE,true,false);
    }

    @Bean
    public Queue dlkQueue(){
        return new Queue(DLK_QUEUE,true,false,false);
    }

    @Bean
    public Binding dlkBind(){
        return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY);
    }


    //延迟插件使用
    //1、声明一个类型为x-delayed-message的交换机
    //2、参数添加一个x-delayed-type值为交换机的类型用于路由key的映射
    @Bean
    public CustomExchange dmpExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    @Bean
    public Queue dmpQueue(){
        return new Queue(DMP_QUEUE,true,false,false);
    }

    @Bean
    public Binding dmpBind(){
        return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs();
    }
}
  1. 声明一个类用于发送带过期时间的消息
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 1. @author lank
 2. @since 2020/12/14 10:33
 */
@Component
@Slf4j
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //使用死信队列发送消息方法封装
    public void send(String message,Integer time){
        String ttl = String.valueOf(time*1000);
        //exchange和routingKey都为业务的就可以,只需要设置消息的过期时间
        rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息的过期时间,是以毫秒为单位的
                message.getMessageProperties().setExpiration(ttl);
                return message;
            }
        });
        log.info("使用死信队列消息:{}发送成功,过期时间:{}秒。",message,time);
    }

    //使用延迟插件发送消息方法封装
    public void send2(String message,Integer time){
        rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
            //使用延迟插件只需要在消息的header中添加x-delay属性,值为过期时间,单位毫秒
                message.getMessageProperties().setHeader("x-delay",time*1000);
                return message;
            }
        });
        log.info("使用延迟插件发送消息:{}发送成功,过期时间:{}秒。",message,time);
    }
}
  1. 编写一个类用于消费消息
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageReceiver {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE)
    public void onMessage(Message message){
        log.info("使用死信队列,收到消息:{}",new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE)
    public void onMessage2(Message message){
        log.info("使用延迟插件,收到消息:{}",new String(message.getBody()));
    }
}
  1. 编写Controller调用发送消息方法测试结果
package com.lank.demo.controller;
import com.lank.demo.rabbitmq.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    public MessageSender messageSender;

    //死信队列controller
    @GetMapping("/send")
    public String send(@RequestParam String msg,Integer time){
        messageSender.send(msg,time);
        return "ok";
    }

    //延迟插件controller
    @GetMapping("/send2")
    public String sendByPlugin(@RequestParam String msg,Integer time){
        messageSender.send2(msg,time);
        return "ok";
    }

}
  1. 配置文件application.properties
server.port=4399
#virtual-host使用默认的/就好,如果需要/demo需自己在控制台添加
spring.rabbitmq.virtual-host=/demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  1. 启动项目,打开rabbitmq控制台,可以看到交换机和队列已经创建好。

RabbitMQ实现延时消息的两种方法

RabbitMQ实现延时消息的两种方法

  1. 在浏览器中请求http://localhost:4399/send?msg=hello&time=5,从控制台的输出来看,刚好5s后接收到消息。
2020-12-16 22:47:28.071  INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:hello发送成功,过期时间:5秒。
2020-12-16 22:47:33.145  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:hello
1.4死信队列的一个小注意点

当我往死信队列中发送两条不同过期时间的消息时,如果先发送的消息A的过期时间大于后发送的消息B的过期时间时,由于消息的顺序消费,消息B过期后并不会立即重新publish到死信交换机,而是会等到消息A过期后一起被消费。

依次发送两个请求http://localhost:4399/send?msg=消息A&time=30和http://localhost:4399/send?msg=消息B&time=10,消息A先发送,过期时间30S,消息B后发送,过期时间10S,我们想要的结果应该是10S收到消息B,30S后收到消息A,但结果并不是,控制台输出如下:

2020-12-16 22:54:47.339  INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:消息A发送成功,过期时间:30秒。
2020-12-16 22:54:54.278  INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:消息B发送成功,过期时间:10秒。
2020-12-16 22:55:17.356  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:消息A
2020-12-16 22:55:17.357  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:消息B

消息A30S后被成功消费,紧接着消息B被消费。因此当我们使用死信队列时应该注意是否消息的过期时间都是一样的,比如订单超过10分钟未支付修改其状态。如果当一个队列各个消息的过期时间不一致时,使用死信队列就可能达不到延时的作用。这时候我们可以使用延时插件来实现这需求。

2 、延时插件

RabbitMQ Delayed Message Plugin是一个rabbitmq的插件,所以使用前需要安装它,可以参考的GitHub地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2.1如何实现

  1. 安装好插件后只需要声明一个类型type为"x-delayed-message"的exchange,并且在其可选参数下配置一个key为"x-delayed-typ",值为交换机类型(topic/direct/fanout)的属性。
  2. 声明一个队列绑定到该交换机
  3. 在发送消息的时候消息的header里添加一个key为"x-delay",值为过期时间的属性,单位毫秒。
  4. 代码就在上面,配置类为DMP开头的,发送消息的方法为send2()。
  5. 启动后在rabbitmq控制台可以看到一个类型为x-delayed-message的交换机。

RabbitMQ实现延时消息的两种方法

RabbitMQ实现延时消息的两种方法文章来源地址https://www.toymoban.com/news/detail-494836.html

  1. 继续在浏览器中发送两个请求http://localhost:4399/send2?msg=消息A&time=30和http://localhost:4399/send2?msg=消息B&time=10,控制台输出如下,不会出现死信队列出现的问题:
2020-12-16 23:31:19.819  INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延迟插件发送消息:消息A发送成功,过期时间:30秒。
2020-12-16 23:31:27.673  INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延迟插件发送消息:消息B发送成功,过期时间:10秒。
2020-12-16 23:31:37.833  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延迟插件,收到消息:消息B
2020-12-16 23:31:49.917  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延迟插件,收到消息:消息A

到了这里,关于RabbitMQ实现延时消息的两种方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMq应用延时消息

    一.建立绑定关系 二.建立生产者 1.消息实体 三.建立消费者 四.测试类测试 五.效果如图所示

    2024年02月12日
    浏览(37)
  • Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

    erlang 和 rabbitmq 版本说明 https://www.rabbitmq.com/which-erlang.html 确认需要安装的mq版本以及对应的erlang版本。 RabbitMQ下载地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下载地址: https://packagecloud.io/rabbitmq/erlang RabbitMQ延迟消息插件下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exc

    2024年02月08日
    浏览(51)
  • Moqui REST API的两种实现方法

    实现Restful API的方法  实现REST API有两种方法。 第一种: The main tool for building a REST API based on internal services and entity operations is to define resource paths in a  Service REST API  XML file such as the  moqui.rest.xml  file in  moqui-framework  and the  mantle.rest.xml  file in  mantle-usl . With your own Service RES

    2024年02月02日
    浏览(35)
  • 折半查找(二分查找)的两种方法及实现 Python

    概念: 在计算机科学中,折半查找,也称二分查找,是一种在有序数组中查找某一特定元素的搜索算法。 搜索过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜索过程结束;如果某一特定元素大于或者小于中间元素,则在数组大于或小于中间元素的那一

    2024年02月09日
    浏览(41)
  • css实现垂直上下布局的两种常用方法

    例子:将两个 span 元素在 div 内垂直居中放置. 方法一:使用 Flexbox 来实现。 在下面的示例中,我将为 div 元素添加样式,使其成为一个 Flex 容器,并使用 Flexbox 属性将其中的两个 span 元素垂直居中: 在上述代码中,我们为 元素添加了名为 .container 的样式类,并为两个 元素分

    2024年02月10日
    浏览(35)
  • js 实现弹幕效果的两种方法

    首先,在HTML文件中创建画布元素,并在CSS中设置其位置和大小: 接着,在JavaScript中获取画布元素并创建画布上下文对象: 然后,定义一个弹幕类,包含文本、颜色、字体大小、位置和速度等属性: 在弹幕类中添加绘制弹幕的方法: 然后,在页面加载完毕后,创建一个弹幕

    2024年02月16日
    浏览(34)
  • vue项目实现回到顶部的两种超简单方法

    vue 中实现回到顶部的两种方式: (1)锚点方式 通过点击锚点回到指定位置: 样式: 实现效果: (2)scrollTop 通过点击事件将scrollTop重置为0,从而达到返回顶部的效果。 代码资源链接 代码地址

    2024年02月11日
    浏览(41)
  • Kafka - 消息队列的两种模式

    消息队列确实可以根据消息传递的模式分为 点对点模式 发布/订阅模式 这两种模式有不同的特点和应用场景: 点对点模式(Point-to-Point,P2P): 在点对点模式中,有一个生产者(Producer)将消息发送到一个特定的队列(Queue)。 只有一个消费者(Consumer)可以接收和处理队列

    2024年02月08日
    浏览(38)
  • Inno Setup实现软件开机自动运行的两种方法

    Inno Setup实现软件开机自动启动的两种方法 在许多情况下,我们希望我们的软件能够在操作系统启动时自动启动。对于 Windows 操作系统,可以通过将程序添加到启动组或在注册表的 Run 项中创建值来实现此目的。 有两种不同的方法可以做到这一点: 方法一:在启动组中创建快

    2024年02月06日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包