zookeeper源码(03)启动流程

这篇具有很好参考价值的文章主要介绍了zookeeper源码(03)启动流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文将从启动类开始详细分析zookeeper的启动流程:

  1. 加载配置的过程
  2. 集群启动过程
  3. 单机版启动过程

启动类

org.apache.zookeeper.server.quorum.QuorumPeerMain类。

用于启动zookeeper服务,第一个参数用来指定配置文件,配置文件properties格式,例如以下配置参数:

  • dataDir - 数据存储目录
  • dataLogDir - txnlog(事务日志)存储目录,默认dataDir
  • clientPort - 接收客户端连接的端口,例如2181
  • tickTime - leader做quorum验证的周期时长,默认3000ms
  • initLimit - leader等待follower连接、数据同步ack的最大tick数量
  • syncLimit - leader发送同步数据等待ack的最大tick数量
  • server.id - 用于quorum协议的host:port[:port]格式的server列表

加载配置

QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
    config.parse(args[0]);
}

args[0]是配置文件名。

  1. 加载普通配置
  2. 加载quorumPeer配置
  3. 从dynamic文件加载quorumVerifier配置,zk会在processReconfig时生成dynamic文件,默认文件名zoo.cfg.dynamic.${version}格式,默认不开启Reconfig功能
  4. 从dynamic文件加载lastQuorumPeer配置,同上,默认zoo.cfg.dynamic.next文件

加载普通配置

QuorumPeerConfig封装以下字段:

// 监听客户端连接的地址,使用clientPort和clientPortAddress配置确定,用来创建cnxnFactory
protected InetSocketAddress clientPortAddress;
// 用来创建secureCnxnFactory,使用secureClientPort和secureClientPortAddress配置确定
protected InetSocketAddress secureClientPortAddress;
// quorum使用ssl通信
protected boolean sslQuorum = false;
// portUnification配置,使用UnifiedServerSocket创建套接字
protected boolean shouldUsePortUnification = false;
// 用来创建ObserverMaster,该组件可以实现链式的数据复制,减小leader的负载
protected int observerMasterPort;
// 自动重新加载ssl文件
protected boolean sslQuorumReloadCertFiles = false;
// 数据目录和事务log目录
protected File dataDir;
protected File dataLogDir;
// dynamicConfig文件名
protected String dynamicConfigFileStr = null;
// 配置文件名,非配置参数
protected String configFileStr = null;
// leader做quorum验证的周期时长,默认300ms
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
// 单个addr的client最大连接数
protected int maxClientCnxns = 60;
// 超时时长
protected int minSessionTimeout = -1;
protected int maxSessionTimeout = -1;
// metricsProvider.className配置,MetricsProvider实现类全名
protected String metricsProviderClassName = DefaultMetricsProvider.class.getName();
// 封装metricsProvider.*下面的配置
protected Properties metricsProviderConfiguration = new Properties();
// 本地session配置
protected boolean localSessionsEnabled = false;
protected boolean localSessionsUpgradingEnabled = false;
// client连接的backlog数设置
protected int clientPortListenBacklog = -1;
// leader等待follower连接、数据同步ack的最大tick数量
protected int initLimit;
// leader发送同步数据等待ack的最大tick数量
protected int syncLimit;
// 大于0时用来计算连接leader的超时时长
protected int connectToLearnerMasterLimit;
// 必须是3
protected int electionAlg = 3;
// 未使用的配置
protected int electionPort = 2182;
// 监听所有IP地址
protected boolean quorumListenOnAllIPs = false;
// myid配置
protected long serverId = UNSET_SERVERID;

protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;

// autopurge.snapRetainCount配置
protected int snapRetainCount = 3;
// autopurge.purgeInterval配置
protected int purgeInterval = 0;
// 开启同步
protected boolean syncEnabled = true;

protected String initialConfig;

// PARTICIPANT|OBSERVER
protected LearnerType peerType = LearnerType.PARTICIPANT;

/**
 * Configurations for the quorumpeer-to-quorumpeer sasl authentication
 */
