【Zookeeper源码走读】第三章 服务器处理客户端请求的流程

这篇具有很好参考价值的文章主要介绍了【Zookeeper源码走读】第三章 服务器处理客户端请求的流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Server处理client请求的流程

前一篇文章,已经大致介绍了Server的启动流程,在NIOServerCnxnFactory.start()方法中,启动了多个线程,其中就有接收socket报文的线程,代码如下:

    public void start() {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
        // ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }

注意这里,acceptThread是接收socket的线程(AcceptThread),acceptThread的初始化是在NIOServerCnxnFactory.configure()中实现的:

@Override
    public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
        ......
        acceptThread = new AcceptThread(ss, addr, selectorThreads);
    }

NIOServerCnxnFactory.configure() 先于 NIOServerCnxnFactory.startup() 执行,所以,在NIOServerCnxnFactory.startup() 中可直接启动acceptThread的start()方法。

然后跟踪最后一行代码,zks.startup():

    public synchronized void startup() {
        startupWithServerState(State.RUNNING);
    }

    // startupWithServerState()源码如下
    private void startupWithServerState(State state) {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        startRequestThrottler();

        registerJMX();

        startJvmPauseMonitor();

        registerMetrics();

        setState(state);

        requestPathMetricsCollector.start();

        localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

        notifyAll();
    }

startupWithServerState() 方法里面就是启动各种线程。目前与接收socket无关,暂时不关注。

回到前面提到的接收socket的AcceptThread线程,在执行NIOServerCnxnFactory.start()的方法中,AcceptThread的start()方法被调用,因此,接下来看看AcceptThread的run()方法都干了些什么事情,
注意:AcceptThread是NIOServerCnxnFactory内的私有类:

    public void run() {
        try {
            while (!stopped && !acceptSocket.socket().isClosed()) {
                try {
                    select();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }
        } finally {
            closeSelector();
            // This will wake up the selector threads, and tell the
            // worker thread pool to begin shutdown.
            if (!reconfiguring) {
                NIOServerCnxnFactory.this.stop();
            }
            LOG.info("accept thread exitted run method");
        }
    }

run()方法直接调用select() 方法,如下:

    private void select() {
        try {
            selector.select();

            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    if (!doAccept()) {
                        // If unable to pull a new connection off the accept
                        // queue, pause accepting to give us time to free
                        // up file descriptors and so the accept thread
                        // doesn't spin in a tight loop.
                        pauseAccept(10);
                    }
                } else {
                    LOG.warn("Unexpected ops in accept select {}", key.readyOps());
                }
            }
        } catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e);
        }
    }

select()方法的调用核心是doAccept()方法:

   private boolean doAccept() {
        boolean accepted = false;
        SocketChannel sc = null;
        try {
            sc = acceptSocket.accept();
            accepted = true;
            if (limitTotalNumberOfCnxns()) {
                throw new IOException("Too many connections max allowed is " + maxCnxns);
            }
            InetAddress ia = sc.socket().getInetAddress();
            int cnxncount = getClientCnxnCount(ia);

            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
                throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
            }

            LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());

            sc.configureBlocking(false);

            // Round-robin assign this connection to a selector thread
            if (!selectorIterator.hasNext()) {
                selectorIterator = selectorThreads.iterator();
            }
            SelectorThread selectorThread = selectorIterator.next();
            if (!selectorThread.addAcceptedConnection(sc)) {
                throw new IOException("Unable to add connection to selector queue"
                                      + (stopped ? " (shutdown in progress)" : ""));
            }
            acceptErrorLogger.flush();
        } catch (IOException e) {
            // accept, maxClientCnxns, configureBlocking
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
            fastCloseSock(sc);
        }
        return accepted;
    }

}

上述代码,主要是对拿到的请求进行各种判断,是否符合条件,重点关注:

    SelectorThread selectorThread = selectorIterator.next();

    if (!selectorThread.addAcceptedConnection(sc)) {
        throw new IOException("Unable to add connection to selector queue"
        + (stopped ? " (shutdown in progress)" : ""));
    }

selectorThread.addAcceptedConnection(sc) 方法的源码如下:

    public boolean addAcceptedConnection(SocketChannel accepted) {
        if (stopped || !acceptedQueue.offer(accepted)) {
            return false;
        }
        wakeupSelector();
        return true;
    }

    public void wakeupSelector() {
        selector.wakeup();
    }

