ZooKeeper 源码分析 (五) - 数据与存储

本贴最后更新于 1820 天前,其中的信息可能已经沧海桑田

这篇文章我们来看下 zk 底层数据存储的技术细节,在 ZooKeeper 中,数据存储分为内存数据存储磁盘数据存储

1. 内存数据

ZooKeeper 的数据模型是一棵树,可以类比为一个内存数据库,存储了整棵树的内容,包括节点路径,节点数据及其 ACL 信息等,ZooKeeper 会定时将这个数据存储到磁盘上。接下来看一下几个关键的数据模型。

1.1. DataTree

ZooKeeper 内存数据存储的核心组件,代表了内存中一份完整的数据 。

DataTree 的数据结构如下:
WX201904010745032x.png

1.2. DataNode

DataNode 是数据存储的最小单元,包含着对父节点的引用。还有其他几个属性

byte [] data : 保存节点的数据
Long acl: acl map长度
StatPersisted stat :节点状态
Set<String> children : 当前节点的子节点列表

还提供了几个操作接口:
添加子节点:往 set 集合中添加子节点信息

public synchronized boolean addChild(String child) {
    if (children == null) {
        // let's be conservative on the typical number of children
        children = new HashSet<String>(8);
    }
    return children.add(child);
}

删除子节点:从 set 集合中移除子节点信息

public synchronized boolean removeChild(String child) {
    if (children == null) {
        return false;
    }
    return children.remove(child);
}

查询子节点:当前 node 的所有子节点

public synchronized Set<String> getChildren() {
    if (children == null) {
        return EMPTY_SET;
    }
    return Collections.unmodifiableSet(children);
}

1.3. nodes

DataTree 存储所有 ZooKeeper 节点的路径、数据内容及其 ACL 信息,底层的数据结构就是一个典型的 ConcurrentHashMap 结构

private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

对于 ZooKeeper 数据的所有操作,底层都是对这个 map 结构的操作,nodes 以数据节点的 path 作为 key,value 则是节点的数据内容 DataNode。

对于所有的临时节点,为了方便实时访问跟清理,DataTree 单独将临时节点保存起来:

private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();

1.4. ZKDatabase

ZooKeeper 的内存数据库,负责管理 ZooKeeper 所有会话、datatree 存储和事务日志,zkdatabase 会定时向磁盘 dump 快照数据,同时在 ZooKeeper 服务器启动时,通过磁盘恢复内存数据。

写入事务的触发入口在 SyncRequestProcessor 中,前面讲过,zk 请求的处理是采用责任链的方式来处理,这就是其中一个链条。主要逻辑在 run()中处理,看一段处理过程的代码:

// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
    logCount++;
    if (logCount > (snapCount / 2 + randRoll)) {
        randRoll = r.nextInt(snapCount/2);
        // roll the log
        zks.getZKDatabase().rollLog();
        // take a snapshot
        if (snapInProcess != null && snapInProcess.isAlive()) {
            LOG.warn("Too busy to snap, skipping");
        } else {
            snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                    public void run() {
                        try {
                            zks.takeSnapshot();
                        } catch(Exception e) {
                            LOG.warn("Unexpected exception", e);
                        }
                    }
                };
            snapInProcess.start();
        }
        logCount = 0;
    }
}

通过 getZKDatabase().append(si) 来写入日志。

2. 事务日志

在了解事务写入过程之前,我们看看日志格式,事务日志是在 append(si) 方法里进行日志写入的。

如果 logStream==null 时,创建文件:logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));,这里使用 Util.makeLogName 来生成文件内容。我们看下具体格式

//  LOG_FILE_PREFIX常量为 `log`
public static String makeLogName(long zxid) {
    return FileTxnLog.LOG_FILE_PREFIX + "." + Long.toHexString(zxid);
}

后缀是根据事务 ID (zxid)生成一个 16 进制的数据。这样做的目的是为了方便的根据事务 ID 查询相关日志。另外 zxid 生成本身是有规律的(高 32 位代表当前 Leader 周期 epoch,低 32 位是操作序列好),因此将 zxid 作为文件后缀,我们清楚的看出当前运行时 ZooKeeper 的 leader 周期
生成的日志名称是类似于这样的:

