从零手搓一个【消息队列】创建核心类, 数据库设计与实现

这篇具有很好参考价值的文章主要介绍了从零手搓一个【消息队列】创建核心类, 数据库设计与实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

整体目录结构 :
从零手搓一个【消息队列】创建核心类, 数据库设计与实现,消息队列项目,消息队列,SQLite,RabbitMQ,MyBatis

本文主要实现 server 包


一、创建核心类

上篇文章 分析了项目需求, 介绍了项目中重要的核心概念和核心 API, 以及重要板块

一个消息队列中需要的交换机, 队列, 绑定, 消息等核心概念, 以面向对象的思想, 在server.core 包下创建出来对应的类
从零手搓一个【消息队列】创建核心类, 数据库设计与实现,消息队列项目,消息队列,SQLite,RabbitMQ,MyBatis


1, 交换机

@Data
public class Exchange {
    // 身份标识(唯一, RabbitMQ 就是以 name 作为身份标识的)
    private String name;

    // 三种交换机类型
    private ExchangeTypeEnum type = ExchangeTypeEnum.DIRECT;

    // 是否需要持久化存储
    private boolean durable = false;

    // 是否(交换机没人使用时)自动删除 ------------------>先不实现
    private boolean autoDelete = false;

    // 创建交换机时, 指定的参数选项 ------------------>先不实现
    // 数据库中存储 String 类型, 需要序列化
    private Map<String, Object> arguments = new HashMap<>();