从以上代码可知,执行的是SelectorThread线程,这个线程是什么时候初始化的呢?我们回到之前NIOServerCnxnFactory.configure()方法:

    int numCores = Runtime.getRuntime().availableProcessors();
        // 32 cores sweet spot seems to be 4 selector threads
        numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
        
    for (int i = 0; i < numSelectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(i));
    }

numSelectorThreads 可以通过配置文件修改,默认是根据cpu内核个数计算而来。

NIOServerCnxnFactory.startup(),即可看到启动线程

    for (SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }

接下来,看下SelectorThread线程的run方法:

    public void run() {
        while (!stopped) {
            try {
                select();
                processAcceptedConnections();
                processInterestOpsUpdateRequests();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

        // 后面的代码是close相关的,暂时不予关注,省略
        ......
    }

select()的源码:

    private void select() {
        try {
            selector.select();

            Set<SelectionKey> selected = selector.selectedKeys();
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
            Collections.shuffle(selectedList);
            Iterator<SelectionKey> selectedKeys = selectedList.iterator();
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selected.remove(key);

                if (!key.isValid()) {
                    cleanupSelectionKey(key);
                    continue;
                }
                if (key.isReadable() || key.isWritable()) {
                    handleIO(key);
                } else {
                    LOG.warn("Unexpected ops in select {}", key.readyOps());
                }
            }
        } catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e);
        }
    }

上面的方法,需要关注handleIO(key):

    private void handleIO(SelectionKey key) {
        IOWorkRequest workRequest = new IOWorkRequest(this, key);
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

        // Stop selecting this key while processing on its
        // connection
        cnxn.disableSelectable();
        key.interestOps(0);
        touchCnxn(cnxn);
        workerPool.schedule(workRequest);
    }

注意这两行代码:

    IOWorkRequest workRequest = new IOWorkRequest(this, key);
    workerPool.schedule(workRequest);

workerPool对象,是在NIOServerCnxnFactory.start()方法中被初始化的:

    if (workerPool == null) {
        workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
    }

workerPool.schedule(workRequest) 的实现是:

    public void schedule(WorkRequest workRequest) {
        schedule(workRequest, 0);
    }

    // 上面的方法调用的是下面的方法
    public void schedule(WorkRequest workRequest, long id) {
        if (stopped) {
            workRequest.cleanup();
            return;
        }

        ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);

        // If we have a worker thread pool, use that; otherwise, do the work
        // directly.
        int size = workers.size();
        if (size > 0) {
            try {
                // make sure to map negative ids as well to [0, size-1]
                int workerNum = ((int) (id % size) + size) % size;
                ExecutorService worker = workers.get(workerNum);
                worker.execute(scheduledWorkRequest);
            } catch (RejectedExecutionException e) {
                LOG.warn("ExecutorService rejected execution", e);
                workRequest.cleanup();
            }
        } else {
            // When there is no worker thread pool, do the work directly
            // and wait for its completion
            scheduledWorkRequest.run();
        }
    }

上面一大堆代码,其实就是调用的IOWorkRequest.doWork()方法

    public void doWork() throws InterruptedException {
        if (!key.isValid()) {
            selectorThread.cleanupSelectionKey(key);
            return;
        }

        if (key.isReadable() || key.isWritable()) {
            cnxn.doIO(key);

            // Check if we shutdown or doIO() closed this connection
            if (stopped) {
                cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                return;
            }
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }
            touchCnxn(cnxn);
        }

        // Mark this connection as once again ready for selection
        cnxn.enableSelectable();
        // Push an update request on the queue to resume selecting
        // on the current set of interest ops, which may have changed
        // as a result of the I/O operations we just performed.
        if (!selectorThread.addInterestOpsUpdateRequest(key)) {
            cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
        }
    }