log.2c01631713

2.3. 日志写入

日志写入的过程为:

  • 确定是否有事务日志可写。
    ZooKeeper 会首先判断 FileTxnLog 是否关联上一个可写的事务日志文件,如果没有,则使用与该事务操作关联的 ZXID 作为后置创建一个事务日志文件,同时构建文件头信息(包含魔数 magic,事务格式版本 version 和 dbid),并立即写入到事务日志文件中去。同时将该文件流放入一个集合 streamsToFlush

  • 确定事务日志是否需要扩容
    检测当前事务日志文件剩余空间不足 4KB 时,就会开始文件空间扩容

  • 事务序列化
    包括对事务体(Record)和事务头(TxnHeader)的序列化

  • 生成 CheckSum
    此步骤是为了保证日志文件的完整性数据的准确性。会根据序列化前的字节数组大小来计算 Checksum。

  • 事务日志写入文件流
    将序列化后的事务头,事务体及 Checksum 值写入到文件流中去

  • 事务日志刷入磁盘
    上一步骤已经把事务操作写入了文件流,但是由于缓存的原因,无法实时的写入磁盘文件,需要强制刷入磁盘。

没有事务日志文件可写时,创建一个事务日志文件。

if (logStream==null) {
   if(LOG.isInfoEnabled()){
        LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
   }
   logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
}

生成 File 流

fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);

构建文件头信息

FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);

文件头写入事务日志

logStream.flush();
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos);

判断是否需要扩容

public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
    // If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
    if (preAllocSize > 0 && position + 4096 >= fileSize) {
        // If we have written more than we have previously preallocated we need to make sure the new
        // file size is larger than what we already have
        if (position > fileSize) {
            fileSize = position + preAllocSize;
            fileSize -= fileSize % preAllocSize;
        } else {
            fileSize += preAllocSize;
        }
    }
    return fileSize;
}

文件写入

long padFile(FileChannel fileChannel) throws IOException {
    long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
    if (currentSize != newFileSize) {
        fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
        currentSize = newFileSize;
    }
    return currentSize;
}

序列化事务头和事务体

public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn)
        throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    OutputArchive boa = BinaryOutputArchive.getArchive(baos);

    hdr.serialize(boa, "hdr");
    if (txn != null) {
        txn.serialize(boa, "txn");
    }
    return baos.toByteArray();
}

生成 checkSum

Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);

事务日志写入流并且刷入磁盘文件

oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);

2.4. 日志截断

ZooKeeper 运行过程中,可能会出现这样的情况,非 Leader 上记录的事务 ID 大于 Leader 服务器,这是一个非法的运行状态,在 zk 中只要存在 leader,所有机器的数据必须和 leader 保持一致。

如果出现上述情况,Leader 就会发送 TRUNC 命令给这个机器,要求其进行日志截断。
这段逻辑在 LearnerHandler run 方法 syncFollower 中处理:

boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
实现逻辑比较复杂,就不贴代码,有兴趣可以自己看。

3. 数据快照

用来记录 ZooKeeper 服务器上某一个时刻的全量内存数据内容,并将其写入到指定的磁盘文件中。
数据快照也是使用磁盘目录进行存储,可以通过 dataDir 属性进行配置。命名方式跟实物文件命名类似:snapshot.${ZXID},ZXID 被转换为十六进制的数。
下面看一下生成数据快照的过程:

  • 确定是否需要进行数据快照
    每进行一次实物日志记录之后,ZooKeeper 都会检测当前是否需要进行数据快照。理论上进行 snapCount 次事务操作后就会开始数据快照。但是数据快照对机器性能有影响,为了避免集群中所有机器统一时刻都在进行数据快照,ZooKeeper 在具体实现过程中,采用了“过半随机”策略。

  • 切换事务日志文件
    是指当前的事务日志已经写满了,需要重新创建一个新的事务日志。

  • 创建数据快照异步线程
    为了不影响主流程,需要创建一个单独的异步线程来进行数据快照

  • 获取全量数据和会话信息

  • 生成快照文件名

  • 数据序列化

