ZooKeeper 源码分析 (三)—服务端启动之单机模式

本贴最后更新于 1846 天前,其中的信息可能已经时移世异

1. 预启动

1.1. QuorumPeerMain 作为启动入口

跟集群模式一样,启动入口也是在 QuorumPeerMain main 方法中。

1.2. 解析配置文件 zoo.cfg

配置文件包含了 zk 运行时需要的基本参数,比如 tickTime、dataDir 和 clientPort。解析的主要逻辑如下:

try {
    File configFile = (new VerifyingFileFactory.Builder(LOG)
        .warnForRelativePath()
        .failForNonExistingPath()
        .build()).create(path);
        
    Properties cfg = new Properties();
    FileInputStream in = new FileInputStream(configFile);
    try {
    	// 文件加载
        cfg.load(in);
        configFileStr = path;
    } finally {
        in.close();
    }
    // 解析配置文件
    parseProperties(cfg);
} catch (IOException e) {
    throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
    throw new ConfigException("Error processing " + path, e);
}   

1.3. 创建并启动历史文件清理器 DatadirCleanupManager

ZooKeeper 的自动清理文件机制,对快照文件和事务日志定期进行清理。实例创建的代码如下:

DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());

可以看到,创建实例时需要知道 dataDir、dataLogDir、快照保存个数、清理间隔这些参数。内部基于 TimerTask 起了一个定时任务,会根据 purgeInterval 定期去清理文件。

timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

1.4. 判断启动模式

根据解析出的服务器地址列表判断走单机模式还是集群模式。

 if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    LOG.warn("Either no config or no quorum defined in config, running "
            + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
}

这里的 args 是启动命令行输入的配置文件名称,当文件名称只有一个,并且配置文件里配置了 distributed=true 时走集群模式,否则走单机模式。单机启动的代码都在 ZooKeeperServerMain.main(args)

1.5. 再次进行配置解析

单机启动时,命令输入的参数会有两种情况

  • 输入了配置文件名称
  • 直接输入了配置参数,比如按顺序输入了 clientPortAddressdataDirdataLogDirtickTimemaxClientCnxns

所以要根据这两种情况进行不同的解析

if (args.length == 1) {
    config.parse(args[0]);
} else {
    config.parse(args);
}

当命令行输入的文件名称时,根据路径构造 File 对象,进行解析,然后把配置信息读入到 ServerConfig 中

public void parse(String path) throws ConfigException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    // 根据文件名称解析配置文件
    config.parse(path);

    // let qpconfig parse the file and then pull the stuff we are
    // interested in
    // 从配置中读取配置信息,给ServerConfig赋值
    readFrom(config);
}

直接输入配置参数时,需要保证输入参数的顺序,代码中是按顺序取的,顺序见上面描述。

public void parse(String[] args) {
    if (args.length < 2 || args.length > 4) {
        throw new IllegalArgumentException("Invalid number of arguments:" + Arrays.toString(args));
    }
    // clientPortAddress是第一顺序
    clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0]));
    // 第二顺序是dataDir
    dataDir = new File(args[1]);
    dataLogDir = dataDir;
    if (args.length >= 3) {
    	// 第三顺序是tickTime
        tickTime = Integer.parseInt(args[2]);
    }
    if (args.length == 4) {
    	// 第四顺序是maxClientCnxns
        maxClientCnxns = Integer.parseInt(args[3]);
    }
}

2. 初始化

接下来会执行 runFromConfig(ServerConfig config) 方法,进行服务器实例化过程,分步骤来看。

2.1. 创建 FileTxnSnapLog

FileTxnSnapLog 是 ZooKeeper 上层服务于底层数据存储之间的对接层,提供了一系列操作数据文件的接口。包括事务日志文件和快照数据文件。根据 dataDir 和 snapDir 来创建 FileTxnSnapLog。

this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);

主要的方法有如下一些,后面会作为专题来讲

// 读取快照文件和事务日志之后恢复服务端数据
restore(DataTree, Map, Integer>, PlayBackListener):long

// 把最新的事务日志快速更新到server database,与restore不同的是只处理事务日志
fastForwardFromEdits(DataTree, Map, Integer>,PlayBackListener):long

// 把dataTree和sessions放入快照文件中
save(DataTree,ConcurrentHashMap, Integer>, boolean):void

// 在dataTree上处理事务
processTransaction(TxnHeader,DataTree ,Map, Integer> , Record ):void

2.2. 创建 MetricsProvider

MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。在《ZooKeeper 源码分析二-服务端启动之集群模式》讲过,这里不再细说。

2.3. 创建服务器实例 ZooKeeperServer

依赖 txnLog、tickTime、minSessionTimeout、maxSessionTimeout、listenBacklog 等参数实例化 ZooKeeperServer:

ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,  config.tickTime, config.minSessionTimeout, config.maxSessionTimeout,  config.listenBacklog, null);

2.4. 创建 ServerStats

ServerStats 是一个服务端统计器,包含了最基本的运行时信息:

  • packetsSent 从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端向客户端发送的响应包次数
  • packetsReceived 从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端接收到来自客户端的请求包次数
  • requestLatency 从 zk 启动开始,或是最近一次重置服务端统计信息之后服务端请求处理的最大延时、最小延时、平均延时以及总延时
  • clientResponseStats 提供 jute 序列化缓冲区使用情况的实时统计信息
  • fsyncThresholdExceedCount 同步内存数据到存储设备超过阈值的次数
  • startTime zk 启动后开始统计的时间

2.5. Registers shutdown handler

ZooKeeperServerShutdownHandler 是 zk 用于处理异常的组件。当系统发生错误时,会使用 CountDownLatch 通知其他线程停止工作

