RocketMQ是一个分布式消息中间件,它的核心组件之一是namesrv,负责管理broker的路由信息和kv配置。本文将介绍RocketMQ5.1版本中namesrv的启动过程,包括如何解析命令行参数、加载配置文件、初始化和启动namesrv控制器等。
首先,我们需要在环境变量中设置ROCKETMQ_HOME,指向RocketMQ的安装目录。然后,我们可以使用如下命令启动namesrv:
nohup sh mqnamesrv &
这条命令执行运行的是 namesrv.NamesrvStartup#main
方法。
public static void main(String[] args) {
main0(args);
controllerManagerMain();
}
启动过程分为两部分
-
main0(args)
:启动 NamesrvController -
controllerManagerMain()
:启动 ControllerManager
本文只分析 NamesrvController 的启动
即 namesrv.NamesrvStartup#main0
方法,代码如下:
public static NamesrvController main0(String[] args) {
try {
// 解析命令行参数和配置文件
parseCommandlineAndConfigFile(args);
// 创建并启动 NamesrvController
NamesrvController controller = createAndStartNamesrvController();
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
namesrv的启动过程可以分为以下几个步骤:
- 解析命令行参数和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置对象
- 创建并启动 NamesrvController 对象,该对象是namesrv的核心控制器,它持有各种配置对象、网络通信对象、路由管理对象等
1 解析命令行参数和配置文件
解析命令行参数和配置文件的方法是 namesrv.NamesrvStartup#parseCommandlineAndConfigFile
,代码如下:
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());
if (null == commandLine) {
System.exit(-1);
return;
}
// 创建配置类对象
namesrvConfig = new NamesrvConfig();
nettyServerConfig = new NettyServerConfig();
nettyClientConfig = new NettyClientConfig();
// 设置namesrv的监听端口
nettyServerConfig.setListenPort(9876);
// 如果命令行中有-c参数,则从配置文件中加载namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的属性
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
if (namesrvConfig.isEnableControllerInNamesrv()) {
controllerConfig = new ControllerConfig();
MixAll.properties2Object(properties, controllerConfig);
}
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 如果命令行中有-p参数,则打印namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的属性
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(logConsole, namesrvConfig);
MixAll.printObjectProperties(logConsole, nettyServerConfig);
MixAll.printObjectProperties(logConsole, nettyClientConfig);
if (namesrvConfig.isEnableControllerInNamesrv()) {
MixAll.printObjectProperties(logConsole, controllerConfig);
}
System.exit(0);
}
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
}
上述代码的执行流程如下:
-
创建一个NamesrvConfig对象,用于存储namesrv的配置信息,如rocketmqHome、kvConfigPath等。NamesrvConfig类的具体属性如下表所示
属性名 含义 默认值 rocketmqHome RocketMQ的根目录 环境变量ROCKETMQ_HOME kvConfigPath KV配置文件的路径 ${user.home}/namesrv/kvConfig.json configStorePath 配置存储文件的路径 ${user.home}/namesrv/namesrv.properties productEnvName 环境名称 center clusterTest 是否开启集群测试 false orderMessageEnable 是否支持顺序消息 false -
创建一个NettyServerConfig对象,用于存储netty服务端的配置信息,如listenPort、serverWorkerThreads等。NettyServerConfig类的具体属性如下表所示
属性名 含义 默认值 listenPort 监听端口,用于接收客户端的连接请求 9876 serverWorkerThreads 服务端工作线程数量,用于处理网络IO事件或执行业务任务 8 serverCallbackExecutorThreads 服务端回调执行线程数量,用于执行异步回调任务,如果为0,则使用公共线程池 0 serverSelectorThreads 服务端选择器线程数量,用于接收和分发网络IO事件,建议不要超过3个 3 serverOnewaySemaphoreValue 服务端单向请求信号量值,用于限制单向请求的并发度,防止资源耗尽 256 serverAsyncSemaphoreValue 服务端异步请求信号量值,用于限制异步请求的并发度,防止资源耗尽 64 serverChannelMaxIdleTimeSeconds 服务端通道最大空闲时间(秒),超过该时间则关闭连接,释放资源 120 serverSocketSndBufSize 服务端Socket发送缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1 serverSocketRcvBufSize 服务端Socket接收缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1 serverPooledByteBufAllocatorEnable 是否启用服务端池化ByteBuf分配器,可以提高内存利用率和性能 true useEpollNativeSelector 是否使用Epoll本地选择器,可以提高Linux平台下的网络IO效率,需要操作系统支持 false -
创建一个NettyClientConfig对象,用于存储netty客户端的配置信息,如clientWorkerThreads、clientCallbackExecutorThreads等。NettyClientConfig类的具体属性如下表所示
属性名 含义 默认值 clientWorkerThreads 客户端工作线程数量,用于处理网络IO事件或执行业务任务 4 clientCallbackExecutorThreads 客户端回调执行线程数量,用于执行异步回调任务,如果为0,则使用公共线程池 可用处理器的数量 connectTimeoutMillis 连接超时时间(毫秒),超过该时间则放弃连接 3000 channelNotActiveInterval 通道不活跃的间隔时间(毫秒),超过该时间则关闭通道,释放资源 1000 * 60 clientChannelMaxIdleTimeSeconds 客户端通道最大空闲时间(秒),超过该时间则关闭连接,释放资源 120 clientSocketSndBufSize 客户端Socket发送缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1 clientSocketRcvBufSize 客户端Socket接收缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1 clientPooledByteBufAllocatorEnable 是否启用客户端池化ByteBuf分配器,可以提高内存利用率和性能 false clientCloseSocketIfTimeout 是否在超时时关闭客户端Socket,可以避免资源泄漏 false useTLS 是否使用TLS协议进行加密通信,需要操作系统支持 false clientOnewaySemaphoreValue 客户端单向请求信号量值,用于限制单向请求的并发度,防止资源耗尽 65535 clientAsyncSemaphoreValue 客户端异步请求信号量值,用于限制异步请求的并发度,防止资源耗尽 65535 -
解析命令行参数,支持以下选项:
-
-c
指定配置文件路径,如果指定了配置文件,会从文件中加载namesrvConfig、nettyServerConfig和nettyClientConfig的属性。 -
-p
打印所有配置信息,并退出程序 - 其他选项会被转换为properties对象,并覆盖namesrvConfig、nettyServerConfig和nettyClientConfig的属性
-
2 创建并启动 NamesrvController
创建并启动 NamesrvController 的方法是 namesrv.NamesrvStartup#createAndStartNamesrvController
:
public static NamesrvController createAndStartNamesrvController() throws Exception {
// 创建 NamesrvController
NamesrvController controller = createNamesrvController();
// 启动 NamesrvController
start(controller);
// 输出启动成功的日志
NettyServerConfig serverConfig = controller.getNettyServerConfig();
String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
}
代码执行流程如下:
- 调用createNamesrvController方法,创建NamesrvController对象
- 调用start方法,传入NamesrvController对象,启动 NamesrvController 对象
- 格式化一个提示信息,包含序列化类型、绑定地址和监听端口信息
- 使用log对象记录提示信息到日志文件中
- 把提示信息打印到控制台中
- 返回NamesrvController对象
如果在执行中出现异常,则抛出异常并退出程序
2.1 创建 NamesrvController 对象
在 namesrv.NamesrvStartup#createAndStartNamesrvController
方法中调用了 namesrv.NamesrvStartup#createNamesrvController
方法,用于创建 NamesrvController 对象,代码为:
public static NamesrvController createNamesrvController() {
// 创建 NamesrvController,传入namesrvConfig、nettyServerConfig、nettyClientConfig配置
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
创建 NamesrvController 对象的步骤如下:
- 创建一个NamesrvController对象,传入namesrvConfig、nettyServerConfig和nettyClientConfig对象,这些对象存储了namesrv的业务配置和网络配置
- 调用NamesrvController对象的getConfiguration方法,获取一个Configuration对象,该对象负责管理namesrv的配置信息
- 调用Configuration对象的registerConfig方法,传入properties对象,将properties对象中的配置信息注册到Configuration对象中
- 返回NamesrvController对象
这个函数的返回值是NamesrvController对象
2.2 启动 NamesrvController 对象
在 namesrv.NamesrvStartup#createAndStartNamesrvController
方法中调用了 namesrv.NamesrvStartup#start
方法,用于启动创建好的 NamesrvController 对象,代码为:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。
// 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
// 启动controller
controller.start();
return controller;
}
启动 NamesrvController 对象的步骤如下:
第一步:初始化 controller
调用 namesrv.NamesrvController#initialize
方法,初始化controller。如果初始化失败,就调用controller的shutdown方法,关闭controller,并退出程序
public boolean initialize() {
loadConfig();
initiateNetworkComponents();
initiateThreadExecutors();
registerProcessor();
startScheduleService();
initiateSslContext();
initiateRpcHooks();
return true;
}
初始化操作包括:
-
调用
namesrv.NamesrvController#loadConfig
方法从 NamesrvConfig 对象中保存的 kvConfigPath 指定的文件中加载kv配置,并创建一个KVConfigManager对象,用于管理和打印kv配置private void loadConfig() { this.kvConfigManager.load(); }
-
调用
namesrv.NamesrvController#initiateNetworkComponents
方法初始化网络组件,包括remotingClient和remotingServerremotingClient是一个NettyRemotingClient对象,它用于向其他服务发送请求或响应。remotingServer是一个NettyRemotingServer对象,它用于接收和处理来自其他服务的请求或响应。BrokerHousekeepingService对象用于处理broker的连接和断开事件。
private void initiateNetworkComponents() { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingClient = new NettyRemotingClient(this.nettyClientConfig); }
-
调用
namesrv.NamesrvController#initiateThreadExecutors
方法初始化两个线程池,一个是defaultExecutor,用于处理默认的远程请求;另一个是clientRequestExecutor,用于处理客户端的路由信息请求。这两个线程池都使用了LinkedBlockingQueue作为任务队列,并且重写了newTaskFor方法,使用FutureTaskExt包装了Runnable任务。private void initiateThreadExecutors() { this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity()); this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) { @Override protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { return new FutureTaskExt<>(runnable, value); } }; this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity()); this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) { @Override protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { return new FutureTaskExt<>(runnable, value); } }; }
-
调用
namesrv.NamesrvController#registerProcessor
方法根据namesrvConfig.isClusterTest()
的值,选择使用ClusterTestRequestProcessor或者DefaultRequestProcessor作为默认处理器- ClusterTestRequestProcessor是一个用于集群测试的处理器,它会在请求前后添加一些环境信息,比如产品环境名称、请求时间等
- DefaultRequestProcessor是一个用于正常运行的处理器,它会根据请求的类型,调用不同的方法来处理,比如注册Broker、获取路由信息、更新配置等。
- 在
namesrvConfig.isClusterTest() = false
时如果收到请求的requestCode
等于RequestCode.GET_ROUTEINFO_BY_TOPIC
则会使用ClientRequestProcessor来处理;当收到其他请求时,会使用DefaultRequestProcessor来处理。
private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor); } else { // Support get route info only temporarily ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor); this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor); } }
此部分为 RIP 29 Optimize RocketMQ NameServer 中
Thread pool separation
改进。在改进之前nameserver使用同一个线程池和队列来处理所有的客户端路由请求、broker注册请求等,如果其中一种类型的请求爆发,会影响所有的请求。为了解决这个问题,RIP-29 将最重要的客户端路由请求单独隔离出来,使用不同的线程池和队列,队列大小和线程数都可以配置,这样可以保证不同类型的请求之间不会相互影响,如下图所示 -
调用
namesrv.NamesrvController#startScheduleService
方法启动定时服务,执行以下三个任务:- 每隔一段时间扫描不活跃的broker,并清理路由信息
- 每隔 10 分钟打印所有的KV配置信息
- 每隔 1 秒打印线程池的水位日志,即客户端请求线程池和默认线程池的队列大小和头部任务的慢时间(从创建到执行的时间)
private void startScheduleService() { // 周期性扫描不活跃的Broker,并从路由信息中移除 this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS); // 每隔 10 分钟打印KVConfig 信息 this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES); // 每隔 1 秒打印线程池的水位日志, // 即客户端请求线程池和默认线程池的队列大小和头部任务的慢时间(从创建到执行的时间) this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { NamesrvController.this.printWaterMark(); } catch (Throwable e) { LOGGER.error("printWaterMark error.", e); } }, 10, 1, TimeUnit.SECONDS); }
-
调用
namesrv.NamesrvController#initiateSslContext
方法初始化SSL上下文,即配置remotingServer使用TLS协议进行安全通信 -
调用
namesrv.NamesrvController#initiateRpcHooks
方法注册RPC钩子,即在remotingServer处理请求之前或之后执行一些自定义的逻辑private void initiateRpcHooks() { this.remotingServer.registerRPCHook(new ZoneRouteRPCHook()); }
在
namesrv.route.ZoneRouteRPCHook
类中重写了doAfterResponse
方法,它会在处理GET_ROUTEINFO_BY_TOPIC请求(即请求的requestCode = RequestCode.GET_ROUTEINFO_BY_TOPIC
)时,根据请求中的zoneName参数,过滤掉不属于该区域的broker和queue数据,从而实现区域隔离的功能。具体来说,
doAfterResponse
会设置 response 的 body 为namesrv.route.ZoneRouteRPCHook#filterByZoneName
方法返回值的字节数组格式,filterByZoneName
方法作用是返回过滤掉不属于该区域的broker和queue数据后的TopicRouteData数据对象@Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { if (RequestCode.GET_ROUTEINFO_BY_TOPIC != request.getCode()) { return; } // 省略部分代码 TopicRouteData topicRouteData = RemotingSerializable.decode(response.getBody(), TopicRouteData.class); response.setBody(filterByZoneName(topicRouteData, zoneName).encode()); }
第二步:注册 JVM 钩子
通过addShutdownHook方法注册一个ShutdownHookThread对象,即 JVM 钩子,用来在程序终止时调用controller的shutdown方法,释放资源
public void shutdown() {
this.remotingClient.shutdown();
this.remotingServer.shutdown();
this.defaultExecutor.shutdown();
this.clientRequestExecutor.shutdown();
this.scheduledExecutorService.shutdown();
this.scanExecutorService.shutdown();
this.routeInfoManager.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
第二步:启动 controller
调用 namesrv.NamesrvController#start
方法,启动 controller
public void start() throws Exception {
this.remotingServer.start();
// In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
if (0 == nettyServerConfig.getListenPort()) {
nettyServerConfig.setListenPort(this.remotingServer.localListenPort());
}
this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
+ ":" + nettyServerConfig.getListenPort()));
this.remotingClient.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
this.routeInfoManager.start();
}
启动操作包括:文章来源:https://www.toymoban.com/news/detail-404429.html
- 调用remotingServer对象的start方法,启动一个NettyRemotingServer,用于接收和处理客户端的请求。
- 如果nettyServerConfig对象的listenPort属性为0,说明是由操作系统自动分配一个可用端口,那么将remotingServer对象的localListenPort属性赋值给nettyServerConfig对象的listenPort属性,保持一致。
- 调用remotingClient对象的updateNameServerAddressList方法,更新本地地址列表,只包含当前机器的IP地址和端口号。
- 调用remotingClient对象的start方法,启动一个NettyRemotingClient,用于向其他服务发送请求。
- 如果fileWatchService对象不为空,调用它的start方法,启动一个文件监视服务,用于动态加载证书文件。
- 调用routeInfoManager对象的start方法,启动一个路由信息管理器,用于维护Broker和Topic的路由关系。
至此,NamesrvController 的启动流程结束文章来源地址https://www.toymoban.com/news/detail-404429.html
到了这里,关于RocketMQ 5.1 NameServer 启动流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!