菜鸡学习zookeeper源码(三)NIOServer的启动
前言
上一篇写到了QuorumPeer的start方法,里面主要进行执行了loadDataBase方法(进行加载本地的数据信息,具体是怎么进行加载的,没在文章中进行说明,这块小园子也没看,等分析完整体的启动流程之后在进行分析), 这篇文章的话主要写startServerCnxnFactory方法,在上一篇文章中也进行说明,这个方法主要进行了启动了两个ServerCnxnFactory对象,一个是安全的,一个是不安全的,里面的默认实现都是NIOServerCnxnFactory
NIOServerCnxnFactory
这个还是老的习惯,这个类上有很多注释说明,可以先看下类的注释说明,这种开源的框架一般都会在类的说明上进行说明这个类是干什么的
在类上的说明我们可以看出来这个这个是通过nio非阻塞式socket进行连接的,线程之间的通信是通过队列来进行处理,它里面主要有1个接收线程来进行接收新的连接,并将新的连接给selector线程,selector线程数量是1-N个,通过工厂进行创建多个selector线程来进行支持大量的连接,当连接很多的时候,这块可能会成一个瓶颈,0-M个socket I/O worker 线程来进行I/O线程的读写操作,还有1个过期的线程来进行关闭空闲的连接信息。这个zookeeper的官方在这个类的上面给了一个示例:在32核机器上,1个接受线程,1个连接过期线程、4个selector线程和64个socket I/O worker。原文注释如下:
/** * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using * NIO non-blocking socket calls. Communication between threads is handled via * queues. * * - 1 accept thread, which accepts new connections and assigns to a * selector thread * - 1-N selector threads, each of which selects on 1/N of the connections. * The reason the factory supports more than one selector thread is that * with large numbers of connections, select() itself can become a * performance bottleneck. * - 0-M socket I/O worker threads, which perform basic socket reads and * writes. If configured with 0 worker threads, the selector threads * do the socket I/O directly. * - 1 connection expiration thread, which closes idle connections; this is * necessary to expire connections on which no session is established. * * Typical (default) thread counts are: on a 32 core machine, 1 accept thread, * 1 connection expiration thread, 4 selector threads, and 64 worker threads. */
我们先看下构造函数
/**
* Construct a new server connection factory which will accept an unlimited number
* of concurrent connections from each client (up to the file descriptor
* limits of the operating system). startup(zks) must be called subsequently.
*/
public NIOServerCnxnFactory() {
}
在构造函数中我们可以看到一些注释,上面注释的大致意思是:这个NIOServer的连接工厂可以接收来自每个客户端的无限数量的并发连接(最多为文件描述符操作系统的限制)。我们可以后面看下怎么进行支持无限数量的并发连接。
在 runFromConfig方法解析中,可以看到都进行调用了NIOServerCnxnFactory#configure方法

NIOServerCnxnFactory#configure
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();
//设置最大的连接数
maxClientCnxns = maxcc;
initMaxCnxns();
sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
//设置过期的队列以及对过期线程进行初始化
cnxnExpiryQueue = new ExpiryQueue(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
//获取本地的CPU的核数
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
//selector线程的数量
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads 0 ? numWorkerThreads : "no") + " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
LOG.info(logMsg);
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
listenBacklog = backlog;
//从这往下就是常见的nio socket编程的
//绑定端口号
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port {}", addr);
if (listenBacklog == -1) {
ss.socket().bind(addr);
} else {
ss.socket().bind(addr, listenBacklog);
}
if (addr.getPort() == 0) {
// We're likely bound to a different port than was requested, so log that too
LOG.info("bound to port {}", ss.getLocalAddress());
}
ss.configureBlocking(false);
//初始化一个接收线程
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
这个configure方法,大体逻辑上是针对NIOServerCnxnFactory里面的一些属性进行一些赋值操作,会针对maxClientCnxns(最大连接数),expirerThread(过期线程)初始化,numSelectorThreads(selector线程的数量,会进行获取CPU的核数,根据核数进行计算selector线程数),numWorkerThreads(工作的线程数)以及常见的serverScoketChannel的初始化,最后在进行初始化一个接收线程。
NIOServerCnxnFactory#start
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
NIOServerCnxnFactory的start方法这个代码行数比较少,一眼看过去,主要进行了三种操作,WorkService(WorkerService是用于运行任务的工作线程池,并且是实现的使用一个或多个ExecutorServices.)的初始化,selectorThread的线程的启动,acceptThread(接收线程)的启动,expirerThread(过期线程)的启动
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/0b977c78e6.html