protected boolean quorumServerRequireSasl = false;
protected boolean quorumLearnerRequireSasl = false;
protected boolean quorumEnableSasl = false;
protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected int quorumCnxnThreadsSize;

// multi address related configs
// multiAddress.enabled配置
private boolean multiAddressEnabled = Boolean.parseBoolean(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "false"));
// multiAddress.reachabilityCheckEnabled配置
private boolean multiAddressReachabilityCheckEnabled = Boolean.parseBoolean(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED, "true"));
// multiAddress.reachabilityCheckTimeoutMs配置
private int multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
                       String.valueOf(1000)));

// 创建QuorumVerifier时使用,不为null时会创建QuorumOracleMaj
protected String oraclePath;

// Minimum snapshot retain count.
private final int MIN_SNAP_RETAIN_COUNT = 3;
// JVM Pause Monitor feature switch
protected boolean jvmPauseMonitorToRun = false;
// JVM Pause Monitor warn threshold in ms
protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT;
// JVM Pause Monitor info threshold in ms
protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT;
// JVM Pause Monitor sleep time in ms
protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT;

在parse方法的最后一个else分支,会将其他配置前缀zookeeper.之后设置到System环境变量中:

System.setProperty("zookeeper." + key, value);

比如一些SSL相关配置参数:

ssl.quorum.keyStore.location=/path/to/keystore.jks
ssl.quorum.keyStore.password=password
ssl.quorum.trustStore.location=/path/to/truststore.jks
ssl.quorum.trustStore.password=password

解析配置文件之后,会根据ssl相关参数做ssl配置:

if (this.secureClientPortAddress != null) {
    configureSSLAuth();
}

默认会将X509AuthenticationProvider作为ssl认证组件:

// key = "zookeeper.authProvider.x509"
System.setProperty(ProviderRegistry.AUTHPROVIDER_PROPERTY_PREFIX + "x509",
                   "org.apache.zookeeper.server.auth.X509AuthenticationProvider");

其余的都是参数验证代码,不详细说明。

加载quorumPeer配置

// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
if (dynamicConfigFileStr == null) {
    // 解析quorum配置
    setupQuorumPeerConfig(zkProp, true);
    if (isDistributed() && isReconfigEnabled()) { // 默认reconfigEnabled==false分支进不来
        // we don't backup static config for standalone mode.
        // we also don't backup if reconfig feature is disabled.
        // 备份zoo.cfg到zoo.cfg.bak
        backupOldConfig();
    }
}

解析quorum配置:

void setupQuorumPeerConfig(Properties prop,
                           boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(
        prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
    // 读取${dataDir}/myid文件,给serverId赋值
    setupMyId();
    // 对比clientPortAddress配置与quorum配置进行重新赋值
    setupClientPort();
    // 对比peerType配置与quorum配置进行重新赋值
    setupPeerType();
    checkValidity(); // 参数验证
}

parseDynamicConfig方法需要看一下:

public static QuorumVerifier parseDynamicConfig(
        Properties dynamicConfigProp, int eAlg, boolean warnings,
        boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException {
    boolean isHierarchical = false;
    for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        // group.*和weight.*的参数配置
        if (key.startsWith("group") || key.startsWith("weight")) {
            isHierarchical = true;
        } else if (!configBackwardCompatibilityMode &&
                   !key.startsWith("server.") && !key.equals("version")) {
            throw new ConfigException("Unrecognised parameter: " + key);
        }
    }

    QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical, oraclePath);

    // 验证 略
}

