springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定

这篇具有很好参考价值的文章主要介绍了springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定

1. 数据库准备

drop table if exists mq_config;

/*==============================================================*/
/* Table: mq_config                                             */
/*==============================================================*/
create table mq_config
(
   mq_id                varchar(200) not null comment '交换机id',
   exchange_type        char(1) comment '交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)',
   exchange_name        varchar(200) comment '交换机名称',
   queue_name           varchar(200) comment '队列名称',
   binding              varchar(200) comment '绑定关系',
   delay_type           char(1) comment '是否死信队列(0:是;1:否)',
   version              bigint(20) default 0 comment '乐观锁',
   del_flag             char(1) default '0' comment '删除标志(0:存在; 1:删除)',
   status               char(1) default '0' comment '记录状态(0:在用; 1:停用)',
   create_by            varchar(64) comment '创建人',
   create_time          timestamp(0) default CURRENT_TIMESTAMP comment '创建时间',
   update_by            varchar(64) comment '修改人',
   update_time          datetime(0) default CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间',
   remark               varchar(500) comment '备注',
   primary key (mq_id)
);

alter table mq_config comment 'mq配置表';

2. 依赖引入

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!-- 连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <!-- swagger依赖 -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-micro-spring-boot-starter</artifactId>
            <version>3.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-spring-boot-starter</artifactId>
            <version>3.0.3</version>
        </dependency>
        <!-- mybatis-plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3. yml配置

# 应用名称
server:
  port: 8080

spring:
  datasource:
    # 配置druid数据库连接池
    druid:
      #配置当前数据源类型
      type: com.alibaba.druid.pool.DruidDataSource
      # 配置MySQL的驱动程序类
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/schedule?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
      username: root
      password: root
      # 配置监控统计拦截的filters,stat是sql监控,wall是防火墙(如果不添加则监控无效),添加log4j需要引入jar包
      filters: stat,wall
      # 连接池最大活跃连接数
      max-active: 100
      # 连接池初始化连接数量
      initial-size: 1
      # 配置获取连接等待超时的时间
      max-wait: 60000
      # 连接池最小空闲数
      min-idle: 1
      # 指定空闲连接检查、废弃连接清理、空闲连接池大小调整之间的操作时间间隔
      time-between-eviction-runs-millis: 60000
      # 指定一个空闲连接最少空闲多久后可被清除
      min-evictable-idle-time-millis: 300000
      # 连接是否有效的查询语句
      validation-query: select 'x'
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      #打开 PSCache,并且指定每个连接上 PSCache 的大小
      pool-prepared-statements: true
      max-open-prepared-statements: 50
      max-pool-prepared-statement-per-connection-size: 20
      # 配置 DruidStatFilter
      web-stat-filter:
        enabled: true #\u662F\u5426\u542F\u7528StatFilter\u9ED8\u8BA4\u503Ctrue
        # 排除一些不必要的url,比如.js,/jslib/等
        exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"
        # 过滤规则
        url-pattern: /*
      # 配置 DruidStatViewServlet
      stat-view-servlet:
        #手动重置监控数据
        enabled: true
        url-pattern: /druid/*
        # IP白名单,没有配置或者为空,则允许所有访问
        allow:
        #IP黑名单,若白名单也存在,则优先使用
        deny:
        # 配置druid登录用户名、密码
        login-username: admin
        login-password: admin
        # HTML 中 Reset All 按钮
        reset-enable: true
  rabbitmq:
    host: 10.168.1.200
    port: 5672
    virtual-host: test
    username: admin
    password: admin

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4. 创建mq操作类

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqVo implements Serializable {

    private static final long serialVersionUID = -3630888028848412302L;

    /**
     * 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)
     */
    @ApiModelProperty(value = "交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)")
    private String exchangeType;

    /**
     * 交换机名称
     */
    @ApiModelProperty(value = "交换机名称")
    private String exchangeName;

    /**
     * 队列名称
     */
    @ApiModelProperty(value = "队列名称")
    private String queueName;

    /**
     * 绑定关系
     */
    @ApiModelProperty(value = "绑定关系")
    private String binding;

    /**
     * 是否死信队列(0:是;1:否)
     */
    @ApiModelProperty(value = "是否死信队列(0:是;1:否)")
    private String delayType;

    /**
     * 操作类型(0:新增; 1:删除)
     */
    @ApiModelProperty(value = "操作类型(0:新增; 1:删除)")
    private int type;
}

5. mq操作方法

@Slf4j
@Component
public class MqUtils {

    /**
     * 获取工厂配置类
     */
    @Resource
    private ConnectionFactory connectionFactory;

