SpringBoot连接多个RabbitMQ

这篇具有很好参考价值的文章主要介绍了SpringBoot连接多个RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 前 言

在 SpringBoot 中整合单个 RabbitMQ 使用,是很简单的,只需要引入依赖,然后在配置里面配置好 MQ 的连接地址、账号、密码等信息,然后使用即可。但如果 MQ 的连接地址是多个,那这种连接方式就不奏效了。

前段时间,我开发的一个项目就遇到了这样的问题。那个项目,好几个关联方,每个关联方用的 MQ 的地址都不相同,也就意味着我这边要连接几个 RabbbitMQ 地址。SpringBoot 连接多个 RabbitMQ,怎么搞?

使用默认的连接方式是行不通的,我已经试过,而要实现 SpringBoot 连接多个 RabbitMQ,只能自定义重写一些东西,分别配置才可以,下面一起来走一下试试。

2. 重 写

首先要明确的是,下面的两个类是需要重写的:

  • RabbitTemplate:往队列里面丢消息时,需要用到
  • RabbitAdmin:声明队列、声明交换机、绑定队列和交换机用到

这里,我定义两个关联方,一个是 one,一个是 two,分别重写与它们的连接工厂。

2.1 重写与关联方one的连接工厂
package com.yuhuofei.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 重写与关联方one的连接工厂
 * @date 2022/10/3 16:57
 */
@Slf4j
@Configuration
public class OneMQConfig {


    @Value("${one.spring.rabbitmq.host}")
    private String host;

    @Value("${one.spring.rabbitmq.port}")
    private int port;

    @Value("${one.spring.rabbitmq.username}")
    private String username;

    @Value("${one.spring.rabbitmq.password}")
    private String password;

    @Value("${one.spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 定义与one的连接工厂
     */
    @Bean(name = "oneConnectionFactory")
    @Primary
    public ConnectionFactory oneConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "oneRabbitTemplate")
    @Primary
    public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory);
        oneRabbitTemplate.setMandatory(true);
        oneRabbitTemplate.setConnectionFactory(connectionFactory);
        oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());
                    //下面可自定义业务逻辑处理,如入库保存信息等
                    
                } else {
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定义业务逻辑处理,如入库保存信息等
                    
                }
            }
        });

        oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 只要消息没有投递给指定的队列 就触发这个失败回调
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //获取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 内容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息发送失败{}", e);
                }
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);
                //下面可自定义业务逻辑处理,如入库保存信息等
            }
        });
        return oneRabbitTemplate;
    }

    @Bean(name = "oneFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory,
                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "oneRabbitAdmin")
    @Primary
    public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

2.2 重写与关联方two的连接工厂
package com.yuhuofei.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 重写与关联方two的连接工厂
 * @date 2022/10/3 17:52
 */
@Slf4j
@Configuration
public class TwoMQConfig {

    @Value("${two.spring.rabbitmq.host}")
    private String host;

    @Value("${two.spring.rabbitmq.port}")
    private int port;

    @Value("${two.spring.rabbitmq.username}")
    private String username;

    @Value("${two.spring.rabbitmq.password}")
    private String password;

    @Value("${two.spring.rabbitmq.virtualHost}")
    private String virtualHost;

    /**
     * 定义与two的连接工厂
     */
    @Bean(name = "twoConnectionFactory")
    public ConnectionFactory twoConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "twoRabbitTemplate")
    public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory);
        twoRabbitTemplate.setMandatory(true);
        twoRabbitTemplate.setConnectionFactory(connectionFactory);
        twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());
                    //下面可自定义业务逻辑处理,如入库保存信息等

                } else {
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定义业务逻辑处理,如入库保存信息等

                }
            }
        });

        twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 只要消息没有投递给指定的队列 就触发这个失败回调
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //获取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 内容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息发送失败{}", e);
                }
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);
                //下面可自定义业务逻辑处理,如入库保存信息等
            }
        });
        return twoRabbitTemplate;
    }

    @Bean(name = "twoFactory")
    public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory,
                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "twoRabbitAdmin")
    public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

2.3 创建队列及交换机并绑定
package com.yuhuofei.mq.config;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 创建队列、交换机并绑定
 * @date 2022/10/3 18:15
 */
public class QueueConfig {


    @Resource(name = "oneRabbitAdmin")
    private RabbitAdmin oneRabbitAdmin;

