RabbitMQ之动态创建队列与绑定交换机和监听器

这篇具有很好参考价值的文章主要介绍了RabbitMQ之动态创建队列与绑定交换机和监听器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

为什么需要动态创建队列与绑定交换机?我在写项目的时候遇到这么个问题,我数据库中存在一个字段messageType指定为消息类型,消息类型存在三种,一种是通知类,一种是验证码类,一种是活动类。并且对应的,要将消息进行不同渠道的分发,还存在一个channelType,而他又存在QQ邮箱,手机短信、服务号三种不同的渠道。假如说我每增加一个渠道类型,我就必须再一次手动创建一个队列,那可太烦人了,并且还得新增一个新的监听器。而对应的,每个渠道商给的接口调用频率不同,如果我采用一个队列发送所有的渠道消息,这样一来,一旦某个服务被限流阻塞,那我核心服务就使用不了了。因此还需要对不同的渠道创建不同的队列进行消息隔离。这又面临一个问题,我怎么去动态的创建我自己的队列,不需要我每次都去手动新增队列、绑定队列?下面我说说自己的解决方案。当然,RabbitMQ的交换机的类型不同,对应的队列绑定方式也不太一样.

先复习复习,这些声明创建绑定方式

RabbitMQ中,声明队列以及绑定队列的几种方式

使用RabbitManager可视化图形界面创建、绑定队列

在queues中创建好队列

@rabbitlistener动态配置,消息队列,rabbitmq,ruby,分布式

在exchanges中创建好交换机

@rabbitlistener动态配置,消息队列,rabbitmq,ruby,分布式

在queues中,找到自己创建的队列,点进去,绑定上路由和交换机

@rabbitlistener动态配置,消息队列,rabbitmq,ruby,分布式
或者

在exchanges中,找到自己创建的交换机,点进去,绑定上路由和交换机

@rabbitlistener动态配置,消息队列,rabbitmq,ruby,分布式

反正无论你从哪进去,只要能将queue绑到交换机上,那就没问题。注意一些参数的选择。如果不理解可以看看我之前的文章。

写一个Config类,在Config类中声明队列绑定交换机

示例如下

/**  
* 将队列与交换机绑定  
*/  
@Configuration  
public class RabbitmqConfig {  
  
    //声明sms交换机  
    @Bean(RabbitmqConstant.SMS_EXCHANGE_NAME)  
    public Exchange SMS_EXCHANGE(){  
        //durable(true) 持久化,mq重启之后交换机还在  
        return ExchangeBuilder  
        .topicExchange(RabbitmqConstant.SMS_EXCHANGE_NAME)  
        .durable(true)  
        .build();  
    }  

    //声明sms队列  
    @Bean(RabbitmqConstant.SMS_QUEUE_NAME)  
    public Queue SMS_QUEUE(){  
        return new Queue(RabbitmqConstant.SMS_QUEUE_NAME);  
    }  


    //sms队列绑定sms交换机,指定routingKey  
    @Bean  
    public Binding BINDING_ROUTING_KEY_SMS(@Qualifier(RabbitmqConstant.SMS_QUEUE_NAME) Queue queue,  
    @Qualifier(RabbitmqConstant.SMS_EXCHANGE_NAME) Exchange exchange){  
        return BindingBuilder  
        .bind(queue)  
        .to(exchange)  
        .with(RabbitmqConstant.SMS_ROUTING_KET_FOR_REGISTER)  
        .noargs();  
    }  
}

直接在@RabbitListener中声明并且绑定队列

