zookeeper源码(04)leader选举流程

这篇具有很好参考价值的文章主要介绍了zookeeper源码(04)leader选举流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在"zookeeper源码(03)集群启动流程"中介绍了leader选举的入口,本文将详细分析leader选举组件和流程。

leader选举流程(重要)

  1. quorumPeer的start阶段使用startLeaderElection()方法启动选举
  2. LOOKING状态,投自己一票
  3. createElectionAlgorithm - 创建选举核心组件:QuorumCnxManager(管理连接)、FastLeaderElection(选举)等
  4. quorumPeer的main loop根据当前状态执行不同流程

状态与流程:

  • LOOKING - 使用fastLeaderElection.lookForLeader选举

    1. 递增选举epoch开启新一轮选举
    2. 使用自己的serverId、zxid、currentEpoch初始化投票决议
    3. 把选票发出去
    4. 循环接收其他server的选票:
      • LOOKING选票:对比选举epoch、currentEpoch、zxid、serverId决定投给哪个server,若是超过半数节点同意该决议,则将该server确定为leader
      • FOLLOWING选票:对比选举epoch后将选票投给当前leader
      • LEADING选票:对比选举epoch后将选票投给当前leader
  • LEADING - 创建Leader对象执行lead逻辑

    1. zkServer加载数据
    2. 启动quorum监听
    3. 根据各个follower的当前epoch确定新的epoch和zxid
    4. 给follower同步数据
    5. 启动zkServer
    6. 每间隔tick验证多数follower同步状态
  • FOLLOWING - 创建Follower对象指定followLeader逻辑

    1. connectToLeader - 连接leader服务器
    2. registerWithLeader - 向leader发送当前epoch,等待leader发送新一轮的epoch
    3. syncWithLeader - 接收leader同步的数据:txnlog、committedlog、snapshot
    4. 保持通信处理来自leader的数据包
  • OBSERVING - 创建Observer对象执行observeLeader逻辑,基本与FOLLOWING相同

启动leader选举

QuorumPeer的startLeaderElection方法是启动选举的入口:

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 投自己一票,封装zxid和epoch
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // electionType总是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    // TODO: use a factory rather than a switch
    // 可以使用策略模式替换switch语句
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        // 关闭oldQcm
        if (oldQcm != null) {
            oldQcm.halt();
        }
        // 用来启动serverSocket监听
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        }
        break;
    default:
        assert false;
    }
    return le;
}

public QuorumCnxManager createCnxnManager() {
    // socket超时设置使用,默认tickTime * syncLimit
    // 按照zoo_sample.cfg文件配置是2000 * 5
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(), // serverId->quorumServer
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(), // 是否监听所有IP默认false
        this.quorumCnxnThreadsSize, // 默认20
        this.isQuorumSaslAuthEnabled());
}

QuorumCnxManager类

概述:

This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
  1. 维护leader选举时server之间的tcp连接
  2. 确保两个server之间存在一个连接,如果两个server同时建立连接,则始终保留id大的一方建立的连接
  3. 使用队列缓存待发送的消息

主要字段

// 用于执行QuorumConnectionReqThread和QuorumConnectionReceiverThread
private ThreadPoolExecutor connectionExecutor;

// 管理sid -> SendWorker/BlockingQueue/ByteBuffer
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

// 接收队列
public final BlockingQueue<Message> recvQueue;

主要方法

public void initiateConnection(final MultipleAddresses electionAddr, final Long sid);
// 将initiateConnection方法放到了QuorumConnectionReqThread中然后提交给connectionExecutor异步执行
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid);

private boolean startConnection(Socket sock, Long sid) throws IOException;

public void receiveConnection(final Socket sock);
// 将receiveConnection方法放到了QuorumConnectionReceiverThread中然后提交给connectionExecutor异步执行
public void receiveConnectionAsync(final Socket sock);

public void toSend(Long sid, ByteBuffer b);