    @Resource(name = "twoRabbitAdmin")
    private RabbitAdmin twoRabbitAdmin;

    @Value("${one.out.queue}")
    private String oneOutQueue;

    @Value("${one.out.queue}")
    private String oneRoutingKey;

    @Value("${two.output.queue}")
    private String twoOutQueue;

    @Value("${two.output.queue}")
    private String twoRoutingKey;

    @Value("${one.topic.exchange.name}")
    private String oneTopicExchange;

    @Value("${two.topic.exchange.name}")
    private String twoTopicExchange;

    @PostConstruct
    public void oneRabbitInit() {
        //声明交换机
        oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false));
        //声明队列
        oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false));
        //绑定队列及交换机
        oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false))
                .to(new TopicExchange(oneTopicExchange, true, false))
                .with(oneRoutingKey));
    }

    @PostConstruct
    public void twoRabbitInit() {
        //声明交换机
        twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false));
        //声明队列
        twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true));
        //绑定队列及交换机
        twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false))
                .to(new TopicExchange(twoTopicExchange, true, false))
                .with(twoRoutingKey));
    }
}

2.4 配置信息

这里的配置信息,需要与各自的关联方约定好再配置

# 与关联方one的MQ配置
one.spring.rabbitmq.host=one.mq.com
one.spring.rabbitmq.port=5672
one.spring.rabbitmq.username=xxxxx
one.spring.rabbitmq.password=xxxxx
one.spring.rabbitmq.virtual-host=/xxxxx
one.out.queue=xxxaa.ssssd.cffs.xxxx
one.topic.exchange.name=oneTopExchange

# 与关联方two的MQ配置
two.spring.rabbitmq.host=two.mq.com
two.spring.rabbitmq.port=5672
two.spring.rabbitmq.username=aaaaaaa
two.spring.rabbitmq.password=aaaaaaa
two.spring.rabbitmq.virtualHost=/aaaaaaa
two.out.queue=ddddd.sssss.hhhhh.eeee
two.topic.exchange.name=twoTopExchange
2.5 注意点

在连接多个 MQ 的情况下,需要在某个连接加上 @Primary 注解(见 2.1 中的代码),表示主连接,默认使用这个连接,如果不加,服务会起不来

3. 使 用

3.1 作为消费者

由于在前面的 2.3 中,声明了队列及交换机,并进行了绑定,那么作为消费者,监听相应的队列,获取关联方发送的消息进行处理即可。这里用监听关联方 one 的出队列做展示,two 的类似。

需要注意的地方是,在监听队列时,需要指定 ContainerFactory。

package com.yuhuofei.mq.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 监听关联方one的消息
 * @date 2022/10/3 18:38
 */
@Slf4j
@Service
public class OneReceive {

    @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory")
    public void listenOne(Message message, Channel channel) {
        //获取MQ返回的数据
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        String data = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("MQ返回的数据:{}", data);
        //下面进行业务逻辑处理
        
    }
}

3.1 作为生产者

使用之前重写的 RabbitTemplate ,向各个关联方指定的队列发送消息。文章来源地址https://www.toymoban.com/news/detail-597286.html

package com.yuhuofei.mq.service;

import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 向关联方的队列发送消息
 * @date 2022/10/3 18:47
 */
@Slf4j
@Service
public class SendMessage {

    @Resource(name = "oneRabbitTemplate")
    private RabbitTemplate oneRabbitTemplate;

    @Resource(name = "twoRabbitTemplate")
    private RabbitTemplate twoRabbitTemplate;

    public void sendToOneMessage(String messageId, OneMessageConverter message) {
        String exchange = message.getExchange();
        String routingKey = message.getRoutingKey();
        JsonObject data = message.getData();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message info = new Message(data.toString().getBytes(), messageProperties);
        info.getMessageProperties().setMessageId(messageId);
        oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));
    }

    public void sendToTwoMessage(String messageId, TwoMessageConverter message) {
        String exchange = message.getExchange();
        String routingKey = message.getRoutingKey();
        JsonObject data = message.getData();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message info = new Message(data.toString().getBytes(), messageProperties);
        info.getMessageProperties().setMessageId(messageId);
        twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));
    }
}

