根据源码,模拟实现 RabbitMQ - 内存数据管理(4)

这篇具有很好参考价值的文章主要介绍了根据源码,模拟实现 RabbitMQ - 内存数据管理(4)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、内存数据管理

1.1、需求分析

1.2、实现 MemoryDataCenter 类

1.2.1、ConcurrentHashMap 数据管理

1.2.2、封装交换机操作

1.2.3、封装队列操作

1.2.4、封装绑定操作

1.2.5、封装消息操作

1.2.6、封装未确认消息操作

1.2.7、封装恢复数据操作


一、内存数据管理


1.1、需求分析

当前已经使用 数据库 管理了 交换机、绑定、队列,又使用 数据文件 管理了 消息.

最后还使用一个类将上述两部分整合在了一起,对上层提供统一的一套接口.

但对于 MQ 来说,是以内存存储数据为主,硬盘存储数据为辅(硬盘数据主要是为了持久化保存,重启之后,数据不丢失).

接下来就需要使用 内存 来管理上述数据~~

这里我们主要使用 ConcurrentHashMap 来进行数据管理(主要是因为线程安全问题).

交换机:使用 ConcurrentHashMap,key 是 name,value 是 Exchange 对象。

队列:使用 ConcurrentHashMap,key 是 name,value 是 MSGQueue 对象。

绑定:使用嵌套的 ConcurrentHashMap,key 是 exchangeName,value 是一个 ConcurrentHashMap(key 是 queueName,value 是 Binding 对象)。

消息:使用 ConcurrentHashMap,key 是 messageId ,value 是 Message 对象。

队列和消息的关联关系:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是一个 LinkedList(每个元素是一个 Message 对象)。

表示 “未被确认” 的消息:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是 ConcurrentHashMap(key 是 messageId,value 是 Message 对象,后续实现消息确认的逻辑,需要根据 ack 响应的内容,会提供一个确认的 messageId,根据这个 messageId 来把上述结构中的 Message 对象找到并移除)。

Ps:此处实现的 MQ,支持两种应答模式的 ACK

  1. 自动应答:消费者取了元素,整个消息就算是被应答了,此时整个消息就可以被干掉了。
  2. 手动应答:消费者取了元素,这个消息不算被应答,需要消费者主动再调用一个 basicAck 方法,此时才认为是真正应答了,才能删除这个消息。

1.2、实现 MemoryDataCenter 类

1.2.1、ConcurrentHashMap 数据管理

这里就是用 ConcurrentHashMap 来对上述数据进行统一内存管理.

    //key 是 exchangeName, value 是 Exchange 对象
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    //key 是 queueName, value 是 MSGQueue 对象
    private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
    //第一个 key 是 exchangeName,第二个 key 是 queueName
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
    //key 是 messageId ,value 是 Message 对象
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
    //key 是 queueName , value 是 Message 的链表
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
    // 第一个 key 是 queueName, 第二个 key 是 messageId
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

1.2.2、封装交换机操作

主要就是对 exchangeMap 插入、获取、删除交换机.


    public void insertExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(), exchange);
        System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);
    }

1.2.3、封装队列操作

主要就是对 queueMap 插入、获取、删除队列.

    public void insertQueue(MSGQueue queue) {
        queueMap.put(queue.getName(), queue);
        System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName());
    }

    public MSGQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName);
    }

1.2.4、封装绑定操作

这里值得注意的是加锁逻辑,并不是加了锁就一定安全,也不是说不加锁就一定不安全,如果这段代码前后逻辑性很强,需要打包成一个原子性的操作,那就可以进行加锁,如果不是那么强的因果,就没必要,因为加锁也是需要开销的,加锁之后的锁竞争更是一个时间消耗。

    public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if(bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }
        //上面这段逻辑可以用以下代码来替换
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                k -> new ConcurrentHashMap<>());

        synchronized(bindingMap) {
            //再根据 queueName 查一下,只有不存在的时候才能插入,存在就抛出异常
            if(bindingMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +
                        ", queueName=" + binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(), binding);
        }
        System.out.println("[MemoryDataCenter] 新绑定添加成功!exchangeName=" + binding.getExchangeName() +
                ", queueName=" + binding.getQueueName());

    }

    /**
     * 获取绑定有两个版本
     * 1.根据 exchangeName 和 queueName 确定唯一一个 Binding
     * 2.根据 exchangeName 获取到所有的 Binding
     * @param exchangeName
     * @param queueName
     * @return
     */
    public Binding getBinding(String exchangeName, String queueName) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if(bindingMap == null) {
            throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + exchangeName +
                    ", queueName=" + queueName);
        }
        return bindingMap.get(queueName);
    }

    public ConcurrentHashMap<String, Binding> getBindings(String exchangName) {
        return bindingsMap.get(exchangName);
    }

    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding>  bindingMap = bindingsMap.get(binding.getExchangeName());
        //这里操作不是很关键,因此可以不用加锁(加锁不一定就安全,也不是说不加锁就一定不安全,要结合实际场景)
        //如果这段代码前后逻辑性很强,需要打包成一个原子性的操作,那就可以进行加锁,如果不是那么强的因果,就没必要,因为加锁也是需要开销的,加锁之后的锁竞争更是一个时间消耗
        if(bindingMap == null) {
            throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + binding.getExchangeName() +
                    ", queueName=" + binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName=" + binding.getExchangeName() +
                ", queueName=" + binding.getQueueName());
    }