@RabbitListener(bindings = @QueueBinding(  
value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"),  
exchange = @Exchange(value = "${rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC),  
key = "${rabbitmq.routing.key}"  
))  
public void onMessage(Message message) {  
   //收到消息后处理...
}

RabbitAdmin的使用

RabbitAdmin 是 Spring AMQP 框架中的一个重要组件,用于简化和管理 RabbitMQ 的配置和操作。它提供了一种方便的方式来声明队列、交换机和绑定等 RabbitMQ 的对象,并使用这些对象进行消息的发送和接收。

RabbitAdmin 可以自动创建和绑定队列、交换机和绑定,这样可以在应用程序启动时自动配置 RabbitMQ 的相关对象,而不需要手动去创建和配置这些对象。

使用 RabbitAdmin,你可以通过编程的方式在应用程序中进行 RabbitMQ 的配置和操作,而不需要手动通过管理界面或命令行工具去创建和管理 RabbitMQ 的对象。

以下是 RabbitAdmin 的一些常见用法:

  1. 声明队列:使用 declareQueue() 方法来声明队列,可以设置队列的名称、是否持久化、是否独占等属性。
  2. 声明交换机:使用 declareExchange() 方法来声明交换机,可以设置交换机的类型、是否持久化等属性。
  3. 声明绑定:使用 declareBinding() 方法来声明绑定,将交换机和队列通过指定的路由键绑定在一起。
  4. 发送消息:使用 convertAndSend() 方法来向指定的交换机发送消息。
  5. 接收消息:使用 receiveAndConvert() 方法来接收消息,并将消息转换为指定的 Java 对象。

一个问题

我的一个项目,对不同的业务的发送队列不一样。按照前文所说,我需要手写9个@RabbitListener,我真的会吐。下面看看我是怎么解决的。

Spring SpEL

Spring SpEL(Spring Expression Language,Spring 表达式语言)是 Spring 框架中提供的一种强大的表达式语言。它在功能上比 EL(Expression Language)更为强大,并且可以在更多的场景中使用。

Spring SpEL 可以在 Spring 应用程序的配置文件中使用,用于定义和处理字符串模板、计算表达式和访问对象属性等。 SpEL 提供了一套丰富的语法和功能,使得在 Spring 配置文件或 Spring Boot 的注解中可以进行更灵活和动态的编程。如果兴趣可以去看看这篇文章的详解使用。

深入掌握Spring SpEL及其实战应用

我这里就说一些我会用的
。SpEl可以使用在注解上,可以动态的写一个表达式,并且解析。表达式就类似于#{1 + 1 }、
#{beanName.get()}方法调用,或者是读取配置文件#{‘${rabbimq.name}’}。表达式可以调用SpringIOC容器
中的Bean的方法,下面我就用到了。

RabbitAdmin 配合 SpringEl表达式 和 @RabbitListener 以及Spring多例Bean,动态创建队列并且声明队列监听Bean

第一步,配合SpringBean的生命周期中,@PostConsturct注解,这个注解会在Bean属性赋值之后调用。因此,我拿他来用RabbitAdmin初始化我的队列
工具类,负责从数据库中获取队列的一些拼接段,并且将其拼接好,放在集合中。

@Component("queueNameMappingUtils")  
public class QueueNameMappingUtils {  
    /**  
    * 下标(用于迭代groupIds位置)  
    */  
    private static Integer index = 0;  

    private static List<String> queueNames = getAllQueueNames();  

    /**  
    * 获取所有队列名称
    */  
    public static List<String> getAllQueueNames(){  
        List<String> queueNames = new ArrayList<>();  
        ChannelType[] channelTypes = ChannelType.values();  
        MessageType[] messageTypes = MessageType.values();  
        //将他们组合起来  
        for (ChannelType channelType : channelTypes) {  
            for (MessageType messageType : messageTypes) {  
                groupIds.add(channelType.getCodeEn() + "." + messageType.getCodeEn());  
            }  
        }  
        return queueNames;  
    }  
    /**  
    * 一个一个获取  
    */  
    public String get(){  
        return queueNames.get(index++);  
    }  
}
@Service  
@Slf4j  
public class RabbitMqReceiverInit{
    /**  
    * 上下文容器,用来获取实例  
    */  
    @Autowired  
    private ApplicationContext applicationContext;  

    /**  
    * 编程的方式绑定队列  
    */  
    @Autowired  
    private RabbitAdmin rabbitAdmin;  

    @Value("${message.push.rabbitmq.exchange.name}")  
    private String exchangeName;  
    @Value("${message.pusg.rabbitmq.queue.name.prefix}")  
    private String queuePrefix;  
    @Value("${message.push.rabbitmq.routing.key.prefix}")  
    private String routingKeyPrefix;  


    /**  
    * 获取得到所有的groupId  
    */  
    private static List<String> queueNames = QueueNameMappingUtils.getAllQueueNames();  

    /**  
    * 创建出对象,这个是在bean属性注入之后搞定的  
    */  
    @PostConstruct  
    public void init() {  
        for (int i = 0; i < queueNames.size(); i++) {  
            applicationContext.getBean(RabbitMqReceiver.class);  
        }  
        //申明交换机  
        rabbitAdmin.declareExchange(new TopicExchange(exchangeName, true, false));  
        //开始生产队列  
        for (String s : queueNames) {  
        //队列名称  
        String queueName = queuePrefix + "." + s;  
        //路由名称  
        String routingKey = routingKeyPrefix + "." + s;  
        //申明队列  
        rabbitAdmin.declareQueue(new Queue(queueName, true));  
        //绑定上交换机  
        rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null));  
        }  
    }  

    @Bean  
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {  
        return new RabbitAdmin(connectionFactory);  
    }  
  
}