到了这里,关于SpringBoot连接多个RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ESP32连接MQ Sensor实现气味反应

    ESP32+MQTT+MySQL实现发布订阅【气味数据收集】 🔗 https://blog.csdn.net/ws15168689087/article/details/131627595 ESP32连接云服务器【WebSocket】 🔗 https://blog.csdn.net/ws15168689087/article/details/131406163 个人云服务器搭建MQTT服务器 🔗 https://blog.csdn.net/ws15168689087/article/details/131571433 ESP32开发板引脚介绍

    2024年02月16日
    浏览(36)
  • PHP聚合支付网站源码/对接十多个支付接口 第三方/第四方支付/系统源码

    PHP聚合支付网站源码/对接十多个支付接口 第三方/第四方支付/系统源码 内附数十个支付接口代码文件。 下载地址:https://bbs.csdn.net/topics/616764485  

    2024年02月11日
    浏览(48)
  • RabbitMQ详解(四):SpringBoot整合MQ

    需要创建两个springboot项目,一个springboot_rabbitmq_producer生产者,一个springboot_rabbitmq_consumer消费者 定义生产者 创建生产者工程 springboot_rabbitmq_producer pom.xml文件中添加依赖 application.yml文件配置 生产者代码 绑定关系,基于配置文件的形式 测试代码 启动测试,查看图形化管理界面

    2024年02月05日
    浏览(32)
  • springboot整合阿里大于并结合mq发送短信

    在 pom.xml 文件中添加以下依赖: 在 application.properties 文件中添加以下配置: 其中, accessKeyId 和 accessKeySecret 是阿里云控制台上的AccessKey, signName 是短信签名, templateCode 是短信模板ID。 在Spring Boot中,我们可以使用MQ来异步发送短信,提高系统的响应速度。这里以ActiveMQ为例

    2024年02月08日
    浏览(44)
  • 计算两个或多个向量之间的相关性(Matlab 实现)

    本文首次在公众号【零妖阁】上发表,为了方便阅读和分享,我们将在其他平台进行自动同步。由于不同平台的排版格式可能存在差异,为了避免影响阅读体验,建议如有排版问题,可前往公众号查看原文。感谢您的阅读和支持! 两个随机变量 x x x 、 y y y 的 Pearson 线性相关

    2024年02月04日
    浏览(37)
  • 小程序中实现两个或者多个小程序之间互相跳转

    前言:         小程序中实现两个或者多个小程序之间互相跳转,a小程序带参跳转到b小程序中。 点我 https://developers.weixin.qq.com/miniprogram/dev/api/navigate/wx.navigateToMiniProgram.html a小程序:app.json: //数组是其他小程序的appid b小程序中: app.json: //数组是其他小程序的appid 1、页面

    2023年04月09日
    浏览(39)
  • Matlab怎样合并两个矩阵(怎样合并多个矩阵)为一个矩阵

       Hi,科研大神,厌倦了某宝的Chat账号总是封号失联吗?需要稳定的单独账号吗?联系下方企鹅号走起来,都是科研人为大家做点有意义的事情,为您的科研助力~ 如果您需要稳定的-Chat哥婆特账号or图书文献资料- 请加企鹅号-都是科研人为大家科研助力~  

    2024年02月11日
    浏览(37)
  • Nginx分端口部署两个或多个项目(包含反向代理配置)

    Author:think 一、部署Nginx 若读者没有部署安装Nginx,则可以参考下面这篇文章进行安装。 CentOS 7非编译安装Nginx_think_mzs的博客-CSDN博客 二、分析Nginx配置文件 通过上面的方法安装的Nginx,其配置文件在 /etc/nginx/ 目录下,如下图所示。 其中 nginx.conf 为Nginx的主要配置文件,在 co

    2024年02月05日
    浏览(46)
  • git一套代码关联多个远程仓库(一行提交到两个仓库)

    因公司开发了自己的软件代码管理仓库项目,所以需要把之前在git上的项目代码同步到\\\"软件工厂\\\"的仓库中,但是也还没有完全弃用原来的git仓库,而且git 也用习惯了,所以我就考虑将代码提交的时候一次提交到两个远程仓库中。 通过该命令查看现有代码关联的仓库情况 通

    2024年02月05日
    浏览(66)
  • 如何安装多个版本的python,python可以装两个版本吗

    这篇文章主要介绍了可不可以在同一台计算机上安装多个python版本,具有一定借鉴价值,需要的朋友可以参考下。希望大家阅读完这篇文章后大有收获,下面让小编带着大家一起了解一下。 可以的,比如macOS和Linux系统都自带了Python2,你可以保留Python2(因为很多工具的执行还

    2024年02月07日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包