10.1 模块人口代码的功能
10.1.1 入口函数
首先看一下 NameServer 的源码目录(见图 10-1 ) 。NamesrvStartup 是模块的启动入 口, NamesrvController 是用来协块各个调模功能的代码。
我们从启动代码开始分析,找到 NamesrvStartup.java 里的 main 函数 public static void main(String[] args) {mainO(args);},发现它又把逻辑转到 main。这个函数里。
10.1.2 解析命令行参数
main0 函数主要完成两个功能,第一个功能是解析命令行参数,我们通过源码来看一看,重点是解析 - c 和 - p 参数。
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());
if (null == commandLine) {
System.exit(-1);
return;
}
namesrvConfig = new NamesrvConfig();
nettyServerConfig = new NettyServerConfig();
nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
if (namesrvConfig.isEnableControllerInNamesrv()) {
controllerConfig = new ControllerConfig();
MixAll.properties2Object(properties, controllerConfig);
}
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(logConsole, namesrvConfig);
MixAll.printObjectProperties(logConsole, nettyServerConfig);
MixAll.printObjectProperties(logConsole, nettyClientConfig);
if (namesrvConfig.isEnableControllerInNamesrv()) {
MixAll.printObjectProperties(logConsole, controllerConfig);
}
System.exit(0);
}
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
}
-c 命令行参数用来指定配置文件的位置; - p 命令行参数用来打印所有配置项的值。注意 ,用 - p 参数打印配置项的值之后程序就退出了,这是一个帮助调试的选项 。
10.1.3 初始化 NameServer 的 Controller
main0 函数的另外一个功能是初始化 Controller
根据解析出的配置参数, 调用 controller.initialize()来初始化,然后调用controIler.start() 让 NameServer 开始服务。还有 一个逻辑是注册 ShutdownHookThread ,当程序退出的时候会调用controller.shutdown 来做退出前的清理工作 。
10.2 NameServer 的总控逻辑
NameServer 的总控逻辑在 NamesrvController.java 代码中 。 NameServer 是集群 的协调者 ,它 只是简单地接收其他角色报上来的状态,然后根据请求返回相应的状态 。 首先, NameserverController 把执行线程池初始化好。
org.apache.rocketmq.namesrv.NamesrvController#initialize L103
org.apache.rocketmq.namesrv.NamesrvController#initiateThreadExecutors
private void startScheduleService() {
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
1, 10, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
NamesrvController.this.printWaterMark();
} catch (Throwable e) {
LOGGER.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
}
启动 了一个默认是 8 个线程的线程池 ( private int serverWorkerThreads = 8),还有两个定时执行的线程,一个用来扫描失效的 Broker (scanNotActiveBroker ) , 另一个用来打印配置信息( printAllPeriodically)。
然后启动负责通信的服务 remotingServer, remotingServer 监听一些端口 ,收到 Broker 、 Client 等发过来的请求后,根据请求的命令,调用不同的 Processor 来处理。 这些不同的处理逻辑被放到上面初始化的线程池中执行。
org.apache.rocketmq.namesrv.NamesrvController#initiateNetworkComponents
org.apache.rocketmq.namesrv.NamesrvController#registerProcessor
remotingServer 是基于Netty 封装的一个网络通信服务,要了解 remotingServer 需要先对 Netty 有个基本的认知。
10.3 核心业务逻辑处理
NameServer 的核心业务逻辑 , 在 DefaultRequestProcessor.java 中可以 一目了然地看出 。 网络通信服务模块收到请求后,就调用这个 Processor 来处理,
逻辑主体是个 switch 语句 ,根据 RequestCode 调用不同的函数来处理 ,从 RequestCode 可以了解到 NameServer 的主要功能,比如 : REGISTER_BROKER 是在集群中新加入一个 Broker 机器; GET_ROUTEINTO_BY_TOPIC 是请求获取一个 Topic 的路由信息 ; WIPE_WRITE_PERM OF BROKER 是删除一个 Broker 的写权限。
10.4 集群状态存储
NameServer 作为集群的协调者,需要保存和维护集群的各种元数据 , 这是通过 RoutelnfoManager 类来实现的。
private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
5 张图告诉你 RocketMQ 为什么不使用 Zookeeper 做注册中心 - 腾讯云开发者社区-腾讯云RocketMQ 选择了自己写 NameServer 做注册中心而没有选择 Zookeeper,这是为什么呢?https://cloud.tencent.com/developer/article/2118883?shareByChannel=link#3.1每个结构存储着一类集群信息,具体含义在第 5 章有介绍。了解 RocketMQ各个角色的功能后,对每个结构的处理逻辑就好理解了 。 下面重点看一下控制访问这些结构的锁机制 。
锁分为互斥锁、读写锁; 也可分为可重入锁、不可重入锁。 在 NameServer的场景中 ,读取操作多,更改操作少, 所以选择读写锁能大大提高效率。 RoutelnfoManager 中使用的是可重人的读写锁 ( private final ReadWriteLock lock = new ReentrantReadWriteLock()),我们以 deleteTopic 函数为例,看一下锁的使用方式。
public void deleteTopic(final String topic) {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} catch (Exception e) {
log.error("deleteTopic Exception", e);
} finally {
this.lock.writeLock().unlock();
}
}
首先锁的获取和执行逻辑要放到一个 try {}里,然后在 finally {}中释放 。这是一种典型的使用方式,我们可以参考这种方式实现自己的代码。
10.5 路由注册
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,每隔30s向集群中所有NameServer发送心跳包,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。
1)Broker主动上报
// org.apache.rocketmq.broker.BrokerController#start L1572/L1587
this.registerBrokerAll(true, false, true);//BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
// org.apache.rocketmq.broker.BrokerController#registerBrokerAll L1708
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
// org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll L1734
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
// org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper, java.util.List<java.lang.String>, boolean, int, boolean, boolean, java.lang.Long, org.apache.rocketmq.common.BrokerIdentity) L507
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
2)NameServ 处理Broker的请求
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor
网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker
代码:DefaultRequestProcessor#processRequest
switch (request.getCode()) {
case RequestCode.REGISTER_BROKER:
return this.registerBroker(ctx, request);
default:
String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
代码:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker
// ...
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
request.getExtFields().get(MixAll.ZONE_NAME),
requestHeader.getHeartbeatTimeoutMillis(),
requestHeader.getEnableActingMaster(),
topicConfigWrapper,
filterServerList,
ctx.channel()
);
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#brokerAddrTable.put
//维护brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//维护topicQueueTable
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) ||
registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
// 如果是不存在,首次添加的
if (null == queueDataMap) {
queueDataMap = new HashMap<>();
queueDataMap.put(brokerName, queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
// 如果Topic 存在但没有 queueData
final QueueData existedQD = queueDataMap.get(brokerName);
if (existedQD == null) {
queueDataMap.put(brokerName, queueData);
} else if (!existedQD.equals(queueData)) {
// 否则提示 topic更改了
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), existedQD,
queueData);
queueDataMap.put(brokerName, queueData);
}
}
}
//维护brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
//维护filterServerList
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
2.2.3.3 路由删除
Broker
每隔30s向NameServer
发送一个心跳包,心跳包包含BrokerId
,Broker
地址,Broker
名称,Broker
所属集群名称、Broker
关联的FilterServer
列表。但是如果Broker
宕机,NameServer
无法收到心跳包,此时NameServer
如何来剔除这些失效的Broker
呢?NameServer
会每隔10s扫描brokerLiveTable
状态表,如果BrokerLive
的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker
失效,移除该Broker
,关闭与Broker
连接,同时更新topicQueueTable
、brokerAddrTable
、brokerLiveTable
、filterServerTable
。
RocketMQ有两个触发点来删除路由信息:
-
NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
-
Broker在正常关闭的情况下,会执行unregisterBroker指令
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。
代码:NamesrvController#initialize
public boolean initialize() {
loadConfig();
initiateNetworkComponents();
initiateThreadExecutors();
registerProcessor();
startScheduleService();
initiateSslContext();
initiateRpcHooks();
return true;
}
private void startScheduleService() {
// //每隔10s扫描一次为活跃Broker
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
1, 10, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
NamesrvController.this.printWaterMark();
} catch (Throwable e) {
LOGGER.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
}
代码:RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
try {
log.info("start scanNotActiveBroker");
for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
long last = next.getValue().getLastUpdateTimestamp();
long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
if ((last + timeoutMillis) < System.currentTimeMillis()) {
RemotingHelper.closeChannel(next.getValue().getChannel());
log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
this.onChannelDestroy(next.getKey());
}
}
} catch (Exception e) {
log.error("scanNotActiveBroker exception", e);
}
}
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#onChannelDestroy(org.apache.rocketmq.namesrv.routeinfo.BrokerAddrInfo)
public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {
UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();
boolean needUnRegister = false;
if (brokerAddrInfo != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (needUnRegister) {
boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);
log.info("the broker's channel destroyed, submit the unregister request at once, " +
"broker info: {}, submit result: {}", unRegisterRequest, result);
}
}
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#submitUnRegisterBrokerRequest
public boolean submitUnRegisterBrokerRequest(UnRegisterBrokerRequestHeader unRegisterRequest) {
return this.unRegisterService.submit(unRegisterRequest);
}
public boolean submit(UnRegisterBrokerRequestHeader unRegisterRequest) {
return unregistrationQueue.offer(unRegisterRequest);
}
@Override
public void run() {
while (!this.isStopped()) {
try {
final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
unregistrationQueue.drainTo(unregistrationRequests);
// Add polled request
unregistrationRequests.add(request);
this.routeInfoManager.unRegisterBroker(unregistrationRequests);
} catch (Throwable e) {
log.error("Handle unregister broker request failed", e);
}
}
}
// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker
// RouteInfoManager 会移除相关信息
2.2.3.4 路由发现
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
代码:DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
文章来源:https://www.toymoban.com/news/detail-421359.html
文章来源地址https://www.toymoban.com/news/detail-421359.html
到了这里,关于Chapter10-NameServer 源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!