菜鸡学习zookeeper源码(三)NIOServer的启动

这篇具有很好参考价值的文章主要介绍了菜鸡学习zookeeper源码(三)NIOServer的启动。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

上一篇写到了QuorumPeer的start方法,里面主要进行执行了loadDataBase方法(进行加载本地的数据信息,具体是怎么进行加载的,没在文章中进行说明,这块小园子也没看,等分析完整体的启动流程之后在进行分析), 这篇文章的话主要写startServerCnxnFactory方法,在上一篇文章中也进行说明,这个方法主要进行了启动了两个ServerCnxnFactory对象,一个是安全的,一个是不安全的,里面的默认实现都是NIOServerCnxnFactory

NIOServerCnxnFactory

这个还是老的习惯,这个类上有很多注释说明,可以先看下类的注释说明,这种开源的框架一般都会在类的说明上进行说明这个类是干什么的

在类上的说明我们可以看出来这个这个是通过nio非阻塞式socket进行连接的,线程之间的通信是通过队列来进行处理,它里面主要有1个接收线程来进行接收新的连接,并将新的连接给selector线程,selector线程数量是1-N个,通过工厂进行创建多个selector线程来进行支持大量的连接,当连接很多的时候,这块可能会成一个瓶颈,0-M个socket I/O worker 线程来进行I/O线程的读写操作,还有1个过期的线程来进行关闭空闲的连接信息。这个zookeeper的官方在这个类的上面给了一个示例:在32核机器上,1个接受线程,1个连接过期线程、4个selector线程和64个socket I/O worker。原文注释如下:

/**
 * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
 * NIO non-blocking socket calls. Communication between threads is handled via
 * queues.
 *
 *   - 1   accept thread, which accepts new connections and assigns to a
 *         selector thread
 *   - 1-N selector threads, each of which selects on 1/N of the connections.
 *         The reason the factory supports more than one selector thread is that
 *         with large numbers of connections, select() itself can become a
 *         performance bottleneck.
 *   - 0-M socket I/O worker threads, which perform basic socket reads and
 *         writes. If configured with 0 worker threads, the selector threads
 *         do the socket I/O directly.
 *   - 1   connection expiration thread, which closes idle connections; this is
 *         necessary to expire connections on which no session is established.
 *
 * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
 * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
 */

我们先看下构造函数

 /**
     * Construct a new server connection factory which will accept an unlimited number
     * of concurrent connections from each client (up to the file descriptor
     * limits of the operating system). startup(zks) must be called subsequently.
     */
    public NIOServerCnxnFactory() {
    }

在构造函数中我们可以看到一些注释,上面注释的大致意思是:这个NIOServer的连接工厂可以接收来自每个客户端的无限数量的并发连接(最多为文件描述符操作系统的限制)。我们可以后面看下怎么进行支持无限数量的并发连接。

在 runFromConfig方法解析中,可以看到都进行调用了NIOServerCnxnFactory#configure方法

菜鸡学习zookeeper源码(三)NIOServer的启动,zookeeper,学习,zookeeper

NIOServerCnxnFactory#configure

 public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
        if (secure) {
            throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
        }
        
        configureSaslLogin();
//设置最大的连接数
        maxClientCnxns = maxcc;
        initMaxCnxns();

        sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
        // We also use the sessionlessCnxnTimeout as expiring interval for
        // cnxnExpiryQueue. These don't need to be the same, but the expiring
        // interval passed into the ExpiryQueue() constructor below should be
        // less than or equal to the timeout.
         //设置过期的队列以及对过期线程进行初始化
        cnxnExpiryQueue = new ExpiryQueue<>(sessionlessCnxnTimeout);
        expirerThread = new ConnectionExpirerThread();
        //获取本地的CPU的核数
        int numCores = Runtime.getRuntime().availableProcessors();
        // 32 cores sweet spot seems to be 4 selector threads
        //selector线程的数量
        numSelectorThreads = Integer.getInteger(
            ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
            Math.max((int) Math.sqrt((float) numCores / 2), 1));
        if (numSelectorThreads < 1) {
            throw new IOException("numSelectorThreads must be at least 1");
        }
        //工作线程的数量
        numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
        workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

        String logMsg = "Configuring NIO connection handler with "
            + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
            + numSelectorThreads + " selector thread(s), "
            + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
            + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
        LOG.info(logMsg);
        for (int i = 0; i < numSelectorThreads; ++i) {
            selectorThreads.add(new SelectorThread(i));
        }

        listenBacklog = backlog;
        
        //从这往下就是常见的nio socket编程的
        //绑定端口号
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port {}", addr);
        if (listenBacklog == -1) {
            ss.socket().bind(addr);
        } else {
            ss.socket().bind(addr, listenBacklog);
        }
        if (addr.getPort() == 0) {
            // We're likely bound to a different port than was requested, so log that too
            LOG.info("bound to port {}", ss.getLocalAddress());
        }
        ss.configureBlocking(false);
        //初始化一个接收线程
        acceptThread = new AcceptThread(ss, addr, selectorThreads);
    }