logCount > (snapCount / 2 + randRoll) 当满足这个条件时,才进行数据快照。logCount 表示当前已经记录的事务日志数量。randRoll 为 1~snapCount / 2 之前的随机数。

重新写入一个事务日志文件之前,要清空流中的信息,进行覆盖操作
zks.getZKDatabase().rollLog();

数据快照生成过程,snapInProcess 正在运行时采用丢弃策略。否则生成一个异步线程进行数据快照

if (snapInProcess != null && snapInProcess.isAlive()) {
    LOG.warn("Too busy to snap, skipping");
} else {
    snapInProcess = new ZooKeeperThread("Snapshot Thread") {
            public void run() {
                try {
                    zks.takeSnapshot();
                } catch(Exception e) {
                    LOG.warn("Unexpected exception", e);
                }
            }
        };
    snapInProcess.start();
}

保存快照数据的主要逻辑如下。
生成文件名,创建文件、数据写入、序列化等。

public void save(DataTree dataTree,
                 ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
                 boolean syncSnap)
    throws IOException {
    long lastZxid = dataTree.lastProcessedZxid;
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
            snapshotFile);
    try {
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
    } catch (IOException e) {
        throw e;
    }
}

4. 初始化

初始化的过程在之前一篇文章 ZooKeeper 源码分析(二)—服务端启动之集群模式中简单提到过,这篇文章详细看下。
先借用一张图来展示数据初始化的流程。

WX201904040851572x.png

4.1. 初始化 FileTxnSnapLog

项目启动时执行,在 runFromConfig

txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
            final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
                    config.tickTime, config.minSessionTimeout, config.maxSessionTimeout,
                    config.listenBacklog, null);

4.2. 初始化 ZKDatabase

初始化 data 最终是要拿到最新的事务 ID,这里会判断如果已经初始化完成,则直接获取最新的事务 ID,否则走加载数据的过程获取最新事务 ID。

public void loadData() {
	if(zkDb.isInitialized()){
            setZxid(zkDb.getDataTreeLastProcessedZxid());
	}
	else {
	    setZxid(zkDb.loadDataBase());
	}
}

加载数据时,会先初始化一个 DataTree,保存了 zk 上所有的节点信息。还会创建 sessionsWithTimeouts 用于保存所有客户端会话超时时间的记录器。

public long loadDataBase() throws IOException {
    long startTime = Time.currentElapsedTime();
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    long loadTime = Time.currentElapsedTime() - startTime;
    ServerMetrics.DB_INIT_TIME.add(loadTime);
    LOG.info("Snapshot loaded in " + loadTime + " ms");
    return zxid;
}

4.3. 创建 PlayBackListener 监听器

long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); 传入了监听器信息,用来接收事务应用过程中的回调,通过这个监听器来进行数据订正。

4.4. 处理快照文件

先反序列化快照文件中数据,判断是不是存在文件。
long deserializeResult = snapLog.deserialize(dt, sessions);

在文件路径下查找是否已经存在文件,如果存在则删除文件重新创建,然后把 trustEmptyDB 设置为 true。

File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
    LOG.info("Initialize file found, an empty database will not block voting participation");
    trustEmptyDB = true;
} else {
    trustEmptyDB = autoCreateDB;
}

如果序列化结果 deserializeResult = -1 ,表示没有找到任何快照,需要初始化一个空的 database。
否则走 fastForwardFromEdits 方法进行快照数据的解析,快速恢复内存数据。