boolean connectOne(long sid, MultipleAddresses electionAddr);
void connectOne(long sid);
public void connectAll();

其余工具方法不分析。

initiateConnection方法

创建Socket对象,如有必要则做ssl握手和认证,发送初始化数据包。如果自己id小则关闭连接,以确保两个server之间存在一个连接。

public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
    Socket sock = null;
    try {
        // 创建Socket
        if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
        } else {
            sock = SOCKET_FACTORY.get();
        }
        setSockOpts(sock); // socket设置例如timeout
        // 连接目标peer
        sock.connect(electionAddr.getReachableOrOne(), cnxTO);
        // ssl握手
        if (sock instanceof SSLSocket) {
            SSLSocket sslSock = (SSLSocket) sock;
            sslSock.startHandshake();
        }
    } catch (X509Exception e) {
        closeSocket(sock);
        return;
    } catch (UnresolvedAddressException | IOException e) {
        closeSocket(sock);
        return;
    }

    try {
        // 发连接初始化数据包、sasl认证
        // 如果selfId小于对方,关闭连接
        // 创建SendWorker、RecvWorker并启动
        // 创建对应sid的发送队列
        startConnection(sock, sid);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

startConnection方法

  1. 发连接初始化数据包、sasl认证
  2. 如果selfId小于对方,关闭连接
  3. 创建SendWorker、RecvWorker并启动
  4. 创建对应sid的发送队列
private boolean startConnection(Socket sock, Long sid) throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // 输出流
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);

        // 发协议版本、myid、address初始化数据包
        long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
        dout.writeLong(protocolVersion);
        dout.writeLong(self.getMyId());

        // now we send our election address. For the new protocol version, we can send multiple addresses.
        Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
                ? self.getElectionAddress().getAllAddresses()
                : Arrays.asList(self.getElectionAddress().getOne());

        String addr = addressesToSend.stream()
                .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();

        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        closeSocket(sock);
        return false;
    }

    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        authLearner.authenticate(sock, qps.hostname);
    }

    if (sid > self.getMyId()) { // If lost the challenge, then drop the new connection
        closeSocket(sock);
    } else {
        // 创建SendWorker、RecvWorker
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        // 创建发送队列
        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();

        return true;
    }
    return false;
}

receiveConnection方法

当server收到连接请求,如果change获胜(selfId大于对方),将关闭该连接,由自己去连接对方。

public void receiveConnection(final Socket sock) {
    DataInputStream din = null;
    try {
        // 输入流
        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        handleConnection(sock, din);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

private void handleConnection(Socket sock, DataInputStream din) throws IOException {
    Long sid = null, protocolVersion = null;
    MultipleAddresses electionAddr = null;

    try {
        protocolVersion = din.readLong();
        if (protocolVersion >= 0) { // this is a server id and not a protocol version
            sid = protocolVersion;
        } else {
            try {
                InitialMessage init = InitialMessage.parse(protocolVersion, din);
                sid = init.sid;
                if (!init.electionAddr.isEmpty()) {
                    electionAddr = new MultipleAddresses(init.electionAddr,
                            Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
                }
            } catch (InitialMessage.InitialMessageException ex) {
                closeSocket(sock);
                return;
            }
        }

        if (sid == QuorumPeer.OBSERVER_ID) {
            // Choose identifier at random. We need a value to identify the connection.
            sid = observerCounter.getAndDecrement();
        }
    } catch (IOException e) {
        closeSocket(sock);
        return;
    }

    // do authenticating learner
    authServer.authenticate(sock, din);
    // If wins the challenge, then close the new connection.
    if (sid < self.getMyId()) { // 对方比自己id小,需要关闭当前连接,由自己去连接对方
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        // 关闭连接
        closeSocket(sock);

        if (electionAddr != null) {
            connectOne(sid, electionAddr); // 连接对方
        } else {
            connectOne(sid);
        }
    } else if (sid == self.getMyId()) {
    } else { // 创建SendWorker、RecvWorker和发送队列
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();
    }
}

toSend方法

发消息。

public void toSend(Long sid, ByteBuffer b) {
    // 如果是给自己的消息,直接发给recvQueue
    if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
    } else {
        // 将消息发给sid对应的发送队列
        BlockingQueue<ByteBuffer> bq =
            queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
        addToSendQueue(bq, b);
        // 检查是否建立了连接
        connectOne(sid);
    }
}

connectOne方法

synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
    // 已经建立过连接
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && electionAddr.size() > 1 &&
            self.isMultiAddressReachabilityCheckEnabled()) {
            // check是否可达
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return true;
    }
    // 异步建立新连接
    return initiateConnectionAsync(electionAddr, sid);
}