跟踪 cnxn.doIO(key) 这行代码,cnxn 是NIOServerCnxn的对象:

    void doIO(SelectionKey k) throws InterruptedException {
        try {
            if (!isSocketOpen()) {
                LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));

                return;
            }
            if (k.isReadable()) {
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    handleFailedRead();
                }
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip();
                        isPayload = readLength(k);
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // not the case for 4letterword
                        readPayload();
                    } else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }
            if (k.isWritable()) {
                handleWrite(k);

                if (!initialized && !getReadInterest() && !getWriteInterest()) {
                    throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
                }
            }
        } catch (CancelledKeyException e) {
            LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));

            LOG.debug("CancelledKeyException stack trace", e);

            close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
        } catch (CloseRequestException e) {
            // expecting close to log session closure
            close();
        } catch (EndOfStreamException e) {
            LOG.warn("Unexpected exception", e);
            // expecting close to log session closure
            close(e.getReason());
        } catch (ClientCnxnLimitException e) {
            // Common case exception, print at debug level
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
            close(DisconnectReason.CLIENT_CNX_LIMIT);
        } catch (IOException e) {
            LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
            close(DisconnectReason.IO_EXCEPTION);
        }
    }

以上代码就是处理收到socket报文的处理入口,如果收到的需要read的报文,则调用readPayload()方法:

    private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                handleFailedRead();
            }
        }

        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            incomingBuffer.flip();
            packetReceived(4 + incomingBuffer.remaining());
            if (!initialized) {
                readConnectRequest();
            } else {
                readRequest();
            }
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

readPayload时,判断是否为新连接,新连接则调用readConnectRequest()方法:

    private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
        if (!isZKServerRunning()) {
            throw new IOException("ZooKeeperServer not running");
        }
        zkServer.processConnectRequest(this, incomingBuffer);
        initialized = true;
    }

不是新连接,则调用readRequest()方法:

    private void readRequest() throws IOException {
        zkServer.processPacket(this, incomingBuffer);
    }

从上述代码可知,无论是否为新连接,都是zkServer对象里面的方法,zkServer是ZooKeeperServer类的对象。
本文讨论的是客户端第一次连接服务器的过程,因此,这里仅继续走读新建连接的代码,所以查看zkServer.processConnectRequest(this, incomingBuffer)的实现:

    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
        throws IOException, ClientCnxnLimitException {

        ......
        
        // 前面的代码都是解析socket报文,省略
        
        if (sessionId == 0) {
            long id = createSession(cnxn, passwd, sessionTimeout);
            LOG.debug(
                "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                Long.toHexString(id),
                Long.toHexString(connReq.getLastZxidSeen()),
                connReq.getTimeOut(),
                cnxn.getRemoteSocketAddress());
        } else {
            long clientSessionId = connReq.getSessionId();
                LOG.debug(
                    "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                    Long.toHexString(clientSessionId),
                    Long.toHexString(connReq.getLastZxidSeen()),
                    connReq.getTimeOut(),
                    cnxn.getRemoteSocketAddress());
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
            ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);

        }
    }

进入createSession(cnxn, passwd, sessionTimeout)方法:

    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            // Possible since it's just deserialized from a packet on the wire.
            passwd = new byte[0];
        }
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
        submitRequest(si);
        return sessionId;
    }