class ZooKeeperServerShutdownHandler {
	
    private final CountDownLatch shutdownLatch;

    ZooKeeperServerShutdownHandler(CountDownLatch shutdownLatch) {
        this.shutdownLatch = shutdownLatch;
    }

    /**
     * This will be invoked when the server transition to a new server state.
     *
     * @param state new server state
     */
    void handle(State state) {
        if (state == State.ERROR || state == State.SHUTDOWN) {
            shutdownLatch.countDown();
        }
    }
}

2.6. 启动 AdminServer

AdminServer 用来管理 ZooKeeperServer。有两种实现方式 JettyAdminServer 和 DummyAdminServer。

zookeeper.admin.enableServer 为 true 时才启动 AdminServer,通过反射的方式创建实例

public static AdminServer createAdminServer() {
    if (!"false".equals(System.getProperty("zookeeper.admin.enableServer"))) {
        try {
            Class<?> jettyAdminServerC = Class.forName("org.apache.zookeeper.server.admin.JettyAdminServer");
            Object adminServer = jettyAdminServerC.getConstructor().newInstance();
            return (AdminServer) adminServer;

        } catch (ClassNotFoundException e) {
            LOG.warn("Unable to start JettyAdminServer", e);
        }
    }
    return new DummyAdminServer();
}

创建完 AdminServer 后设置 ZooKeeperServer,启动 jetty 容器

adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();

Jetty 容器启动代码如下:

@Override
public void start() throws AdminServerException {
    try {
        server.start();
    } catch (Exception e) {
        // Server.start() only throws Exception, so let's at least wrap it
        // in an identifiable subclass
        throw new AdminServerException(String.format(
                "Problem starting AdminServer on address %s,"
                        + " port %d and command URL %s", address, port,
                commandUrl), e);
    }
    LOG.info(String.format("Started AdminServer on address %s, port %d"
            + " and command URL %s", address, port, commandUrl));
}

2.7. 创建 ServerCnxnFactory

早期版本,都是自己实现 NIO 框架,从 3.4.0 版本引入了 Netty,可以通过 zookeeper.serverCnxnFactory 来指定使用 NIO 还是 Netty 作为 Zookeeper 服务端网络连接工厂。

2.8. 初始化 ServerCnxnFactory

Zookeeper 首先会初始化一个 Thread,作为整个 ServerCnxnFactory 的主线程,然后再初始化 NIO 服务器。

// 初始化selectorThreads
for (int i = 0; i < numSelectorThreads; ++i) {
    selectorThreads.add(new SelectorThread(i));
}
// 初始化expirerThread
expirerThread = new ConnectionExpirerThread();

// 初始化acceptThread
acceptThread = new AcceptThread(ss, addr, selectorThreads);

2.9. 启动 ServerCnxnFactory 主线程

启动步骤 2.7 中已经初始化的主线程的主逻辑(run 方法)。这时候 ZooKeeper 的 NIO 服务器已经对外开放端口,客户端能够访问 2181 端口,但是此时 ZooKeeper 服务器是无法正常处理客户端请求的

@Override
public void start() {
    stopped = false;
    if (workerPool == null) {
        workerPool = new WorkerService(
                "NIOWorker", numWorkerThreads, false);
    }
    for (SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
    // ensure thread is started once and only once
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}

2.10. 恢复本地数据

从本地快照和事务日志文件中进行数据恢复。详细的数据恢复过程后面专题会讲

public void startdata() throws IOException, InterruptedException {
    //check to see if zkDb is not null
    if (zkDb == null) {
        zkDb = new ZKDatabase(this.txnLogFactory);
    }
    if (!zkDb.isInitialized()) {
        loadData();
    }
}

2.11. 创建并启动会话管理器

创建会话管理器 SessionTracker,SessionTracker 主要负责 ZooKeeper 服务端的会话管理,创建 SessionTracker 时,会设置 expireInterval、NextExpirationTime 和 SessionWithTimeout,还会计算出一个初始化的 SessionID

if (sessionTracker == null) {
    createSessionTracker();
}
startSessionTracker();

2.12. 初始化 ZooKeeper 的请求处理链

典型的责任链方式实现,在 ZooKeeper 服务器上,会有多个请求处理器一次来处理一个客户端请求,在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessorSyncRequestProcessorFinalRequestProcessor

PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
单机版 ZooKeeper 服务器的请求处理链

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

2.13. 注册 JMX 服务

ZooKeeper 会将服务器运行时的一些信息以 JMS 的方式暴露给外部,具体实现如下:

protected void registerJMX() {
    // register with JMX
    try {
        jmxServerBean = new ZooKeeperServerBean(this);
        MBeanRegistry.getInstance().register(jmxServerBean, null);

        try {
            jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxDataTreeBean = null;
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxServerBean = null;
    }
}

2.14. 注册 ZooKeeper 服务器实例

前面的步骤过后,ZooKeeper 已经将 ServerCnxnFactory 主线程启动,但是同时 ZooKeeper 依旧无法处理客户端请求,原因是网络层不能够访问 ZooKeeper 实例,经过后续步骤的初始化,ZooKeeper 服务器实例已经初始化完毕,只需要注册给 ServerCnxnFactory 即可,之后 ZooKeeper 就可以对外提供正常的服务了。

final public void setZooKeeperServer(ZooKeeperServer zks) {
    this.zkServer = zks;
    if (zks != null) {
        if (secure) {
            zks.setSecureServerCnxnFactory(this);
        } else {
            zks.setServerCnxnFactory(this);
        }
    }
}

至此,单机版的 ZooKeeper 服务器启动完毕。

3. 流程总结

1

  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 15 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...