    /**
     * 实现序列化, 修改 getter()和 setter(), 供数据库使用
     */
    public String getArguments(){
        ObjectMapper objectMapper = new ObjectMapper();
        // 序列化 往数据库里写
        try {
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "{}";
    }

    public void setArguments(String arguments) {
        ObjectMapper objectMapper = new ObjectMapper();
        // 反序列化 从数据库里读
        try {
            this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }


    /**
     * 便于 测试/ 代码内部调用 时使用
     */
    public Object getArguments(String key) {
        return arguments.get(key);
    }

    public void setArguments(String key, Object value) {
        arguments.put(key, value);
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

本项目未实现 autoDelete 和 arguments


2, 交换机类型

这是一个枚举类, 包含直接交换机, 扇出交换机, 主题交换机

public enum ExchangeTypeEnum {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);
    private final int type;

    ExchangeTypeEnum(int type) {
        this.type = type;
    }

    public int getType() {
        return type;
    }
}

3, 队列

类名不设为 Queue, 防止和标准库中的 Queue 冲突

@Data
public class MessageQueue {
    // 唯一标识
    private String name;

    // 是否需要持久化存储
    private boolean durable = false;

    // 是否为独有(如果是独有, 只能被一个消费者使用) ------------------>先不实现
    private boolean exclusive = false;

    // 是否(队列没人使用时)自动删除 ------------------>先不实现
    private boolean autoDelete = false;

    // 创建队列时, 指定的参数选项 ------------------>先不实现
    // 数据库中存储 String 类型, 需要序列化
    private Map<String, Object> arguments = new HashMap<>();

    /**
     * 实现序列化, 修改 getter()和 setter(), 供数据库使用
     */
    public String getArguments(){
        ObjectMapper objectMapper = new ObjectMapper();
        // 序列化 往数据库里写
        try {
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "{}";
    }

    public void setArguments(String arguments) {
        ObjectMapper objectMapper = new ObjectMapper();
        // 反序列化 从数据库里读
        try {
            this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 便于 测试/ 代码内部调用 时使用
     */
    public Object getArguments(String key) {
        return arguments.get(key);
    }

    public void setArguments(String key, Object value) {
        arguments.put(key, value);
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

暂不实现 exclusive, autoDelete, arguments


4, 绑定

@Data
public class Binding {
    // 绑定的消息队列标识
    private String queueName;

    // 绑定的交换机标识
    private String exchangeName;

    // 绑定的 key
    private String bindingKey;
}

bindingKey 是在创建交换机和队列的绑定时指定的, 生产者发布消息时, 需额外指定一个 routingKey
如果是直接交换机, routingKey 作为队列的唯一标识
如果是扇出交换机, routingKey 为 null, 无需使用
如果是主题交换机, routingKey 需和 bindingKey 匹配


5, 交换机转发 & 绑定规则

在此先不展示, 在后续文章中对应的部分再展示 防止思路混淆


6, 消息

@Data
public class Message implements Serializable {
    // 属性
    private BasicProperties basicProperties = new BasicProperties();

    // 正文
    private byte[] body;

    // 消息存储在文件中的偏移量(字节, 约定 "[,)" 区间 )
    private transient long offsetBegin = 0;
    private transient long offsetEnd = 0;

    // 是否合法(逻辑删除的标记, 0x1 有效, 0x0 无效)
    private byte isValid = 0x1;


    // 提供工厂方法, 封装 Message 类的创建过程
    public static Message createMessage(String routingKey, BasicProperties basicProperties, byte[] body) {
        Message message = new Message();
        if (basicProperties != null) {
            message.setBasicProperties(basicProperties);
        }
        message.setMessageId("M$" + UUID.randomUUID().toString().replace("-", ""));
        message.setRoutingKey(routingKey);
        message.setBody(body);
        return message;
    }

	// 下面这些方法是为了封装 basicProperties 中的 getter()和 setter()
    public String getMessageId() {
        return basicProperties.getMessageId();
    }

    public void setMessageId(String id) {
        basicProperties.setMessageId(id);
    }

    public String getRoutingKey() {
        return basicProperties.getRoutingKey();
    }

    public void setRoutingKey(String key) {
        basicProperties.setRoutingKey(key);
    }

    public int getDeliverMode() {
        return basicProperties.getDeliverMode();
    }

    public void setDeliverMode(int value) {
        basicProperties.setDeliverMode(value);
    }
}
  • Message 需要实现 Serializable 接⼝. 后续需要把 Message 写⼊⽂件以及进⾏⽹络传输.
  • basicProperties 是消息的属性信息. body 是消息体.
  • offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置. 这⼀块具体的设计后⾯再详细介绍. 使⽤ transient 关键字避免属性被序列化.
  • isValid ⽤来表⽰消息在⽂件中是否有效. 这⼀块具体的设计后⾯再详细介绍.
  • createMessage() 相当于⼀个⼯⼚⽅法, ⽤来创建⼀个 Message 实例. messageId 通过UUID 的⽅式⽣成.

文件中的数据不相当于一个顺序表, 如果要真正删除一条消息, 是不是需要把后面的数据整体往前挪动? 这无疑是个低效操作, 因此, 对于 “删除消息” 这种高频操作, 逻辑删除显然是更优解, 但也不能让消息无限制的堆在文件中, 所以后面会参考 JVM 的 GC , 自主实现清理文件的功能


7, 消息属性

这个类作为Message 类的 引用类型的成员属性, 也需要实现 Serializable 接⼝, 否则 message 对象不能被序列化

@Data
public class BasicProperties implements Serializable {
    // 消息的唯一标识(UUID)
    private String messageId;

    // 和 bindingKey 匹配(如果交换机为 DIRECT, 该值就是队列名, 如果交换机为 FANOUT, 该值为 null )
    private String routingKey;

    // 是否要消息持久化( RabbitMQ 就是使用 1 表示不持久化, 2 表示持久化)
    private int deliverMode = 2;

    // ... 其他属性暂不考虑
}

二、数据库设计


1, 使用 SQLite

对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.

此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库

SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.

MySQL 是一个客户端服务器结构的程序, SQLite 相当于直接操作本地的硬盘文件

  • 在pom.xml文件中的 “dependencies” 标签中拷贝 :
		<dependency>
			<groupId>org.xerial</groupId>
			<artifactId>sqlite-jdbc</artifactId>
			<version>3.41.0.1</version>
		</dependency>
  • 在 resource 目录下创建 application.yml 文件配置SQLite数据源, 拷贝:
spring:
  datasource:
    url: jdbc:sqlite:./data/meta.db # 注意这个路径
    username:# 不需要
    password:# 不需要
    driver-class-name: org.sqlite.JDBC

mybatis:
  mapper-locations: classpath:mapper/**Mapper.xml

数据库文件的位置就是 ./data/meta.db, 数据库的数据就在这里


2, 使用 MyBatis

实现 mapper 包
从零手搓一个【消息队列】创建核心类, 数据库设计与实现,消息队列项目,消息队列,SQLite,RabbitMQ,MyBatis


2.1, 创建 Interface

在 server.mapper 包下定义一个 MetaMapper 接口, 需要提供 交换机, 队列, 绑定 的建表, 插入, 删除 , 查询的 API (抽象方法)

不需要使用sql 语句建库, 创建出 ./data/meta.db 这个文件就相当于建库了, 后面再写创建文件的操作

@Mapper
public interface MetaMapper {
    /**
     * 建表
     */
    void createExchangeTable();

    void createQueueTable();

    void createBindingTable();

    /**
     * exchange 表
     */
    void insertExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    List<Exchange> selectAllExchanges();

    /**
     * queue 表
     */
    void insertQueue(MessageQueue queue);

    void deleteQueue(String queueName);

    List<MessageQueue> selectAllQueues();

    /**
     * binding 表
     */
    void insertBinding(Binding binding);

    void deleteBinding(Binding binding);

    List<Binding> selectAllBindings();
}

2.2, 创建 xml 文件

在 resource 目录下, 新建一个 mapper 包, 创建 MetaMapper.xml 文件, 在这个文件中编写 sql 语句, 实现上述在 MetaMapper 接口中的抽象方法

在 xml 文件中拷贝:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.server.mapper.MetaMapper">

</mapper>

其中 namespace 这个字段的值要对应刚才定义的 MetaMapper 接口 的路径


  • 建表(使用 update 标签即可)
    <update id="createExchangeTable">
        create table if not exists exchange (
            name varchar(50) primary key,
            type int,
            durable boolean,
            autoDelete boolean,
            arguments varchar(1024)
        );
    </update>

    <update id="createQueueTable">
        create table if not exists queue (
        name varchar(50) primary key,
        durable boolean,
        exclusive boolean,
        autoDelete boolean,
        arguments varchar(1024)
        );
    </update>

    <update id="createBindingTable">
        create table if not exists binding (
        exchangeName varchar(50),
        queueName varchar(50),
        bindingKey varchar(256)
        );
    </update>

  • exchange 表的增删查的 sql
    <insert id="insertExchange" parameterType="com.example.demo.server.core.Exchange">
        insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});
    </insert>

    <select id="selectAllExchanges" resultType="com.example.demo.server.core.Exchange">
        select * from exchange;
    </select>

    <delete id="deleteExchange" parameterType="java.lang.String">
        delete from exchange where name = #{exchangeName};
    </delete>

  • queue 表的增删查的 sql
<insert id="insertQueue" parameterType="com.example.demo.server.core.MessageQueue">
        insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
    </insert>

    <select id="selectAllQueues" resultType="com.example.demo.server.core.MessageQueue">
        select * from queue;
    </select>

    <delete id="deleteQueue" parameterType="java.lang.String">
        delete from queue where name = #{queueName};
    </delete>

  • queue 表的增删查的 sql
	<insert id="insertBinding" parameterType="com.example.demo.server.core.Binding">
        insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
    </insert>

    <select id="selectAllBindings" resultType="com.example.demo.server.core.Binding">
        select * from binding;
    </select>

    <delete id="deleteBinding" parameterType="com.example.demo.server.core.Binding">
        delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
    </delete>

三、硬盘管理 – 数据库

实现 datacenter 包中的 DataBaseManager 类
从零手搓一个【消息队列】创建核心类, 数据库设计与实现,消息队列项目,消息队列,SQLite,RabbitMQ,MyBatis

datacenter 这个包中整合硬盘上的数据管理 + 内存上的数据管理
硬盘上的数据管理又整合了 数据库中的数据管理 + 文件中的数据管理


1, 创建 DataBaseManager 类

成员属性需要 MetaMapper 的对象, 用来封装刚才编写的数据库的 API

但并不使用 SpringBoot 的依赖注入 (@AutoWired), 而是使用以来查找的方式获取到 metaMapper

public class DataBaseManager {
    private MetaMapper metaMapper;
  
}

在启动类中初始化容器

@SpringBootApplication
public class DemoApplication {
	public static ConfigurableApplicationContext context;
	public static void main(String[] args) throws IOException {
		context = SpringApplication.run(DemoApplication.class, args);
	}
}

2, init() 初始化数据库

对于 DataBaseManager 类的初始化工作, 不仅仅是对成员属性的初始化, 而是需要一些额外的业务逻辑, 这种情况就不使用构造方法了, 而是单独定义一个方法

初始化工作: metaMapper + 建库建表 + 插入默认数据

如果是第一次启动服务器, 没有数据库则建库建表
如果是重启服务器, 已有数据库则不做处理

MyBatis 在第一次创建数据表的时候就会创建出 ./data/meta.db 这个文件, 但前提是要有 ./data 这个目录, 所以要先手动创建

    public void init() {
        this.metaMapper = DemoApplication.context.getBean(MetaMapper.class);

        if(!isDBExists()) {
            // 创建目录
            File file = new File("./data");
            file.mkdirs();
            // 创建数据表
            createTable();
            // 插入数据
            insertDefaultData();
        }
    }

3, insertDefaultData() 插入默认数据

创建一个默认的交换机(直接交换机)

    public void insertDefaultData() {
        Exchange exchange = new Exchange();
        exchange.setName("");
        exchange.setType(ExchangeTypeEnum.DIRECT);
        exchange.setDurable(false);
        exchange.setAutoDelete(false);
        metaMapper.insertExchange(exchange);
    }

4, createTable() 创建数据表

    public void createTable() {
        metaMapper.createExchangeTable();
        metaMapper.createQueueTable();
        metaMapper.createBindingTable();
    }

5, isDBExists() 数据库是否存在

SQLite 是一个轻量级数据库, 操作 SQLite 相当于操作本地的硬盘文件, 所以检查数据库是否存在就是检查数据库文件是否存在

    public boolean isDBExists() {
        File file = new File("./data/meta.db");
        return file.exists();
    }

6, deleteTables() 删除数据表

同上, 删除数据库就是删除文件, 先删文件再删目录

    public void deleteTables(){
        File file = new File("./data/meta.db");
        file.delete();

        File dir = new File("./data");
        dir.delete();
    }

7, 封装数据库的增删查操作

public void insertExchange(Exchange exchange) {
        metaMapper.insertExchange(exchange);
    }

    public List<Exchange> selectAllExchanges() {
        return metaMapper.selectAllExchanges();
    }

    public void deleteExchange(String exchangeName) {
        metaMapper.deleteExchange(exchangeName);
    }

    public void insertQueue(MessageQueue queue) {
        metaMapper.insertQueue(queue);
    }

    public List<MessageQueue> selectAllQueues() {
        return metaMapper.selectAllQueues();
    }

    public void deleteQueue(String queueName) {
        metaMapper.deleteQueue(queueName);
    }

    public void insertBinding(Binding binding) {
        metaMapper.insertBinding(binding);
    }

    public List<Binding> selectAllBindings() {
        return metaMapper.selectAllBindings();
    }

    public void deleteBinding(Binding binding) {
        metaMapper.deleteBinding(binding);
    }

四、小结

本文主要实现了两点 :

  • 1, 根据面向对象思想, 创建出了交换机, 队列, 绑定, 消息, 等核心概念的类
  • 2, 持久化存储 --> 硬盘管理 --> 数据库
    • 2.1, 数据库设计, 使用 SQLite, 并结合 MyBatis 编写了交换机, 队列, 绑定的建表, 增, 删, 查的 sql
    • 2.2, 使用 DataBaseManager 这个类管理数据库中的数据, 因为仅仅有sql语句不足以支撑所有的业务逻辑, 还需要对数据库的初始化, 判断存在, 删除等做进一步的封装

篇幅有限, 目前为止, 持久化存储 --> 硬盘管理 --> 文件 这个板块还没实现

下篇会实现消息在文件上的存储( 文件管理 : MessageFileManager 类)文章来源地址https://www.toymoban.com/news/detail-729933.html


到了这里,关于从零手搓一个【消息队列】创建核心类, 数据库设计与实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从零开始手搓一个STM32与机智云的小项目——GPIO的输入输出

    上一篇中,对整个板子的硬件组成做了一个简单的介绍,本文开始进入程序编写的环节,首先来搞定最简单的GPIO输入输出控制。 GPIO全称叫做通用输入输出接口,它是单片机内核、片上外设与外部电路连接的桥梁,是单片机与外界进行数据交换的通道。 GPIO的端口号是从PA、

    2024年02月08日
    浏览(35)
  • 030-从零搭建微服务-消息队列(二)

    如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。 源码地址(后端):mingyue-springcloud-learning: 🎉 基于 Spring Boot、Spring Cloud Alibaba 的分布式微服务架构基础服务中心🎉 给出微服务的一些搭建建议 源码地址(前端):mingyue-springcloud-ui: 🎉 基于

    2024年02月07日
    浏览(30)
  • 029-从零搭建微服务-消息队列(一)

    如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。 源码地址(后端):mingyue-springcloud-learning: 🎉 基于 Spring Boot、Spring Cloud Alibaba 的分布式微服务架构基础服务中心🎉 给出微服务的一些搭建建议 源码地址(前端):mingyue-springcloud-ui: 🎉 基于

    2024年02月07日
    浏览(25)
  • Kafka消息队列核心概念以及部署

    2023年06月29日
    浏览(39)
  • 架构核心技术之分布式消息队列

    Java全能学习+面试指南:https://javaxiaobear.cn 今天我们来学习分布式消息队列,分布式消息队列的知识结构如下图。 主要介绍以下内容: 同步架构和异步架构的区别。异步架构的主要组成部分:消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型:点对点模型

    2024年02月07日
    浏览(32)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(33)
  • 【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月09日
    浏览(30)
  • 架构师的36项修炼-03架构核心技术之分布式消息队列

    本课时的主题是分布式消息队列,分布式消息队列的知识结构如下图。 本课时主要介绍以下内容。 同步架构和异步架构的区别。异步架构的主要组成部分:消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型:点对点模型和发布订阅模型。 分布式消息队列

    2024年01月24日
    浏览(30)
  • Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)

    消息队列: 分布式系统 必备的一个 基础软件 ,能支持 组件通信消息 的 快速读写 Redis本身 支持数据的快速访问 ,满足 消息队列的读写性能需求 消息队列存取消息的过程 在分布式系统中,两个组件要基于消息队列进行通信,一个组件就会把要处理的数据以消息的形式传递

    2023年04月09日
    浏览(35)
  • 参考RabbitMQ实现一个消息队列

    消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现 解耦合 以及 削峰填谷 。 在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大

    2024年02月14日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包