根据源码,模拟实现 RabbitMQ - 转发规则实现(6)

这篇具有很好参考价值的文章主要介绍了根据源码,模拟实现 RabbitMQ - 转发规则实现(6)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、转发规则实现

1.1、需求分析

1.2、实现 Router 转发规则

1.2.1、bindingKey 和 routingKey 参数校验

1.2.2、消息匹配规则

1.2.3、主题交换机匹配规则


一、转发规则实现


1.1、需求分析

这里主要实现 routingKey 和 bindingKey 参数的校验,以及 TopicExchange 类型绑定规则的实现.

这里重点来看一下 Topic 交换机的转发规则

bindingkey:创建绑定的时候,给绑定指定特殊字符串,相当于出题;

routingKey:发布消息的时候,给消息上指定的特殊字符串,相当于做答案;

当 routingKey 和 bindingKey 匹配(答案对的上),就可以将消息转发给对应的队列。

什么叫做能匹配的上?

routingKey 的组成:

  1. 数字、字母、下划线
  2. 使用 "." 把整个routingKey 分成多个部分

例如:

  • aaa.bbb.ccc        合法
  • aaa.564.bbb       合法
  • aaa                     合法

bindingKey 的组成:

  1. 数字、字母、下划线
  2. 使用 "." 把整个 bindingKey 分成多个部分
  3. 支持两种特殊的符号作为通配符(* 和 # 必须是作为被 . 分割出来的独立部分)
    1. * 可以匹配任何一个独立的部分
    2. # 可以匹配任何 0 个或者多个独立的部分

例如:

  • aaa.*.bbb    合法
  • aaa.*.bbb    非法
  • aaa.#b.ccc  非法

是否能匹配上,有如下几个栗子:


情况一:bindingKey 中没有 * 和 # ,此时,必须要求 routingKey 和 bindingKey 一模一样,才能匹配成功

假设 bindingKey:aaa.bbb.ccc

此时 routingKey 如下:

aaa.bbb.ccc (匹配成功)

aaa.cbb.ccc (匹配失败)


情况二: bindingKey 中有 * 

假设 bindingKey:aaa.*.ccc

此时 routingKey 如下:

aaa.bbb.ccc  (匹配成功)

aaa.gafdga.ccc  (匹配成功)

aaa.bbb.eee.ccc (匹配失败)


情况三:bindingKey 中有 #

假设 bindingKey:aaa.#.ccc

此时 routingKey 如下:

aaa.bbb.ccc   (匹配成功)

aaa.bbb.ddd.ccc   (匹配成功)

aaa.ccc(匹配成功)

aaa.b.ccc.d(匹配失败)


特殊情况:如果把 bindingKey 设置成 #,就可以匹配到所有 routingKey,如下

aaa

aaa.bbb

aaa.bbb.ccc

.......

此时,topic 交换机就相当用户 fanout 交换机了 


Ps:上述规则是 AMQP 协议约定的

1.2、实现 Router 转发规则

1.2.1、bindingKey 和 routingKey 参数校验

bindingKey:

    * 1.数字、字母、下划线
    * 2.使用 . 作为分隔符
    * 3.允许存在 * 和 # 作为通配符,但是通配符只能作为独立的分段

routingKey:

     * 1.字母、数组、下划线
     * 2.使用 . 进行分割

    /**
     * bindingKey 的构造规则
     * @param bindingKey
     * @return
     */
    public boolean checkBindingKey(String bindingKey) {
        if(bindingKey.length() == 0) {
            //空字符串,也是一种合法情况,比如使用 direct(routingKey 直接当作队列名字去匹配) / fanout 交换机的时候,bindingKey 是用不上的
            return true;
        }
        //检查字符串中不能存在的非法字符
        for(int i = 0; i < bindingKey.length(); i++) {
            char ch = bindingKey.charAt(i);
            if(ch >= 'A' && ch <= 'Z') {
                continue;
            }
            if(ch >= 'a' && ch <= 'z') {
                continue;
            }
            if(ch >= '0' && ch <= '9') {
                continue;
            }
            if(ch == '.' || ch == '_' || ch == '*' || ch == '#') {
                continue;
            }
            return false;
        }
        //检查 * 或者 # 是否是独立的部分
        //aaa.*.bbb 是合法的;aaa.a*.bbb 是非法的
        String[] words = bindingKey.split("\\.");//这里是正则表达式
        for(String word : words) {
            //检查 word 长度 > 1,并且包含了 * 或者 # ,就是非法格式了
            if(word.length() > 1 && (word.contains("*") || word.contains("#"))) {
                return false;
            }
        }
        //约定一下,通配符之间的相邻关系(个人约定,不这样约定太难实现)
        //为什么这么约定。因为前三种相邻的时候,实现匹配的逻辑是非常繁琐,同时功能性提升不大
        //1. aaa.#.#.bbb  => 非法
        //2. aaa.#.*.bbb  => 非法
        //3. aaa.*.#.bbb  => 非法
        //4. aaa.*.*.bbb  => 合法
        for(int i = 0; i < words.length - 1; i++) {
            if(words[i].equals("#") && words[i + 1].equals("#")) {
                return false;
            }
            if(words[i].equals("#") && words[i + 1].equals("*")) {
                return false;
            }
            if(words[i].equals("*") && words[i + 1].equals("#")) {
                return false;
            }
        }
        return true;
    }

    /**
     * routingKey 的构造规则
     * @param routingKey
     * @return
     */
    public boolean checkRoutingKey(String routingKey) {
        if(routingKey.length() == 0) {
            //空字符串是合法情况,比如使用 faout 交换机的时候,routingKey 用不上,就可以设置为 “”
            return true;
        }
        for(int i = 0; i < routingKey.length(); i++) {
            char ch = routingKey.charAt(i);
            // 检查大写字母
            if(ch >= 'A' && ch <= 'Z') {
                continue;
            }
            // 检查小写字母
            if(ch >= 'a' && ch <= 'z') {
                continue;
            }
            //检查数字
            if(ch >= '0' && ch <= '9') {
                continue;
            }
            //检查下划线和.
            if(ch == '_' || ch == '.') {
                continue;
            }
            //不是上述规则,就是错误
            return false;
        }
        return true;
    }

Ps:split 方法中的参数是一个正则表达式~  首先 "."  在正则表达式中是一个特殊的符号,"\\." 是把 . 当作原始文本进行匹配,想要使用 . 作为为原始文本,就需要在正则表达式中使用 \. 来表示,又因为,在 Java 的字符串中,"\" 是一个特殊字符,需要使用 "\\" 来转义,此时才能通过 "\\." 的方式真正录入 "." 这个文本

(这个不懂没关系,记住就行了)

1.2.2、消息匹配规则

之前已经具体实现了 直接交换机和扇出交换机的逻辑了,这里主要关注主题交换机.

    /**
     * 判定消息是否可以转发给这个绑定的队列
     * @param exchangeType
     * @param binding
     * @param message
     * @return
     */
    public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {
        //1.判断当前交换机类型
        if(exchangeType == ExchangeType.FANOUT) {
            // 扇出交换机,要给每一个绑定的队列都发送消息
            return true;
        } else if(exchangeType == ExchangeType.TOPIC) {
            // 主题交换机
            return routeTopic(binding, message);
        } else {
            // 其他情况是不存在的
            throw new MqException("[Router] 交换机类型非法!exchangeType=" + exchangeType);
        }
    }

1.2.3、主题交换机匹配规则

假设 bindingKey:aaa.*.bbb.#.ccc

假设 routingKey:aaa.11.bbb.22.33.ccc

这里我们使用双指针算法进行匹配:

  1. 先将这两个字符串使用 split 通过 "." 进行分割.
  2. 如果指向的是一个普通的字符串,此时要求和 routingKey 的对应下标执行的内容完全一致.、
  3. 如果指向的是 *,此时无论 routingKey 指向的是什么,双方都同时下标前进.
  4. 遇到 # 了,并且如果 # 后面没有其他内容了,直接返回 true,匹配成功.
  5. 遇到 # 了,并且 # 后面还有内容,拿着 # 后面的一段内容,去 routingKey 中查找,找到后面的部分,在 routingKey 中出现的位置(如果后面部分在 routingKey 中不存在,直接认为匹配失败).
  6.  两个指针同时到达末尾,则匹配成功,反之匹配失败
    public boolean routeTopic(Binding binding, Message message) {
        //先进行切分
        String[] bindingTokens = binding.getBindingKey().split("\\.");
        String[] routingTokens = message.getRoutingKey().split("\\.");

        //使用双指针
        int bindingIndex = 0;
        int routingIndex = 0;
        while(bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
            if(bindingTokens[bindingIndex].equals("*")) {
                //遇到 * ,继续向后走
                bindingIndex++;
                routingIndex++;
            } else if(bindingTokens[bindingIndex].equals("#")) {
                //如果遇到 #,先看看还有没有下一个位置
                bindingIndex++;
                if(bindingIndex == bindingTokens.length) {
                    //后面没有东西,一定匹配成功
                    return true;
                }
                //如果后面还有东西,就去 routingKey 后面的位置去找
                //findNextMatch 这个方法用来查找该部分再 routingKey 的位置,返回该下标,没找到,就返回 -1
                routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
                if(routingIndex == -1) {
                    //后面没有匹配结果,失败
                    return false;
                }
                //找到了,就继续往后匹配
                routingIndex++;
                bindingIndex++;
            } else {
                //如果遇到普通字符串,要求两边的内容是一样的
                if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
                    return false;
                }
                bindingIndex++;
                routingIndex++;
            }
        }
        //判定是否双方同时到达末尾
        //比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
        return (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length);
    }

    private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {
        for(int i = routingIndex; i < routingTokens.length; i++) {
            if(routingTokens[i].equals(bindingToken)) {
                return i;
            }
        }
        return -1;
    }