1.2.5、封装消息操作

这里值得注意的是 LinkedList 是线程不安全的,要特殊处理.

    /**
     * 添加消息
     * @param message
     */
    public void addMessage(Message message) {
        messageMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter] 新消息添加成功!messageId=" + message.getMessageId());
    }

    /**
     * 根据 id 查询消息
     * @param messageId
     */
    public Message selectMessage(String messageId) {
        return messageMap.get(messageId);
    }

    /**
     * 根据 id 删除消息
     * @param messageId
     */
    public void removeMessage(String messageId) {
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息被移除!messageId=" + messageId);
    }

    /**
     * 发送消息到指定队列
     * @param message
     */
    public void sendMessage(MSGQueue queue, Message message) {
        //先根据队列名字找到指定的链表
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
        //LinkedList 是线程不安全的
        synchronized (messages) {
            messages.add(message);
        }
        //这里把消息在消息中心也插入一下。即使 message 在消息中心存在也没关系
        //因为相同的 messageId 对应的 message 的内容一定是一样的(服务器不会修改 Message 的内容)
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息被投递到队列当中!messageId=" + message.getMessageId());
    }

    /**
     * 从队列中取消息
     * @param queueName
     * @return
     */
    public Message pollMessage(String queueName) {
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages == null) {
            return null;
        }
        synchronized (messages) {
            if(messages.size() == 0) {
                return null;
            }
            //链表中有消息就进行头删
            Message currentMessage = messages.remove(0);
            System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId=" + currentMessage.getMessageId());
            return currentMessage;
        }
    }

    /**
     * 获取指定队列的消息个数
     * @param queueName
     * @return
     */
    public int getMessageCount(String queueName) {
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages == null) {
            return 0;
        }
        synchronized (messages) {
            return messages.size();
        }
    }

1.2.6、封装未确认消息操作

“未被确认” 的消息:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是 ConcurrentHashMap(key 是 messageId,value 是 Message 对象,后续实现消息确认的逻辑,需要根据 ack 响应的内容,会提供一个确认的 messageId,根据这个 messageId 来把上述结构中的 Message 对象找到并移除)。

    /**
     * 添加未确认的消息
     * @param queueName
     * @param message
     */
    public void addMessageWaitAck(String queueName, Message message) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
                k -> new ConcurrentHashMap<>());
        messageHashMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter] 消息进入待确认队列!messageId=" + message.getMessageId());
    }

    /**
     * 删除未确认的消息
     * @param messageId
     */
    public void removeMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap == null) {
            return;
        }
        messageHashMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId=" + messageId);
    }

    public Message getMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap == null) {
            return null;
        }
        return messageHashMap.get(messageId);
    }

1.2.7、封装恢复数据操作

从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.


    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        //1.先清空之前所有的数据
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        //2.恢复所有的交换机数据
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for(Exchange exchange : exchanges) {
            exchangeMap.put(exchange.getName(), exchange);
        }
        //3.恢复所有的队列数据
        List<MSGQueue> queues = diskDataCenter.selectAllQueue();
        for(MSGQueue queue : queues) {
            queueMap.put(queue.getName(), queue);
        }
        //4.恢复所有绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for(Binding binding : bindings) {
            ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                    k -> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(), binding);
        }
        //5.恢复所有的消息数据
        for(MSGQueue queue : queues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessagesFromQueue(queue.getName());
            queueMessageMap.put(queue.getName(), messages);
            //遍历所有的队列,根据每个队列名字。来恢复所有消息
            for(Message message : messages) {
                messageMap.put(message.getMessageId(), message);
            }
        }

    }

Ps;“未确认的消息” 这部分数据不需要从硬盘中恢复,之前硬盘存储也没有考虑过这里~

一旦在等待 ack 的过程中,服务器重启了,这些 “未被确认的消息” 就恢复成了 “未被取走的消息”,这个消息在硬盘上存储的时候,就是当作 “未被取走”。

根据源码,模拟实现 RabbitMQ - 内存数据管理(4),RabbitMQ,rabbitmq,分布式

 文章来源地址https://www.toymoban.com/news/detail-669604.html