查看submitRequest(si):

    public void submitRequest(Request si) {
        enqueueRequest(si);
    }

    public void enqueueRequest(Request si) {
        if (requestThrottler == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (requestThrottler == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        requestThrottler.submitRequest(si);
    }

仅需要看最后一句,requestThrottler.submitRequest(si),requestThrottler是RequestThrottler的对象,在NIOServerCnxnFactory.startup()执行时实现的,调用过程省略,最终初始化是调用的ZooKeeperServer.startRequestThrottler()方法:

    protected void startRequestThrottler() {
        requestThrottler = new RequestThrottler(this);
        requestThrottler.start();

    }

requestThrottler.submitRequest(si) 就是将请求的报文放入RequestThrottler创建的队列里面。

RequestThrottler是一个线程类,由前面的代码可知,requestThrottler在初始化的同时,也调用了start()方法启动线程。

requestThrottler线程的run()方法如下:

    public void run() {
        try {
            while (true) {
                if (killed) {
                    break;
                }

                Request request = submittedRequests.take();
                if (Request.requestOfDeath == request) {
                    break;
                }

                if (request.mustDrop()) {
                    continue;
                }

                // Throttling is disabled when maxRequests = 0
                if (maxRequests > 0) {
                    while (!killed) {
                        if (dropStaleRequests && request.isStale()) {
                            // Note: this will close the connection
                            dropRequest(request);
                            ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
                            request = null;
                            break;
                        }
                        if (zks.getInProcess() < maxRequests) {
                            break;
                        }
                        throttleSleep(stallTime);
                    }
                }

                if (killed) {
                    break;
                }

                // A dropped stale request will be null
                if (request != null) {
                    if (request.isStale()) {
                        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                    }
                    zks.submitRequestNow(request);
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        }
        int dropped = drainQueue();
        LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
    }

上面的方法就是不断地从submittedRequests队列中取数据,然后将数据传给zks.submitRequestNow(request)方法:

    public void submitRequestNow(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type {}", si.type);
                // Update request accounting/throttling limits
                requestFinished(si);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            LOG.debug("Dropping request.", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        }
    }

上面的代码,仅需关注firstProcessor.processRequest(si)这一行,firstProcessor是PrepRequestProcessor类的对象,在ZooKeeperServer.setupRequestProcessors()中初始化的,ZooKeeperServer.setupRequestProcessors() 又是在 NIOServerCnxnFactory.startup()执行时被调用的。

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor) firstProcessor).start();
    }

SyncRequestProcessor和PrepRequestProcessor都是线程类,从上述的代码可知,这两个线程在初始话之后,也调用了start()方法启动线程。

回到firstProcessor.processRequest(si)的实现:

    public void processRequest(Request request) {
        request.prepQueueStartTime = Time.currentElapsedTime();
        submittedRequests.add(request);
        ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
    }

submittedRequests是PrepRequestProcessor的队列,上述代码就是将request放入了PrepRequestProcessor的队列中,而PrepRequestProcessor是一个线程类且已启动,所以查看其run()方法:

    public void run() {
        LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
        try {
            while (true) {
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                Request request = submittedRequests.take();
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                    .add(Time.currentElapsedTime() - request.prepQueueStartTime);
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }

                request.prepStartTime = Time.currentElapsedTime();
                pRequest(request);
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }

run()方法就是不断从submittedRequests队列中取数据,然后交给pRequest(request)方法:

    protected void pRequest(Request request) throws RequestProcessorException {
        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
        request.setHdr(null);
        request.setTxn(null);

        try {
            switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;
            case OpCode.createTTL:
                CreateTTLRequest createTtlRequest = new CreateTTLRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
                break;
            case OpCode.deleteContainer:
            case OpCode.delete:
                DeleteRequest deleteRequest = new DeleteRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                break;
            case OpCode.setData:
                SetDataRequest setDataRequest = new SetDataRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                break;
            case OpCode.reconfig:
                ReconfigRequest reconfigRequest = new ReconfigRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                break;
            case OpCode.setACL:
                SetACLRequest setAclRequest = new SetACLRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                break;
            case OpCode.check:
                CheckVersionRequest checkRequest = new CheckVersionRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                break;
            case OpCode.multi:
                MultiOperationRecord multiRequest = new MultiOperationRecord();
                try {
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch (IOException e) {
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                        Time.currentWallTime(), request.type));

                for (Op op : multiRequest) {
                    Record subrequest = op.toRequestRecord();
                    int type;
                    Record txn;

                    /* If we've already failed one of the ops, don't bother
                     * trying the rest as we know it's going to fail and it
                     * would be confusing in the logfiles.
                     */
                    if (ke != null) {
                        type = OpCode.error;
                        txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    } else {
                        /* Prep the request and convert to a Txn */
                        try {
                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                            type = op.getType();
                            txn = request.getTxn();
                        } catch (KeeperException e) {
                            ke = e;
                            type = OpCode.error;
                            txn = new ErrorTxn(e.code().intValue());

                            if (e.code().intValue() > Code.APIERROR.intValue()) {
                                LOG.info("Got user-level KeeperException when processing {} aborting"
                                         + " remaining multi ops. Error Path:{} Error:{}",
                                         request.toString(),
                                         e.getPath(),
                                         e.getMessage());
                            }

                            request.setException(e);

                            /* Rollback change records from failed multi-op */
                            rollbackPendingChanges(zxid, pendingChanges);
                        }
                    }

                    // TODO: I don't want to have to serialize it here and then
                    //       immediately deserialize in next processor. But I'm
                    //       not sure how else to get the txn stored into our list.
                    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                        txn.serialize(boa, "request");
                        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
                        txns.add(new Txn(type, bb.array()));
                    }
                }

                request.setTxn(new MultiTxn(txns));
                if (digestEnabled) {
                    setTxnDigest(request);
                }

                break;

            //create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                if (!request.isLocalSession()) {
                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                }
                break;

            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getAllChildrenNumber:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches:
            case OpCode.setWatches2:
            case OpCode.checkWatches:
            case OpCode.removeWatches:
            case OpCode.getEphemerals:
            case OpCode.multiRead:
            case OpCode.addWatch:
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                break;
            default:
                LOG.warn("unknown type {}", request.type);
                break;
            }
        } catch (KeeperException e) {
            if (request.getHdr() != null) {
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(e.code().intValue()));
            }

            if (e.code().intValue() > Code.APIERROR.intValue()) {
                LOG.info(
                    "Got user-level KeeperException when processing {} Error Path:{} Error:{}",
                    request.toString(),
                    e.getPath(),
                    e.getMessage());
            }
            request.setException(e);
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error("Failed to process {}", request, e);

            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request.request;
            if (bb != null) {
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
            } else {
                sb.append("request buffer is null");
            }

            LOG.error("Dumping request buffer: 0x{}", sb.toString());
            if (request.getHdr() != null) {
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
            }
        }
        request.zxid = zks.getZxid();
        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
        nextProcessor.processRequest(request);
    }