第二,利用Bean的方法,以及下标偏移,不断获取下一个组装好的队列名称,并且通过SpringSpEl不断访问,最终成功创建多个监听器,并且监听的是不同的队列文章来源地址https://www.toymoban.com/news/detail-773212.html

@Component  
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
public class RabbitMqReceiver {  
    @RabbitListener(queues = "#{'${message.push.rabbitmq.queue.name.prefix}'+'.'+ queueNameMappingUtils.get()}")  
    public void onMessage(Message message) {  
        //处理消息...
    }  
}

到了这里,关于RabbitMQ之动态创建队列与绑定交换机和监听器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列-RabbitMQ:Exchanges、绑定 bindings以及3大常用交换机(Fanout exchange、Direct exchange、Topics exchange)

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上, 通常生产者甚至都不知道这些消息传递传递到了哪些队列中 。 相反, 生产者只能将消息发送到交换机 (exchange) , 交换机工作 的内容非常简单, 一方面它接收来自生产者的消息 , 另一

    2024年04月08日
    浏览(50)
  • RabbitMQ交换机与队列

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反, 生产者只能将消息发送到交换机(exchange) ,交换机工作的内容非常简单, 一方面它接收来自生产者的消息,另一方面

    2024年01月24日
    浏览(33)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(56)
  • rabbitmq默认交换机锁绑定的routingkey-待研究

    例如这个是我的一个消息队列,它默认绑定的交换机是 什么类型呢? 看到这个图,感觉应该是一个默认的交换机,因为是default exchange 于是来到交换机来看看其他默认的交换机: 这里可以看到默认的交换机是direct(应该没看错吧) 但是默认的交换机不应该是有routingKey吗? 那

    2024年02月05日
    浏览(31)
  • RabbitMQ队列及交换机的使用

    目录 一、简单模型 1、首先控制台创建一个队列 2、父工程导入依赖  3、生产者配置文件  4、写测试类 5、消费者配置文件 6、消费者接收消息 二、WorkQueues模型 1、在控制台创建一个新的队列 2、生产者生产消息 3、创建两个消费者接收消息 4、能者多劳充分利用每一个消费者

    2024年02月04日
    浏览(42)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(44)
  • 思科交换机端口动态、静态安全绑定案例

    1、该端口不是聚合端口 2、该端口是access模式 3、配置前要关闭该端口(shutdown) 违规操作:(没有加入端口安全的PC机做一下操作) 1、给没有加入到安全端口的PC进行IP地址配置(增加1次) 2、进行ping操作第一次增加5次,下一次增加4次 查看当前访问过得IP地址信息 Switch(

    2023年04月08日
    浏览(47)
  • 消息队列RabbitMQ.02.交换机的讲解与使用

    目录 RabbitMQ中交换机的基本概念与作用解析 交换机的作用: 交换机的类型: 直连交换机(Direct Exchange): 将消息路由到与消息中的路由键(Routing Key)完全匹配的队列。 主题交换机(Topic Exchange): 使用通配符匹配路由键,允许更灵活的消息路由。 扇形交换机(Fanout E

    2024年01月24日
    浏览(56)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(99)
  • RabbitMQ(一) - 基本结构、SpringBoot整合RabbitMQ、工作队列、发布订阅、直接、主题交换机模式

    Publisher : 生产者 Queue: 存储消息的容器队列; Consumer:消费者 Connection:消费者与消息服务的TCP连接 Channel:信道,是TCP里面的虚拟连接。例如:电缆相当于TCP,信道是一条独立光纤束,一条TCP连接上创建多少条信道是没有限制的。TCP一旦打开,就会出AMQP信道。无论是发布消息

    2024年02月14日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包