Server启动的流程
通过运行zk的启动脚本,找到zk服务器端的入口类。脚本如下:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
所以,zk的入口类是 QuorumPeerMain
,以下是该类的 main()
方法的完整代码:
public static void main(String[] args){
QuorumPeerMain main=new QuorumPeerMain();
try{
main.initializeAndRun(args);
}catch(IllegalArgumentException e){
......
}
LOG.info("Exiting normally");
}
跟踪方法中的initializeAndRun()
,代码如下:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
方法中,继续跟踪 ZooKeeperServerMain.main(args)
,代码如下:
public static void main(String[] args) {
System.out.println(System.getProperty("log4j.configuration"));
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
......
}
LOG.info("Exiting normally");
ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}
上面的代码就是初始化ZooKeeperServerMain
的main
对象,然后调用initializeAndRun()
方法:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
runFromConfig(config);
}
总结一下就是: QuorumPeerMain.initializeAndRun()->ZooKeeperServerMain.initializeAndRun(),然后initializeAndRun()调用runFromConfig()方法,如下:
public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
LOG.info("Starting server");
......
// 不影响流程的代码删掉了
final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig);
txnLog.setServerStats(zkServer.serverStats());
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));
// Start Admin server
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();
boolean needStartZKServer = true;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
secureCnxnFactory.startup(zkServer, needStartZKServer);
}
containerManager = new ContainerManager(
zkServer.getZKDatabase(),
zkServer.firstProcessor,
Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
Integer.getInteger("znode.container.maxPerMinute", 10000),
Long.getLong("znode.container.maxNeverUsedIntervalMs", 0)
);
containerManager.start();
ZKAuditProvider.addZKStartStopAuditLog();
......
// shutdown 相关的代码也被删掉了
}
上面那么多的代码,只需要关注这一段即可:
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
查看 cnxnFactory 的创建方法:
public static ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor()
.newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
throw ioe;
}
}
由上面的代码可知,默认是使用的NIOServerCnxnFactory类,接下来跟踪NIOServerCnxnFactory.startup()方法:
public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
start();
setZooKeeperServer(zks);
if (startServer) {
zks.startdata();
zks.startup();
}
}
查看第一行代码:
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
注意这里,方法内部是启动多个线程,比如:acceptThread是接收socket的线程(AcceptThread),acceptThread的初始化是在NIOServerCnxnFactory.configure()中实现的:
@Override
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
......
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
NIOServerCnxnFactory.configure() 先于 NIOServerCnxnFactory.startup() 执行,所以,在NIOServerCnxnFactory.startup() 中可直接启动acceptThread的start
然后跟踪最后一行代码,zks.startup():
public synchronized void startup() {
startupWithServerState(State.RUNNING);
}
private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();
startRequestThrottler();
registerJMX();
startJvmPauseMonitor();
registerMetrics();
setState(state);
requestPathMetricsCollector.start();
localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
notifyAll();
}
startupWithServerState() 方法里面就是启动各种线程。目前未详细了解每个线程的用途,暂时不关注。文章来源:https://www.toymoban.com/news/detail-435563.html
至此,服务器端的启动流程大致完成了,一些细节的东西,待后面深入后再细究。文章来源地址https://www.toymoban.com/news/detail-435563.html
到了这里,关于【Zookeeper源码走读】第二章 服务器的启动过程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!