根据Request的type,进入不同的case流程,然后重新封装request对象,这部分代码太长,就跳过了,需要的时候再回头研究。request对象封装完成后 ,进入最后一行的方法,nextProcessor.processRequest(request)。
由前面的描述可知,nextProcessor是SyncRequestProcessor的对象,SyncRequestProcessor.processRequest(request)如下:

    public void processRequest(final Request request) {
        Objects.requireNonNull(request, "Request cannot be null");

        request.syncQueueStartTime = Time.currentElapsedTime();
        queuedRequests.add(request);
        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
        }

queuedRequests是SyncRequestProcessor里面的队列,SyncRequestProcessor.processRequest(request)的作用就是将request放入queuedRequests队列里面。
前面已知,SyncRequestProcessor是线程类,且已启动,因此查看其run()方法:

public void run() {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());

                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                    flush();
                    si = queuedRequests.take();
                }

                if (si == REQUEST_OF_DEATH) {
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                // track the number of records written to the log
                if (zks.getZKDatabase().append(si)) {
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // roll the log
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                if (shouldFlush()) {
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

上面的方法就是将请求写入日志,写完之后,调用nextProcessor.processRequest(si)方法,nextProcessor是FinalRequestProcessor的对象。
读到这里,再回头看看初始化这几个Processor的方法setupRequestProcessors(),代码如下:

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor) firstProcessor).start();
    }

是不是瞬间明白了代码为何以这样的顺序进行初始化。

接下来,看看FinalRequestProcessor.processRequest(si)方法:

    public void processRequest(Request request) {
        LOG.debug("Processing request:: {}", request);

        .......
        // 代码太长,仅展示createSession的部分代码
        
        switch (request.type){
            case OpCode.ping:{
                lastOp="PING";
                updateStats(request,lastOp,lastZxid);
        
                cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID,lastZxid,0),null,"response");
                return;
            }
            case OpCode.createSession:{
                lastOp="SESS";
                updateStats(request,lastOp,lastZxid);
        
                zks.finishSessionInit(request.cnxn,true);
                return;
            }
        }
        
        .......
        
    }

如果是创建会话连接的request,最后会调用zks.finishSessionInit(request.cnxn,true)方法,代码如下:

    public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
        // register with JMX
        try {
            if (valid) {
                if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
                    serverCnxnFactory.registerConnection(cnxn);
                } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
                    secureServerCnxnFactory.registerConnection(cnxn);
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
        }

        try {
            ConnectResponse rsp = new ConnectResponse(
                0,
                valid ? cnxn.getSessionTimeout() : 0,
                valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                // longer valid
                valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            bos.writeInt(-1, "len");
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
            bb.putInt(bb.remaining() - 4).rewind();
            cnxn.sendBuffer(bb);

            if (valid) {
                LOG.debug(
                    "Established session 0x{} with negotiated timeout {} for client {}",
                    Long.toHexString(cnxn.getSessionId()),
                    cnxn.getSessionTimeout(),
                    cnxn.getRemoteSocketAddress());
                cnxn.enableRecv();
            } else {

                LOG.info(
                    "Invalid session 0x{} for client {}, probably expired",
                    Long.toHexString(cnxn.getSessionId()),
                    cnxn.getRemoteSocketAddress());
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            }

        } catch (Exception e) {
            LOG.warn("Exception while establishing session, closing", e);
            cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
        }
    }

