Rabbitmq入门与应用(五)-延迟队列的设计与实现

这篇具有很好参考价值的文章主要介绍了Rabbitmq入门与应用(五)-延迟队列的设计与实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

延迟队列设计

在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式

  1. 使用创建一个延迟队列阻塞消息
  2. 使用延迟队列插件

Dead Letter Exchanges — RabbitMQ

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

配置

  1. To set the DLX for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
  2. You may also specify a routing key to use when the messages are being dead-lettered. If the routing key is not set, the message’s own routing keys are used. args.put("x-dead-letter-routing-key", “some-routing-key”);
package com.wnhz.mq.common.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DlxConfig {

    @Bean
    public Queue dlxQueue(){
        return new Queue("dlx_queue_test");
    }

    @Bean
    public DirectExchange dlxExchange(){
        return new DirectExchange("dlx_exchange_test");
    }

    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                .with("dlx_routing_key");
    }

    @Bean
    public Queue normalQueue(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", "dlx_exchange_test");
        map.put("x-dead-letter-routing-key","dlx_routing_key");
        return new Queue("normal_queue_test",true,false,false,map);
    }


    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal_exchange_test");
    }

    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange())
                .with("normal_routing_test");
    }

}
server:
  port: 10005

spring:
  application:
    name: book-consumer
  autoconfigure:
    exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
  rabbitmq:
    host: 192.168.198.130
    port: 5672
    username: admin
    password: 123
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
logging:
  level:
    com.wnhz.mq.consumer: debug

生产者发送信息

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

    @Override
    public void delaySendMessage() {
        String uuid = UUID.randomUUID().toString();
        CorrelationData data = new CorrelationData(uuid);
        String msg = "hello delay";
        int delayTime =5000;

        rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,
                p -> {
                    p.getMessageProperties().setExpiration(String.valueOf(delayTime ));
                    return p;
                });

        log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);
    }
}

消费者消费

   @RabbitListener(queues = "dlx_queue_test")
    public void delayConsume(Message message){
      log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());
    }

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@6d2342d51b11
 |/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11

下载支持3.9.x的插件

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

退出容器:

root@6d2342d51b11:/plugins# exit
exit

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins
Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

打开管理页面

进入Exchange页面,下拉Type看是否已经安装成功。文章来源地址https://www.toymoban.com/news/detail-831926.html

Rabbitmq入门与应用(五)-延迟队列的设计与实现,rabbitmq,rabbitmq,ruby,分布式,java

代码实现

配置类
package com.wnhz.rabbitmq.mq.config;

public interface RabbitmqConstants {

    String DELAYX_QUEUE = "mq_delayx__queue";
    String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";
    String DELAYX_EXCHANGE = "mq_delayx__exchange";
    String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;

import java.util.HashMap;

@Configuration
@Slf4j
public class RabbitmqConfig {


    @Bean
    public Queue delayxQueue() {
        return new Queue(RabbitmqConstants.DELAYX_QUEUE);
    }

    @Bean
    public CustomExchange delayRoutingExchange() {
        return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,
                RabbitmqConstants.DELAYX_EXCHANGE_TYPE,
                true,
                false,
                new HashMap<String, Object>() {{
                  put("x-delayed-type","direct");
                }});
    }

    @Bean
    public Binding delayxBinding() {
        return BindingBuilder.bind(delayxQueue())
                .to(delayRoutingExchange())
                .with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();
    }


    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        log.debug("rabbitmq配置:{}完成", rabbitTemplate);
        return rabbitTemplate;
    }
}

生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendDelayxUser(User user) {
        int delayTime = 10000;
        rabbitTemplate.convertAndSend(
                RabbitmqConstants.DELAYX_EXCHANGE,
                RabbitmqConstants.DELAYX_ROUTING_KEY,
                user, mpp -> {
                    mpp.getMessageProperties().setDelay(delayTime);
                    return mpp;
                });
        log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);
    }
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {


    @RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)
    @Override
    public void receiveDelayxUser(User user) {
      log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());
    }
}

到了这里,关于Rabbitmq入门与应用(五)-延迟队列的设计与实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(60)
  • RabbitMQ之TTL+死信队列实现延迟队列

    RabbitMQ是一个流行的消息队列系统,它提供了许多有用的功能,其中之一是TTL(Time To Live)和死信队列。这些功能可以用来实现延迟队列,让我们来看看如何使用它们。 首先,什么是TTL?TTL是消息的存活时间,它可以设置为一个特定的时间段。如果消息在这个时间段内没有被

    2024年02月13日
    浏览(46)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(77)
  • Springboot集成rabbitmq——实现延迟队列

    目录 1.rabbitmq简介 2.延迟队列 3.Springboot集成rabbitmq 4.以死信队列形式实现 5.以插件形式实现  MQ(message queue),从字面意思上看,本质是个队列,遵从先入先出的规则,只不过队列中存放的内容是 message 而已,是一种跨进程的通信机制,用于上下游传递消息。RabbitMq是开发中常用

    2024年02月05日
    浏览(40)
  • RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月10日
    浏览(58)
  • RabbitMQ系列(17)--延迟队列的简介与实现

    1、延迟队列的概念 延迟队列内部是有序的,重要的特性体现在它的延迟属性上,延迟队列中的元素希望在指定时间到了之后或之前取出处理,简单的说延迟队列就是用来存放需要在指定时间被处理的元素的队列。 2、延迟队列的应用场景 (1)订单指定时间内未支付则自动取消

    2024年02月07日
    浏览(33)
  • rabbitMq实现延迟队列,阿里最爱考的前端面试题

    @Qualifier(“deadLetterExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier(“deadLetterQueueB”) Queue queue, @Qualifier(“deadLetterExchange”) DirectExchange exchange) { return BindingBuilder.

    2024年04月14日
    浏览(40)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(43)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包