private static QuorumVerifier createQuorumVerifier(
        Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException {
    if (oraclePath == null) {
        return createQuorumVerifier(dynamicConfigProp, isHierarchical);
    } else {
        return new QuorumOracleMaj(dynamicConfigProp, oraclePath);
    }
}

private static QuorumVerifier createQuorumVerifier(
        Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException {
    if (isHierarchical) {
        return new QuorumHierarchical(dynamicConfigProp);
    } else {
        return new QuorumMaj(dynamicConfigProp);
    }
}

QuorumMaj

解析集群配置,配置形如:

server.1=server_config;client_config
server.2=server_config;client_config
server.3=server_config;client_config

# 配置两个也能启动,但是只能提供副本能力,无法保证高可用

# 使用分号分隔server_config和client_config

# 1. server_config格式:
#    host:quorumPort:electionPort 或 host:quorumPort:electionPort:type
#    可以配置多个,使用|分隔
#    例如:
#       127.0.0.1:2888:3888:PARTICIPANT

# 2. client_config可以没有,格式: port或host:port

构造方法:

public QuorumMaj(Properties props) throws ConfigException {
    for (Entry<Object, Object> entry : props.entrySet()) {
        String key = entry.getKey().toString();
        String value = entry.getValue().toString();
        if (key.startsWith("server.")) {
            int dot = key.indexOf('.');
            // 获取serverId
            long sid = Long.parseLong(key.substring(dot + 1));
            // 创建QuorumServer对象,解析value字符串
            // value格式: server_config或server_config;client_config
            // server_config格式是使用|分隔的列表,每个元素是:
            // host:quorumPort:electionPort或host:quorumPort:electionPort:type
            // client_config格式: port或host:port
            QuorumServer qs = new QuorumServer(sid, value);
            allMembers.put(Long.valueOf(sid), qs);
            if (qs.type == LearnerType.PARTICIPANT) {
                votingMembers.put(Long.valueOf(sid), qs); // 投票成员
            } else {
                observingMembers.put(Long.valueOf(sid), qs); // observer成员
            }
        } else if (key.equals("version")) {
            version = Long.parseLong(value, 16);
        }
    }
    half = votingMembers.size() / 2; // 成员半数,例如5/2=2
}

QuorumHierarchical

比QuorumMaj多了group和weight等特性。

从dynamic文件加载quorumVerifier配置

用于加载quorumVerifier信息:从dynamicConfigFileStr参数指定的文件加载quorumPeer配置,方式与上一小节一样。

从dynamic文件加载lastQuorumPeer配置

用于加载lastSeenQuorumVerifier信息:从zoo.cfg.dynamic.next文件加载lastSeenQuorumVerifier配置,方式与上一小节一样。

创建并启动DatadirCleanupManager

默认配置时不启动。

// Start and schedule the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
    config.getDataDir(),
    config.getDataLogDir(),
    config.getSnapRetainCount(), // 默认3
    config.getPurgeInterval());
purgeMgr.start();

启动周期任务:

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // 默认不启动
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }

    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

清理逻辑在PurgeTask的run方法:

public void run() {
    try {
        PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
    } catch (Exception e) {}
}

启动集群

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
}

isDistributed判断:

public boolean isDistributed() {
    return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
}

// standaloneEnabled默认true

创建并启动MetricsProvider