if (-1L == deserializeResult) {
    /* this means that we couldn't find any snapshot, so we need to
     * initialize an empty database (reported in ZOOKEEPER-2325) */
    if (txnLog.getLastLoggedZxid() != -1) {
        throw new IOException(
                "No snapshot found, but there are log entries. " +
                "Something is broken!");
    }

    if (trustEmptyDB) {
        /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
         *       or use Map on save() */
        save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);

        /* return a zxid of 0, since we know the database is empty */
        return 0L;
    } else {
        /* return a zxid of -1, since we are possibly missing data */
        LOG.warn("Unexpected empty data tree, setting zxid to -1");
        dt.lastProcessedZxid = -1L;
        return -1L;
    }
}
return fastForwardFromEdits(dt, sessions, listener);

4.5. 解析快照文件

先根据 dataTree 从快照文件中找事务 ID 为 lastProcessedZxid + 1 的数据,如果解析出来的 TxnHeader 为空,则返回 lastProcessedZxid 作为最新的事务 ID,数据恢复过程结束。
如果 TxnHeader 不为空且 hdr.getZxid() < highestZxid,这是种异常情况,会打印异常日志
如果 TxnHeader 不为空且 hdr.getZxid() <= highestZxid,则 highestZxid = hdr.getZxid();

然后调用 processTransaction 方法处理事务日志。同时调用 listener.onTxnLoaded(hdr, itr.getTxn()); 进行已提交事务日志的保存。

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                     PlayBackListener listener) throws IOException {
        TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
        long highestZxid = dt.lastProcessedZxid;
        TxnHeader hdr;
        try {
            while (true) {
                // iterator points to
                // the first valid txn when initialized
                hdr = itr.getHeader();
                if (hdr == null) {
                    //empty logs 已经是最新的事务ID,数据恢复结果,
                    return dt.lastProcessedZxid;
                }
		// 异常情况
                if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                    LOG.error("{}(highestZxid) > {}(next log) for type {}",
                            highestZxid, hdr.getZxid(), hdr.getType());
                } else {
		    // TxnHeader中的zxid作为最新的事务ID
                    highestZxid = hdr.getZxid();
                }
                try {
                    // 事务日志的处理过程
                    processTransaction(hdr,dt,sessions, itr.getTxn());
                } catch(KeeperException.NoNodeException e) {
                   throw new IOException("Failed to process transaction type: " +
                         hdr.getType() + " error: " + e.getMessage(), e);
                }
                // 把当前快照数据当做已提交的事务日志保存
                listener.onTxnLoaded(hdr, itr.getTxn());
                if (!itr.next())
                    break;
            }
        } finally {
            if (itr != null) {
                itr.close();
            }
        }
        return highestZxid;
    }

4.6. 处理事务日志

处理事务的逻辑在 processTransaction()中,先根据 TxnHeader 的 type 来判断是哪种日志类型

  • 如果是 createSession,则在 sessions 的 Map 中保存子节点的信息,然后进行日志处理
  • 如果是 closeSession,则在 sessions 的 Map 中移除子节点的信息,然后进行日志处理
  • 其他类型时,只进行日志处理

processTxn 处理过程是把快照数据解析成一个个 DataNode,然后保存到 dataTree 中。会有异步的流程持久化到磁盘。

public void processTransaction(TxnHeader hdr,DataTree dt,
            Map<Long, Integer> sessions, Record txn)
    throws KeeperException.NoNodeException {
    ProcessTxnResult rc;
    switch (hdr.getType()) {
    case OpCode.createSession:
        sessions.put(hdr.getClientId(),
                ((CreateSessionTxn) txn).getTimeOut());
        // give dataTree a chance to sync its lastProcessedZxid
        rc = dt.processTxn(hdr, txn);
        break;
    case OpCode.closeSession:
        sessions.remove(hdr.getClientId());
        rc = dt.processTxn(hdr, txn);
        break;
    default:
        rc = dt.processTxn(hdr, txn);
    }
}

5. 数据同步

前几篇文章提到过,整个集群完成 leader 选举之后,Learner 会向 Leader 服务器进行注册,当注册完成后,就进入数据同步环节。数据同步就是 Leader 服务器将那些没有在 Learner 服务器上提交过的事务请求同步给 Learner 服务器。大体过程如下:

WX201904040841032x.png

主要逻辑在 LearnerHandler run() 方法中处理。