synchronized void connectOne(long sid) {
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return;
    }
    // 使用sid从lastCommittedView、lastProposedView中解析address之后在建立连接
    synchronized (self.QV_LOCK) {
        boolean knownId = false;
        // Resolve hostname for the remote server before attempting to
        // connect in case the underlying ip address has changed.
        self.recreateSocketAddresses(sid);
        Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
        QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
        Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
        if (lastCommittedView.containsKey(sid)) {
            knownId = true;
            if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
                return;
            }
        }
        if (lastSeenQV != null
            && lastProposedView.containsKey(sid)
            && (!knownId ||
                !lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
            knownId = true;
            if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
                return;
            }
        }
    }
}

connectAll方法

Try to establish a connection with each server if one doesn't exist.

public void connectAll() {
    long sid;
    for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
        sid = en.nextElement();
        connectOne(sid);
    }
}

Listener类

用来启动serverSocket监听,一个线程类,在run方法启动监听:

public void run() {
    if (!shutdown) {
        Set<InetSocketAddress> addresses;

        // 获取需要监听的地址
        if (self.getQuorumListenOnAllIPs()) {
            addresses = self.getElectionAddress().getWildcardAddresses();
        } else {
            addresses = self.getElectionAddress().getAllAddresses();
        }
        // 用于阻塞等待
        CountDownLatch latch = new CountDownLatch(addresses.size());
        // 为每一个监听地址创建ListenerHandler
        listenerHandlers = addresses.stream().map(address ->
                        new ListenerHandler(address,self.shouldUsePortUnification(),
                                            self.isSslQuorum(), latch))
                .collect(Collectors.toList());

        final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        try {
            // 启动ListenerHandler
            listenerHandlers.forEach(executor::submit);
        } finally {
            executor.shutdown();
        }

        try {
            // 阻塞等待,ListenerHandler结束之后会countdown
            latch.await();
        } catch (InterruptedException ie) {
        } finally {
            // Clean up for shutdown 略
        }
    }
    // 略
}

ListenerHandler run方法:

public void run() {
    try {
        // 接受连接
        acceptConnections();
        try {
            close();
        } catch (IOException e) {}
    } catch (Exception e) {
    } finally {
        latch.countDown();
    }
}

private void acceptConnections() {
    int numRetries = 0;
    Socket client = null;

    while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
        try {
            // 创建ServerSocket并bind端口
            serverSocket = createNewServerSocket();
            while (!shutdown) {
                try {
                    // 接受客户端Socket
                    client = serverSocket.accept();
                    setSockOpts(client); // socket设置如timeout
                    // 使用receiveConnection处理新的连接
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
                        receiveConnection(client);
                    }
                    numRetries = 0;
                } catch (SocketTimeoutException e) {}
            }
        } catch (IOException e) {
            // 略
        }
    }
    // 略
}

QuorumConnectionReqThread类

用于异步连接其他peer服务,run方法调用initiateConnection方法建立连接。

QuorumConnectionReceiverThread类

用于异步接受连接,run方法调用receiveConnection方法处理新建立的连接。

SendWorker类

Thread to send messages. Instance waits on a queue, and send a message as soon as there is one available. If connection breaks, then opens a new one.