final MetricsProvider metricsProvider;
try {
    metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
        config.getMetricsProviderClassName(), // DefaultMetricsProvider
        config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
    throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
// 注册到全局
ServerMetrics.metricsProviderInitialized(metricsProvider);

创建ServerCnxnFactory

ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

if (config.getClientPortAddress() != null) {
    // 默认使用NIOServerCnxnFactory实现类
    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(),
                          config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}

if (config.getSecureClientPortAddress() != null) {
    secureCnxnFactory = ServerCnxnFactory.createFactory();
    secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                                config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}

ServerCnxnFactory有两个主要的子类:

  • NIOServerCnxnFactory
  • NettyServerCnxnFactory

默认使用NIOServerCnxnFactory实现类,可以使用-Dzookeeper.serverCnxnFactory=xx来修改:

-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

ServerCnxnFactory

用于接收客户端连接、管理客户端session、处理客户端请求。

NIOServerCnxnFactory

基于NIO的非阻塞、多线程的ServerCnxnFactory实现类,多线程之间通过queue通信:

  • 1个accept线程,用来接收客户端连接,交给selector线程处理
  • 1-N个selector线程,每个线程会select 1/N个连接,多个selector线程的原因是,由于有大量连接,select()可能会成为性能瓶颈
  • 0-M个socket IO worker线程,做socket读写,如果配置为0则selector线程来做IO
  • 1个清理线程,用于关闭空闲连接

线程数量分配示例:32核的机器,1accept线程,1个清理线程,4个selector线程,64个worker线程。

configure方法:

  • 不支持ssl

  • 创建ConnectionExpirerThread线程

  • 根据核数确定各个线程的数量

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    
    // 64
    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    
  • 创建SelectorThread线程

  • 创建ServerSocketChannel、启动监听、设置非阻塞

  • 创建AcceptThread线程

start方法启动各种线程:

  • acceptThread
  • selectorThreads
  • workerPool
  • expirerThread

NettyServerCnxnFactory

基于Netty的ServerCnxnFactory实现,使用CnxnChannelHandler作为业务处理器。

后续会有文章详细分析。

创建并启动QuorumPeer

管理quorum协议,服务器可能处于以下三种状态:

  • Leader选举 - 每个服务器将选出一个leader,最初都会选自己
  • Follower节点 - 将与Leader同步并复制所有事务
  • Leader节点 - 处理请求并将其转发给Follower节点,大多数Follower节点必须同步,该请求才能被提交

创建QuorumPeer并使用QuorumPeerConfig为其设置属性:

public QuorumPeer() throws SaslException {
    super("QuorumPeer");
    quorumStats = new QuorumStats(this);
    jmxRemotePeerBean = new HashMap<>();
    adminServer = AdminServerFactory.createAdminServer(); // http管理的服务,使用JettyAdminServer实现类
    x509Util = createX509Util();
    initialize();
    reconfigEnabled = QuorumPeerConfig.isReconfigEnabled(); // 默认false不开启Reconfig功能
}

下面记录一下重要的步骤。

创建FileTxnSnapLog

quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));

FileTxnSnapLog类:操作TxnLog和SnapShot的入口类。

此步骤会创建dataDir和snapDir目录、判断数据目录可写、创建txnLog和snapLog对象访问数据文件。

创建并初始化ZKDatabase

quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.initConfigInZKDatabase();

维护zookeeper服务器内存数据库,包括session、dataTree和committedlog数据,从磁盘读取日志和快照后启动。

内部使用DataTree存储数据,先看一下创建和初始化阶段的代码。

构造方法:创建DataTree对象,创建/zookeeper/quota、/zookeeper/config节点,创建dataWatches和childWatches对象(使用WatchManager实现类)。

initConfigInZKDatabase方法:

public synchronized void initConfigInZKDatabase() {
    if (zkDb != null) {
        zkDb.initConfigInZKDatabase(getQuorumVerifier());
    }
}

public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {
    try {
        if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
            // should only happen during upgrade
            this.dataTree.addConfigNode();
        }
        // 把当前QuorumVerifier保存到/zookeeper/config中
        // qv.toString()格式如下:
        // server.1=host1:2888:3888:participant;host1:2181\n
        // server.2=host2:2888:3888:participant;host2:2181\n
        // ...
        // version=2
        this.dataTree.setData(ZooDefs.CONFIG_NODE,
            qv.toString().getBytes(UTF_8), // data
            -1, // version
            qv.getVersion(), // txid
            Time.currentWallTime());
    } catch (NoNodeException e) {}
}

设置QuorumVerifier

quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
    quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}

初始化启动QuorumPeer

// 初始化QuorumAuthServer
quorumPeer.initialize();
// 启动QuorumPeer
quorumPeer.start();
// 线程阻塞
quorumPeer.join();

启动QuorumPeer方法:

public synchronized void start() {
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {}
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}

启动QuorumPeer流程

ZKDatabase加载

从txnlog和snapshot加载dataTree数据:

long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
  1. 倒序查找所有snapshot文件,从文件名解析snapZxid作为dataTree的lastProcessedZxid属性,文件内容解析到dataTree中
  2. 如果从snapshot文件未找到数据,则生成snapshot.0文件,将当前dataTree(空的)保存到里面
  3. 使用fastForwardFromEdits方法从txnlog加载数据