上述代码的大概意思是,创建返回报文,并发送给客户端,至此,client发送的连接请求至server的处理过程就算结束了,下一篇分析心跳检测机制。文章来源地址https://www.toymoban.com/news/detail-430904.html

到了这里,关于【Zookeeper源码走读】第三章 服务器处理客户端请求的流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Web服务端实验(第三章)

    代码: 设计效果:   代码: 设计效果:     代码: 设计效果: 图片轮播

    2024年02月07日
    浏览(36)
  • 微服务 第三章 Spring Cloud 简介

    第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 Spring Cloud 并不是一个拿来即可用的框架,它是一种微服务规范,共有以下 2 代实现: 第一代实现:Spring Cloud Netflix 第二代实现:Spring Cloud Alibaba Spring Cloud 组件 描述 Spring Cloud Netflix Eureka Sp

    2024年02月08日
    浏览(61)
  • 第三章 部署Web及WDS服务

    ♥️作者介绍:奇妙的大歪 ♥️个人名言:但行前路,不负韶华! ♥️ 个人简介:云计算网络运维专业人员   目录 一.什么是web 1.www(world     wide    web)万维网             世界      维度     2.www服务软件 3.information 信息         4.虚拟目录 二.什么是WDS?  1.WDS服务

    2024年02月07日
    浏览(70)
  • 三,创建订单微服务消费者 第三章

          因为  支付提供者端的代码如下      

    2024年02月15日
    浏览(38)
  • go-zero学习 第三章 微服务

    1.1 API服务模块 goctl 使用 api 文件生成 api服务 命令: 1.2 RPC服务模块 goctl 使用 protoc 文件生成 rpc服务 命令: 注意: --go_out 、 --go-grpc_out 、 --zrpc_out 三者配置的路径需要完全一致,否则会报下列错误。 基础代码:已生成基本的API服务、RPC服务。 这里以API服务调用RPC服务的登

    2024年02月16日
    浏览(72)
  • 第三章 Elasticsearch简介

    Elasticsearch (后称为 ES )是一个天生支持分布式的搜索、聚合分析和存储引擎。 搜索引擎 全文检索引擎 分布式文档系统 分布式数据库 OLAP系统 分布式搜索中间件 不要去死背概念,概念应该作为一种辅助的手段帮助我们去理解一项技术或知识,总之,等你真正会用了,你就

    2024年02月06日
    浏览(41)
  • 第三章-上网行为安全

    1)宽带滥用 2)上网难监管 3)信息泄露 4)网络违法 5)安全威胁 1)上网行为三要素:用户、流量、行为 2)功能需求 (AC的功能)-- 重点 用户认证 应用控制 网页过滤 行为审计 流量管理 应用选路 互联网上网行为管控 一体化网关 无线Wi-Fi管控营销 无线防共享上网 全网上

    2024年01月23日
    浏览(48)
  • 第三章nginx详解

    特点: 1,稳定性高。(没有apache稳定) 2,系统资源消耗地较低。(处理http请求的并发能力非常高,单台物理服务器可以处理30000-50000个并发请求) 稳定:一般在企业中,为了保持服务器的稳定,并发量的设置在20000个左右。占用内存2M左右。 nginx主要功能: 1,静态文件服

    2024年02月12日
    浏览(48)
  • 第三章 decimal模块

    decimal 模块是 Python 提供的用于进行十进制定点和浮点运算的内置模块。使用它可以快速正确地进行十进制定点和浮点数的舍入运算,并且可以控制有效数字的个数。 使用 decimal 模块主要是因为它与 Python 自带的浮点数相比,有以下优点 : 基于浮点模型,提供与数字计算相同

    2024年02月09日
    浏览(56)
  • 第三章-运输层

    运输层协议为运行在不同主机上的进程之间提供逻辑通信,即从应用程序角度看两个主机好像直连一样,实际可能相隔万里 运输层协议是在端系统上实现的,而不是路由器,为什么这么强调,因为运输层会将应用报文划分为较小的块然后加上一个运输层首部来生成运输层报文

    2024年02月14日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包