用来发送消息的线程:

  • 封装sid、socket、连接输出流
  • 从发送队列取消息,通过输出流发送

RecvWorker类

Thread to receive messages. Instance waits on a socket read. If the channel breaks, then removes itself from the pool of receivers.

用来读取消息的线程:

public void run() {
    threadCnt.incrementAndGet();
    try {
        while (running && !shutdown && sock != null) {
            // 读取消息长度
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException("Received packet with invalid packet: " + length);
            }
            // 读取数据
            final byte[] msgArray = new byte[length];
            din.readFully(msgArray, 0, length);
            // 保存到接收队列
            addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
        }
    } catch (Exception e) {
    } finally {
        sw.finish();
        closeSocket(sock);
    }
}

FastLeaderElection类

文档说明:

Implementation of leader election using TCP. It uses an object of the class QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based as with the other UDP implementations. There are a few parameters that can be tuned to change its behavior. First, finalizeWait determines the amount of time to wait until deciding upon a leader. This is part of the leader election algorithm.
  1. 使用tcp实现leader选举,基于推送模式
  2. 使用QuorumCnxManager对象管理连接

构造方法

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<>();
    recvqueue = new LinkedBlockingQueue<>();
    // 用来启动WorkerSender和WorkerReceiver
    this.messenger = new Messenger(manager);
}

主要字段

// 在leader最终确定之前尝试拉取变化选票的时长
static final int finalizeWait = 200;

// 投票箱,用于保存一轮选举的结果、统计选举结果
private SyncedLearnerTracker leadingVoteSet;

// 发送队列
LinkedBlockingQueue<ToSend> sendqueue;
// 接收队列
LinkedBlockingQueue<Notification> recvqueue;

// 用来启动WorkerSender和WorkerReceiver
Messenger messenger;

// 决议leaderId
long proposedLeader;
// 决议zxid
long proposedZxid;
// 决议epoch
long proposedEpoch;

start方法启动选举

public void start() {
    this.messenger.start(); // 会启动WorkerSender和WorkerReceiver两个线程
}

Messenger类

WorkerSender线程

  1. 从sendqueue取ToSend消息
  2. 通过QuorumCnxManager的toSend方法发送消息

WorkerReceiver线程

  1. 通过QuorumCnxManager的pollRecvQueue取接收的消息
  2. 封装Notification对象,推送到recvqueue队列

主要方法

// 创建发送消息
static ByteBuffer buildMsg(
    int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData);

// 给所有节点发Notification投票
private void sendNotifications();

// 对比serverId、zxid、currentEpoch决定将票投给哪个server
protected boolean totalOrderPredicate(
    long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch);

// 给定一个Vote集,返回SyncedLearnerTracker对象,用来确定是否有足够的选票确定选举结束
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote);

// 如果有leader当选,并且有足够的选票,必须检查该leader是否投票并确认其处于领先地位
// 需要进行这种检查,以避免peers一次又一次地选举一个已经崩溃且不再领先的peer
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch);

// 更新proposedLeader、proposedZxid、proposedEpoch
// 确定leader或者为下一轮投票做准备
synchronized void updateProposal(long leader, long zxid, long epoch);

// 使用当前proposedLeader、proposedZxid、proposedEpoch创建Vote(选票)
public synchronized Vote getVote();

// 通过zkDb获取lastLoggedZxid
private long getInitLastLoggedZxid();

// 获取currentEpoch
private long getPeerEpoch();

// 根据参数proposedLeader更新peer状态
// 如果已经是leader会使用voteSet更新leadingVoteSet
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet);

// 启动一轮leader选举
// 当状态变为LOOKING该方法就会被调用,会给其他peer发投票notification
public Vote lookForLeader() throws InterruptedException;

// 收到FOLLOWING状态notification
private Vote receivedFollowingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
    SyncedLearnerTracker voteSet, Notification n);