获取currentEpoch和acceptedEpoch的值:

// 当前zxid
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// 当前epoch = zxid >> 32L
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);

从${dataDir}/currentEpoch文件读取currentEpoch值:

currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
  1. 如果文件不存在,直接使用epochOfZxid作为currentEpoch并保存到文件
  2. 如果currentEpoch比epochOfZxid小,则继续查找${dataDir}/currentEpoch.tmp文件作为currentEpoch保存到文件,如果文件不存在则抛数据异常

从${dataDir}/acceptedEpoch文件读取acceptedEpoch值:

acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
  1. 如果文件不存在,直接使用epochOfZxid作为acceptedEpoch并保存到文件
  2. 如果acceptedEpoch比currentEpoch小则抛数据异常

启动serverCnxnFactory

private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        cnxnFactory.start(); // NIOServerCnxnFactory在启动阶段会启动内部的4类线程
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

启动AdminServer

默认使用JettyAdminServer实现类,负责提供管理端的http接口。

启动选举

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 投自己一票,封装zxid和epoch
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // electionType总是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    // TODO: use a factory rather than a switch
    // 可以使用策略模式替换switch语句
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        // 关闭oldQcm
        if (oldQcm != null) {
            oldQcm.halt();
        }
        // 用来启动ServerSocket监听
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        }
        break;
    default:
        assert false;
    }
    return le;
}

创建QuorumCnxManager对象:

public QuorumCnxManager createCnxnManager() {
    // 默认tickTime * syncLimit
    // 按照zoo_sample.cfg文件配置是2000 * 5
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(), // serverId->quorumServer
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(), // 是否监听所有IP默认false
        this.quorumCnxnThreadsSize, // 默认20
        this.isQuorumSaslAuthEnabled());
}

QuorumCnxManager类:

This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
  1. 维护leader选举时server之间的tcp连接
  2. 确保两个server之间存在一个连接,如果两个server同时建立连接,则始终保留id大的一方建立的连接
  3. 队列缓存待发送的消息

FastLeaderElection类:

  • 使用TCP实现leader选举
  • 使用QuorumCnxManager管理连接
  • 某些参数可以改变选举行为,比如finalizeWait参数决定leader确定之前需要等待的时间

启动线程

QuorumPeer继承了ZooKeeperThread类,最后会使用super.start()启动线程。run方法while循环,根据当前的ServerState执行不同的逻辑。

启动单机版服务

启动入口

在QuorumPeerMain的initializeAndRun阶段:

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    // 启动单机版服务
    ZooKeeperServerMain.main(args);
}

ZooKeeperServerMain.main方法:

ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
    main.initializeAndRun(args);
}
// 略

initializeAndRun方法:文章来源地址https://www.toymoban.com/news/detail-712255.html

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 略

    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]); // args[0]是配置文件
    } else {
        config.parse(args); // args = {clientPortAddress, dataDir, tickTime, maxClientCnxns}
    }

    runFromConfig(config);
}

启动流程

  1. 创建FileTxnSnapLog对象
  2. 创建ZooKeeperServer对象
  3. 创建并启动AdminServer组件
  4. 创建并启动cnxnFactory和secureCnxnFactory用于接受客户端连接、处理客户端请求,会启动ZooKeeperServer、ZXDatabase等核心组件
  5. 创建并启动ContainerManager组件