5.1. 获取 Leader 状态

在注册最后阶段,Learner 服务器会发送给 Leader 服务器一个 ACKEPOCH 数据包,leader 会从这个数据包中解析出该 Learner 的 currentEpoch 和 lastZxid。

long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
peerLastZxid = ss.getLastZxid();

5.2. 数据同步初始化

在开始数据同步之前,Leader 服务器会进行数据同步初始化,首先从 ZooKeeper 的内存数据库中提取事务请求的对应的提议缓存队列:Proposals,同时完成对以下三个 ZXID 值的初始化。

  • peerLastZxid:该 Learner 服务器最后处理的 ZXID
  • minCommittedLog:Leader 服务器提议缓存队列 committedLog 的最小 ZXID
  • maxCommittedLog:Leader 服务器提议缓存队列 committedLog 的最大 ZXID

ZooKeeper 集群数据同步通常分为四类,分别是直接差异化同步(DIFF 同步)、先回滚再差异化同步(TRUNC+DIFF 同步)、仅回滚同步(TRUNC 同步)和全量同步(SNAP 同步),在初始化阶段,服务器优先以全量同步方式来同步数据。

5.3. 全量同步(SNAP)

全量同步的场景:

  • peerLastZxid < minCommittedLog 时进行全量同步。
  • Leader 服务器上没有提议缓存队列,peerLastZxid 不等于 lastProcessZxid。

全量同步就是 leader 服务器将本机上全量的内存数据都同步给 learner,Leader 服务器首先向 Learner 发送一个 SNAP 指令,通知 Learner 即将进行全量数据同步。随后,Leader 从内存数据库中获取到全量的数据节点和会话超时时间记录器,序列化后传输给 Learner。Learner 收到全量数据后,反序列化后载入内存数据库。

else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
    // Use txnlog and committedLog to sync

    // Calculate sizeLimit that we allow to retrieve txnlog from disk
    long sizeLimit = db.calculateTxnLogSizeLimit();
    // This method can return empty iterator if the requested zxid
    // is older than on-disk txnlog
    Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
            peerLastZxid, sizeLimit);
    if (txnLogItr.hasNext()) {
        LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
        currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                             minCommittedLog, maxCommittedLog);

        LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
        Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
        currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                             null, maxCommittedLog);
        needSnap = false;
    }
    // closing the resources
    if (txnLogItr instanceof TxnLogProposalIterator) {
        TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
        txnProposalItr.close();
    }
}

5.4. 仅同步回滚(TRUNC)

如果(peerLastZxid > maxCommittedLog 时仅同步回滚。
是先回滚再差异化同步的简化模式,Leader 会要求 Learner 回滚到 ZXID 值为 maxCommittedLog 对应的事务操作。

else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
    // Newer than committedLog, send trunc and done
    LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
              Long.toHexString(maxCommittedLog) +
              " for peer sid:" +  getSid());
    queueOpPacket(Leader.TRUNC, maxCommittedLog);
    currentZxid = maxCommittedLog;
    needOpPacket = false;
    needSnap = false;
} 

5.5. 先回滚再差异化同步(TRUNC+DIFF)

使用场景是:Leader 服务器已经将事务记录到了本地事务日志中,但是没有成功发起 Proposal 流程的时候挂掉。这时候 peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间。这个特殊场景就使用先回滚再差异化同步。

5.6. 直接差异化同步(DIFF)

peerLastZxid 在 maxCommittedLog 和 minCommittedLog 之间时进行差异化同步。
Leader 会首先向这个 Learner 发送一个 DIFF 指令,通知 Learner 进入差异化数据同步阶段,Leader 服务器即将把一些 Proposal 同步给自己。后续... 省略

 else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
    // Follower is within commitLog range
    LOG.info("Using committedLog for peer sid: " +  getSid());
    Iterator<Proposal> itr = db.getCommittedLog().iterator();
    currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                         null, maxCommittedLog);
    needSnap = false;
}

终于完了,一些细节还需要仔细推敲。。。

  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 17 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...