    /**
     * 新增消息对列
     */
    public void mqOperate(MqVo mqVo) {

        //交换机类型
        String exchangeType = mqVo.getExchangeType();
        log.info("exchangeType -> {}", exchangeType);

        //队列名称
        String queueName = mqVo.getQueueName();
        log.info("queueName -> {}", queueName);

        //交换机名称
        String exchangeName = mqVo.getExchangeName();
        log.info("exchangeName -> {}", exchangeName);

        //绑定关系
        String binding = mqVo.getBinding();
        log.info("binding -> {}", binding);

        //是否死信队列(0:是;1:否)
        String delayType = mqVo.getDelayType();
        log.info("delayType -> {}", delayType);

        //操作类型
        int status = mqVo.getType();

        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        if (status == 0) {
            //新增队列
            rabbitAdmin.declareQueue(new Queue(queueName));

            //新增交换机
            rabbitAdmin.declareExchange(getExchange(exchangeType, exchangeName, delayType));

            //新增绑定关系
            rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, binding, null));
        } else {
            //删除队列
            rabbitAdmin.deleteQueue(queueName);
            //删除交换机
            rabbitAdmin.deleteExchange(exchangeName);
            //删除绑定关系
            rabbitAdmin.removeBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, binding, null));
        }
        close();
    }

    /**
     * 交换机生成方法
     * @param exchangeType 交换机类型
     * @param exchangeName 交换机名称
     * @param delayType 是否死信队列(0:是;1:否)
     * @return Exchange
     */
    private Exchange getExchange(String exchangeType, String exchangeName, String delayType) {
        Exchange exchange = new DirectExchange(exchangeName);
        String zero = "0";
        switch (exchangeType) {
            case "0":
                if (zero.equals(delayType)) {
                    Map<String, Object> map = new HashMap<>(1);
                    map.put("x-delayed-type", "direct");
                    exchange = new CustomExchange(exchangeName, "x-delayed-message", true, false, map);
                } else {
                    exchange = new DirectExchange(exchangeName);
                }
                break;
            case "1":
                if (zero.equals(delayType)) {
                    //待补充
                } else {
                    exchange = new TopicExchange(exchangeName);
                }
                break;
            case "2":
                if (zero.equals(delayType)) {
                    //待补充
                } else {
                    exchange = new FanoutExchange(exchangeName);
                }
                break;
            case "3":
                if (zero.equals(delayType)) {
                    //待补充
                } else {
                    exchange = new HeadersExchange(exchangeName);
                }
                break;
            default:
                break;
        }
        return exchange;
    }

    /**
     * 关闭连接
     */
    public void close(){

        try (Connection connection = connectionFactory.createConnection()){
            try (Channel channel = connection.createChannel(true)){
                com.rabbitmq.client.Connection connection1 = channel.getConnection();
                connection1.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

6. service接口

@Slf4j
@Service("mqConfigService")
public class MqConfigServiceImpl extends ServiceImpl<MqConfigDao, MqConfig> implements MqConfigService {

    @Resource
    private MqUtils mqUtils;

    @Override
    public boolean add(MqConfig mqConfig) {
        //写入数据库
        boolean save = save(mqConfig);
        //判断是否写入成功
        if (save) {
            //判断是否启用消息队列
            status(mqConfig);
        }
        return save;
    }

    @Override
    public boolean update(MqConfig mqConfig) {
        //查询数据库现存信息
        MqConfig byId = getById(mqConfig.getMqId());
        //修改数据
        boolean b = updateById(mqConfig);
        //判断是否修改成功
        if (b) {
            //删除消息队列
            addOrDelMq(byId, 1);
            //判断是否启用消息队列
            status(mqConfig);
        }
        return b;
    }

    /**
     * 状态位为开启时,消息队列创建方法封装
     * @param mqConfig
     */
    private void status(MqConfig mqConfig) {
        if ("0".equals(mqConfig.getStatus())) {
            //创建消息队列
            addOrDelMq(mqConfig, 0);
        }
    }

    /**
     * 新增队列方法封装
     * @param mqConfig
     */
    public void addOrDelMq(MqConfig mqConfig, int type){
        mqUtils.mqOperate(
                MqVo
                        .builder()
                        .exchangeType(mqConfig.getExchangeType())
                        .queueName(mqConfig.getExchangeName())
                        .exchangeName(mqConfig.getExchangeName())
                        .binding(mqConfig.getBinding())
                        .delayType(mqConfig.getDelayType())
                        .type(type)
                        .build()
        );
    }

    /**
     * 删除消息队列
     * @param mqIds
     * @return
     */
    @Override
    public boolean delete(String[] mqIds) {
        Boolean b = false;
        for (String mqId : mqIds) {
            //查询消息队列信息
            MqConfig mqConfig = getById(mqId);
            //删除消息队列
            b = removeById(mqId);
            //判断消息队列是否删除成功
            if (b) {
                //删除消息队列
                addOrDelMq(mqConfig, 1);
            }
        }
        return b;
    }
}

7.controller接口

@RestController
@RequestMapping("mqConfig")
public class MqConfigController {

    /**
     * 服务对象
     */
    @Resource
    private MqConfigService mqConfigService;

    /**
     * 新增mq配置
     *
     * @param mqConfig 实体
     * @return 新增是否成功
     */
    @ApiOperation(value = "新增mq配置")
    @PostMapping(value = "add", produces = "application/json;charset=utf-8")
    public ApiResult<Boolean> add(MqConfig mqConfig) {
        return ApiResult.ok("添加成功", mqConfigService.add(mqConfig));
    }

    /**
     * 修改mq配置
     *
     * @param mqConfig 实体
     * @return 修改是否成功
     */
    @ApiOperation(value = "修改mq配置")
    @PutMapping(value = "update", produces = "application/json;charset=utf-8")
    public ApiResult<Boolean> update(MqConfig mqConfig) {
        return ApiResult.ok("修改成功", mqConfigService.update(mqConfig));
    }

    /**
     * 删除mq配置
     *
     * @param mqIds 主键
     * @return 删除是否成功
     */
    @ApiOperation(value = "删除mq配置")
    @DeleteMapping(value = "deleteById", produces = "application/json;charset=utf-8")
    public ApiResult<Boolean> deleteById(String[] mqIds) {
        return ApiResult.ok("删除成功", mqConfigService.delete(mqIds));
    }
}

8.gitee地址

文章来源地址https://www.toymoban.com/news/detail-620897.html

到了这里,关于springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot整合RabbitMQ,三种交换机类型示例

    在application.properties或application.yml中配置RabbitMQ服务器的连接参数: 4.1、DirectExchange(直连交换机) 消费者 生产者 测试 一个交换机对多个队列的特点: 一个队列对多个消费者特点: 4.2、FanoutExchange(扇形/广播交换机) 消费者 生产者 4.3、TopicExchange(主题交换机) 消费者 生

    2024年04月12日
    浏览(12)
  • SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法

    原文网址:SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法_IT利刃出鞘的博客-CSDN博客         本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列。 交换机 下边两种方式等价。 队列 下边两种方式等价 绑定 下边两种方式等价 注意:第一种的参数并不是字符

    2023年04月09日
    浏览(21)
  • 【学习笔记】RabbitMQ02:交换机,以及结合springboot快速开始

    参考资料 RabbitMQ官方网站 RabbitMQ官方文档 噼咔噼咔-动力节点教程 4.1 交换机类型 Exchange (简称X)翻译为交换机、交换器、路由器… 注意 :交换机并不是所有消息中间件都有,但是是一个很好的概念 交换机分为以下四个类型 扇形交换机:Fanout Exchange 直连 主题 头部 4.2 扇形

    2024年02月07日
    浏览(21)
  • springboot与rabbitmq的整合【演示5种基本交换机】

    前言 : 👏作者简介:我是笑霸final,一名热爱技术的在校学生。 📝个人主页:个人主页1 || 笑霸final的主页2 📕系列专栏:后端专栏 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀 🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏

    2024年02月17日
    浏览(23)
  • RabbitMQ(一) - 基本结构、SpringBoot整合RabbitMQ、工作队列、发布订阅、直接、主题交换机模式

    Publisher : 生产者 Queue: 存储消息的容器队列; Consumer:消费者 Connection:消费者与消息服务的TCP连接 Channel:信道,是TCP里面的虚拟连接。例如:电缆相当于TCP,信道是一条独立光纤束,一条TCP连接上创建多少条信道是没有限制的。TCP一旦打开,就会出AMQP信道。无论是发布消息

    2024年02月14日
    浏览(23)
  • 【初始RabbitMQ】交换机的实现

    RabbitMQ消息传递模型的核心思想就是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者不知道这些消息会传递到那些队列中 相反,生产者只能将消息发送到交换机,交换机的工作内容也很简单,一方面是接受来自生产者的消息,另一方面是将它们推入到队列

    2024年02月20日
    浏览(13)
  • RabbitMQ的交换机(原理及代码实现)

    Fanout Exchange(扇形) Direct Exchange(直连) opic Exchange(主题) Headers Exchange(头部) Fanout 扇形的,散开的; 扇形交换机 投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发; 如下图所示 P代表provider提供者,X代表exchange交换机,第三部分代

    2024年02月08日
    浏览(17)
  • RabbitMQ中交换机的应用 ,原理 ,案例的实现

                                    🎉🎉欢迎来到我的CSDN主页!🎉🎉                     🏅我是平顶山大师,一个在CSDN分享笔记的博主。📚📚     🌟推荐给大家我的博客专栏《RabbitMQ中交换机的应用及原理,案例的实现》。🎯🎯                    

    2024年01月24日
    浏览(23)
  • RabbitMQ中交换机的应用及原理,案例的实现

    目录 一、介绍 1. 概述 2. 作用及优势 3. 工作原理 二、交换机Exchange 1. Direct 2. Topic 3. Fanout 三、代码案例 消费者代码   1. 直连direct  生产者代码 测试 2. 主题topic  生产者代码 测试 3. 扇形fanout  生产者代码 测试 每篇一获 RabbitMQ中的交换机(exchange)是消息的分发中心,它

    2024年01月24日
    浏览(19)
  • RabbitMQ系列(14)--Topics交换机的简介与实现

    1、Topics交换机的介绍 Topics交换机能让消息只发送往绑定了指定routingkey的队列中去,不同于Direct交换机的是,Topics能把一个消息往多个不同的队列发送;Topics交换机的routingkey不能随意写,必须是一个单词列表,并以点号分隔开,例如“one.two.three”,除此外还有两个替换符,

    2024年02月13日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包