我在第一章提到过:关于持久化那一块,在内存存储主要是获取比较方便,在硬盘上存储主要是为了长时间存储。
关于内存管理又分两部分:数据库管理和文件管理(主要是和消息有关)。
硬盘管理
硬盘中的数据库管理其实在上一章的建库建表说过了,就这么几步,SQLite + MyBatis 几十行代码就搞定了除此之外就是对其进行一个封装即可。
这里的文件管理还是比较复杂的。
文件管理(MessageFileManager )
我们先来解决一个问题:Message 如何在硬盘上存储?
原因主要有以下两点:
-
消息操作并不涉及到复杂的增删改查
-
消息数量可能会⾮常多,数据库的访问效率并不⾼
所以要把消息直接存储在文件中,以下设定消息具体如何在文件中存储:
消息是依托于队列的,因此存储的时候,就需要把消息按照 队列 维度进行展开
此处已经有了一个 data 目录(meta.db 就在这个目录中)
在 data 中创建一些子目录,每个队列对应一个子目录,子目录就是队列名
关于 queue_data.txt :
这个文件中存储的是二进制的数据,我们约定转发到这个队列的所有消息都是以二进制的方式进行存储的。具体如何存储,按照我们自定义的约定;
具体约定如下图:
首先约定每条消息 前四个字节 为消息长度,这里给定多少字节就创建多少字节的空间留给消息进行存储。
对于 BrokerServer 来说,消息是需要进行新增和删除的。
生产者生产一个消息,就是新增一个消息
消费者消费一个消息,就是删除一个消息
因为是存在文件中,所以会存在一个问题:
每次删除一个消息会很麻烦;
- 如果是在内存中,删了也就删了,利用一些集合类,新增消息就添进集合类中,删除就将集合类中删掉就好;
- 但是访问硬盘的速度要比访问内存慢上几千倍,消费一个消息就删除一次,这样极大的影响了效率。
- 故此我们采用的是逻辑删除,isValid 这个参数是 0 就是无效,是 1 就是有效数据,只需要每次获取到这个消息之前,判断一下这个参数就可以了。
对于文件新增:
-
我们采⽤追加⽅式,直接在当前⽂件末尾新增就⾏
-
所以我们采⽤逻辑删除的⽅式。根据消息中的⼀个变量 isValid 判断该消息是否有效,1 为有效消息;
那么应该如何找到每个消息在文件中的具体位置呢?
还记得我们在 Message 这个核心类上的参数:offsetBeg 和 offsetBeg 这两个参数吗?
那么如何找到每个消息对应在⽂件中的位置呢? 我们之前在 Message 中设置了两个变量,⼀个是 offsetBeg,⼀个是 offsetEnd。我们存储消息的时候,是同时在内存中存⼀份和硬盘中存⼀份。⽽内存中存到那⼀份消息,记录了当前 的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息,再根据该消息的两个变量值,就能找到硬盘中的消息数据了。
垃圾回收
上述提到了逻辑删除,那我们啥时候才是真正的删除呢?
在以前的篇章中,介绍过垃圾回收的算法,我们这里采用了 JVM 那一章中的 《复制算法》
选择这个算法主要有两个理由:
- 复制算法比较简单
- 这个项目比较适合使用复制算法,我们文件中有效的消息其实不算多,大多数都是无效的数据
大致流程就是:
直接遍历原有的消息数据⽂件,把所有的有效数据数据重新拷⻉⼀份到新的⽂件中,新⽂件名字和 原来⽂件名字相同,再把旧的⽂件直接删除掉。
整个GC 的流程图:
现在我们 垃圾是可以回收了,但是怎么样来触发我们的垃圾回收呢?
触发回收
统计文件
我在这里对每个队列目录设置了另一个文件,这个文件用于存储垃圾有多少条、无效数据是多少,每次在新增和删除消息的时候,都会影响上述两个参数;
一旦消息超过 2k 并且 无效消息大于 50% ; 此时就会触发垃圾回收
但是这又会触发另一个问题:
如果当⼀个⽂件消息数⽬⾮常的多,⽽且都是有效信息,此时会导致整个消息的数据⽂件⾮常庞⼤,后 续针对这个⽂件操作就会⾮常耗时。假设当前⽂件已经达到10个G了,那么此时如果触发⼀次GC,整 个耗时就会⾮常⾼。
对于 RabbitMQ 来说它是做了如下处理:
⽂件拆分:当某个⽂件⻓度达到⼀定的阈值的时候,就会拆分成两个⽂件(拆着拆着就成了很多⽂件)⽂件合并:每个单独的⽂件都会进⾏GC,如果GC之后,发现⽂件变⼩了,就会和相邻的其他⽂件 合并这样做,可以保证在消息特别多的时候,也能保证性能上的及时响应
本项目暂时没有这么处理,等以后有时间可以对此进行扩展。
大致思路:
-
⽤⼀个专⻔的数据结构,来存储当前队列中有多少个数据⽂件,每个⽂件⼤⼩是多少,消息的数⽬ 是多少,⽆效消息是多少
-
设计策略:什么时候触发⽂件拆分,什么时候触发⽂件合并
统计文件相关代码
static public class Stat {
// 此处直接定义成 public
public int totalCount; // 总的消息数
public int validCount; // 有效消息数
}
1. 统计文件的读
private Stat readStat(String queueName) {
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueN
ame))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
2. 统计文件的写
private void writeStat(String queueName, Stat stat) {
// 使用 PrintWrite 来写文件.
// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
重点内容已经全部介绍了,除了这些重点内容,还有一些其他操作。
其他文件操作
创建消息目录和文件
一个新的项目,不一定存在我们想要的消息目录和文件,我们肯定是需要自己去手动创建的。为此这里提供了一系列的相关方法去调用:
- 先创建队列对应的⽬录(以队列名字为名的⽬录)
- 创建队列⾥⾯的消息数据⽂件
- 创建队列⾥⾯的消息统计数据⽂件
- 给消息统计⽂件设置初始值
代码这里就不一一实现了,具体的我放在末尾处。
删除消息目录和文件
- 先删除消息的统计⽂件和消息数据⽂件
- 再删除队列⽬录
消息序列化
啥叫序列化?
把⼀个对象(结构化数据)转换成⼀个 字符串/字节数组序列化之后⽅便 存储和传输
-
存储:⼀般存储在⽂件中,⽂件只能存字符串/⼆进制数据。不能直接存对象
-
传输:在⽹络中传输,socket
还记得我们在之前完成的一个小项目 《基于Servlet 实现的个人博客系统》中用的是 json 进行序列化吗?
这里为啥不也用 json 呢?
- 由于 Message 里面存储的是二进制文件,而 json 序列化得到的结果是 文本数据,里面无法存储二进制的 body。
- JSON 格式中有很多特殊字符 例如: , : “ {} 等,这些特殊字符会影响 json 格式的解析,
- 如果存文本,这些键值对不会包含上述特殊字符
- 如果是二进制,那就不好说了,万一某个二进制的字节就正好对上某个特殊字符的 ascii 值就会影响解析
- 也可以解决,使用 base64 编码,base64 的作用就是利用四个字节来表示三个字节的信息。但是这种效率太低了,有额外的转码开销,还会使空间变大
针对序列化,有很多解决的办法,我这里采用的是 Java标准库提供的 类:
ObjectInputSteam 、 ObjectOutputSteam
我把序列化放在了 通用包下;
具体的代码如下:
import java.io.*;
// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的.
// 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.
public class BinaryTool {
// 把一个对象序列化成一个字节数组
public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个变长的字节数组.
// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
// ObjectOutputStream 中.
// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
objectOutputStream.writeObject(object);
}
// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]
return byteArrayOutputStream.toByteArray();
}
}
// 把一个字节数组, 反序列化成一个对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
object = objectInputStream.readObject();
}
}
return object;
}
}
将消息写入到文件中(sendMessage)
这个方法用来把一个新的消息, 放到队列对应的文件中
需要注意的是:此处 写入消息 需要两个参数,一个是 队列 MSGQueue,queue 表示要把消息写入的队列 ;一个是 Message , message 则是要写的消息
大致流程:
-
先判断当前写⼊队列的⽂件在不在
-
把 Message 对象进⾏序列化,转换成⼆进制的字节数组
-
进⾏写⼊操作的操作时候要进⾏加锁(锁对象就是当前 MSGQueue),此处如果不加锁。当多个客 户端进⾏发送消息的时候,可能会造成数据不对。
-
先获取当前队列消息数据⽂件的⻓度,⽤这个⻓度来计算 offsetBeg 和 offsetEnd
-
设置该消息 offsetBeg = 当前⽂件⻓度 + 4
-
设置该消息 offsetEnd = 当前⽂件⻓度 + 4 + 当前⼆进制数组⻓度
-
-
把新的 message数据,写⼊到⽂件的末尾处,采⽤追加⽅式
-
先写⼊4个字节的消息⻓度
-
再写⼊消息本体
-
- 更新统计⽂件,并重新写⼊
具体代码不过多演示【可以根据这些方法名去码云中看具体的代码】;
从文件中删除消息(逻辑删除: deleteMessage)
-
先从硬盘中读取出来
-
此处采⽤ RamdomAccessFile 来读取(可以在⽂中指定位置,进⾏读写,随机访问)
-
先定义⼀个 以消息⻓度为length【offsetEnd - offsetBeg】的⼀个字节数组 bufferSrc
-
再根据要删除的 Message 对象中的 offsetBeg 和 offsetEnd 将光标定位那个位置
-
然后将结果读取到 bufferSrc中
-
-
然后将读到的bufferSrc数据反序列化成 Message对象,修改变量 isValid=0x2
-
再将 Message对象 序列化成 bufferDes
-
重新定位光标到消息的 offserBeg
-
将 bufferDes 写回去
-
-
更新统计⽂件信息,写⼊
同样代码不演示;
从硬盘中恢复数据到内存(loadAllMessageFromQueue)
使用这个方法将硬盘中的有效数据加载到内存中(具体来说是一个链表中)这个方法是在程序启动时调用的。
这里使用的是一个 LinkedList 来存储,方便后续进行头删操作
因为消息很可能不止一条,所以需要循环读取,手动记录光标位置
-
先读取4个字节,表示当前消息⻓度
-
然后根据当前消息⻓度,读取对应的⻓度到 buffer 字节数组中
-
把读取到 buffer 字节数据 反序列化成 Message 对象
-
判断这个 Message 对象⾥⾯的 isValid 是否为 0x1
-
如果不是,就 continue,是的话执⾏第六步,不是就从第⼀步开始
-
加⼊消息之前先设置 offsetBeg, offserEnd,然后将消息加⼊到 LinkedList中
-
如果读到末尾会有异常 EOF,会⾃动结束
同样代码不演示;
消息文件垃圾回收(checkGC、GC)
关于 checkGC:
这个是执行垃圾回收的前置方法,这个方法内部进行进行判断,消息是否大于 2000 条,并且无效消息是否大于 50%。
关于 GC :
-
就把所有的有效消息提取出来,单独的在写到⼀个⽂件中,
-
删除旧⽂件,使⽤新⽂件代替
-
注意:还要更新统计⽂件信息
小结:
上述 除了序列化和反序列化,其余的都是存在 MessageFileManager 中进行管理。
- 设计⽬录结构和⽂件格式
- 实现了⽬录创建和删除
- 实现统计⽂件的读写
- 实现了消息的写⼊(按照之前的⽂件格式)
- 实现了消息的删除 (随机访问⽂件)
- 实现了所有消息的加载
- 垃圾回收(复制算法)
数据库管理(DataBaseManager)
通过这个类, 来整合上述的数据库操作。
这个类中只有一个参数就是 MetaMapper ,这个就是数据库操作提供的接口类。
在这个数据库管理类下,有几个比较重要的方法;
初始化(init)
这里需要手动获取到 MetaMapper
metaMapper = MqApplication.context.getBean(MetaMapper.class);
其实这里也可以使用注入的方式,都行。
使用我写的这种方法,必须要在 Application 这个类下添加注解:@SpringBootApplication
这个 初始化方法只在第一次加载的时候调用。
- 这个方法内部只是判断是否存在库和表了
- 不存在就需要先创建处 data 目录
- 在创建表(所谓建表就是创建处交换机、队列、绑定)【createTable】,再插入默认数据【createDefaultData】
- 如果已经存在,那么啥也不干,直接抛出一个日志即可
createTable
这个方法就如上所说,调用 metaMapper 下的接口即可。
添加默认数据(createDefaultData)
给数据库表添加默认数据主要就是添加一个默认的交换机;RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT
所以这个方法就是创建处一个 DIRECT 交换机:
删除数据库(deleteDB)
这里需要先删目录下的 meta.db 文件,因为在删除目录的时候,必须要保证目录是空的。
- 先删文件将这个文件找到,再调用delete()方法,这个可能存在删除失败的情况,所以要用 布尔类型的值接收一下,再根据结果输出不同日志
- 删目录,和上述删文件操作是一样的。
此外就是封装 MetaMapper 这个接口的方法,都是一些很简单的操作,这里就直接看一看就好:
统一硬盘存储管理
关于文件上述已经大致交代清楚了。
关于数据库管理,这个比较简单,就是拿一个类对数据库操作进行统一的封装;这里就类似于 Service 层,进行一个解耦合的作用,这一段就大概看看代码即可,没啥太难的思想和操作。
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/*
* 使用这个类来管理所有硬盘上的数据.
* 1. 数据库: 交换机, 绑定, 队列
* 2. 数据文件: 消息
* 上层逻辑如果需要操作硬盘, 统一都通过这个类来使用. (上层代码不关心当前数据是存储在数据库还是文件中的)
*/
public class DiskDataCenter {
// 这个实例用来管理数据库中的数据
private DataBaseManager dataBaseManager = new DataBaseManager();
// 这个实例用来管理数据文件中的数据
private MessageFileManager messageFileManager = new MessageFileManager();
public void init() {
// 针对上述两个实例进行初始化.
dataBaseManager.init();
// 当前 messageFileManager.init 是空的方法, 只是先列在这里, 一旦后续需要扩展, 就在这里进行初始化即可.
messageFileManager.init();
}
// 封装交换机操作
public void insertExchange(Exchange exchange) {
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges() {
return dataBaseManager.selectAllExchanges();
}
// 封装队列操作
public void insertQueue(MSGQueue queue) throws IOException {
dataBaseManager.insertQueue(queue);
// 创建队列的同时, 不仅仅是把队列对象写到数据库中, 还需要创建出对应的目录和文件
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
// 删除队列的同时, 不仅仅是把队列从数据库中删除, 还需要删除对应的目录和文件
messageFileManager.destroyQueueFiles(queueName);
}
public List<MSGQueue> selectAllQueues() {
return dataBaseManager.selectAllQueues();
}
// 封装绑定操作
public void insertBinding(Binding binding) {
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings() {
return dataBaseManager.selectAllBindings();
}
// 封装消息操作
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue, message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue, message);
if (messageFileManager.checkGC(queue.getName())) {
messageFileManager.gc(queue);
}
}
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
文件管理相关代码文章来源:https://www.toymoban.com/news/detail-709525.html
数据库封装代码文章来源地址https://www.toymoban.com/news/detail-709525.html
到了这里,关于消息队列(三):硬盘管理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!