RocketMQ源码版本V5.0.0,可兼容之前的版本,因为整理资料的时候,之前的版本,和V5版本有所出入,核心流程基本还是大同小异的。
此前已经总结了NameServer的启动流程源码:现在来了解Broker的启动流程。在RocketMQ启动的时候,首先要启动NameServer,然后再启动Broker。
Broker模块主要复制消息的存储、投递、查询,以及服务高可用的保证。
为了实现这些功能,Broker主要有以下几个比较重要的模块:
1.Remoting Module:整个Broker的实体,复制处理来自Client端的请求。
2.Client Manager:复制管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
3.Store Service:提供方便简单的API接口处理消息存储到物理硬盘的查询功能。
4.HA Service:高可用服务,提供Master Broker 和 Salve Broker之间的数据同步功能。
5.Index Service:根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
1. Broker启动入口
Broker的启动入口就是broker模块的BrokerStartup类的main方法。我们先来看BrokerStartup的main方法,其实和NamesrvStartup几乎是一个模子,都是先创建一个Controller,然后再启动它;
2. createBrokerController函数创建BrokerController实例对象
创建BrokerController时,核心就做了2件事情
1.解析各种配置(命令行等)、创建BrokerController需要的各种配置对象:BrokerConfig、NettyServerConig、NettyClientConfig、MessageStoreConfig;
2.BrokerController相当于Broker的一个中央控制类。创建了BrokerController的对象后,再调用BrokerController对象的initialize方法,进行初始化操作。
从创建BrokerController的代码中可以看出,BrokerController的依赖的四个核心配置如下图所示:
这些配置类实际上就是一些普通的POJO类。所以此时Broker的整个组件结构应该是这样的:
Broker整个JVM进行运行期间,都是由BrokerController这个管控组件去管理Broker的请求处理、后台线程以及磁盘数据的。
2.1 创建各种配置类
creatBrokerController方法中,首先就会创建各种配置类,重要的broker相关的配置类有如下几种:
1. BrokerConfig:Broker的配置类,包含Broker的各种配置信息,比如ROCKET_HOME、namesrvAddr、brokerName、brokerId属性等。
2. NettyServiceConfig:NettyServer的配置类,包含Broker作为服务器端时的各种属性,比如客户端进行交互的时候。就设置NettyServer的监听端口为10911。即客户端与Broker通信时使用10911端口。
3. NettyClientConfig:NettyClient的配置类,包含Broker最为客户端时的各种属性,Broker与NameServer进行交互的时候,就会作为客户端存在。
4. MessageStoreConfig:Broker消息存储的配置类,包含了消息存储的相关配置。比如各种文件的目录,大小等信息。
配置完成以上几个核心配置文件后,还会进行外部文件的解析,将-c指令指定的外部配置文件的属性设置给这些配置类。
设置了对应的配置信息后,还会进行一系列的校验,例如:
1. ROCKETMQ_HOME校验:如果在启动参数中没有指定ROCKETMQ_HOME属性,那么打印异常并退出程序,并提示让我们自己进行配置该属性。ROCKETMQ_HOME就是指定RocketMQ的配置文件路径。
2. namesrvAddr校验:我们可以配置多个NameServer地址,以“;”分割,Broker会通过将各个NameServer的字符串地址转换为InetSocketAddress(实现 IP 套接字地址(IP 地址 + 端口号))来校验各个地址的合法性。
3. 设置、校验brokerId,如果broker是同步master或者异步master角色,则设置brokerId为0,如果是slave角色,则校验设置的brokerId,如果不大于0,则打印异常。
根据检查broker的角色配置brokerId:默认角色是ASYNC_MASTER 通过此配置可知brokerId为0表示master,非0表示slave
broker的角色分为:
ASYNC_MASTER(异步主机):异步同步消息到slave
SYNC_MASTER(同步主机):同步同步消息到slave
SLAVE(从机)
2.2 BrokerController类
我们在继续宁BrokerController如何初始化之前,先来看一下BrokerController内部到底是怎么样的。我们说过BrokerController相当于broker的一个中央控制器,各种组件角色的交互都是通过BrokerController来完成的,而不是组件的直接互相调用。
从下面的源码中不难看出,实例化BrokerController的时候,会一块实例化很多的配置类和线程池队列。
2.3 初始化BrokerController控制器
初始化步骤大致如下:
1. 加载配置文件:topic配置文件、topicQueue相关配置、消费者消费偏移量配置文件、订阅分组配置文件、消费者订单信息管理文件。
2. 实例化和初始化消息存储服务相关类DefaultMessageStore。
3. 通过DefaultMessageStore加载消息存储的相关文件。
比如:commitLog日志文件、consumequeue消费消息队列文件的加载、indexFiles索引文件的构建 messsageStore还会将这些文件的内容加载到内存中,并且完成RocketMQ的数据恢复 这是broker启动是核心步骤之一。
4. 初始化Broker通信层,创建netty远程服务 (remotingServer 和 fastRemotingServer)。
创建broker的netty远程服务,端口为10911,可用于处理客户端的所有请求; 创建broker的快速netty远程服务,端口号为普通端口号-2(默认10909),这就是所谓的快速通道,对应可以处理客户端除了拉取消息之外的所有请求,所谓的VIP端口。
5. 创建各种线程池,主要有两类:
第一类:负责处理别人发过来的请求;
第二类:负责处理自己的一些后台任务。
这一步创建了很多线程池,因为RocketMQ为了性能,对过多的请求进行异步优化处理,因此需要许多线程池。
6. RocketMQ底层通信基于netty,这里注册netty请求处理器。
(1) registerProcessor方法将处理器和对应的线程池绑定为一个Pair对象,并且将这个pair对象放入processorTable中, 其值就是pair对象,key就是对应的请求编码RequestCode。
(2) 每个请求,都会根据自己携带的RequestCode在processorTable中查找对应的处理器以及对应的执行器线程池来处理请求。 RocketMQ通过这样的方式来提升处理请求的性能。
7. 启动一系列定时周期任务(定时调度线程池的后台执行)
在注册了netty消息处理器之后,将会启动一系列的定时任务 这些定时任务由BrokerController中的scheduledExecutorService去执行,该线程池只有一个线程。
8. 初始化事务消息相关服务
初始化服务采用Java SPI的方式进行加载 主要初始化三个服务:
(1)transactionalMessageService(事务消息服务):用于处理、检查事务消息
(2)transactionalMessageCheckListener(事务消息监查监听器):监听回查消息
(3)transactionalMessageCheckService(事务消息检查服务):提供了事务消息回查的逻辑,默认情况下 6s以上没commit/rollback的事务消息才会触发事务回查,而如果回查次数超过了15次则丢弃事务
9. 初始化Acl权限相关服务:加载权限相关校验器
同样是基于Java的SPI机制进行查找,并且会将找到校验器注册到RpcHook中,在请求执行之前会执行权限校验。
10. 初始化RPC调用的钩子函数
RpcHook是RocketMQ提供的钩子类,提供一种类似于类似于AOP的功能。 可以在请求被处理之前和响应被返回之前执行对应的方法。
除了以上步骤以外,还有Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改。
2.3.1 加载配置文件
2.3.2 创建消息存储对象defaultMessageStore
如果上面的配置文件都加载成功,则创建负责消息存储相关的对象defaultMessageStore。注意,这里所谓的加载成功,是指在加载过程中没有抛出异常,即使是没有对应的文件和临时文件,只要没有抛出异常,也会返回true,表示加载成功。
DefaultMessageStore是RocketMQ的核心文件存储控制类,是RocketMQ对于消息存储和获取功能的抽象,DefaultMessageStore类位于store模块,通过该类可以直接控制管理commitLog、consumeQueue、indexFile等文件内容的读、写,非常重要。
在启动Broker的时候,就会创建一个defaultMessagStore对象,随后通过load方法进行磁盘文件的加载和异常数据的恢复。
2.3.2.1 解析延迟级别
2.3.3 注册消息存储钩子
2.3.4 Load加载恢复消息文件
DefaultMesageStore实例化之后,将会调用load方法将磁盘中的commitLog、ConsumeQueue、IndexFile文件的数据加载到内存,还会进行数据恢复操作。
主要步骤为:
1. 调用isTempFileExit方法判断上次broker是否是正常退出。如果是正常退出不会保留absort文件,异常退出则会;broker在启动时会创建abort文件,并且注册钩子函数,在JVM退出时删除abort文件,如果下一次启动时存在abort文件,说明broker是异常退出的,文件数据可能存在不一致的情况,需要进行数据修复。
2. 加载CommitLog日志文件。CommitLog文件才是真正消息存储的地方(即消息主题以及元数据的存储主题,存储Producer端写入的消息主题内容,消息内容是不定长的)。单个大小默认1G。官方描述如下:单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息顺序写入日志文件,效率很高,当文件满了,写入下一个文件。
3. 加载ConusmeQueue文件。 ConsumeQueue文件可以看作是CommitLog是索引文件,其存储了它所属topic的信息在CommitLog中的偏移量。消费者拉取消息的时候,可以从ConsumeQueue中快速的根据偏移量定位消息在CommitLog中的位置。
4. 加载checkpoint检查点文件。StoreCheckpoint记录这commitLog、ConsumeQueue、Index文件的最后更新时间点。当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件。
5. 加载index索引文件。index索引文件用以通过时间区间快速查询消息,底层为HashMap结构,实现为hash索引。如果不是正常退出,并且最大更新时间比checkpoint文件中是时间戳大,则删除该index文件。
6. 恢复ConsumeQueue文件和CommitLog文件,将正确的数据恢复至内存,删除错误数据和文件
2.3.4.1 isTempFileExist 判断是否存在临时文件
2.3.4.2 commitLog#load加载消息日志文件
通过内部的CommitLog对象的load方法加载Commit Log日志文件。CommitLog的load方法实际上是委托内部的mappedFileQueue的load方法进行加载。
MappedFileQueue#load方法会就是将commitLog目录路径下的commotlog文件进行全部的加载为MappedFile对象。
在物理上commitlog目录下面是一个个的commitlog文件,但是在java中进行了三层映射。CommitLog-MappedFileQueue-MappedFile。CommitLog中包含MappedFileQueue,以及commitlog相关的其他服务,例如刷盘服务;MappedFileQueue中包含MappedFile集合,以及单个commotlog文件大小等属性,而MappedFile才是真正的一个commotlog文件在Java中的映射,包含文件名、大小、mmap对象mappedByteBuffer等属性。
实际上MappedFileQueue和MappedFile都是通用类,commitlog、comsumequeue、indexfile文件都会使用到。
创建MappedFile映射文件。MappedFile作为一个RocketMQ的物理文件在Java中的映射类。commitLog consumerQueue、indexFile3种文件磁盘的读写都是通过MappedFile操作的。
2.3.4.3 consumeQueueStore#load加载消费队列文件
ConsumeQueue对象建立之后,会对自己管理的队列id目录下面的ConsumeQueue文件进行加载。内部就是调用mappedFileQueue的load方法,该方法我们前面讲过了,会对每个ConsumeQueue文件创建一个MappedFile对象并且进行内存映射mmap操作。
2.3.4.4 创建StoreCheckpoint检查点对象
在commitlog和consumequeue文件都加载成功之后,加载checkpoint 检查点文件,创建storeCheckpoint对象。
StoreCheckpoint记录着commitLog、consumeQueue、index文件的最后更新时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件。
StoreCheckpoint记录了四个关键属性(之前版本是前三个,现在这个版本又加了第4个):
1. physicMsgTimestamp:最新commitLog文件的刷盘时间戳,单位毫秒;
2. logicsMsgTimestamp:最新consumeQueue文件的刷盘时间戳,单位毫秒;
3. indexMsgTimestamp:创建最新indexfile文件的时间戳,单位毫秒;
4. masterFlushedOffset:获取master刷新偏移。
2.3.4.5 加载indexFile文件
index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。
最终一个index文件对应着一个IndexFile实例,并且会加到indexFileList集合中。还会判断如果上次broker不是正常退出,并且并且当前index文件中最后一个消息的落盘时间戳大于StoreCheckpoint中的最后一个index索引文件创建时间,则该索引文件被删除。
2.3.4.6 恢复ConsumeQueue文件和commitLog文件以及TopicQueueTable,将正确的数据恢复至内存,删除错误数据和文件
ConusmeQueue
步骤一:
/**
* 进行ConsumeQueue的恢复
* 默认采用的是正常恢复,不是并发恢复
*/
private void recoverConsumeQueue() {
if (!this.brokerConfig.isRecoverConcurrently()) {
this.consumeQueueStore.recover();
} else {
this.consumeQueueStore.recoverConcurrently();
}
}
步骤二:
/**
* 正常恢复
*/
public void recover() {
//遍历consumeQueueTable的value集合,即queueId到ConsumeQueue的map映射
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
//遍历所有的ConsumeQueueInterface
for (ConsumeQueueInterface logic : maps.values()) {
//恢复ConsumeQueue,删除无效的ConsumeQueue文件
this.recover(logic);
}
}
}
步骤三:
public void recover(ConsumeQueueInterface consumeQueue) {
//获取文件队列生命周期
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.recover();
}
步骤四:
/**
* ConsumeQueue的recover()方法
*/
@Override
public void recover() {
//获取所有的ConsumeQueue文件映射的mappedFiles集合
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
//如果存在commitlog文件集合
if (!mappedFiles.isEmpty()) {
//从倒数第三个文件开始恢复
int index = mappedFiles.size() - 3;
//不足3个文件,则直接从第1个文件开始进行恢复
if (index < 0) {
index = 0;
}
//consumequeue映射文件的大小
int mappedFileSizeLogics = this.mappedFileSize;
//获取文件对应的映射对象
MappedFile mappedFile = mappedFiles.get(index);
//文件映射对应的DriectByteBuffer
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//获取文件映射的初始物理偏移量,其实和文件名相同
long processOffset = mappedFile.getFileFromOffset();
//consumequeue映射文件的有效offset
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
//校验每一个索引条目的有效性,CQ_STORE_UNIT_SIZE是每隔条目的大小,默认20
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
//获取该条目对应的消息在commitlog文件中的物理偏移量
long offset = byteBuffer.getLong();
//获取该条目对应的消息在commitlog文件中的总长度
int size = byteBuffer.getInt();
//获取该条目对应的消息的tag哈希值
long tagsCode = byteBuffer.getLong();
//如果offset 和 size 都大于0,则表示当前条目有效
if (offset >= 0 && size > 0) {
//更新当前ConsumeQueue文件中的有效数据偏移量
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
//跟新当前queueId目录下所有的ConsumeQueue文件中的最大有效物理偏移量
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
//否则,表示当前条目无效了,后续的条目不会遍历
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
//如果当前ConsumeQueue文件中的有效数据偏移量和文件大小一样,则表示该ConsumeQueue文件中的所有条目都是有效的
if (mappedFileOffset == mappedFileSizeLogics) {
//遍历下一个文件
index++;
//遍历到了最后一个文件,则结束遍历
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
//获取下一个文件的数据
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
//如果不相等,则表示当前ConsumeQueue有部分无效数据,恢复结束
log.info("recover current consume queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
//该文件映射的已恢复的物理偏移量
processOffset += mappedFileOffset;
//设置当前queueId下面的所有的ConsumeQueue文件的最新数据
//设置刷盘最新位置,提交的最新位置
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
/**
* 删除文件中最大有效数据偏移量processOffset以后的所有数据
* 该方法会校验,如果文件最大数据偏移量大于最大有效数据偏移量
* 最大数据偏移量:文件的最大数据偏移
* 最大有效数据偏移量:最后一个有效条目在自身文件中的偏移量
* 1. 那么将文件起始偏移量大于最大有效数据偏移量的文件进行整个删除
* 2. 否则设置该文件的有效数据位置为最大有效数据偏移量
*/
this.mappedFileQueue.truncateDirtyFiles(processOffset);
//是否启用外部读取,默认不启用
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
步骤五:
/**
* MappedFileQueue的方法
* @param offset 文件的最大有效数据偏移量
*/
public void truncateDirtyFiles(long offset) {
//待移除的文件集合
List<MappedFile> willRemoveFiles = new ArrayList<>();
//遍历内部所有的MappedFile文件
for (MappedFile file : this.mappedFiles) {
//获取当前文件自身的最大数据偏移
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
//如果最大数据偏移量大于最大有效数据偏移量
if (fileTailOffset > offset) {
//如果最大有效数据偏移量大于等于该文件的起始偏移量,那么说明文件当中有一部分数据是有效的,那么设置该文件的的有效属性
if (offset >= file.getFileFromOffset()) {
//设置当前文件的刷盘、提交、写入指针为当前最大有效数据偏移量
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//如果最大有效数据偏移量小于该文件的起始偏移量,那么删除该文件
file.destroy(1000);
//记录到待删除的文件集合中
willRemoveFiles.add(file);
}
}
}
//将等待移除的文件整体从mappedFiles中移除
this.deleteExpiredFile(willRemoveFiles);
}
commitLog.recoverNormally(正常恢复)
步骤一:
/**
* When the normal exit, data recovery, all memory data have been flush
* 该方法用于broker上次正常关闭的时候恢复commitlog,其逻辑与recoverConsumeQueue恢复ConsumeQueue文件的方法差不多
* 当正常退出、数据恢复时,所有内存数据都已刷新到磁盘
* @param maxPhyOffsetOfConsumeQueue consumequeue文件中记录的最大有效commitlog文件偏移量
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
/**
* 是否检查消耗记录的CRC32。默认为true
* 目的是:
* 这样可以确保消息不会发生在线或磁盘损坏。
* 该检查增加了一些开销,因此在寻求极端性能的情况下可能会禁用该检查。
*/
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
//检查重复信息
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
//获取commitlog文件集合
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
//如果存在commitlog文件
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//从倒数第三个文件开始恢复
int index = mappedFiles.size() - 3;
//不足3个文件,则直接从第1个开始恢复
if (index < 0) {
index = 0;
}
//获取文件对应的映射对象
MappedFile mappedFile = mappedFiles.get(index);
//文件映射对应的DiectByteBuffer
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//获取文件映射的初始物理偏移量,其实和文件名相同
long processOffset = mappedFile.getFileFromOffset();
//当前commitlog映射文件的有效offset
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = this.getConfirmOffset();
// normal recover doesn't require dispatching
//正常恢复不需要调度
boolean doDispatch = false;
while (true) {
//生成DispatchRequest,验证本条消息是否合法
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
//获取消息大小
int size = dispatchRequest.getMsgSize();
// Normal data
//如果数据是正常的
if (dispatchRequest.isSuccess() && size > 0) {
//最后有效的物理地址偏移 = 文件的初始物理偏移量 + commitlog文件有效的偏移量
lastValidMsgPhyOffset = processOffset + mappedFileOffset;
//更新mappedFilOffset的值加上本条消息长度
mappedFileOffset += size;
/**
* 将在新的调度请求发送到消息存储时触发。
*/
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
//如果消息的请求是正常的但是size为0,表示到了文件的末尾,则尝试跳到下一个commitlog文件进行检测
else if (dispatchRequest.isSuccess() && size == 0) {
/**
* 将在新的调度请求发送到消息存储时触发。
*/
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, true);
index++;
//如果最后一个文件查找完毕,结束循环
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
//如果最后一个文件没有查找完毕,那么跳转到下一个文件
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
//如果当前消息异常,那么不继续校验
else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
}
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
//commitlog文件的最大有效区域的偏移量
processOffset += mappedFileOffset;
// Set a candidate confirm offset.
// In most cases, this value will be overwritten by confirmLog.init.
// It works if some confirmed messages are lost.
this.setConfirmOffset(lastValidMsgPhyOffset);
//设置当前commitlog下面的所有的commitlog文件的最新数据
//设置刷盘最新位置,提交的最新位置
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
/**
* 删除文件最大有效数据偏移量processOffset之后的所有数据
*/
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
//如果最consmequeue文件记录的最大有效commitlog文件偏移量,大于等于commitlog文件本身记录的最大有效区域的偏移量
//那么以consumequeue文件的有效数据为准,再次清楚consumequeue文件中的脏数据
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
//如果不存在commitlog文件
log.warn("The commitlog files are deleted, and delete the consume queue files");
//那么重置刷盘最新位置,提交的最新位置,并且清除所有的consumequeue索引文件
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
步骤二:
/**
* MappedFileQueue的方法
* @param offset 文件的最大有效数据偏移量
*/
public void truncateDirtyFiles(long offset) {
//待移除的文件集合
List<MappedFile> willRemoveFiles = new ArrayList<>();
//遍历内部所有的MappedFile文件
for (MappedFile file : this.mappedFiles) {
//获取当前文件自身的最大数据偏移
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
//如果最大数据偏移量大于最大有效数据偏移量
if (fileTailOffset > offset) {
//如果最大有效数据偏移量大于等于该文件的起始偏移量,那么说明文件当中有一部分数据是有效的,那么设置该文件的的有效属性
if (offset >= file.getFileFromOffset()) {
//设置当前文件的刷盘、提交、写入指针为当前最大有效数据偏移量
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//如果最大有效数据偏移量小于该文件的起始偏移量,那么删除该文件
file.destroy(1000);
//记录到待删除的文件集合中
willRemoveFiles.add(file);
}
}
}
//将等待移除的文件整体从mappedFiles中移除
this.deleteExpiredFile(willRemoveFiles);
}
commitLog.recoverNormally(异常恢复 - 当前版本已经不采用,简单说一下)
步骤一:
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
//倒叙遍历所有的commitlog文件执行检查恢复
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//首先校验当前commitlog文件是否是一个正确的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
//从第一个正确的commitlog文件开始遍历恢复
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = this.getConfirmOffset();
// abnormal recover require dispatching
boolean doDispatch = true;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
lastValidMsgPhyOffset = processOffset + mappedFileOffset;
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
}
} else {
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, true);
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
if (size > 0) {
log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
}
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
processOffset += mappedFileOffset;
// Set a candidate confirm offset.
// In most cases, this value will be overwritten by confirmLog.init.
// It works if some confirmed messages are lost.
this.setConfirmOffset(lastValidMsgPhyOffset);
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
步骤二:
/**
* 校验当前commitlog文件是否是一个正确的文件
* @param mappedFile 需要判断的commitlog文件
* @return
*/
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//获取文件开头的魔数
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
//如果文件的魔数与commitlog文件的正确的魔数不一致,则直接返回false,表示不是正确的commitlog文件
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;
long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
//如果消息存盘时间为 0,则直接返回false,表示为存储任何消息
if (0 == storeTimestamp) {
return false;
}
//如果messafeIndexEnable为true, 并且使用安全的消息索引功能,即可靠模式,那么Index文件进行校验
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
//如果StoreCheckpoint的最小时间戳索引大于等于当前文件的存储时间,那么返回true,表示当前文件至少有部分是可靠的
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
//如果StoreCheckpoint的最小时间戳大于等于当前文件的存储时间,那么返回true,表示当前文件至少有部分是可靠的
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
恢复TopicQueueTable
步骤一:
public void recoverTopicQueueTable() {
//获取commitlog的最小偏移量
long minPhyOffset = this.commitLog.getMinOffset();
this.consumeQueueStore.recoverOffsetTable(minPhyOffset);
}
步骤二:
/**
* DefaultMessageStore的方法
*
* @param minPhyOffset
*/
public void recoverOffsetTable(long minPhyOffset) {
ConcurrentMap<String, Long> cqOffsetTable = new ConcurrentHashMap<>(1024);
ConcurrentMap<String, Long> bcqOffsetTable = new ConcurrentHashMap<>(1024);
//遍历consumequeueQueueTable。即consumequeue文件的集合
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
long maxOffsetInQueue = logic.getMaxOffsetInQueue();
if (Objects.equals(CQType.BatchCQ, logic.getCQType())) {
//将"topicName-queueId"作为key,将当前queueId下面最大的相对偏移量作为value存入table
bcqOffsetTable.put(key, maxOffsetInQueue);
} else {
//将"topicName-queueId"作为key,将当前queueId下面最大的相对偏移量作为value存入table
cqOffsetTable.put(key, maxOffsetInQueue);
}
this.correctMinOffset(logic, minPhyOffset);
}
}
//Correct unSubmit consumeOffset
if (messageStoreConfig.isDuplicationEnable()) {
SelectMappedBufferResult lastBuffer = null;
long startReadOffset = messageStore.getCommitLog().getConfirmOffset() == -1 ? 0 : messageStore.getCommitLog().getConfirmOffset();
while ((lastBuffer = messageStore.selectOneMessageByOffset(startReadOffset)) != null) {
try {
if (lastBuffer.getStartOffset() > startReadOffset) {
startReadOffset = lastBuffer.getStartOffset();
continue;
}
ByteBuffer bb = lastBuffer.getByteBuffer();
int magicCode = bb.getInt(bb.position() + 4);
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
startReadOffset += bb.getInt(bb.position());
continue;
} else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) {
throw new RuntimeException("Unknown magicCode: " + magicCode);
}
lastBuffer.getByteBuffer().mark();
DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(), true, true, true);
if (!dispatchRequest.isSuccess())
break;
lastBuffer.getByteBuffer().reset();
MessageExt msg = MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, true);
if (msg == null)
break;
String key = msg.getTopic() + "-" + msg.getQueueId();
cqOffsetTable.put(key, msg.getQueueOffset() + 1);
startReadOffset += msg.getStoreSize();
} finally {
if (lastBuffer != null)
lastBuffer.release();
}
}
}
//设置为topicQueueTable
this.setTopicQueueTable(cqOffsetTable);
this.setBatchTopicQueueTable(bcqOffsetTable);
}
2.3.5 计划消息服务在消息存储服务之后加载 加载RocketMQ延迟消息的服务,包括延时等级,配置文件等等
步骤一:
@Override
public boolean load() {
boolean result = super.load();
//解析延迟级别
result = result && this.parseDelayLevel();
//校正延迟偏移
result = result && this.correctDelayOffset();
return result;
}
步骤二:
public boolean load() {
String fileName = null;
try {
//获取配置文件路径
fileName = this.configFilePath();
//加载配置文件得到内部的json字符串数据
String jsonString = MixAll.file2String(fileName);
//如果加载的json字符串为null 或者长度为0,那么就加载bak备份文件
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
//如果加载的json字符串不为空,那么将json字符串反序列化为对象属性
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
}
public abstract String configFilePath();
private boolean loadBak() {
String fileName = null;
try {
//获取配置文件路径
fileName = this.configFilePath();
//加载配置文件得到内部的json字符串数据
String jsonString = MixAll.file2String(fileName + ".bak");
//如果加载的json字符串不为null 并且 长度 > 0,就进行反序列化
if (jsonString != null && jsonString.length() > 0) {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " Failed", e);
return false;
}
return true;
}
步骤三(解析延迟级别):
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
/**
* 获取延迟级别的字符串形式
* 默认的延迟级别:
* messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
String levelString = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
try {
//以空格划分为延迟级别字符数组
String[] levelArray = levelString.split(" ");
//遍历延迟级别数组
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
//截取value字符串的最后一个字符,作为字符串ch,ch为延迟级别的单位
String ch = value.substring(value.length() - 1);
//根据延迟级别的单位从timeUnitTable集合中获取对应的value值
Long tu = timeUnitTable.get(ch);
int level = i + 1;
//更新最大延迟级别
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
//获取messageDelayLevel中去掉单位的延迟事件的大小(只包含数字,不包含单位)
long num = Long.parseLong(value.substring(0, value.length() - 1));
/**
* 获取延迟时间,以s为单位:
* 例如:
* tu = 1000L * 60
* num = 3
* delayTimeMillis = tu * num
*/
long delayTimeMillis = tu * num;
/**
* 将延迟级别以及对应的延迟时间以键值对的形式放入delayLevelTable中,
* delayLevelTable为ConcurrentMap集合,线程安全的集合
*/
this.delayLevelTable.put(level, delayTimeMillis);
/**
* 是否启用异步传递,默认不启用
* enableAsyncDeliver = false
*/
if (this.enableAsyncDeliver) {
//deliverPendingTable:将延迟等级,及 阻塞队列以键值对的形式放入传递待定表中
this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
}
}
} catch (Exception e) {
log.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
步骤四(校正延迟偏移):
/**
* 校正延迟偏移量
* @return
*/
public boolean correctDelayOffset() {
try {
/**
* 遍历delayLevelTable:
* delayLevelTable键存储的是:延迟级别
* delayLevelTable值存储的是:延迟级别对应的时间长短(以s为单位)
*/
for (int delayLevel : delayLevelTable.keySet()) {
//获得一个消费队列接口
ConsumeQueueInterface cq =
brokerController.getMessageStore().getQueueStore().findOrCreateConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
/**
* 偏移量表offsetTable是一个ConcurrentMap集合,线程安全的集合
* offsetTable键存储的是:延迟级别
* offsetTable值存储的是:延迟级别对应的偏移量
*/
//根据延迟级别获取当前的延迟偏移量
Long currentDelayOffset = offsetTable.get(delayLevel);
//延迟偏移量 == null 或者 消费队列接口 == null,终止
if (currentDelayOffset == null || cq == null) {
continue;
}
//正确的延迟偏移量 = 当前的延迟偏移量
long correctDelayOffset = currentDelayOffset;
//获取最小偏移量
long cqMinOffset = cq.getMinOffsetInQueue();
//获取最大偏移量
long cqMaxOffset = cq.getMaxOffsetInQueue();
if (currentDelayOffset < cqMinOffset) {
correctDelayOffset = cqMinOffset;
//调度CQ偏移无效。偏移量={},cqMinOffset={},cqMaxOffset=},queueId=}
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (currentDelayOffset > cqMaxOffset) {
correctDelayOffset = cqMaxOffset;
//调度CQ偏移无效。偏移量={},cqMinOffset={},cqMaxOffset=},queueId=}
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (correctDelayOffset != currentDelayOffset) {
//从{}到{}的正确延迟偏移量[delayLevel{}]
log.error("correct delay offset [ delayLevel {} ] from {} to {}", delayLevel, currentDelayOffset, correctDelayOffset);
offsetTable.put(delayLevel, correctDelayOffset);
}
}
} catch (Exception e) {
log.error("correctDelayOffset exception", e);
return false;
}
return true;
}
2.3.6 初始化Broker通信层,创建netty远程服务
步骤一:
protected void initializeRemotingServer() throws CloneNotSupportedException {
//创建broker的netty远程服务,端口号为10911, 可以用来处理客户端的所有请求
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
//创建一个broker的netty快速远程服务的配置
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
//设置快递netty远程服务的配置监听端口号为普通服务端口号 - 2, 默认10909
int listeningPort = nettyServerConfig.getListenPort() - 2;
if (listeningPort < 0) {
listeningPort = 0;
}
//设置broker的netty快速远程服务的端口号
fastConfig.setListenPort(listeningPort);
//创建broker的netty快读远程服务,这就是所谓的快速通道,对应可以处理客户端除了拉取消息之外的所有请求,所谓的VIP端口
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
}
2.3.7 创建各种线程池
主要有两类:
第一类:负责处理别人发过来的请求;
第二类:负责处理自己的一些后台任务 *
这一步创建了很多线程池,因为RocketMQ为了性能,对过多的请求进行异步优化处理,因此需要许多线程池
步骤一:
/**
* Initialize resources including remoting server and thread executors.
* 初始化资源,包括远程处理服务器和线程执行器。
*/
protected void initializeResources() {
//创建Broker控制器计划线程池
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));
//创建处理发送消息请求的线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
//创建处理拉取消息请求的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
//创建精简拉取消息执行器
this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getLitePullMessageThreadPoolNums(),
this.brokerConfig.getLitePullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.litePullThreadPoolQueue,
new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));
this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.putThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
//创建ack消息执行器
this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getAckMessageThreadPoolNums(),
this.brokerConfig.getAckMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.ackThreadPoolQueue,
new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
//创建查询请求的线程池
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
//broker 管理线程池,作为默认处理器的线程池
this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getAdminBrokerThreadPoolNums(),
this.brokerConfig.getAdminBrokerThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.adminBrokerThreadPoolQueue,
new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
//创建客户端管理的线程池
this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
//创建broker心跳处理的线程池
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));
//创建消费者管理的线程池
this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getConsumerManageThreadPoolNums(),
this.brokerConfig.getConsumerManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumerManagerThreadPoolQueue,
new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));
//创建reply消息请求的线程池
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));
//创建事务消息相关处理的线程池
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));
//创建负载均衡的线程池
this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.loadBalanceThreadPoolQueue,
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
//创建同步broker成员组执行器服务
this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
//创建broker心跳执行器服务
this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
//创建topic队列映射清理服务
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
}
2.3.8 RocketMQ底层通信基于netty,这里注册netty请求处理器。
1. registerProcessor方法将处理器和对应的线程池绑定为一个Pair对象,并且将这个pair对象放入processorTable中, 其值就是pair对象,key就是对应的请求编码RequestCode。
2. 每个请求,都会根据自己携带的RequestCode在processorTable中查找对应的处理器以及对应的执行器线程池来处理请求。 RocketMQ通过这样的方式来提升处理请求的性能。
/**
* 从这里的源码能够看出来,除了pullMessageProcessor处理器只会被注册到remotingServer之外,
* 其他处理器会被注册到remotingServer和fastRemotingServer这两个netty服务中。
*
* 所以Vip通道服务不能够处理拉取消息的请求
*/
public void registerProcessor() {
/*
* SendMessageProcessor
*/
sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* PeekMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor);
/**
* PopMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);
/**
* AckMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
/**
* ChangeInvisibleTimeProcessor
*/
this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
/**
* notificationProcessor
*/
this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor);
/**
* pollingInfoProcessor
*/
this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor);
/**
* ReplyMessageProcessor
*/
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/**
* QueryAssignmentProcessor
*/
this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
/*
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
2.3.9 启动一系列定时周期任务(定时调度线程池的后台执行)
在注册了netty消息处理器之后,将会启动一系列的定时任务 这些定时任务由BrokerController中的scheduledExecutorService去执行,该线程池只有一个线程。
步骤一:
protected void initializeScheduledTasks() {
/**
* 初始化broker计划任务
*/
initializeBrokerScheduledTasks();
//每隔5s进行ScheduledTask元数据的刷新
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) {
LOG.error("ScheduledTask refresh metadata exception", e);
}
}
}, 10, 5, TimeUnit.SECONDS);
//如果namesrvAddr不为null,每隔120s,更新一次nameServer地址列表
if (this.brokerConfig.getNamesrvAddr() != null) {
this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
// also auto update namesrv if specify
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.updateNamesrvAddr();
} catch (Throwable e) {
LOG.error("Failed to update nameServer address list", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//每隔120s,获取nameServer的地址
else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
LOG.error("Failed to fetch nameServer address", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
}
步骤二:
protected void initializeBrokerScheduledTasks() {
//延迟启动的时间
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = TimeUnit.DAYS.toMillis(1);
//每隔24小时执行一次,打印昨天生产和消费的消息数量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
LOG.error("BrokerController: failed to record broker stats", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//每隔5s将消费者offset进行持久化操作,存入consumerOffset.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
LOG.error(
"BrokerController: failed to persist config file of consumerOffset", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//每隔10s将消费过滤信息 和 消费者订单信息 进行持久化,存入consumerFilter.json文件 和 consumerOrderInfo.json中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
BrokerController.this.consumerOrderInfoManager.persist();
} catch (Throwable e) {
LOG.error(
"BrokerController: failed to persist config file of consumerFilter or consumerOrderInfo",
e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
/**
* 每隔3m将检查消费者的消费进度
* 每当消费者 isDisableConsumeIfConsumerReadSlowly (消费者消费缓慢)= true(默认false) 并且 进度落后阈值的时候 ,就停止消费者消费,保护broker,避免消息积压
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
LOG.error("BrokerController: failed to protectBroker", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
/**
* 每隔1s将打印
* 发送消息线程池队列,
* 拉取消息线程池队列,
* 查询消息线程池队列,
* 精简拉取线程池队列、
* 结束事务线程池队列
* 客户端管理器线程池队列、
* 心跳线程池队列、
* ack线程池队列的大小
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
LOG.error("BrokerController: failed to print broker watermark", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
//每隔1m将打印已经存储在commitlog提交日志文件中,但尚未分派到consume queue(消费队列)的字节数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//调度任务落后于提交日志{}字节
LOG.info("Dispatch task fall behind commit log {}bytes",
BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
LOG.error("Failed to print dispatchBehindBytes", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
//如果没有开启DLeger服务(DLeger开启后表示支持高可用的主从自动切换)并且 不允许重复复制 并且 没有启动控制器模式,不支持自动切换代理的角色。
if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !brokerConfig.isEnableControllerMode()) {
//如果当前的broker是从节点
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
//根据是否配置了HA地址 并且 HA地址的长度是否大于等于设置的默认最小长度(6),来更新HA地址
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
//每隔3s同步一次slave从节点信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//如果当前时间 - 上一次的同步时间 > 6s
if (System.currentTimeMillis() - lastSyncTimeMs > 60 * 1000) {
/**
* 更新从机配置信息,包括:
* 同步topic配置信息
* 同步消费者Offset信息
* 同步延迟Offset信息
* 同步订阅组配置信息
* 同步消息请求模式信息
* 如果启用了计时器车轮,还需要同步计时器指标
*/
BrokerController.this.getSlaveSynchronize().syncAll();
//更新上一次同步的时间
lastSyncTimeMs = System.currentTimeMillis();
}
//timer checkpoint, latency-sensitive, so sync it more frequently
BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
} catch (Throwable e) {
//未能同步从属设备的所有配置
LOG.error("Failed to sync all config for slave.", e);
}
}
}, 1000 * 10, 3 * 1000, TimeUnit.MILLISECONDS);
} else {
//如果是主节点,每隔60s将打印主从节点的差异
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
LOG.error("Failed to print diff of master and slave.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
//如果启动控制器模式,支持自动切换代理的角色。
if (this.brokerConfig.isEnableControllerMode()) {
//
this.updateMasterHAServerAddrPeriodically = true;
}
}
2.3.10 初始化事务消息相关服务
初始化服务采用Java SPI的方式进行加载 主要初始化三个服务:
1.transactionalMessageService(事务消息服务):用于处理、检查事务消息 2.transactionalMessageCheckListener(事务消息监查监听器):监听回查消息 3.transactionalMessageCheckService(事务消息检查服务):提供了事务消息回查的逻辑,默认情况下 6s以上没commit/rollback的事务消息才会触发事务回查,而如果回查次数超过了15次则丢弃事务
private void initialTransaction() {
/**
* 基于Java的SPI机制,查找"META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService"文件里面的SPI实现
* 事务消息服务:用于处理、检查事务消息。
*/
this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
//如果没有通过SPI指定具体的实现,那么就使用默认实现,TransactionalMessageServiceImpl
this.transactionalMessageService = new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
//加载默认事务消息挂钩服务:{}
LOG.warn("Load default transaction message hook service: {}",
TransactionalMessageServiceImpl.class.getSimpleName());
}
/**
* 基于Java的SPI机制,查找"META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"文件里面的SPI实现
* 事务消息回查服务监听器:监听回查消息
*/
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
//如果没有通过SPI制定具体的实现,那么使用默认实现,DefaultTransactionalMessageCheckListener
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
//加载默认丢弃消息挂钩服务:{}
LOG.warn("Load default discard message hook service: {}",
DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
/**
* 事务消息检查服务:提供了事务消息回查的逻辑。
*
* 创建TransactionMessageCheckService服务,该服务内部有一个线程
* 默认情况下,6秒以上没commit/rollback的事务消息才会触发事务回查,而如果回查次数超过15次则丢弃事务。
*/
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
2.3.11 初始化Acl权限相关服务:加载权限相关校验器
同样是基于Java的SPI机制进行查找,并且会将找到校验器注册到RpcHook中,在请求执行之前会执行权限校验。
private void initialAcl() {
//校验是都开启了Acl,默认为false,所以直接返回了
if (!this.brokerConfig.isAclEnable()) {
//代理未启用acl
LOG.info("The broker dose not enable acl");
return;
}
//如果开启了Acl,则首先通过SPI机制获取AccessValidator
List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
if (accessValidators.isEmpty()) {
//ServiceProvider加载了没有AccessValidator,使用default org.apache.rocketmq.acl.plain.plainAccessValidator(默认的访问验证器)
LOG.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
accessValidators.add(new PlainAccessValidator());
}
//将校验器存入accessValidatorMap,并且注册到RpcHook中,在请求之前会执行校验
for (AccessValidator accessValidator : accessValidators) {
final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(), validator);
//注册RPC钩子函数
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
//在执行之前会进行校验
validator.validate(validator.parse(request, remoteAddr));
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
}
2.3.12 初始化RPC调用的钩子函数
RpcHook是RocketMQ提供的钩子类,提供一种类似于类似于AOP的功能。 可以在请求被处理之前和响应被返回之前执行对应的方法。
private void initialRpcHooks() {
//通过SPI机制获取RPCHook
List<RPCHook> rpcHooks = ServiceProvider.load(RPCHook.class);
//如果没有配置的RpcHook,那么直接返回
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
//遍历并且注册所有的RpcHook
for (RPCHook rpcHook : rpcHooks) {
this.registerServerRPCHook(rpcHook);
}
}
2.3.13 Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改
//Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改
//默认是PERMISSIVE,因此会进入代码块
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
//实例化文件监听服务,并且初始化事务消息服务
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
//信任证书已更改,请重新加载ssl上下文
LOG.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
//证书和私钥已更改,请重新加载ssl上下文
LOG.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
});
} catch (Exception e) {
result = false;
LOG.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
}
return result;
}
2.4 添加钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作
/**
* 9. 添加钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作
* 关闭定时器,向所有nameServer注销注册信息
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
/**
* 关闭定时器,向nameServer注销注册信息
* 并且还会在messageStore#shutdown方法中将abort临时文件删除
*/
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
2.5 调用controller.start();
启动broker, 发送心跳包的入口
public void start() throws Exception {
this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
//副本数 > 1 && 是否启用从机代理主机 || 是启动控制器模式,支持自动切换代理的角色。
if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {
isIsolated = true;
}
//代理外部API 不为空
if (this.brokerOuterAPI != null) {
//启动代理外部API
this.brokerOuterAPI.start();
}
//启动基础服务
startBasicService();
//如果 isIsolated 为false && 没有开启DLeger的相关配置 && 没有开启重复复制的功能
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
//更改特殊服务的状态 将brokerId更改为0
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
/**
* 在broker首次启动时,强制进行broker的注册
* 强制注册当前broker信息到所有的nameServer中
*/
this.registerBrokerAll(true, false, true);
}
/**
* 设置一个定时任务,默认情况下每隔30s调用registorBrokerAll方法向所有的nameServer进行一次注册broker信息,时间间隔可以配置registorNameServerPeriod属性,允许的值是在1万到6万毫秒之间,也就是(10s到60s的范围)
* 这个定时任务就是broker向nameServer发送的心跳包的定时任务,包括topic名,读、写队列个数,队列权限,是否有序等信息。
*/
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
/**
* isEnableSlaveActingMaster(),默认为false。不启用slave代理master
* 故障切换时,从设备将充当主设备,例如,如果这设备关闭,计时器或事务消息在从属设备中过期
*/
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
}
}
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}
/**
* isEnableControllerMode():是否启动控制器模式,支持自动切换代理的角色。
* 默认为false
*/
if (this.brokerConfig.isEnableControllerMode()) {
scheduleSendHeartbeat();
}
/**
* isSkipPreOnline():默认为false
*/
if (brokerConfig.isSkipPreOnline()) {
//无条件启动broker
startServiceWithoutCondition();
}
}
2.5.1 启动基础服务 startBasicService();
protected void startBasicService() throws Exception {
/**
* 启动消息存储服务
* 处理消息存储相关的日志,比如CommitLog、ConsumeQueue等
*/
if (this.messageStore != null) {
this.messageStore.start();
}
//启动计时器消息存储
if (this.timerMessageStore != null) {
this.timerMessageStore.start();
}
//启动复制副本管理器功能
if (this.replicasManager != null) {
this.replicasManager.start();
}
if (remotingServerStartLatch != null) {
remotingServerStartLatch.await();
}
/**
* 启动netty路由服务
* broker的服务端,处理消费者和生产者的请求
*
*/
if (this.remotingServer != null) {
this.remotingServer.start();
// In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
if (null != nettyServerConfig && 0 == nettyServerConfig.getListenPort()) {
nettyServerConfig.setListenPort(remotingServer.localListenPort());
}
}
/**
* 启动快速netty快速路由服务
* 只给消息生产者的服务端
*/
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
this.storeHost = new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort());
for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
brokerAttachedPlugin.start();
}
}
if (this.popMessageProcessor != null) {
this.popMessageProcessor.getPopLongPollingService().start();
this.popMessageProcessor.getPopBufferMergeService().start();
this.popMessageProcessor.getQueueLockManager().start();
}
if (this.ackMessageProcessor != null) {
this.ackMessageProcessor.startPopReviveService();
}
if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.start();
}
/**
* 文件监听器启动,关注文件变更的服务,及时加载最新的ssl证书
* 通过对文件进行hash,判断新的hash和当前hash是否一致
* 如果不一致,表示文件变更了
*/
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
/**
* 长轮询拉取消息挂起服务启动
* 处理push模式消费,或者延迟消费的服务
*/
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
/**
* 客户端连接心跳服务启动
* 启动定时器 每10s清理没用的链接
*/
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
/**
* 过滤服务管理器启动
* 启动定时器,每30s 通过shell脚本启动startfsrv.sh
* 自定义消息过滤服务,如果用系统的tag或者是sql 不需要开启该服务
*/
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
if (this.broadcastOffsetManager != null) {
this.broadcastOffsetManager.start();
}
if (this.escapeBridge != null) {
this.escapeBridge.start();
}
if (this.topicRouteInfoManager != null) {
this.topicRouteInfoManager.start();
}
if (this.brokerPreOnlineService != null) {
this.brokerPreOnlineService.start();
}
//Init state version after messageStore initialized.
//初始化状态版本
this.topicConfigManager.initStateVersion();
}
2.5.2 在broker首次启动时,强制进行broker的注册registerBrokerAll
/**
* 把broker注册到所有的nameServer中,发送心跳包
* @param checkOrderConfig 是否校验 顺序消息配置
* @param oneway 是否是单向发送, 单向发送不接收返回值
* @param forceRegister 是否是强制注册
*/
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
/**
* 根据topicConfigManager中的topic信息构建topic信息的传输协议对象 topicConfigWrapper
* 在此前topicConfigManager.load()方法中已经加载了所有的topic信息
* topicConfigWrapper 中封装了该broker上的topic信息和dataVersion
*/
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
//设置版本号
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
//设置TopicConfigTable表
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
//设置TopicQueueMappingInfoMap
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
/**
* 这块代码的作用是将topicConfigWrapper中的值取出来重新封装一遍,又再塞回topicConfigWrapper
* 理解:这昂做的目的主要是为了将this.brokerConfig.getBrokerPermission()的属性值set进去
* 不过这并不是重要的细节,我们只需要知道topicConfigWrapper中至少包含了该broker上的topic信息以及dataVersion即可
*/
//如果当前broker权限不支持读或者写
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
//那么重新配置topic权限
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
/**
* 这一块内容是重点
*
* needRegister()方法用于判断当前broker是否需要向nameServer进行注册,当forceRegistor参数为true时,表示强制注册
* 那么该方法的结果是无所谓的,如果forceRegistor为false,那么broker是否需要向nameServer注册就得看这个方法的结果了
*/
//如果forceRegistor为true,表示强制注册, 或者如果当前broker应该注册,那么向nameServer进行注册
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isInBrokerContainer())) {
//执行注册
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
2.5.3 needRegister
/**
* broker是否需要向nameServer中注册
* @param clusterName 集群名
* @param brokerAddr broker地址
* @param brokerName broker名字
* @param brokerId brokerId
* @param timeoutMills 超时时间
* @param isInBrokerContainer 是否在broker容器中
* @return broker是否需要向nameServer中注册
*/
private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills,
final boolean isInBrokerContainer) {
/**
* 根据topicConfigManager中的topic信息构建topic信息的传输协议对象 topicConfigWrapper
* 在此前topicConfigManager.load()方法中已经加载了所有topic信息
* topicConfigWrapper 中封装了该broker上的topic信息和dataVersion
*/
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
/**
* 获取nameServer的DataVerison数据,一一对比自身数据是否一致
* 如果有一个nameServer的DataVersion数据版本不一致则重新注册
*/
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer);
boolean needRegister = false;
//如果有一个和nameServer的数据版本不一致,则需要重新注册
for (Boolean changed : changeList) {
if (changed) {
needRegister = true;
break;
}
}
return needRegister;
}
2.5.4 doRegisterBrokerAll
/**
* 1.调用brokerOuterAPI.registerBrokerAll进行注册
* 2.处理注册结果,registerBrokerResultList:进行master地址的更新、顺序消息Topic的配置更新
* @param checkOrderConfig 是否检测顺序topic
* @param oneway 是否是单向
* @param topicConfigWrapper topic信息的传输协议包装对象
*/
protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
if (shutdown) {
//broker已经关闭,无需再进行注册
BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
return;
}
/**
* 调用brokerOuterAPI.registerBrokerAll发送请求到NameServer进行注册,返回注册结果
*
* 执行注册,broker作为客户端向所有nameServer进行注册
*/
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
//包含了携带topic信息的topicConfigTable,以及版本信息的dataVersion
//这两个信息保存在持久化文件topics.json中
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
this.getBrokerIdentity());
/**
* 对注册结果进行处理
*/
handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
}
2.5.5 brokerOuterAPI.registerBrokerAll
/**
* Considering compression brings much CPU overhead to name server, stream API will not support compression and
* compression feature is deprecated.
* broker向nameServer注册
* @param clusterName
* @param brokerAddr
* @param brokerName
* @param brokerId
* @param haServerAddr
* @param topicConfigWrapper
* @param filterServerList
* @param oneway
* @param timeoutMills
* @param compressed default false
* @return
*/
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean enableActingMaster,
final boolean compressed,
final Long heartbeatTimeoutMillis,
final BrokerIdentity brokerIdentity) {
//创建一个CopyOnWriteArrayList类型的集合,用来保存请求的返回结果
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
//获得nameServer的地址信息集合
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
//如果获取到的nameServer地址信息集合 不为 null && 集合长度 > 0
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
/**
* 封装请求头:
* 包含了broker地址 ip:port
* broker的id 也就是角色 id等于0,为master, id > 0 为slave
* brokerName
* broker集群名称
* haServer地址
* 是否开启压缩
* 心跳超时毫秒(如果不为 null)
*/
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setEnableActingMaster(enableActingMaster);
requestHeader.setCompressed(false);
//如果心跳超时毫秒不为 null ,将其也封装在请求头里面
if (heartbeatTimeoutMillis != null) {
requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
}
/**
* 封装请求体
* 当前broker所有的topic信息,名称,读写队列数以及版本信息dataVersion
* 使用门闩依次向各个nameServer注册
*/
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
/**
* CountDownLatch介绍:
* CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
* CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。
* 当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
*
* 使用CountDownLatch作为倒数计数器,用于并发控制
* CountDownLatch 使得只有所有nameServer 的响应结果都返回时才会继续执行后续的逻辑
*
*/
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
/**
* 采用线程池的方式,即多线程并发的向所有的nameServer发起注册请求
* 遍历所有的nameServer,并将注册任务registerBroker 丢进brokerOuterExecutor 线程池中执行
*/
for (final String namesrvAddr : nameServerAddressList) {
//并发的执行线程任务
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
@Override
public void run0() {
try {
/**
* 真正执行注册的地方
*/
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
//保存注册结果
registerBrokerResultList.add(result);
}
//已完成当前broker注册到nameServer。目标主机={}
LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
} catch (Exception e) {
LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
} finally {
//每一个请求执行完毕,无论是正常还是异常,都需要减少一个计数
countDownLatch.countDown();
}
}
});
}
try {
/**
* 主线程在此限时等待6000ms,直到上面的任务全部执行完毕之后,计数变为0,会唤醒主线程继续执行后面的逻辑
*/
if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
}
} catch (InterruptedException ignore) {
}
}
return registerBrokerResultList;
}
2.5.6 registerBroker
/**
* registerBroker方法会通过底层的NettyClient,把这个请求发送到NameServer进行注册:
*
* @param namesrvAddr
* @param oneway
* @param timeoutMills
* @param requestHeader
* @param body
* @return
* @throws RemotingCommandException
* @throws MQBrokerException
* @throws RemotingConnectException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
* @throws InterruptedException
*/
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
//构建远程调用请求对象,code为REGISTER_BROKER=103
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
/**
* invokeSync方法:以同步的方式向客户端发送消息
* invokeAsync方法:以异步的方式向客户端发送消息
* invokeOneway方法:只向客户端发送消息,而不处理客户端返回的消息
*/
//如果是单向请求,则broker发起异步请求即可返回,不必关心执行结果,注册请求不是单向请求
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//利用RemotingClient发送注册请求,这个RemotingClient其实就是Netty客户端
/**
* 最核心的就是下面这行
* invokeSync()方法在NettyRemotingClient类中
*
* 创建连接最终的核心:NettyRomotingClient底层是 - 基于Netty的Bootstrap类的connnect方法,创建了一个连接
* 发送请求最终的核心:NettyRemotingClient底层是 - 基于Netty的Channel API,把注册的请求给发送到了NameServer就可以了
*/
//通过remotingClient发起同步调用,非单向请求,即需要同步的获取结果
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
//下面是处理返回结果,封装成一个RegisterBrokerResult
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
/**
* 解析响应数据,封装结果
*/
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
2.5.7 invokeSync
/**
* 消息的同步发送,不仅需要将消息发送出去,还要处理消息发送的响应结果,
*
* 同步发送与单向发送很像,都是先根据broker地址查找连接,
* 如果连接正常,在消息发送之前和消息发送之后就执行钩子方法,然后将消息发送出去,将消息响应结果返回
* 如果消息发送出现异常,就关闭连接,抛出异常
* @param addr
* @param request
* @param timeoutMillis
* @return
* @throws InterruptedException
* @throws RemotingConnectException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
*/
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
/**
*
* 获取连接Channel,这个Channel可以理解成broker跟nameServer之间建立的一个连接
*/
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
//在执行发送消息之前,执行Rpc钩子
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
}
/**
* 这里才是真正的发送请求
*/
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//接收到消息之后,执行Rpc钩子
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
this.updateChannelLastResponseTime(addr);
return response;
} catch (RemotingSendRequestException e) {
LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
2.5.8 invokeSyncImpl
/**
* 同步发送请求:
*
* 同步发送方法首先创建ResponeFuture,ResponeFuture是保存请求响应结果的,opaque是请求id,将请求id与响应结果的对应关系保存在responeTable(map)中,通过请求id就可以找到对应的响应结果了。
* 然后利用netty的Channel连接组件,将消息以同步的方式发送出去,添加一个监听器,监听消息是否成功发送,当监听到消息成功发送,就设置发送成功的标志,否则设置发送失败的标志,并且删除请求与响应的对应关系,以及异常原因。
*
* 消息通过netty的Channel组件连接发送后,就等待消息的响应结果。
* 当响应结果为null,但是发送成功,那么就抛出超时的异常,否则就抛出其他异常。
* 当响应结果不为null,就返回响应结果,最后删除请求与响应的关系。
*
* @param channel
* @param request
* @param timeoutMillis
* @return
* @throws InterruptedException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//get the request id
//请求id,通过该id可以找到该请求的响应结果
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//将请求id与响应结果的对应关系保存在responseTable(map)集合中
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
/**
* 基于Netty的Channel组件,将请求发出去
*/
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
//消息发送成功,设置发送成功的标志
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
//发送失败
responseFuture.setSendRequestOK(false);
//删除请求id与响应的对应关系
responseTable.remove(opaque);
//发送异常
responseFuture.setCause(f.cause());
//发送结果为null
responseFuture.putResponse(null);
log.warn("Failed to write a request command to {}, caused by underlying I/O operation failure", addr);
});
/**
* 这里比较重要 等待请求响应的结果
*/
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
//响应结果为空
if (null == responseCommand) {
//发送成功,但是没有响应,抛出超时的异常
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
//最后删除请求id与响应的对应关系
this.responseTable.remove(opaque);
}
}
2.5.9 handleRegisterBrokerResult
/**
* 对注册结果进行处理
* @param registerBrokerResultList
* @param checkOrderConfig
*/
protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,
boolean checkOrderConfig) {
for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {
if (registerBrokerResult != null) {
//涉及master/slave的一些机制
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
break;
}
}
}
broker的整个启动核心过程基本就是以上的内容
参考文献:
(8条消息) RocketMQ源码(3)—Broker启动流程源码解析【一万字】_刘Java的博客-CSDN博客_rocketmq-broker源码
(12条消息) RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】_刘Java的博客-CSDN博客文章来源:https://www.toymoban.com/news/detail-472844.html
RocketMQ源码分析(三)——Broker启动流程 | 山海 | 专注分布式系统架构与设计 (tpvlog.com)文章来源地址https://www.toymoban.com/news/detail-472844.html
到了这里,关于RocketMQ(三) broker启动的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!