根据源码,模拟实现 RabbitMQ - 转发规则实现(6),RabbitMQ,rabbitmq,分布式

 文章来源地址https://www.toymoban.com/news/detail-664463.html

到了这里,关于根据源码,模拟实现 RabbitMQ - 转发规则实现(6)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 根据源码,模拟实现 RabbitMQ - 实现消息持久化,统一硬盘操作(3)

    目录 一、实现消息持久化 1.1、消息的存储设定 1.1.1、存储方式 1.1.2、存储格式约定 1.1.3、queue_data.txt 文件内容  1.1.4、queue_stat.txt 文件内容 1.2、实现 MessageFileManager 类 1.2.1、设计目录结构和文件格式 1.2.2、实现消息的写入 1.2.3、实现消息的删除(随机访问文件) 1.2.4、获取队

    2024年02月12日
    浏览(35)
  • 根据源码,模拟实现 RabbitMQ - 通过 SQLite + MyBatis 设计数据库(2)

    目录 一、数据库设计 1.1、数据库选择 1.2、环境配置 1.3、建库建表接口实现 1.4、封装数据库操作 1.5、针对 DataBaseManager 进行单元测试 1.6、心得 MySQL 是我们最熟悉的数据库,但是这里我们选择使用 SQLite,原因如下: SQLite 比 MySQL 更轻量:一个完整的 SQLite 数据库,只有一个单

    2024年02月13日
    浏览(29)
  • 根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)

    目录 一、客户端代码实现 1.1、需求分析 1.2、具体实现 1)实现 ConnectionFactory 2)实现 Connection 3)实现 Channel 二、编写 Demo  2.1、实例  2.1、实例演示 RabbitMQ 的客户端设定:一个客户端可以有多个模块,每个模块都可以和 broker server 之间建立 “逻辑上的连接” (channel),这

    2024年02月11日
    浏览(33)
  • 根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8)

    目录 一、网络通讯协议设计 1.1、交互模型 1.2、自定义应用层协议 1.2.1、请求和响应格式约定 ​编辑 1.2.2、参数说明 1.2.3、具体例子 1.2.4、特殊栗子 1.3、实现 BrokerServer 1.3.1、属性和构造 1.3.2、启动 BrokerServer 1.3.3、停止 BrokerServer 1.3.4、处理每一个客户端连接 1.3.5、读取请求

    2024年02月10日
    浏览(34)
  • 模拟实现消息队列(以 RabbitMQ 为蓝本)

    核心概念1 生产者(Producer):生产者负责生成数据并将其放入缓冲区(队列)中。生产者可以是一个线程或多个线程,它们可以并行地生成数据。当缓冲区(队列)已满时,生产者需要等待,直到有空间可用。 消费者(Consumer):消费者负责从缓冲区(队列)中取出数据并进行处

    2024年02月13日
    浏览(53)
  • 本地模拟发送、接收RabbitMQ数据

    日常开发中,当线上RabbitMQ坏境还没准备好时,可在本地模拟发送、接收消息 Docker安装RabbitMQ 【SpringCloud】整合RabbitMQ六大模式应用(入门到精通) Spring RabbitMQ 配置多个虚拟主机(vhost)

    2024年02月21日
    浏览(32)
  • 基于RabbitMQ的模拟消息队列需求文档

    什么是消息队列? 消息队列就是,基于阻塞队列,封装成一个独立的服务器程序,实现跨主机使用生产者-消费者模型。生产者生产消息到消息队列,消费者从消息队列消费数据。 1.核心概念 生产者(Producer):生产消息的客户端 消费者 (Consumer) :消费消息的客户端 中间人

    2024年02月10日
    浏览(25)
  • 模拟高并发下RabbitMQ的削峰作用

            在并发量很高的时候,服务端处理不过来客户端发的请求,这个时候可以使用消息队列,实现削峰。原理就是请求先打到队列上,服务端从队列里取出消息进行处理,处理不过来的消息就堆积在消息队列里等待。 可以模拟一下这个过程:         发送方把10万条

    2024年02月11日
    浏览(32)
  • 基于RabbitMQ的模拟消息队列之四——内存管理

    针对交换机、队列、绑定、消息、待确认消息设计数据结构。 交换机集合 exchangeMap 数据结构:ConcurrentHashMap key:交换机name value:交换机对象 队列集合 queueMap 数据结构: ConcurrentHashMap key:队列name value:队列对象 绑定集合 bindingsMap 数据结构: ConcurrentHashMap 嵌套 ConcurrentHashMap key

    2024年02月10日
    浏览(32)
  • 【rabbitMQ】-延迟队列-模拟控制智能家居的操作指令

    这个需求为控制智能家居工作,把控制智能家居的操作指令发到队列中,比如:扫地机、洗衣机到指定时间工作   一.什么是延迟队列? 延迟队列存储的对象是对应的延迟消息,所谓“延迟消息” 是指当消息被发送以后, 并不想让消费者立刻拿到消息 ,而是等待特定时间后

    2024年02月06日
    浏览(26)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包