这个configure方法,大体逻辑上是针对NIOServerCnxnFactory里面的一些属性进行一些赋值操作,会针对maxClientCnxns(最大连接数),expirerThread(过期线程)初始化,numSelectorThreads(selector线程的数量,会进行获取CPU的核数,根据核数进行计算selector线程数),numWorkerThreads(工作的线程数)以及常见的serverScoketChannel的初始化,最后在进行初始化一个接收线程。

NIOServerCnxnFactory#start

 public void start() {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
        // ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }

NIOServerCnxnFactory的start方法这个代码行数比较少,一眼看过去,主要进行了三种操作,WorkService(WorkerService是用于运行任务的工作线程池,并且是实现的使用一个或多个ExecutorServices.)的初始化,selectorThread的线程的启动,acceptThread(接收线程)的启动,expirerThread(过期线程)的启动文章来源地址https://www.toymoban.com/news/detail-819893.html

到了这里,关于菜鸡学习zookeeper源码(三)NIOServer的启动的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Zookeeper源码走读】第二章 服务器的启动过程

    通过运行zk的启动脚本,找到zk服务器端的入口类。脚本如下: 所以,zk的入口类是 QuorumPeerMain ,以下是该类的 main() 方法的完整代码: 跟踪方法中的 initializeAndRun() ,代码如下: 方法中,继续跟踪 ZooKeeperServerMain.main(args) ,代码如下: 上面的代码就是初始化 ZooKeeperServerMai

    2024年02月03日
    浏览(34)
  • ZooKeeper源码解析——学习ApacheZookeeper原理,掌握其核心组件的数据模型、监听通知机制等

    作者:禅与计算机程序设计艺术 随着互联网的飞速发展,各种信息数据越来越多,数据的存储也越来越依赖于分布式文件系统或NoSQL数据库。而传统的单机数据库往往不具备弹性可扩展性和高可用容错能力,在面对海量数据时难免会遇到性能瓶颈。为了解决这一问题,人们又

    2024年02月10日
    浏览(40)
  • android源码学习- APP启动流程(android12源码)

    百度一搜能找到很多讲APP启动流程的,但是往往要么就是太老旧(还是基于android6去分析的),要么就是不全(往往只讲了整个流程的一小部分)。所以我结合网上现有的文章,以及源码的阅读和调试,耗费了3整天的时间,力求写出一篇最完整,最详细,最通俗易懂的文章,

    2024年02月11日
    浏览(46)
  • SpringBoot源码学习4——SpringBoot内嵌Tomcat启动流程源码分析

    系列文章目录和关于我 我在初学spring的时候,很懵逼,因为整个项目中不存在main方法,让我有点摸不着头脑。那时候我知道有个东西叫tomcat是它监听了端口,解析了协议调到了我的servlet。 在我初学SpringBoot的时候,很懵逼,有main方法了,但是tomcat在哪里呢,又是如何启动起

    2024年02月04日
    浏览(47)
  • zookeeper 无法启动

    1、查看服务状态 2、查看启动日志 3、查看服务日志 cat /var/log/zookeeper/zookeeper.log 从提供的日志中,可以看到ZooKeeper在启动过程中遇到了一个 java.io.EOFException 异常。这个异常通常表示在读取文件时意外地到达了文件的末尾。在这种情况下,异常可能是由于ZooKeeper的事务日志文

    2024年02月03日
    浏览(48)
  • Zookeeper配置启动教程

    下载地址:https://zookeeper.apache.org/releases.html 选择 Apache ZooKeeper 3.8.0(asc, sha512) 点击 https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz 完成下载。 修改 apache-zookeeper-3.8.0conf 目录下的 zoo-simple.cfg文件为 zoo-simple.cfg,修改文件内容: 增加系统环境变量 ZOOKEEPER_HOME ,

    2024年02月16日
    浏览(47)
  • window Zookeeper 启动;

    本文对window Zookeeper zk 启动 进行介绍; ZooKeeper 是一个开源的分布式协调服务,它提供了一个简单的接口和可靠的协调机制,可以帮助分布式系统实现高可用性和一致性。ZooKeeper 主要用于管理分布式系统中的元数据、配置信息、命名服务、分布式锁等,以协调和同步分布式系

    2024年04月09日
    浏览(48)
  • linux上启动zookeeper

    在我们日常中,去部署项目难免会用到Dubbo框架,那就不可少我们的zookeeper, 我们先在Linux上解压zookeeper压缩包,在cd进入目录下 进入后 当我们端口连接时,不用着急连,好像Dubbo连接断开后有延时,实际还没有彻底断开,会出现一个连接失败,我们可以稍等一会在连就行了

    2024年02月11日
    浏览(71)
  • zookeeper 启动失败,报错 “ZooKeeper JMX enabled by default”

    zookeeper启动时,出现如下情况: 显示JMX是默认关闭的 通过jps命令查看进程时,没有QuorumPeerMain这个进程 网上有很多方法,有说关闭防火墙的, 也有说把命令改成 ./zkServer.sh start 我都尝试过,和上面效果一样,均无法启动(可能每一个人的解决办法都不一样) 在 zkServer.sh 文

    2024年02月01日
    浏览(53)
  • Linux中怎么启动Zookeeper

    首先进入 Zookeeper安装目录 下的 bin 目录 比如: 然后在此目录下执行命令。 1. 启动Zookeeper Server端 2.启动Zookeeper Client端 启动Zookeeper Client端后如下:

    2024年04月17日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包