到了这里,关于zookeeper源码(03)启动流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • zookeeper1==zookeeper源码阅读,源码启动ZK集群

    下载源码 Tags · apache/zookeeper · GitHub https://codeload.github.com/apache/zookeeper/zip/refs/tags/release-3.9.1 JDK8 MAVEN3.8.6 mvn -DskipTests=true package 配置ZK1 zkServer.cmd中指出了启动类是 QuorumPeerMain QuorumPeer翻译成集群成员比较合理,Quorum集群Peer成员 在代码目录下新建data文件夹和log文件夹 并在dat

    2024年02月05日
    浏览(32)
  • zookeeper选举流程源码分析

    zookeeper选举流程源码分析 选举的代码主要是在 QuorumPeer.java 这个类中。 它有一个内部枚举类,用来表示当前节点的状态。 LOOKING: 当前节点在选举过程中 FOLLOWING:当前节点是从节点 LEADING: 当前节点是主节点 OBSERVING: 当前节点是观察者状态,这种状态的节点不参与选举的投

    2024年02月11日
    浏览(28)
  • zookeeper源码(01)集群启动

    本文介绍一下zookeeper-3.5.7集群安装。 创建数据、日志目录: 编辑conf/zoo.cfg文件: 默认连接localhost:2181的zookeeper服务,可以使用-server选项指定服务器地址。

    2024年02月07日
    浏览(26)
  • zookeeper源码(04)leader选举流程

    在\\\"zookeeper源码(03)集群启动流程\\\"中介绍了leader选举的入口,本文将详细分析leader选举组件和流程。 quorumPeer的start阶段使用startLeaderElection()方法启动选举 LOOKING状态,投自己一票 createElectionAlgorithm - 创建选举核心组件:QuorumCnxManager(管理连接)、FastLeaderElection(选举)等 quorumPeer的

    2024年02月05日
    浏览(29)
  • zookeeper源码(02)源码编译启动及idea导入

    本文介绍一下zookeeper-3.9.0源码下载、编译及本地启动。 该文件介绍了编译zookeeper需要的环境和命令。 java-1.8.0_102 maven-3.3.9 zookeeper-assembly/target/apache-zookeeper-3.9.0-bin.tar.gz 包目录结构: /bin - 可执行文件及脚本 /conf - 配置文件 /lib - zookeeper包及依赖的包 /docs - 文档 To also build the

    2024年02月08日
    浏览(46)
  • zookeeper源码(08)请求处理及数据读写流程

    用于接收客户端连接、管理客户端session、处理客户端请求。 代表一个客户端连接对象: 从网络读写数据 数据编解码 将请求转发给上层组件或者从上层组件接收响应 管理连接状态,比如:enableRecv、sessionTimeout、stale、invalid等 保存当前的packetsReceived、packetsSent、lastCxid、last

    2024年02月19日
    浏览(27)
  • 菜鸡学习zookeeper源码(三)NIOServer的启动

    上一篇写到了QuorumPeer的start方法,里面主要进行执行了loadDataBase方法(进行加载本地的数据信息,具体是怎么进行加载的,没在文章中进行说明,这块小园子也没看,等分析完整体的启动流程之后在进行分析), 这篇文章的话主要写startServerCnxnFactory方法,在上一篇文章中也进

    2024年01月24日
    浏览(30)
  • 【Zookeeper源码走读】第二章 服务器的启动过程

    通过运行zk的启动脚本,找到zk服务器端的入口类。脚本如下: 所以,zk的入口类是 QuorumPeerMain ,以下是该类的 main() 方法的完整代码: 跟踪方法中的 initializeAndRun() ,代码如下: 方法中,继续跟踪 ZooKeeperServerMain.main(args) ,代码如下: 上面的代码就是初始化 ZooKeeperServerMai

    2024年02月03日
    浏览(27)
  • 【Zookeeper源码走读】第三章 服务器处理客户端请求的流程

    前一篇文章,已经大致介绍了Server的启动流程,在NIOServerCnxnFactory.start()方法中,启动了多个线程,其中就有接收socket报文的线程,代码如下: 注意这里,acceptThread是接收socket的线程(AcceptThread),acceptThread的初始化是在NIOServerCnxnFactory.configure()中实现的: NIOServerCnxnFactory.confi

    2024年02月02日
    浏览(38)
  • 【spring源码系列-03】xml配置文件启动spring时refresh的前置工作

    Spring源码系列整体栏目 内容 链接地址 【一】spring源码整体概述 https://blog.csdn.net/zhenghuishengq/article/details/130940885 【二】通过refresh方法剖析IOC的整体流程 https://blog.csdn.net/zhenghuishengq/article/details/131003428 【三】xml配置文件启动spring时refresh的前置工作 https://blog.csdn.net/zhenghuishen

    2024年02月08日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包