到了这里,关于根据源码,模拟实现 RabbitMQ - 内存数据管理(4)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)

    目录 一、虚拟主机 + Consume设计 1.1、承接问题 1.2、具体实现 1.2.1、消费者订阅消息实现思路 1.2.2、消费者描述自己执行任务方式实现思路 1.2.3、消息推送给消费者实现思路 1.2.4、消息确认 前面已经实现了虚拟主机大部分功能以及转发规则的判定,也就是说,现在消息已经可

    2024年02月11日
    浏览(33)
  • 根据源码,模拟实现 RabbitMQ - 实现消息持久化,统一硬盘操作(3)

    目录 一、实现消息持久化 1.1、消息的存储设定 1.1.1、存储方式 1.1.2、存储格式约定 1.1.3、queue_data.txt 文件内容  1.1.4、queue_stat.txt 文件内容 1.2、实现 MessageFileManager 类 1.2.1、设计目录结构和文件格式 1.2.2、实现消息的写入 1.2.3、实现消息的删除(随机访问文件) 1.2.4、获取队

    2024年02月12日
    浏览(48)
  • 根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)

    目录 一、客户端代码实现 1.1、需求分析 1.2、具体实现 1)实现 ConnectionFactory 2)实现 Connection 3)实现 Channel 二、编写 Demo  2.1、实例  2.1、实例演示 RabbitMQ 的客户端设定:一个客户端可以有多个模块,每个模块都可以和 broker server 之间建立 “逻辑上的连接” (channel),这

    2024年02月11日
    浏览(44)
  • 根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8)

    目录 一、网络通讯协议设计 1.1、交互模型 1.2、自定义应用层协议 1.2.1、请求和响应格式约定 ​编辑 1.2.2、参数说明 1.2.3、具体例子 1.2.4、特殊栗子 1.3、实现 BrokerServer 1.3.1、属性和构造 1.3.2、启动 BrokerServer 1.3.3、停止 BrokerServer 1.3.4、处理每一个客户端连接 1.3.5、读取请求

    2024年02月10日
    浏览(48)
  • 基于RabbitMQ的模拟消息队列之三——硬盘数据管理

    1.设计数据库 交换机、队列、绑定是交给数据库来管理的,所以,设计这三个表结构就够了,表的字段和核心类同名。 2.添加sqlite依赖 3.配置application.properties文件 4.创建接口MetaMapper 在mqserver包下新创建一个包,名字为mapper,在此包下,创建一个接口MetaMapper。添加注解@Mappe

    2024年02月10日
    浏览(73)
  • Vue3+Vue-Router+Element-Plus根据后端数据实现前端动态路由——权限管理模块

    提示:文章内容仔细看一些,或者直接粘贴复制,效果满满 提示:文章大概 1、项目:前后端分离 2、前端:基于Vite创建的Vue3项目 3、后端:没有,模拟的后端数据 4、关于路径“@”符号——vite.config.js 文件里修改 提示:以下是本篇文章正文内容,下面案例可供复制粘贴使用

    2024年02月02日
    浏览(58)
  • SpringBoot使用flywaydb实现数据库版本管理【附源码】

    本文主要是配合SpringBoot使用用户输入的自定义数据源启动一文附带产出。前文主要介绍了SpringBoot无数据源启动,然后通过用户录入自定义数据库配置后,连接数据库的操作。但是当整个项目交给用户使用时,谁使用都不知道情况下,数据源都自己定义的情况下,我们项目升

    2024年02月07日
    浏览(38)
  • 基于RabbitMQ的模拟消息队列之四——内存管理

    针对交换机、队列、绑定、消息、待确认消息设计数据结构。 交换机集合 exchangeMap 数据结构:ConcurrentHashMap key:交换机name value:交换机对象 队列集合 queueMap 数据结构: ConcurrentHashMap key:队列name value:队列对象 绑定集合 bindingsMap 数据结构: ConcurrentHashMap 嵌套 ConcurrentHashMap key

    2024年02月10日
    浏览(40)
  • python 实现学生信息管理系统+MySql 数据库,包含源码及相关实现说明~

    1、系统说明 python 编写的学生信息管理系统+MySQL数据库,实现了增删改查的基本功能。 2、数据库说明 本人使用的是 MySQL8.0 版本 数据库端口号为:3306 数据库用户名是:root 数据库名称是:practice 建立的表是:students 3、系统功能 增加学生信息 删除学生信息 修改学生信息 查

    2024年02月11日
    浏览(51)
  • 图书管理系统|基于Springboot的图书管理系统设计与实现(源码+数据库+文档)

    图书管理系统目录 目录 基于Springboot的图书管理系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、个人中心 2、管理员管理 3、用户管理 4、图书出版社管理 四、数据库设计 1、实体ER图 五、核心代码  六、论文参考 七、最新计算机毕设选题推荐 八、源码获取:

    2024年03月26日
    浏览(85)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包