// 收到LEADING状态notification
private Vote receivedLeadingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
    SyncedLearnerTracker voteSet, Notification n);

buildMsg方法

static ByteBuffer buildMsg(int state, long leader, long zxid,
                           long electionEpoch, long epoch, byte[] configData) {
    byte[] requestBytes = new byte[44 + configData.length];
    ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

    requestBuffer.clear();
    requestBuffer.putInt(state); // 当前状态
    requestBuffer.putLong(leader); // 投票的leaderId
    requestBuffer.putLong(zxid); // zxid
    requestBuffer.putLong(electionEpoch); // 选举epoch
    requestBuffer.putLong(epoch); // 数据epoch
    requestBuffer.putInt(Notification.CURRENTVERSION); // 0x2
    requestBuffer.putInt(configData.length); // 数据长度
    requestBuffer.put(configData); // quorumVerifier数据

    return requestBuffer;
}

totalOrderPredicate方法

对比serverId、zxid、currentEpoch决定将票投给哪个server:

protected boolean totalOrderPredicate(
    long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {

    if (self.getQuorumVerifier().getWeight(newId) == 0) {
        return false;
    }

    /*
     * Return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch)
            || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}

getVoteTracker方法

给定一个Vote集,返回SyncedLearnerTracker对象,用来确定是否有足够的选票宣布选举结束:

protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
        && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 比对其他server响应的选票和本地的选票,决定是否将选票sid放入ack集
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey()); // key是sid
        }
    }

    return voteSet;
}

checkLeader方法

protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {

    boolean predicate = true;

    if (leader != self.getMyId()) {
        if (votes.get(leader) == null) { // leader服务器必须投票,否则次轮投票也无效
            predicate = false;
        } else if (votes.get(leader).getState() != ServerState.LEADING) {
            // leader服务器的状态必须是LEADING,否则次轮投票也无效
            predicate = false;
        }
    } else if (logicalclock.get() != electionEpoch) { // 选举epoch必须一致
        predicate = false;
    }

    return predicate;
}

lookForLeader方法

启动一轮leader选举,当状态变为LOOKING该方法就会被调用,会给其他peer发投票notification通知:

public Vote lookForLeader() throws InterruptedException {
    // 略
    try {
        // 存储当前选举周期的sid -> vote选票数据
        Map<Long, Vote> recvset = new HashMap<>();

        // 存储之前选举周期的sid -> vote选票数据
        Map<Long, Vote> outofelection = new HashMap<>();

        int notTimeout = minNotificationInterval;

        synchronized (this) {
            logicalclock.incrementAndGet(); // 递增选举epoch开始新一轮选举
            // 初始化选举"决议",最开始都是投票给自己
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        // 给所有节点发通知
        sendNotifications();
        // 投票箱
        SyncedLearnerTracker voteSet = null;

        // 正常情况下直到选出leader才会退出
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            if (n == null) {
                // 重发或者重连
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                // 略

            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                switch (n.state) {
                case LOOKING:
                    // 略
                    // 对方的选举epoch比自己大
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch); // 同步为新的epoch
                        recvset.clear(); // 清空投票集
                        // 比对选票,如果对方赢了,则使用对方的选票更新到本地
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        // 把最新的选票发出去
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        // 对方的选举epoch比自己小
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                   proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    // 保存到选票集
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    // 创建投票箱
                    voteSet = getVoteTracker(
                        recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                    // 判断acks>half表示已经选举出了leader
                    if (voteSet.hasAllQuorums()) {

                        // 等待拉取变化的选票
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, 
                                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 设置peer状态
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(
                                proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    break;
                case FOLLOWING:
                    // 收到FOLLOWING通知
                    Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                    if (resultFN == null) {
                        break;
                    } else {
                        return resultFN;
                    }
                case LEADING:
                    // 收到LEADING通知
                    Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                    if (resultLN == null) {
                        break;
                    } else {
                        return resultLN;
                    }
                default:
                    break;
                }
            } else {
                // 略
            }
        }
        return null;
    } finally {
        // 略
    }
}

receivedFollowingNotification方法

收到FOLLOWING状态notification。

private Vote receivedFollowingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
    // 也会将选票投给当前leader
    // 之后会进行quorum验证和leaderCheck验证
    if (n.electionEpoch == logicalclock.get()) {
        // 创建投票箱
        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        voteSet = getVoteTracker(
            recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        // acks>half和leaderCheck
        if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
            // 更新节点状态
            setPeerState(n.leader, voteSet);
            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }

    // 当本节点较晚进入集群,集群已经有了leader时,会进入下面逻辑
    // 与前面的代码基本相同
    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
    voteSet = getVoteTracker(
        outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
        synchronized (this) {
            logicalclock.set(n.electionEpoch);
            setPeerState(n.leader, voteSet);
        }
        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
        leaveInstance(endVote);
        return endVote;
    }

    return null;
}

receivedLeadingNotification方法

收到LEADING状态notification。

private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
                                         SyncedLearnerTracker voteSet, Notification n) {
    Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
    if (result == null) {
        if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
            // 略
        } else {
            return null;
        }
    } else {
        return result;
    }
}

QuorumPeer类

管理quorum协议,服务器可能处于以下三种状态:

  • Leader选举 - 每个服务器将选出一个leader,最初都会选自己
  • Follower节点 - 将与Leader同步并复制所有事务
  • Leader节点 - 处理请求并将其转发给Follower节点,大多数Follower节点必须同步,该请求才能被提交

run方法main loop

run方法main loop判断当前peer状态,执行选举、lead、follow等逻辑:

public void run() {
    // 略

    try {
        // Main loop
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    // 略
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    updateServerState();

                    // Add delay jitter before we switch to LOOKING
                    // state to reduce the load of ObserverMaster
                    if (isRunning()) {
                        Observer.waitForObserverElectionDelay();
                    }
                }
                break;
            case FOLLOWING:
                try {
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    updateServerState();
                }
                break;
            case LEADING:
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
        }
    } finally {
        // 略
    }
}

LOOKING分支

try {
    reconfigFlagClear();
    if (shuttingDownLE) {
        shuttingDownLE = false;
        startLeaderElection();
    }
    // 使用FastLeaderElection选举
    setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
    setPeerState(ServerState.LOOKING); // 重置为LOOKING状态
}

FOLLOWING分支

try {
    setFollower(makeFollower(logFactory));
    follower.followLeader(); // 启动follower
} catch (Exception e) {
} finally {
    follower.shutdown();
    setFollower(null);
    updateServerState(); // 更新服务状态
}

创建Follower对象:

protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

LEADING分支

try {
    setLeader(makeLeader(logFactory));
    leader.lead(); // 启动leader
    setLeader(null);
} catch (Exception e) {
} finally {
    if (leader != null) {
        leader.shutdown("Forcing shutdown");
        setLeader(null);
    }
    updateServerState(); // 更新服务状态
}

创建Leader对象:

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
    return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

OBSERVING分支

try {
    setObserver(makeObserver(logFactory));
    observer.observeLeader();
} catch (Exception e) {
} finally {
    observer.shutdown();
    setObserver(null);
    updateServerState();

    // Add delay jitter before we switch to LOOKING
    // state to reduce the load of ObserverMaster
    if (isRunning()) {
        Observer.waitForObserverElectionDelay();
    }
}

创建Observer对象:文章来源地址https://www.toymoban.com/news/detail-745954.html

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
    return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}

到了这里,关于zookeeper源码(04)leader选举流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一文通吃:从 ZooKeeper 一致性,Leader选举讲到 ZAB 协议与 PAXOS 算法(上)

    本文首发自「慕课网」,想了解更多IT干货内容,程序员圈内热闻,欢迎关注\\\"慕课网\\\"或慕课网公众号! 作者:大能 | 慕课网讲师 本文将从ZooKeeper集群如何保证一致性,讲到zookeeper保证数据一致性的协议,然后展开讲Zookeeper集群Leader选举,包括集群三种节点的类型,ZAB协议中

    2024年02月07日
    浏览(58)
  • Kafka学习--3、Kafka Broker、节点服役和退役、Kafka 副本、Leader 选举流程、故障处理

    1.1 Kafka Broker工作流程 1.1.1 Zookeeper储存的Kafka信息 (1)启动Zookeeper集群、再启动Kafka集群,然后启动Zookeeper客户端 (2)通过ls命令可以查看kafka相关信息。 1.1.2 Kafka Broker总体工作流程 1、模拟Kafka上下线,Zookeeper中数据变化 (1)查看/kafka/brokers/ids 路径上的节点。 (2)查看

    2024年02月10日
    浏览(43)
  • zookeeper4==zookeeper源码阅读,FOLLOWER收到了需要LEADER执行的命令后各节点会执行什么

    上面已经阅读并观察了节点确定自己的身份后会做些什么,大致就是比对双方信息然后完成同步。 本篇阅读, FOLLOWER收到了需要LEADER执行的命令后,怎么同步给LEADER的,并且LEADER会执行什么操作。 源码启动zkCli用于测试 将原本的代码拷贝一份用IDEA打开后,找到org.apache.zook

    2024年02月03日
    浏览(35)
  • 【项目实战】Kafka 的 Leader 选举和负载均衡

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(40)
  • Zookeeper选举机制(通俗易懂)

    SID: 服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。 ZXID: 事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和 ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关

    2024年01月22日
    浏览(44)
  • Zookeeper的选举机制

    是一个分布式的系统,多个节点 并且节点中记录的数据是完全一致(一致性) , 当某个zk的节点宕机之后不会影响工作。因为Zookeeper的主节点不存在单点故障!Zookeeper的主节点是可以动态选举出来的! zookeeper的进程在不同的工作模式下,有不同的通信端口(比如选举时,通过端口

    2024年02月16日
    浏览(42)
  • postgresql 内核源码分析 事务提交回滚状态记录 clog机制流程,commit log文件格式,事务状态为什么单独记录的原因,分组优化及leader更新机制

    ​ 专栏内容 : postgresql内核源码分析 手写数据库toadb 并发编程 ​ 开源贡献 : toadb开源库 个人主页 :我的主页 管理社区 :开源数据库 座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物. PostgreSQL是一种开源的关系型数据库管理系统,其内核源码的分析对于深入理

    2024年02月08日
    浏览(55)
  • Apache Zookeeper架构和选举机制

    ZooKeeper是一个开源的分布式协调服务,旨在解决分布式系统中的一致性、配置管理、领导者选举等问题。它由Apache软件基金会维护,是Hadoop生态系统的一部分,被广泛用于构建高可用、可靠和具有一致性的分布式应用程序和服务。 ZooKeeper提供了一个层次化的命名空间,类似于

    2024年02月11日
    浏览(44)
  • zookeeper源码(03)启动流程

    本文将从启动类开始详细分析zookeeper的启动流程: 加载配置的过程 集群启动过程 单机版启动过程 org.apache.zookeeper.server.quorum.QuorumPeerMain类。 用于启动zookeeper服务,第一个参数用来指定配置文件,配置文件properties格式,例如以下配置参数: dataDir - 数据存储目录 dataLogDir - t

    2024年02月08日
    浏览(51)
  • ZooKeeper 选举的过半机制防止脑裂

    结论: Zookeeper采用过半选举机制,防止了脑裂。 原因: 如果有5台节点,leader联系不上了,其他4个节点由于超过半数,所以又选出了一个leader,当失联的leader恢复网络时,发现集群中已经有了leader,会把自己降为flower,防止出现两个leader。 和NameNode不同的是,zookeeper是自己

    2024年02月14日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包