2024年1月

本文详细分析一下zookeeper的数据存储。

ZKDatabase

维护zookeeper服务器内存数据库,包括session、dataTree和committedlog数据,从磁盘读取日志和快照后启动。

关键字段

// 数据节点树
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected FileTxnSnapLog snapLog; // 用于操作底层数据文件
// committedLog中第一条和最后一条数据的zxid
protected long minCommittedLog, maxCommittedLog;
// committedLog最大容量,默认500
public int commitLogCount;
// 维护最后提交的请求集,可用于快速follower同步
protected Queue<Proposal> committedLog = new ArrayDeque<>();

protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
private volatile boolean initialized = false;

// txnlog计数
private AtomicInteger txnCount = new AtomicInteger(0);

构造方法

public ZKDatabase(FileTxnSnapLog snapLog) {
    dataTree = createDataTree();
    sessionsWithTimeouts = new ConcurrentHashMap<>();
    this.snapLog = snapLog;

    // 初始化snapshotSizeFactor默认0.33
    // 初始化commitLogCount默认500
}

public DataTree createDataTree() {
    return new DataTree();
}

创建DataTree对象:创建/zookeeper/quota、/zookeeper/config节点,创建dataWatches和childWatches对象(使用WatchManager实现类)。

主要方法

// 返回committedLog集
public synchronized Collection<Proposal> getCommittedLog();
// 返回dataTree.lastProcessedZxid的值
public long getDataTreeLastProcessedZxid();
// 返回dataTree.getSessions()集
public Collection<Long> getSessions();
// 返回sessionsWithTimeouts的size
public long getSessionCount();
// 从磁盘加载dataTree并把txnLog加载到committedLog中
public long loadDataBase() throws IOException;
// 从磁盘加载txnLog到committedLog中
public long fastForwardDataBase() throws IOException;
// 使用addCommittedProposal方法添加committedLog
private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest);
// 添加committedLog
public void addCommittedProposal(Request request);
// 从txnLog加载Proposal
public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit);
// 使用dataTree.removeCnxn(cnxn)
public void removeCnxn(ServerCnxn cnxn);
// 使用dataTree.killSession(sessionId, zxid)
public void killSession(long sessionId, long zxid);
// 使用dataTree.dumpEphemerals(pwriter)
public void dumpEphemerals(PrintWriter pwriter);
// 使用dataTree.getEphemerals()
public Map<Long, Set<String>> getEphemerals();
// 使用dataTree.getNodeCount()
public int getNodeCount();
// 使用dataTree.getEphemerals(sessionId)
public Set<String> getEphemerals(long sessionId);
// 给dataTree.lastProcessedZxid赋值
public void setlastProcessedZxid(long zxid);
// 使用dataTree.processTxn(hdr, txn, digest)
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest);
// 使用dataTree.statNode(path, serverCnxn)
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException;
// 使用dataTree.getNode(path)
public DataNode getNode(String path);
// 使用dataTree.getData(path, stat, watcher)
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
// 使用dataTree.setWatches方法实现
public void setWatches(long relativeZxid, List<String> dataWatches,
                       List<String> existWatches, List<String> childWatches,
                       List<String> persistentWatches, List<String> persistentRecursiveWatches,
                       Watcher watcher);
// 使用dataTree.addWatch(basePath, watcher, mode)
public void addWatch(String basePath, Watcher watcher, int mode);
// 使用dataTree.getChildren(path, stat, watcher)
public List<String> getChildren(
    String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
// 使用dataTree.getAllChildrenNumber(path)
public int getAllChildrenNumber(String path) throws KeeperException.NoNodeException;
// Truncate the ZKDatabase to the specified zxid
public boolean truncateLog(long zxid) throws IOException;
// Deserialize a snapshot from an input archive
public void deserializeSnapshot(InputArchive ia) throws IOException;
// Deserialize a snapshot that contains FileHeader from an input archive
// It is used by the admin restore command
public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException;
// Serialize the snapshot
public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException;
// 使用snapLog.append(si)保存数据,txnCount++
public boolean append(Request si) throws IOException;
// 使用snapLog.rollLog()滚动底层txnLog
public void rollLog() throws IOException;
// 使用snapLog.commit()提交底层txnLog
public void commit() throws IOException;
// 初始化/zookeeper/config数据,集群启动时已介绍
public synchronized void initConfigInZKDatabase(QuorumVerifier qv);
// 使用dataTree.containsWatcher(path, type, watcher)
public boolean containsWatcher(String path, WatcherType type, Watcher watcher);
// 使用dataTree.removeWatch(path, type, watcher)
public boolean removeWatch(String path, WatcherType type, Watcher watcher);

loadDataBase方法

从磁盘加载dataTree并把txnLog加载到committedLog中:

public long loadDataBase() throws IOException {
    long startTime = Time.currentElapsedTime();
    // 1. 从snapshot加载dataTree
    // 2. 使用fastForwardFromEdits方法从txnLog加载dataTree和committedlog
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    // 略
    return zxid;
}

fastForwardDataBase方法

从txnLog加载dataTree和committedlog集:

public long fastForwardDataBase() throws IOException {
    // 会通过commitProposalPlaybackListener调用addCommittedProposal添加committedlog
    long zxid = snapLog.fastForwardFromEdits(
        dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    return zxid;
}

addCommittedProposal方法

private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
    Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
    r.setTxnDigest(digest);
    addCommittedProposal(r);
}

public void addCommittedProposal(Request request) {
    WriteLock wl = logLock.writeLock();
    try {
        wl.lock();
        if (committedLog.size() > commitLogCount) {
            committedLog.remove();
            minCommittedLog = committedLog.peek().packet.getZxid();
        }
        if (committedLog.isEmpty()) {
            minCommittedLog = request.zxid;
            maxCommittedLog = request.zxid;
        }
        byte[] data = request.getSerializeData();
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        committedLog.add(p);
        maxCommittedLog = p.packet.getZxid();
    } finally {
        wl.unlock();
    }
}

getProposalsFromTxnLog方法

从txnlog获取Proposal,只填充packet字段:

public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit) {
    if (sizeLimit < 0) {
        return TxnLogProposalIterator.EMPTY_ITERATOR;
    }

    TxnIterator itr = null;
    try {
        // 从txnLog文件读取数据
        // 底层通过FileTxnIterator类读取文件流实现
        itr = snapLog.readTxnLog(startZxid, false);

        // If we cannot guarantee that this is strictly the starting txn
        // after a given zxid, we should fail.
        if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) {
            itr.close();
            return TxnLogProposalIterator.EMPTY_ITERATOR;
        }

        if (sizeLimit > 0) {
            long txnSize = itr.getStorageSize();
            if (txnSize > sizeLimit) {
                itr.close();
                return TxnLogProposalIterator.EMPTY_ITERATOR;
            }
        }
    } catch (IOException e) {
        itr.close();
        return TxnLogProposalIterator.EMPTY_ITERATOR;
    }
    return new TxnLogProposalIterator(itr);
}

truncateLog方法

把txnlog数据truncate到指定的zxid位置,然后重新加载DataTree数据:

public boolean truncateLog(long zxid) throws IOException {
    clear();

    // truncate the log
    boolean truncated = snapLog.truncateLog(zxid);

    if (!truncated) {
        return false;
    }

    loadDataBase();
    return true;
}

deserializeSnapshot方法

public void deserializeSnapshot(InputArchive ia) throws IOException {
    clear();
    SerializeUtils.deserializeSnapshot(getDataTree(), ia, getSessionWithTimeOuts());
    initialized = true;
}

public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
    clear();

    // deserialize data tree
    final DataTree dataTree = getDataTree();
    FileSnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
    SnapStream.checkSealIntegrity(is, ia);

    // deserialize digest and check integrity
    if (dataTree.deserializeZxidDigest(ia, 0)) {
        SnapStream.checkSealIntegrity(is, ia);
    }

    // deserialize lastProcessedZxid and check integrity
    if (dataTree.deserializeLastProcessedZxid(ia)) {
        SnapStream.checkSealIntegrity(is, ia);
    }

    // compare the digest to find inconsistency
    if (dataTree.getDigestFromLoadedSnapshot() != null) {
        dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
    }

    initialized = true;
}

serializeSnapshot方法

public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
    SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
}

DataTree

维护树状结构,没有任何网络或客户端连接代码,因此可以以独立的方式进行测试。

维护两个并行的数据结构:一个从完整路径映射到DataNodes的哈希表和一个DataNodes树,对路径的所有访问都是通过哈希表进行的,只有在序列化到磁盘时才遍历DataNodes树。

关键字段

// This map provides a fast lookup to the data nodes
private final NodeHashMap nodes;
// Watcher
private IWatchManager dataWatches;
private IWatchManager childWatches;
// cached total size of paths and data for all DataNodes
private final AtomicLong nodeDataSize = new AtomicLong(0);
// This hashtable lists the paths of the ephemeral nodes of a session
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();
// This set contains the paths of all container nodes
private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<>());
// This set contains the paths of all ttl nodes
private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<>());
// This is a pointer to the root of the DataTree
private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted());
// create a /zookeeper filesystem that is the proc filesystem of zookeeper
private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
// create a /zookeeper/quota node for maintaining quota properties for zookeeper
private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
// 最新被处理的zxid
public volatile long lastProcessedZxid = 0;
  • NodeHashMap - NodeHashMapImpl实现类使用ConcurrentHashMap保存path -> DataNode数据
  • IWatchManager和Watcher - 监听器管理
  • DataNode - 封装树节点信息,包括data、children、stat等

构造方法

DataTree(DigestCalculator digestCalculator) {
    this.digestCalculator = digestCalculator;
    nodes = new NodeHashMapImpl(digestCalculator);

    // rather than fight it, let root have an alias
    nodes.put("", root); // "" -> root
    nodes.putWithoutDigest(rootZookeeper, root); // "/" -> root

    // add the proc node and quota node
    root.addChild(procChildZookeeper); // 添加zookeeper子节点
    nodes.put(procZookeeper, procDataNode); // "/zookeeper" -> procDataNode

    procDataNode.addChild(quotaChildZookeeper); // 添加quota子节点
    nodes.put(quotaZookeeper, quotaDataNode); // "/zookeeper/quota" -> quotaDataNode

    addConfigNode(); // 添加/zookeeper/config节点

    nodeDataSize.set(approximateDataSize());
    try {
        // 使用WatchManager实现类
        dataWatches = WatchManagerFactory.createWatchManager();
        childWatches = WatchManagerFactory.createWatchManager();
    } catch (Exception e) {}
}

public void addConfigNode() {
    DataNode zookeeperZnode = nodes.get(procZookeeper); // 找到/zookeeper节点
    if (zookeeperZnode != null) {
        zookeeperZnode.addChild(configChildZookeeper); // 添加config子节点
    }

    nodes.put(configZookeeper, new DataNode(new byte[0], -1L, new StatPersisted()));
    try {
        // Reconfig node is access controlled by default (ZOOKEEPER-2014).
        setACL(configZookeeper, ZooDefs.Ids.READ_ACL_UNSAFE, -1);
    } catch (NoNodeException e) {}
}

主要方法

// Add a new node to the DataTree
public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner,
                       int parentCVersion, long zxid, long time, Stat outputStat);
// Remove path from the DataTree
public void deleteNode(String path, long zxid);
// 为节点设置数据
public Stat setData(String path, byte[] data, int version, long zxid, long time);
// 1. 获取path的data
// 2. 如果watcher不为null则addWatch
public byte[] getData(String path, Stat stat, Watcher watcher);
// 使用node.copyStat(stat)保存stat数据
public Stat statNode(String path, Watcher watcher);

// 1. copyStat到stat中
// 2. addWatch
// 3. getChildren
public List<String> getChildren(String path, Stat stat, Watcher watcher);

// 设置、获取权限
public Stat setACL(String path, List<ACL> acl, int version);
public List<ACL> getACL(String path, Stat stat);
public List<ACL> getACL(DataNode node);
// 添加Watcher
public void addWatch(String basePath, Watcher watcher, int mode);

// 处理事务请求
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn);

// 杀会话,使用deleteNodes删除paths2DeleteLocal和paths2DeleteInTxn集
void killSession(
    long session, long zxid, Set<String> paths2DeleteLocal, List<String> paths2DeleteInTxn);
// 遍历paths2Delete调用deleteNode方法删除节点
void deleteNodes(long session, long zxid, Iterable<String> paths2Delete);
// 递归方式获取path下面的总节点数和总字节数
private void getCounts(String path, Counts counts);

// 序列化
void serializeNode(OutputArchive oa, StringBuilder path);
public void serializeNodeData(OutputArchive oa, String path, DataNode node);
public void serializeAcls(OutputArchive oa);
public void serializeNodes(OutputArchive oa);
public void serialize(OutputArchive oa, String tag);
// 反序列化
public void deserialize(InputArchive ia, String tag);
// 从dataWatches和childWatches移除watcher
public void removeCnxn(Watcher watcher);
// 触发或addWatch
public void setWatches(long relativeZxid, List<String> dataWatches,
                       List<String> existWatches, List<String> childWatches,
                       List<String> persistentWatches, List<String> persistentRecursiveWatches,
                       Watcher watcher);
// 为path设置新的cversion和zxid
public void setCversionPzxid(String path, int newCversion, long zxid);
// Add the digest to the historical list, and update the latest zxid digest
private void logZxidDigest(long zxid, long digest);
// 序列化、反序列化lastProcessedZxidDigest
public boolean serializeZxidDigest(OutputArchive oa);
public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot);
// 序列化、反序列化lastProcessedZxid
public boolean serializeLastProcessedZxid(final OutputArchive oa);
public boolean deserializeLastProcessedZxid(final InputArchive ia);
// Compares the actual tree's digest with that in the snapshot.
// Resets digestFromLoadedSnapshot after comparison.
public void compareSnapshotDigests(long zxid);
// Compares the digest of the tree with the digest present in transaction digest.
// If there is any error, logs and alerts the watchers.
public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest);

createNode方法

processTxn中会使用该方法创建节点:

public void createNode(final String path, byte[] data, List<ACL> acl,
                       long ephemeralOwner, int parentCVersion, long zxid,
                       long time, Stat outputStat) throws NoNodeException, NodeExistsException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    StatPersisted stat = createStat(zxid, time, ephemeralOwner); // Create a node stat
    DataNode parent = nodes.get(parentName); // 父节点需要存在
    synchronized (parent) {
        Long acls = aclCache.convertAcls(acl);

        Set<String> children = parent.getChildren(); // path节点不能存在

        nodes.preChange(parentName, parent); // 执行removeDigest
        if (parentCVersion == -1) {
            parentCVersion = parent.stat.getCversion();
            parentCVersion++; // childVersion递增
        }

        if (parentCVersion > parent.stat.getCversion()) {
            parent.stat.setCversion(parentCVersion); // 父节点的childVersion
            parent.stat.setPzxid(zxid); // 父节点processZxid
        }
        DataNode child = new DataNode(data, acls, stat);
        parent.addChild(childName); // 添加节点
        nodes.postChange(parentName, parent);
        nodeDataSize.addAndGet(getNodeSize(path, child.data));
        nodes.put(path, child); // 维护NodeHashMap
        EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);// 通常是VOID|NORMAL
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.add(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.add(path);
        } else if (ephemeralOwner != 0) {
            // 维护临时节点
            HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
            synchronized (list) {
                list.add(path);
            }
        }
        if (outputStat != null) {
            child.copyStat(outputStat); // 把权限保存到outputStat中
        }
    }

    // 略

    // 触发监听器
    dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
    childWatches.triggerWatch(
        parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

deleteNode方法

public void deleteNode(String path, long zxid) throws NoNodeException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);

    DataNode parent = nodes.get(parentName); // 父节点要存在
    synchronized (parent) {
        nodes.preChange(parentName, parent);
        parent.removeChild(childName); // 移除子节点
        if (zxid > parent.stat.getPzxid()) {
            parent.stat.setPzxid(zxid);
        }
        nodes.postChange(parentName, parent);
    }

    DataNode node = nodes.get(path); // 节点要存在
    nodes.remove(path); // 从NodeHashMap移除
    synchronized (node) {
        aclCache.removeUsage(node.acl);
        nodeDataSize.addAndGet(-getNodeSize(path, node.data));
    }

    synchronized (parent) {
        long owner = node.stat.getEphemeralOwner();
        EphemeralType ephemeralType = EphemeralType.get(owner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.remove(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.remove(path);
        } else if (owner != 0) { // 移除临时节点
            Set<String> nodes = ephemerals.get(owner);
            if (nodes != null) {
                synchronized (nodes) {
                    nodes.remove(path);
                }
            }
        }
    }

    // 略

    // 触发监听器
    WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
    childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
    childWatches.triggerWatch(
        "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

setData方法

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path); // 节点要存在
    byte[] lastData;
    synchronized (n) {
        lastData = n.data;
        nodes.preChange(path, n);
        n.data = data; // data赋值
        n.stat.setMtime(time); // 修改时间
        n.stat.setMzxid(zxid); // 修改的zxid
        n.stat.setVersion(version); // 版本
        n.copyStat(s); // 保存stat
        nodes.postChange(path, n);
    }

    // 略
    // 触发监听器
    dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
    return s;
}

setAcl等acl方法

public Stat setACL(String path, List<ACL> acl, int version) throws NoNodeException {
    DataNode n = nodes.get(path);
    synchronized (n) {
        Stat stat = new Stat();
        aclCache.removeUsage(n.acl);
        nodes.preChange(path, n);
        n.stat.setAversion(version); // access时间
        n.acl = aclCache.convertAcls(acl); // 设置权限
        n.copyStat(stat);
        nodes.postChange(path, n);
        return stat;
    }
}

public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
    DataNode n = nodes.get(path);
    synchronized (n) {
        if (stat != null) {
            n.copyStat(stat);
        }
        return new ArrayList<>(aclCache.convertLong(n.acl));
    }
}

public List<ACL> getACL(DataNode node) {
    synchronized (node) {
        return aclCache.convertLong(node.acl);
    }
}

addWatch方法

public void addWatch(String basePath, Watcher watcher, int mode) {
    WatcherMode watcherMode = WatcherMode.fromZooDef(mode); // PERSISTENT_RECURSIVE or PERSISTENT
    dataWatches.addWatch(basePath, watcher, watcherMode); // 只给节点添加Watcher
    if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
        childWatches.addWatch(basePath, watcher, watcherMode); // 递归添加Watcher
    }
}

processTxn方法

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        case OpCode.create: // 创建节点
            CreateTxn createTxn = (CreateTxn) txn;
            rc.path = createTxn.getPath();
            createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(),
                createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(),
                header.getZxid(), header.getTime(), null);
            break;
        case OpCode.create2: // 创建节点并保存stat
            CreateTxn create2Txn = (CreateTxn) txn;
            rc.path = create2Txn.getPath();
            Stat stat = new Stat();
            createNode(create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(),
                create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(),
                header.getZxid(), header.getTime(), stat);
            rc.stat = stat;
            break;
        case OpCode.createTTL:
            CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
            rc.path = createTtlTxn.getPath();
            stat = new Stat();
            createNode(createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(),
                EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), // ttl
                createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
            rc.stat = stat;
            break;
        case OpCode.createContainer:
            CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
            rc.path = createContainerTxn.getPath();
            stat = new Stat();
            createNode(createContainerTxn.getPath(), createContainerTxn.getData(),
                createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
            rc.stat = stat;
            break;
        case OpCode.delete:
        case OpCode.deleteContainer:
            DeleteTxn deleteTxn = (DeleteTxn) txn;
            rc.path = deleteTxn.getPath();
            deleteNode(deleteTxn.getPath(), header.getZxid()); // 删除节点
            break;
        case OpCode.reconfig:
        case OpCode.setData: // 设置节点数据
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            rc.path = setDataTxn.getPath();
            rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),
                header.getZxid(), header.getTime());
            break;
        case OpCode.setACL: // 设置ACL
            SetACLTxn setACLTxn = (SetACLTxn) txn;
            rc.path = setACLTxn.getPath();
            rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
            break;
        case OpCode.closeSession: // 关闭session
            long sessionId = header.getClientId();
            if (txn != null) {
                killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId),
                        ((CloseSessionTxn) txn).getPaths2Delete());
            } else {
                killSession(sessionId, header.getZxid());
            }
            break;
        case OpCode.error:
            ErrorTxn errTxn = (ErrorTxn) txn;
            rc.err = errTxn.getErr();
            break;
        case OpCode.check:
            CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
            rc.path = checkTxn.getPath();
            break;
        case OpCode.multi:
            // 遍历处理每一个Txn
            break;
        }
    } catch (KeeperException e) {
        rc.err = e.code().intValue();
    } catch (IOException e) {}

    //
    if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
        int lastSlash = rc.path.lastIndexOf('/');
        String parentName = rc.path.substring(0, lastSlash);
        CreateTxn cTxn = (CreateTxn) txn;
        try {
            setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
        } catch (NoNodeException e) {
            rc.err = e.code().intValue();
        }
    }
    //
    if (!isSubTxn) {
        if (rc.zxid > lastProcessedZxid) {
            lastProcessedZxid = rc.zxid; // 设置最新lastProcessedZxid
        }

        // 略
    }

    return rc;
}

serialize相关方法

void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
    String pathString = path.toString();
    DataNode node = getNode(pathString); // 查找节点
    String[] children;
    DataNode nodeCopy;
    synchronized (node) {
        StatPersisted statCopy = new StatPersisted();
        copyStatPersisted(node.stat, statCopy);
        // we do not need to make a copy of node.data because the contents are never changed
        nodeCopy = new DataNode(node.data, node.acl, statCopy);
        children = node.getChildren().toArray(new String[0]);
    }
    serializeNodeData(oa, pathString, nodeCopy); // 把节点写入到oa中
    path.append('/');
    int off = path.length();
    // 遍历子节点,将子节点写入oa中
    for (String child : children) {
        path.delete(off, Integer.MAX_VALUE);
        path.append(child);
        serializeNode(oa, path);
    }
}

// visible for test
public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
    oa.writeString(path, "path");
    oa.writeRecord(node, "node");
}

public void serializeAcls(OutputArchive oa) throws IOException {
    aclCache.serialize(oa);
}

// 序列化整个NodeHashMap对象
public void serializeNodes(OutputArchive oa) throws IOException {
    serializeNode(oa, new StringBuilder());
    // / marks end of stream
    // we need to check if clear had been called in between the snapshot.
    if (root != null) {
        oa.writeString("/", "path");
    }
}

// 完整序列化
public void serialize(OutputArchive oa, String tag) throws IOException {
    serializeAcls(oa);
    serializeNodes(oa);
}

deserialize相关方法

public void deserialize(InputArchive ia, String tag) throws IOException {
    aclCache.deserialize(ia);
    nodes.clear();
    pTrie.clear();
    nodeDataSize.set(0);
    String path = ia.readString("path");
    while (!"/".equals(path)) {
        DataNode node = new DataNode();
        ia.readRecord(node, "node");
        nodes.put(path, node);
        synchronized (node) {
            aclCache.addUsage(node.acl);
        }
        int lastSlash = path.lastIndexOf('/');
        if (lastSlash == -1) {
            root = node;
        } else {
            String parentPath = path.substring(0, lastSlash);
            DataNode parent = nodes.get(parentPath);
            if (parent == null) {
                throw new IOException(
                        "Invalid Datatree, unable to find parent " + parentPath + " of path " + path);
            }
            parent.addChild(path.substring(lastSlash + 1));
            long owner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(owner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (owner != 0) {
                HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>());
                list.add(path);
            }
        }
        path = ia.readString("path");
    }
    // have counted digest for root node with "", ignore here to avoid counting twice for root node
    nodes.putWithoutDigest("/", root);

    nodeDataSize.set(approximateDataSize());

    // we are done with deserializing the datatree update the quotas - create path trie
    // and also update the stat nodes
    setupQuota();

    aclCache.purgeUnused();
}

FileTxnSnapLog

操作TxnLog和SnapShot的入口类。

构造方法会创建dataDir和snapDir目录,判断数据目录可写,创建txnLog和snapLog对象访问数据文件。

主要方法

// 从snapshots和transaction logs加载数据库
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);

// fast forward the server database to have the latest transactions in it
// This is the same as restore, but only reads from the transaction logs and not restores from a snapshot
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);

// 使用txnLog.read(zxid, fastForward)方法从指定zxid加载TxnIterator
public TxnIterator readTxnLog(long zxid, boolean fastForward);

// process the transaction on the datatree
public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions, Record txn);

// 使用txnLog.getLastLoggedZxid()方法获取last logged zxid
public long getLastLoggedZxid();

// 把datatree和sessions保存到snapshot中
public File save(
    DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap);

// 把txnLog truncate到指定的zxid
public boolean truncateLog(long zxid);

// 使用snaplog.findMostRecentSnapshot()方法加载最近snapshot文件
public File findMostRecentSnapshot();
// 使用snaplog.findNRecentSnapshots(n)方法加载n个最近snapshot文件
public List<File> findNRecentSnapshots(int n);
// 使用snaplog.findNValidSnapshots(n)方法加载n个合法snapshot文件
public List<File> findNValidSnapshots(int n);

// 获取快照文件,可能包含比给定zxid更新的事务。
// 包括起始zxid大于给定zxid的日志,以及起始zxid小于给定zxid的最新事务日志。
// 后一个日志文件可能包含超出给定zxid的事务。
public File[] getSnapshotLogs(long zxid);

// 使用txnLog.append(si)追加数据
public boolean append(Request si);
// txnLog.commit()提交数据
public void commit();

restore方法

  1. 从snapshot加载dataTree数据
  2. 从txnlog加载dataTree和committedlog数据
  3. 如果没有加载到dataTree数据,将空的dataTree数据保存到snapshot.0文件中

fastForwardFromEdits方法

从txnlog加载dataTree和committedlog数据。

processTransaction方法

public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions,
        Record txn) throws KeeperException.NoNodeException {
    ProcessTxnResult rc;
    switch (hdr.getType()) {
    case OpCode.createSession:
        sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
        // give dataTree a chance to sync its lastProcessedZxid
        rc = dt.processTxn(hdr, txn);
        break;
    case OpCode.closeSession:
        sessions.remove(hdr.getClientId());
        rc = dt.processTxn(hdr, txn);
        break;
    default:
        rc = dt.processTxn(hdr, txn);
    }
}

save方法

public File save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
        boolean syncSnap) throws IOException {
    long lastZxid = dataTree.lastProcessedZxid;
    // 文件名snapshot.${lastZxid}
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    try {
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
        return snapshotFile;
    } catch (IOException e) {
        throw e;
    }
}

truncateLog方法

public boolean truncateLog(long zxid) {
    try {
        // close the existing txnLog and snapLog
        close();

        // truncate it
        try (FileTxnLog truncLog = new FileTxnLog(dataDir)) {
            boolean truncated = truncLog.truncate(zxid);

            // re-open the txnLog and snapLog
            // I'd rather just close/reopen this object itself, however that
            // would have a big impact outside ZKDatabase as there are other
            // objects holding a reference to this object.
            txnLog = new FileTxnLog(dataDir);
            snapLog = new FileSnap(snapDir);

            return truncated;
        }
    } catch (IOException e) {
        return false;
    }
}

TxnLog接口和FileTxnLog实现类

txnlog

使用文件保存所有的事务操作,客户端的写操作会先写入txnlog文件,在follower达到quorum状态后提交到dataTree中,在ZKDatabase启动阶段,如果txnlog的zxid大于snapshot的zxid时,会加载txnlog文件数据回放事务,提交到dataTree中。

TxnLog接口

Interface for reading transaction logs.

public interface TxnLog extends Closeable {

    // Setter for ServerStats to monitor fsync threshold exceed
    void setServerStats(ServerStats serverStats);

    // roll the current log being appended to
    void rollLog() throws IOException;

    // Append a request to the transaction log with a digset
    boolean append(Request request) throws IOException;

    // Start reading the transaction logs from a given zxid
    TxnIterator read(long zxid) throws IOException;

    // the last zxid of the logged transactions
    long getLastLoggedZxid() throws IOException;

    // truncate the log to get in sync with the leader
    boolean truncate(long zxid) throws IOException;

    // the dbid for this transaction log
    long getDbId() throws IOException;

    // commit the transaction and make sure they are persisted
    void commit() throws IOException;

    // return transaction log's elapsed sync time in milliseconds
    long getTxnLogSyncElapsedTime();

    void close() throws IOException;
    void setTotalLogSize(long size);
    long getTotalLogSize();
}

FileTxnLog实现类

This class implements the TxnLog interface. It provides api's to access the txnlogs and add entries to it.
The format of a Transactional log is as follows:

   LogFile:
       FileHeader TxnList ZeroPad

   FileHeader: {
       magic 4bytes (ZKLG)
       version 4bytes
       dbid 8bytes
   }

   TxnList:
       Txn || Txn TxnList

   Txn:
       checksum Txnlen TxnHeader Record 0x42

   checksum: 8bytes Adler32 is currently used
     calculated across payload -- Txnlen, TxnHeader, Record and 0x42

   Txnlen:
       len 4bytes

   TxnHeader: {
       sessionid 8bytes
       cxid 4bytes
       zxid 8bytes
       time 8bytes
       type 4bytes
   }

   Record:
       See Jute definition file for details on the various record types

   ZeroPad:
       0 padded to EOF (filled during preallocation stage)

FileTxnLog主要方法实现

public synchronized boolean append(Request request) throws IOException {
    TxnHeader hdr = request.getHdr();
    if (hdr == null) { // 不是事务请求
        return false;
    }
    if (hdr.getZxid() <= lastZxidSeen) {
        LOG.warn("...");
    } else {
        lastZxidSeen = hdr.getZxid();
    }
    if (logStream == null) {
        // 创建新log.${hdr.zxid}文件
        logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
        fos = new FileOutputStream(logFileWrite);
        logStream = new BufferedOutputStream(fos);
        oa = BinaryOutputArchive.getArchive(logStream);
        FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); // 文件头
        long dataSize = oa.getDataSize();
        fhdr.serialize(oa, "fileheader"); // 写文件头
        logStream.flush();
        // 文件偏移量
        filePosition += oa.getDataSize() - dataSize;
        filePadding.setCurrentSize(filePosition);
        streamsToFlush.add(fos);
    }
    fileSize = filePadding.padFile(fos.getChannel(), filePosition);
    byte[] buf = request.getSerializeData();
    long dataSize = oa.getDataSize();
    Checksum crc = makeChecksumAlgorithm();
    crc.update(buf, 0, buf.length);
    oa.writeLong(crc.getValue(), "txnEntryCRC"); // checksum
    Util.writeTxnBytes(oa, buf); // 写len, hdr, txn, digest, 0x42
    unFlushedSize += oa.getDataSize() - dataSize; // 计算未flush字节数
    return true;
}

public long getLastLoggedZxid() {
    File[] files = getLogFiles(logDir.listFiles(), 0);
    long maxLog = files.length > 0 ?
        Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;
    // 最新的log文件的后缀作为zxid
    long zxid = maxLog;
    // 从文件解析最新zxid
    try (FileTxnLog txn = new FileTxnLog(logDir); TxnIterator itr = txn.read(maxLog)) {
        while (true) {
            if (!itr.next()) {
                break;
            }
            TxnHeader hdr = itr.getHeader();
            zxid = hdr.getZxid();
        }
    } catch (IOException e) {
    }
    return zxid;
}

public synchronized void rollLog() throws IOException {
    if (logStream != null) {
        this.logStream.flush(); // 把当前文件刷写出去
        prevLogsRunningTotal += getCurrentLogSize();
        this.logStream = null; // 重置相关变量,后续append时会创建新的文件
        oa = null;
        fileSize = 0;
        filePosition = 0;
        unFlushedSize = 0;
    }
}

public synchronized void commit() throws IOException {
    if (logStream != null) {
        logStream.flush(); // 刷写文件
        filePosition += unFlushedSize;
        // If we have written more than we have previously preallocated,
        // we should override the fileSize by filePosition.
        if (filePosition > fileSize) {
            fileSize = filePosition;
        }
        unFlushedSize = 0;
    }
    for (FileOutputStream log : streamsToFlush) {
        log.flush(); // 刷写文件
        if (forceSync) {
            long startSyncNS = System.nanoTime();

            FileChannel channel = log.getChannel();
            channel.force(false);

            // 略
        }
    }
    // 关闭文件流
    while (streamsToFlush.size() > 1) {
        streamsToFlush.poll().close();
    }

    // Roll the log file if we exceed the size limit
    if (txnLogSizeLimit > 0) { // 默认-1分支进不来
        long logSize = getCurrentLogSize();
        if (logSize > txnLogSizeLimit) {
            rollLog();
        }
    }
}

// FileTxnIterator封装logFile和输入流对象,可以按照协议从文件流读取txnLog数据
public TxnIterator read(long zxid) throws IOException {
    return read(zxid, true);
}
public TxnIterator read(long zxid, boolean fastForward) throws IOException {
    return new FileTxnIterator(logDir, zxid, fastForward);
}

// 将log文件truncate到指定zxid位置
public boolean truncate(long zxid) throws IOException {
    try (FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid)) {
        PositionInputStream input = itr.inputStream;
        if (input == null) {
            throw new IOException("No log files found to truncate");
        }
        long pos = input.getPosition();
        // now, truncate at the current position
        RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
        raf.setLength(pos);
        raf.close(); // 把最小的文件truncate到指定zxid位置
        while (itr.goToNextLog()) { // 删除所有>zxid的log文件
            if (!itr.logFile.delete()) {
            }
        }
    }
    return true;
}

private static FileHeader readHeader(File file) throws IOException {
    InputStream is = null;
    try {
        is = new BufferedInputStream(new FileInputStream(file));
        InputArchive ia = BinaryInputArchive.getArchive(is);
        FileHeader hdr = new FileHeader();
        hdr.deserialize(ia, "fileheader"); // 反序列化
        return hdr;
    } finally {
        // is.close();
    }
}

FileTxnIterator类

this class implements the txnlog iterator interface which is used for reading the transaction logs.

内部使用List保存着比指定zxid大或者含有指定zxid数据的log文件,初始化阶段会定位到参数zxid指定的位置,这样在后续访问时就可以从参数指定的zxid开始读取数据了。

public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException {
    this.logDir = logDir;
    this.zxid = zxid;
    init();

    if (fastForward && hdr != null) {
        while (hdr.getZxid() < zxid) { // 这里将数据移动到zxid位置
            if (!next()) {
                break;
            }
        }
    }
}

void init() throws IOException {
    storedFiles = new ArrayList<>();
    // 倒序查找log文件
    List<File> files = Util.sortDataDir(
        FileTxnLog.getLogFiles(logDir.listFiles(), 0),
        LOG_FILE_PREFIX,
        false);
    for (File f : files) {
        if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
            storedFiles.add(f);
        } else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
            // add the last logfile that is less than the zxid
            storedFiles.add(f);
            break;
        }
    }
    goToNextLog(); // 定位到下一个文件
    next(); // 定位到下一个log数据
}

SnapShot接口和FileSnap实现类

SnapShot接口

snapshot interface for the persistence layer. implement this interface for implementing snapshots.

public interface SnapShot {

    // deserialize a data tree from the last valid snapshot and return the last zxid that was deserialized
    long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;

    // persist the datatree and the sessions into a persistence storage
    void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;

    // find the most recent snapshot file
    File findMostRecentSnapshot() throws IOException;

    // get information of the last saved/restored snapshot
    SnapshotInfo getLastSnapshotInfo();

    // free resources from this snapshot immediately
    void close() throws IOException;
}

FileSnap实现类

负责存储、序列化和反序列化正确的快照。并提供对快照的访问:

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // 在snapDir下查找合法的快照文件,倒序,所以最新的在前面
    List<File> snapList = findNValidSnapshots(100);
    File snap = null;
    long snapZxid = -1;
    boolean foundValid = false;
    for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
        snap = snapList.get(i);
        snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
            InputArchive ia = BinaryInputArchive.getArchive(snapIS);
            deserialize(dt, sessions, ia); // 将数据反序列到dt
            SnapStream.checkSealIntegrity(snapIS, ia);

            // Deserializing the zxid digest from the input
            // stream and update the digestFromLoadedSnapshot.
            // 格式: zxid digestVersion digest
            if (dt.deserializeZxidDigest(ia, snapZxid)) {
                SnapStream.checkSealIntegrity(snapIS, ia);
            }

            // deserialize lastProcessedZxid and check inconsistency
            // 读lastZxid字段得到
            if (dt.deserializeLastProcessedZxid(ia)) {
                SnapStream.checkSealIntegrity(snapIS, ia);
            }

            foundValid = true;
            break;
        } catch (IOException e) {}
    }
    // 验证foundValid
    // 上次处理到的zxid
    dt.lastProcessedZxid = snapZxid;
    lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);

    // compare the digest if this is not a fuzzy snapshot, we want to compare and find inconsistent asap
    if (dt.getDigestFromLoadedSnapshot() != null) {
        dt.compareSnapshotDigests(dt.lastProcessedZxid);
    }
    return dt.lastProcessedZxid;
}

public static void deserialize(
        DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
    FileHeader header = new FileHeader(); // magic, version, dbid
    header.deserialize(ia, "fileheader"); // 解析文件头并验证magic
    if (header.getMagic() != SNAP_MAGIC) {
        throw new IOException("mismatching magic headers");
    }
    // 反序列化
    // 会话:
    //   Count Session(s)
    //   Session {id, timeout}
    // 节点:
    //   AclCache PathNode(s)
    //   PathNode {path, node}
    //   node {data, acl, stat}
    SerializeUtils.deserializeSnapshot(dt, ia, sessions);
}

protected List<File> findNValidSnapshots(int n) {
    // 在snapDir下查找快照文件,倒序,最新的在前面
    List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
    int count = 0;
    List<File> list = new ArrayList<>();
    for (File f : files) {
        try {
            if (SnapStream.isValidSnapshot(f)) { // 验证文件合法
                list.add(f);
                count++;
                if (count == n) {
                    break;
                }
            }
        } catch (IOException e) {}
    }
    return list;
}

public List<File> findNRecentSnapshots(int n) throws IOException {
    // 在snapDir下查找快照文件,倒序,最新的在前面
    List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
    int count = 0;
    List<File> list = new ArrayList<>();
    for (File f : files) {
        if (count == n) {
            break;
        }
        if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
            count++;
            list.add(f);
        }
    }
    return list;
}

protected void serialize(
        DataTree dt, Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException {
    // 验证header!=null
    header.serialize(oa, "fileheader"); // 序列化文件头
    SerializeUtils.serializeSnapshot(dt, oa, sessions); // 序列化dataTree
}

public synchronized void serialize(
        DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException {
    if (!close) {
        try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
            OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
            FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
            serialize(dt, sessions, oa, header);
            SnapStream.sealStream(snapOS, oa);

            // 序列化digest
            if (dt.serializeZxidDigest(oa)) {
                SnapStream.sealStream(snapOS, oa);
            }

            // 序列化lastProcessZxid
            if (dt.serializeLastProcessedZxid(oa)) {
                SnapStream.sealStream(snapOS, oa);
            }

            lastSnapshotInfo = new SnapshotInfo(
                Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
                snapShot.lastModified() / 1000);
        }
    } else {
        throw new IOException("FileSnap has already been closed");
    }
}

DatadirCleanupManager

启动周期任务

清理过期文件,保留最新的snapRetainCount个snapshot文件和对应的txnlog文件,将其余过期的文件删除掉。

purgeInterval参数指定执行周期(小时),默认0不开启清理功能。

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        return;
    }
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        return;
    }

    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

PurgeTask

public void run() {
    try {
        PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
    } catch (Exception e) {}
}

PurgeTxnLog.purge方法:

public static void purge(File dataDir, File snapDir, int num) throws IOException {
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }

    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

    // 倒序查找最新的num个snapshot文件
    List<File> snaps = txnLog.findNValidSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        // 删除掉zxid比snaps小的txnlog和snapshot文件
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

ContainerManager

负责清理container节点,只能有leader管理。启动后,定期检查cversion>0且没有子级的container节点和ttl节点。尝试删除节点,删除的结果并不重要。如果提议失败或容器节点不为空,则没有任何危害。

前言

在现代软件开发中,微服务架构和CQRS模式都是备受关注的技术趋势。微服务架构通过将应用程序拆分为一系列小型、自治的服务,提供了更好的可伸缩性和灵活性。而CQRS模式则通过将读操作和写操作分离,优化了系统的性能和可维护性。本文小编将为大家介绍如何在ASP.NET Core微服务架构下使用RabbitMQ来实现CQRS模式。

微服务架构的简要概览

微服务架构是一种软件架构模式,它将一个大型的单体应用程序拆分为一组小型、自治的服务,每个服务都可以独立部署、扩展和管理。每个服务都专注于一个特定的业务功能,并通过轻量级的通信机制相互协作,形成一个完整的分布式系统。

RabbitMQ在微服务中的作用

消息代理,以RabbitMQ作为示例,是微服务架构的枢纽,为服务间异步通信提供了一个健壮的机制。它们使得分离组件间的通信变得解耦合、可靠和可扩展。在下面的这段代码里面,RabbitMQ被用于给特定队列发送消息,确保服务间通信可靠。

// Example of using RabbitMQ with RabbitMQ.Client in C#
using RabbitMQ.Client;
class RabbitMQService {
    public void SendMessageToQueue(string queueName, string message) {
        var factory = new ConnectionFactory(){HostName="localhost"};
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel;
        channel.QueueDeclare(queue:queueName,durable:false,exclusive:false,autoDelete:false,arguments:null);
        var body=Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange:"",routingKey:queueName,basicProperties:null,body:body);
        Console.WriteLines($"Message sent to {queueName}:{message}");
    }
}

RabbitMQ提供了很多功能,使得针对微服务架构高度适合:

  • 可靠性:它确保消息可靠传输,支持消息识别机制。
  • 灵活性:支持多种消息模式(发布订阅,点对点)和协议(AMQP,MQTT)。
  • 可扩展:允许通过发布横跨不同节点或集群的消息来横向伸缩。

下面这段代码演示了RabbitMQ如何实现一个发布和订阅的功能。

// Example of using RabbitMQ for Publish-Subscribe
public class Publisher
{
    public void Publish(string exchangeName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
        Console.WriteLine($"Message published to {exchangeName}: {message}");
    }
}

CQRS 模式

CQRS从根本上来说是把处理命令(改变系统状态)的职责从查询(不更改状态下获取数据)中分离出来。这种分离允许对每种类型操作进行优化和裁剪。如下方的代码所示,Command Handler(命令程序)处理写操作,负责执行更新、创建或删除等改变系统状态的操作。Query Handler(查询程序)处理读操作,负责提供数据查询和展示的功能。

// Example of Command and Query models in C#
public class Command {
    public string Id {get;set;}
    public object Payload{get;set}
}

public class Query {
    public string Id(get;set;)
}
// Command Handler
public class CommandHandler {
    public void HandleCommand(Command command) {
        // Logic to process and update the system state based on the command
    }
}
// Query Handler
public class QueryHandler {
    public object HandleQuery(Query query) {
        // Logic to retrieve and return data without altering the system state
        return null;
    }
}

分离读和写操作的优势

  • 易于优化:不同模型可以为它们特定的任务进行优化。
  • 可扩展:系统可以为读和写独立扩展,优化性能。
  • 灵活性:修改写逻辑不影响读操作,在设计和迭代上提供了更大的灵活性。
// Command Handler
public class CommandHandler {
    public void HandleCommand(Command command){
        // Logic to process and update the system state based on the command
    }
}
// Query handler
public class QueryHandler{
    public object HandlerQuery(Query query) {
        // Logic to retrieve and return data without altering the system state
        return null;
    }
}

RabbitMQ与CQRS集成

在集成CQRS与RabbitMQ时,需要考虑以下因素:

  • 消息结构:以一种清晰一致的格式为命令和事件设计消息。
  • 错误处理:在消息处理中实现针对错误处理和重试的策略。
  • 消息持久性:配置队列来确保消息持久,避免数据丢失。
  • 可伸缩性:通过考虑RabbitMQ集群和负载均衡,为可伸缩提前谋划。

现在,小编以在线订单系统为场景,介绍如何集成RabbitMQ和CQRS来实现订单的异步处理。

场景:

在一个在线订单系统中,放置了新订单后,它就需要被异步处理。小编将会使用RabbitMQ来处理命令(放置订单)和事件(订单处理)。这个系统将会用队列来分离命令和事件,同时遵循CQRS原则。

设计注意事项:

  • OrderCommand:
    表示下订单的命令。
  • OrderEvent:
    表示已处理的订单。
  • Error Handling:
    对失败订单实施重试机制。

命令处理:

public class OrderCommandHandler
{
    private readonly string commandQueueName = "order_commands";

    public void SendOrderCommand(OrderCommand command)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(command));
        channel.BasicPublish(exchange: "", routingKey: commandQueueName, basicProperties: null, body: body);
        Console.WriteLine($"Order command sent: {JsonConvert.SerializeObject(command)}");
    }
    
    public void ConsumeOrderCommands()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var commandMessage = Encoding.UTF8.GetString(body);
            var orderCommand = JsonConvert.DeserializeObject<OrderCommand>(commandMessage);

            // 处理订单命令
            Task.Run(() => ProcessOrderCommand(orderCommand));

            // 确认消息
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: commandQueueName, autoAck: false, consumer: consumer);
    }
    
    private void ProcessOrderCommand(OrderCommand orderCommand)
    {
        // 异步处理订单命令的逻辑
        Console.WriteLine($"Processing order command: {JsonConvert.SerializeObject(orderCommand)}");
        
        // 下订单,执行验证
        // 如果成功,发布一个订单处理事件
        var orderEvent = new OrderEvent { OrderId = orderCommand.OrderId, Status = "Processed" };
        SendOrderProcessedEvent(orderEvent);
    }
    
    private void SendOrderProcessedEvent(OrderEvent orderEvent)
    {
        var eventQueueName = "order_events";
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderEvent));
        channel.BasicPublish(exchange: "", routingKey: eventQueueName, basicProperties: null, body: body);
        Console.WriteLine($"Order processed event sent: {JsonConvert.SerializeObject(orderEvent)}");
    }
}

为命令和事件实现消息队列

在集成RabbitMQ的基于CQRS系统中,为命令和事件建立的分离的队列能使得组件间异步通信。

public class OrderEventConsumer
{
    private readonly string eventQueueName = "order_events";

    public void ConsumeOrderEvents()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var eventMessage = Encoding.UTF8.GetString(body);
            var orderEvent = JsonConvert.DeserializeObject<OrderEvent>(eventMessage);
            Console.WriteLine($"Received order processed event: {JsonConvert.SerializeObject(orderEvent)}");
            // 处理已处理订单事件的逻辑
        };
        channel.BasicConsume(queue: eventQueueName, autoAck: true, consumer: consumer);
    }
}

异步通信和事件驱动架构

事件驱动架构中,RabbitMQ使得异步通信更加便捷,这是因为它允许组件以一种非阻塞方式对事件和消息进行响应。

public class Program
{
    public static void Main(string[] args)
    {
        var orderCommandHandler = new OrderCommandHandler();
        var orderEventConsumer = new OrderEventConsumer();

        // 举例:发送订单命令
        var orderCommand = new OrderCommand { OrderId = Guid.NewGuid(), Product = "Product A", Quantity = 2 };
        orderCommandHandler.SendOrderCommand(orderCommand);

        // 异步使用订单命令和事件
        Task.Run(() => orderCommandHandler.ConsumeOrderCommands());
        Task.Run(() => orderEventConsumer.ConsumeOrderEvents());
        Console.ReadLine(); // 保持应用程序运行
    }
}

在微服务中集成CQRS和RabbitMQ

创建服务

现在小编创建两个服务,一个用于订单消息处理(OrderComandService),一个用于订单查询处理(OrderQueryService)。

OrderComandService(订单命令服务)

// 处理命令(下订单)
public class OrderCommandService
{
    private readonly string commandQueueName = "order_commands";
    public void SendOrderCommand(OrderCommand command)
    {
        // 向RabbitMQ队列发送order命令的代码(具体可以参考前面SendOrderCommand的代码)
    }
    public void ConsumeOrderCommands()
    {
        // 从RabbitMQ队列中消费订单命令的代码(具体可以参考前面的ConsumeOrderCommands代码)
        // 异步处理接收到的命令并相应地触发事件
    }
}

OrderQueryService(订单查询服务)

// 处理查询(获取订单)
public class OrderQueryService
{
    private readonly string queryQueueName = "order_queries";
    public void SendOrderQuery(Query query)
    {
        // 向RabbitMQ队列发送order命令的代码(具体可以参考前面SendOrderCommand的代码)
    }
    public void ConsumeOrderQueries()
    {
        // 从RabbitMQ队列中接受消费订单命令的代码(具体可以参考前面的ConsumeOrderCommands代码)
        // 异步处理接收到的查询并检索订单数据
    }
}

在微服务中定义命令和查询模型

命令和查询模型

// 命令模型
public class OrderCommand
{
    public string OrderId { get; set; }
    // 其他与订单相关的字段(省略)
}
// 查询模型
public class OrderQuery
{
    public string QueryId { get; set; }
    // 其他与订单相关的字段(省略)
}

使用RabbitMQ编写订单命令和订单查询:

OrderCommandService(订单命令服务)

// 发送订单命令
OrderCommandService orderCommandService = new OrderCommandService();
OrderCommand orderCommand = new OrderCommand { OrderId = "123", /* 其他订单属性 */ };
orderCommandService.SendOrderCommand(orderCommand);
// 消费订单命令
orderCommandService.ConsumeOrderCommands();

OrderQueryService(订单查询服务)

// 发送订单查询
OrderQueryService orderQueryService = new OrderQueryService();
OrderQuery orderQuery = new OrderQuery { QueryId = "456", /* 其他订单属性 */ };
orderQueryService.SendOrderQuery(orderQuery);
// 消费订单查询
orderQueryService.ConsumeOrderQueries();

总结

在ASP.NET Core微服务架构中,使用RabbitMQ作为消息队列服务,通过实现CQRS模式(Command Query Responsibility Segregation),将写操作和读操作分离,以提高系统的性能和可伸缩性。这种组合能够实现异步通信和事件驱动架构,通过将命令发送到命令处理器执行写操作,同时使用订阅模式将事件发布给查询服务,实现实时的数据查询和更新。这样的架构使系统更具弹性和扩展性,并为开发者提供更好的工具和方法来构建复杂的分布式系统,以满足不同业务需求。

扩展链接:

Redis从入门到实践

一节课带你搞懂数据库事务!

Chrome开发者工具使用教程

如何在Web应用中添加一个JavaScript Excel查看器

高性能渲染——详解HTML Canvas的优势与性能

大家好,我是独孤风。元数据管理平台层出不穷,但目前主流的还是Atlas、Datahub、Openmetadata三家,那么我们该如何选择呢?

本文就带大家对比一下。要了解元数据管理平台,先要从架构说起。

元数据管理的架构与开源方案

下面介绍元数据管理的架构实现,不同的架构都对应了不同的开源实现。

下图描述了第一代元数据架构。它通常是一个经典的单体前端(可能是一个 Flask 应用程序),连接到主要存储进行查询(通常是 MySQL/Postgres),一个用于提供搜索查询的搜索索引(通常是 Elasticsearch),并且对于这种架构的第 1.5 代,也许一旦达到关系数据库的“递归查询”限制,就使用了处理谱系(通常是 Neo4j)图形查询的图形索引。

很快,第二代的架构出现了。单体应用程序已拆分为位于元数据存储数据库前面的服务。该服务提供了一个 API,允许使用推送机制将元数据写入系统。

第三代架构是基于事件的元数据管理架构,客户可以根据他们的需要以不同的方式与元数据数据库交互。

元数据的低延迟查找、对元数据属性进行全文和排名搜索的能力、对元数据关系的图形查询以及全扫描和分析能力。

Datahub 就是采用的这种架构。

下图是当今元数据格局的简单直观表示:

(包含部分非开源方案)

Apache Atlas

Atlas是Hadoop的数据治理和元数据框架。Atlas于2015年7月开始在Hortonworks进行孵化。

官网地址为:
https://atlas.apache.org/

源码地址为:
https://github.com/apache/atlas

目前标星1.7K,最新稳定版本2.3.0。

开发语言后端主要为Java,前端功能主要为JS实现。

特性

  • Atlas支持各种Hadoop和非Hadoop元数据类型
  • 提供了丰富的REST API进行集成
  • 对数据血缘的追溯达到了字段级别,这种技术还没有其实类似框架可以实现
  • 对权限也有很好的控制

Atlas包括以下组件:

  • 采用Hbase存储元数据
  • 采用Solr实现索引
  • Ingest/Export 采集导出组件 Type System类型系统 Graph Engine图形引擎 共同构成Atlas的核心机制
  • 所有功能通过API向用户提供,也可以通过Kafka消息系统进行集成
  • Atlas支持各种源获取元数据:Hive,Sqoop,Storm。。。
  • 还有优秀的UI支持

Atlas是Hadoop生态的嫡系,并且天然的集成在Ambari中(不过版本较低,建议自己安装)。

Atlas对Hive的支持极好,对Spark也有一定的支持。

如果熟悉Atlas的API,也可以很好的扩展。

但由于社群活跃度一般,Atlas后期更新乏力。

页面也还是老样子,新版本的页面并不完善,所以还有有很大的局限性。

DataHub (LinkedIn)

LinkedIn开源出来的,原来叫做WhereHows 。经过一段时间的发展datahub于2020年2月在Github开源。

官网地址为:
https://datahubproject.io/

源码地址为:
https://github.com/linkedin/datahub

目前标星8.8K,最新稳定版本0.12.0。

开发语言为Java和Python。

DataHub是由LinkedIn的数据团队开源的一款提供元数据搜索与发现的工具。

提到LinkedIn,不得不想到大名鼎鼎的Kafka,Kafka就是LinkedIn开源的。LinkedIn开源的Kafka直接影响了整个实时计算领域的发展,而LinkedIn的数据团队也一直在探索数据治理的问题,不断努力扩展其基础架构,以满足不断增长的大数据生态系统的需求。随着数据的数量和丰富性的增长,数据科学家和工程师要发现可用的数据资产,了解其出处并根据见解采取适当的行动变得越来越具有挑战性。为了帮助增长的同时继续扩大生产力和数据创新,创建了通用的元数据搜索和发现工具DataHub。

由于背后有商业化的规划,并且社区活跃,近两年Datahub的更新异常活跃。也将自己的定位为基于现代数据栈的元数据平台。
DataHub实现了端到端的数据发现,数据可观察性和数据治理。并且为开发人员提供了丰富的扩展接口,其目的就是应对不断变化的数据生态。事实证明,元数据管理就应该这样去建设。
DataHub提供了跨数据库、数据仓库、数据湖、数据可视化工具的搜索与发现功能。实现端到端的全流程数据血缘的构建。DataHub是实时的元数据捕捉框架,可以实时感应元数据的变化。同时支持标签,术语表,业务域等元数据的管理。DataHub还提供了丰富的权限支持。在最新的DataHub版本中,可以在页面上去进行元数据的获取操作。
DataHub支持的数据源非常丰富,如Tableai、PowerBI、Superset等数据可视化工具。
也支持Airflow、Spark、ES、Kafka、Hive、Mysql、Oracle等大数据组件的元数据的获取。

Datahub的页面经过最新的改版,规划也较为合理,美观。

Openmatadata

OpenMetadata是一个用于数据治理的一体化平台,可以帮助我们发现,协作,并正确的获取数据。

OpenMetadata提供了数据发现、数据血缘、数据质量、数据探查、数据治理和团队协作的一体化平台。它是发展最快的开源项目之一,拥有充满活力的社区,并被各行业垂直领域的众多公司采用。 OpenMetadata 由基于开放元数据标准和API 的集中式元数据存储提供支持,支持各种数据服务的连接器,可实现端到端元数据管理,让您可以自由地释放数据资产的价值。

官网地址:
https://open-metadata.org/

源码地址:
https://github.com/open-metadata/OpenMetadata

目前标星3.4K,最新版本为1.2.3。

主要开发语言,后端为Java,前端为TS。

其UI非常美观,其操作和使用逻辑,也符合业务人员的习惯。

优缺点对比

Datahub:

优势:

强大的数据发现和搜索功能,方便用户快速定位所需数据。

提供数据质量元数据,帮助用户理解和信任数据。

支持多种数据源,包括传统的关系数据库和现代的数据湖。

社区活跃,不断有新功能和改进加入。

劣势: 初学者可能会觉得界面和配置相对复杂。

在某些情况下,集成新的数据源可能需要额外的开发工作。

Atlas:

优势:

与Apache Hadoop生态系统深度集成,特别适合Hadoop用户。

提供强大的数据血缘和分类功能,有助于数据治理。

支持自定义的元数据类型和模型。

开源,有较大的社区支持和贡献。

劣势:

主要针对Hadoop生态系统,可能不适合非Hadoop环境。

用户界面和用户体验不如一些商业产品。

如何选择?

毫无疑问,从活跃度和发展趋势来看,Datahub都是目前最炙手可热的元数据管理平台。Openmatadata更有数据治理、数据资产管理平台的样子。而Atlas和Hadoop联系紧密,也有自己优势。

那么我们该如何选择呢?首先应该明确需求。

相信读到这篇文章的人,大部分还是想做一个元数据管理平台,以开展企业的数据治理工作。如果学习过DAMA的数据治理体系,我们应该知道做元数据管理要梳理好数据源都在哪,并尽可能的管理公司的全量数据。

而功能方面,是否需要数据血缘功能,术语表、标签等功能都是需要调研的内容。那我们一步步来分析。

1、梳理数据源

数据仓库与BI是大部分企业必备的,也是重要的元数据来源。不同企业的的搭建方式不同,前几年可能更多的是离线数仓,多采用Hive,Spark等大数据技术搭建。近几年数据湖技术,实时数仓技术出现,更多的企业会选择如Hudi,Iceberg等技术,而实时数仓多采用Doris,Paimon等技术,在实时处理中,还要考虑收集Flink实时计算引擎的元数据。

而部分企业也希望将业务系统,如Oracle,Mysql等数据库的元数据进行收集。

除此以外,还有一些业务元数据也是需要梳理的,一般通过接口、页面都可以操作。

原生支持所有组件的元数据管理平台是不存在的
。但是好在元数据管理平台都提供了丰富的API接口,是可以扩展的。

所以在对数据源梳理后,并结合上面元数据管理平台的特性,可以做出基本的选择。

如果企业需要管理的数据源主要是大数据组件,Hive和Spark为主,可以使用Atlas快速的搭建一个元数据管理平台,由于原生的支持,基本不需要做很多的适配,只要安装配置好就可以。

但是如果企业收集元数据不限于此,建议选择更灵活的Datahub和Openmetadata,反正都要做适配,做二次开发,不如直接选一个更灵活的。

2、明确需求

我们先来看看三个平台的功能。

Altas有搜索,数据血缘,标签,术语表等功能。

Datahub有搜索,数据血缘,数据分析,标签,术语表等功能,也可以集成数据质量框架,如GreatExceptions。

Openmetadata有搜索,数据血缘,数据质量,数据分析,标签,术语表功能,并且有团队协作的功能。

如果这些能满足公司的需要就是可以选择的,如果不能,那么多余的功能就需要另外的开发了。

二开这里简单说一下,如果是元数据管理平台+数据治理工具的组合,建议选择Datahub基本可以覆盖所有的元数据管理功能,也有很好的扩展性。

而如果想选择一个平台大而全,可以考虑在Openmetadata基础上二开,毕竟支持的功能多一些。

3、可行性

虽然完事具备,但是能不能实行,其实并不一定。实
现元数据管理的难度巨大

在项目开始之前,必须要考虑实现的难度,如果不需要二开,可能只需要有经验的技术人员或者运维人员安装好就可以。

但是如果需要二开,则必须考虑开发难度。

Atlas后端主要为Java,需要高级的Java开发人员进行钻研,如需要更改页面,也需要前端人员的配合。

Datahub后端Java和Python都有的,而核心的数据摄取部分,主要是Python为主,熟悉Python框架的同学会更好上手。如需要更改页面,也需要前端人员的配合。

Openmetadata后端为Java,前端为TS。同样都是要有相关经验的人员参与的。

元数据管理并不容易,我在搭建二开环境的过程中也是遇到了很大的困难,但是熟悉开源项目的源码对于自研项目也有着非常大的帮助,没有白走的路,越是困难收获也会更大。

欢迎加入大数据流动,共同学习元数据管理相关知识,未完待续~

wmproxy

wmproxy
已用
Rust
实现
http/https
代理,
socks5
代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现
websocket
代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

国内: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

项目设计目标

利用正则替换的能力,能把指定的字符串替换成想要的字符串。

正则库

因为rust官方团队并未将正则正式的加入到std标准库里面,目前我们引用的是
regex
也是
rust-lang
官方出品的正则库。

匹配的规则

  1. 字符匹配:正则表达式可以匹配单个字符,如字母、数字、标点符号等。常见的字符匹配包括:
  • \d
    :匹配任意数字,等价于
    [0-9]
  • \D
    :匹配任意非数字字符,等价于
    [^0-9]
  • \w
    :匹配任意字母、数字或下划线字符,等价于
    [A-Za-z0-9_]
  • \W
    :匹配任意非字母、数字或下划线字符,等价于
    [^A-Za-z0-9_]
  • .
    :匹配除换行符(\n、\r)之外的任意字符。
  1. 字符类匹配:使用字符类可以匹配指定范围内的字符。常见的字符类匹配包括:
  • [abc]
    :匹配方括号内的任意字符,例如
    a

    b

    c
  • [^abc]
    :匹配除方括号内字符之外的任意字符,例如不是
    a

    b

    c
    的字符。
  • [a-z]
    :匹配任意小写字母。
  • [A-Z]
    :匹配任意大写字母。
  • [0-9]
    :匹配任意数字。
  1. 量词匹配:用于指定字符或字符类出现的次数。常见的量词匹配包括:
  • *
    :匹配前一项0次或多次,等价于
    {0,}
  • +
    :匹配前一项1次或多次,等价于
    {1,}
  • ?
    :匹配前一项0次或1次,也就是说前一项是可选的,等价于
    {0,1}
  • {n}
    :匹配前一项恰好n次。
  • {n,}
    :匹配前一项至少n次。
  • {n,m}
    :匹配前一项至少n次,但不超过m次。
  1. 边界匹配:用于匹配字符串的边界位置。常见的边界匹配包括:
  • ^
    :匹配字符串的开头位置。
  • $
    :匹配字符串的结尾位置。
  • \b
    :匹配单词的边界位置,即字与空白间的位置。
  • \B
    :匹配非单词边界的位置。
  1. 选择、分组和引用:
  • |
    :选择符号,匹配该符号左边或右边的表达式。
  • (...)
    :将几项组合成一个单元,这个单元可通过"*"、"+"、"?" 和"|" 等符号加以修饰,也可以记住与这个组匹配的字符以便后面引用。
  • \n
    :在正则表达式中,n 是一个正整数,引用匹配到的第n个分组。
  1. 预查:预查是一种零宽断言,即匹配的是位置而不是字符。预查包括正向预查和负向预查:
  • (?=...)
    :正向肯定预查,表示要匹配的字符串后面必须紧跟着指定的模式。
  • (?!...)
    :正向否定预查,表示要匹配的字符串后面不能紧跟着指定的模式。
  • (?<=...)
    :反向肯定预查,表示要匹配的字符串前面必须紧跟着指定的模式。
  • (?<!...)
    :反向否定预查,表示要匹配的字符串前面不能紧跟着指定的模式。

需求功能

  • 需要从
    Request
    中获取
    Url
    或者
    Path
    并将其中间的某一部分替换成另一部分,且需兼容部分不需要替换,两种模式需均能正常的。

  • 需要配置中正确的读取分割的信息,如
    "{path}/ '/ro(\\w+)/(.*) {path} /ro$1/Cargo.toml' /root/README.md"
    需要正确的分割成
    {path}/
    ,
    /ro(\\w+)/(.*) {path} /ro$1/Cargo.toml
    ,
    /root/README.md
    三个部分。

需求实现

以下是一段try_paths的配置

[[http.server.location]]
rate_limit = "4m/s"
rule = "/root"
file_server = { browse = true }
proxy_pass = ""
try_paths = "{path}/ '/ro(\\w+)/(.*) {path} /ro$1/Cargo.toml' /root/README.md"

我们需要将try_paths做正确的拆分,我们需要将一个字符串按空格做分割,且如果有单引号
'
或者双引号
"
需要找到其对应的结尾,防止将其中的字符串做切割。

我们将利用以下正则,其中小括号括起来是我们将匹配的内容,用
|
则表示并行的匹配规则,当第一个没有匹配到将匹配第二个选项。

r#"([^\s'"]+)|"([^"]*)"|'([^']*)'"#

其中
([^\s'"]+)
表示非空白字符开头也非单号双引号开头,直到碰到空格或者单引号双引号停止,那我们将获取第一个匹配项
{path}/

其中
"([^"]*)"
则表示以双引号开头,中间不能添加任何双引号的其它任意字符,直到匹配到另一个双引号停止,取中间的数据,不取双引号。
'([^']*)'
则类似双引号,那么
'/ro(\\w+)/(.*) {path} /ro$1/Cargo.toml'
将是单引号开头的匹配项直到另一个单引号截止,那么匹配的结果为
/ro(\\w+)/(.*) {path} /ro$1/Cargo.toml

另一个匹配第一个规则
/root/README.md
,此时我们已经正确将数据进行切割,以下是源码实现,用lazy_static是为了只初始化一次,无需重复耗性能。

pub fn split_by_whitespace<'a>(key: &'a str) -> Vec<&'a str> {
    lazy_static! {
        static ref RE: Regex = Regex::new(r#"([^\s'"]+)|"([^"]*)"|'([^']*)'"#).unwrap();
    };
    
    let mut vals = vec![];
    for (_, [value]) in RE.captures_iter(key).map(|c| c.extract()) {
        vals.push(value);
    }
    vals
}

以下是正确的替换,假设我们收到的是
GET /root/index.html HTTP/1.1\r\n
那么第一个参数
{path}/
将通过
Request
取得为
/root/index.html/

那么第二个参数
/ro(\\w+)/(.*) {path} /ro$1/Cargo.toml
,按空格切割,切割结果有
/ro(\\w+)/(.*)
,
{path}
,
/ro$1/Cargo.toml
有三个参数,且第一个参数为正则,那么我们尝试将第一个参数正则化,与第二个参数相匹配,并替换成第三个参数的内容同时将第二个参数格式化为
/root/index.html
,那么与正则相匹配

匹配项 匹配结果
$0 /root/index.html
正则表达示里0均为整个字符串
$1 ot
(\w+)的内容
$2 index.html
(.*)的内容

那么
/ro$1/Cargo.toml
替换成结果后将为
/root/Cargo.toml
通过这种方法我们可以将任意的字符串按照一定的规则匹配成另一个字符串来达到自定义的目的。

源码实现:

pub fn format_req(req: &Request<Body>, formats: &str) -> String {
    let pw = FORMAT_PATTERN_CACHE.with(|m| {
        if !m.borrow().contains_key(&formats) {
            let p = PatternEncoder::new(formats);
            m.borrow_mut().insert(
                Box::leak(formats.to_string().clone().into_boxed_str()),
                Arc::new(p),
            );
        }
        m.borrow()[&formats].clone()
    });

    // 将其转化成Record然后进行encode
    let record = ProxyRecord::new_req(Record::builder().level(Level::Info).build(), req);
    let mut buf = vec![];
    pw.encode(&mut SimpleWriter(&mut buf), &record).unwrap();
    String::from_utf8_lossy(&buf[..]).to_string()
}

fn inner_oper_regex(req: &Request<Body>, re: &Regex, vals: &[&str]) -> String {
    let mut ret = String::new();
    let key = Self::format_req(req, vals[0]);
    for idx in 1..vals.len() {
        if idx != 1 {
            ret += " ";
        }
        let val = re.replace_all(&key, vals[idx]);
        ret += &val;
    }
    ret
}

pub fn format_req_may_regex(req: &Request<Body>, formats: &str) -> String {
    let formats = formats.trim();
    if formats.contains(char::is_whitespace) {
        // 因为均是从配置中读取的数据, 在这里缓存正则表达示会在总量上受到配置的限制
        lazy_static! {
            static ref RE_CACHES: Mutex<HashMap<&'static str, Regex>> =
                Mutex::new(HashMap::new());
        };

        if formats.len() == 0 {
            return String::new();
        }

        let vals = Self::split_by_whitespace(formats);
        if vals.len() < 2 {
            return String::new();
        }

        if let Ok(mut guard) = RE_CACHES.lock() {
            if let Some(re) = guard.get(&vals[1]) {
                return Self::inner_oper_regex(req, re, &vals[1..]);
            } else {
                if let Ok(re) = Regex::new(vals[0]) {
                    let ret = Self::inner_oper_regex(req, &re, &vals[1..]);
                    guard.insert(Box::leak(vals[0].to_string().into_boxed_str()), re);
                    return ret;
                }
            }
        }
    }
    Self::format_req(req, formats)
}

测试用例

根据Request生成我们任意想要的内容

mod tests {
    use webparse::Request;
    use wenmeng::Body;

    use crate::Helper;

    fn build_request() -> Request<Body> {
        Request::builder()
            .url("http://127.0.0.1/test/root?query=1&a=b")
            .header("Accept", "text/html")
            .body("ok")
            .unwrap()
            .into_type()
    }

    #[test]
    fn do_test_reg() {
        let req = &build_request();
        let format = r" /test/(.*) {path} /formal/$1 ";
        let val = Helper::format_req_may_regex(req, format);
        assert_eq!(val, "/formal/root");
        
        let format = r" /te(\w+)/(.*) {path} /formal/$1/$2 ";
        let val = Helper::format_req_may_regex(req, format);
        assert_eq!(val, "/formal/st/root");

        let format = r" /te(\w+)/(.*) {url} /formal/$1/$2 ";
        let val = Helper::format_req_may_regex(req, format);
        assert_eq!(val, "http://127.0.0.1/formal/st/root?query=1&a=b");
    }
}

小结

正则在计算机的处理中是非常的常用的一种技术,具有许多优点,使得它在文本处理和模式匹配方面非常强大和灵活,有强大的文本匹配和搜索功能,跨平台性跨语言,每种语言都有相应的实现,既简洁又高效便捷,是受欢迎的一种又相处较难的字符串处理技术。

点击
[关注]

[在看]

[点赞]
是对作者最大的支持

一、定义

允许一个对象在其内部状态改变时改变它的行为,对象看起来似乎修改了它的类,状态模式又称为状态对象,它是一种对象行为模式。

二、描述

状态模式是一种较为复杂的设计模式,用于解决系统中复杂对象的状态转换以及不同状态下行为的封装问题,包含以下三个角色:

1、Context(环境类):
环境类又称为上下文类,它是拥有多种状态的对象。由于环境类的状态存在多样性,且在不同状态下对象的行为有所不同,所以将状态独立出去形成单独的状态类。在环境类中维护一个抽象状态类State的实例,这个实例定义当前状态,在具体实现时,它是一个State子类的对象。

2、State(抽象状态类):
它用于定义一个接口以封装与环境类的一个特定状态相关的行为,在抽象状态类中声明了各种不同状态对应的方法,而在其子类中实现了这些方法,由于不同状态下对象的行为可能不同,因此在不同子类中方法的实现可能存在不同,相同的方法可以写在抽象状态类中。

3、ConcreteState(具体状态类):
它是抽象状态类的子类,每一个具体状态类实现一个与环境类的一个状态相关的行为,对应环境类的一个具体状态,不同的具体状态类其行为有所不同。

三、例子

X公司公司要为一银行开发一套信用卡业务系统,银行账户(Account)是该系统的核心类之一,通过分析,系统中账户存在3种状态,根据余额的不同,以上3种状态可发生相互转换,具体说明如下:
(1)如果账户中余额大于等于0,则账户的状态为正常状态(Normal State),此时用户既可以向该账户存款也可以从该账户取款。
(2)如果账户中余额小于0,并且大于一2000,则账户的状态为透支状态(Overdraft State),此时用户既可以向该账户存款也可以从该账户取款,但需要按天计算利息。
(3)如果账户中余额等于-2000,那么账户的状态为受限状态(Restricted State),此时用户只能向该账户存款,不能再从中取款,同时也将按天计算利息。


Account:银行账户,充当环境类

public class Account
{
    private AccountState state;  //维持一个对抽象状态对象的引用
    private string owner;        //开户名
    private double balance = 0;  //账户余额
    public Account(string owner, double init)
    {
        this.owner = owner;
        this.balance = init;
        this.state = new NormalState(this);
        Console.WriteLine("{0}开户,初始金额为{1}", this.owner, init);
        Console.WriteLine("------------------------------------");
    }
    //设置初始状态
    public double Balance
    {
        get { return balance; }
        set { balance = value; }
    }
    public void SetState(AccountState state)
    {
        this.state = state;
    }
    public void Deposit(double amount)
    {
        Console.WriteLine("{0}存款{1}", this.owner, amount);
        state.Deposit(amount);// 调用状态对象的Deposit()方法
        Console.WriteLine("现在余额为{0}", this.Balance);
        Console.WriteLine("现在账户状态为{0}", this.state.GetType().ToString());
        Console.WriteLine("------------------------------------");
    }
    public void Withdraw(double amount)
    {
        Console.WriteLine("{0}取款{1}", this.owner, amount);
        state.Withdraw(amount); //调用状态对象的Withdraw()方法
        Console.WriteLine("现在余额为{0}", this.Balance);
        Console.WriteLine("现在账户状态为{0}", this.state.GetType().ToString());
        Console.WriteLine("------------------------------------");
    }
    public void ComputeInterest()
    {
        state.ComputeInterest();   //调用状态对象的computeInterest方法
    }
}

AccountState:账户状态类,充当抽象状态类

public abstract class AccountState
{
    private Account acc;
    public Account Acc
    {
        get { return acc; }
        set { acc = value; }
    }
    public abstract void Deposit(double amount);
    public abstract void Withdraw(double amount);
    public abstract void ComputeInterest();
    public abstract void StateCheck();
}

NormalState、OverdraftState、RestrictedState:正常状态、透支状态、受限状态,充当具体状态类

public class NormalState : AccountState
{
    public NormalState(Account acc)
    {
        this.Acc = acc;
    }
    public NormalState(AccountState state)
    {
        this.Acc = state.Acc;
    }

    public override void Deposit(double amount)
    {
        Acc.Balance = Acc.Balance + amount;
        StateCheck();
    }
    public override void Withdraw(double amount)
    {
        Acc.Balance = Acc.Balance - amount;
        StateCheck();
    }

    public override void ComputeInterest()
    {
        Console.WriteLine("正常状态,无需支付利息!");
    }

    public override void StateCheck()
    {
        if (Acc.Balance > -2000 && Acc.Balance <= 0)
        {
            Acc.SetState(new OverdraftState(this));
        }
        else if (Acc.Balance == -2000)
        {
            Acc.SetState(new RestrictedState(this));
        }
        else if (Acc.Balance < -2000)
        {
            Console.WriteLine("操作受限!");
        }
    }
}

public class OverdraftState : AccountState
{
    public OverdraftState(AccountState state)
    {
        this.Acc = state.Acc;
    }
    public override void Deposit(double amount)
    {
        Acc.Balance = Acc.Balance + amount;
        StateCheck();
    }
    public override void Withdraw(double amount)
    {
        Acc.Balance = Acc.Balance - amount;
        StateCheck();
    }

    public override void ComputeInterest()
    {
        Console.WriteLine("计算利息!");
    }

    public override void StateCheck()
    {
        if (Acc.Balance > 0)
        {
            Acc.SetState(new NormalState(this));
        }
        else if (Acc.Balance == -2000)
        {
            Acc.SetState(new OverdraftState(this));
        }
        else if (Acc.Balance < -2000)
        {
            Console.WriteLine("操作受限!");
        }
    }
}

public class RestrictedState : AccountState
{
    public RestrictedState(AccountState state)
    {
        this.Acc = state.Acc;
    }
    public override void Deposit(double amount)
    {
        Acc.Balance = Acc.Balance + amount;
        StateCheck();
    }
    public override void Withdraw(double amount)
    {
        Console.WriteLine("账户受限,取款失败!");
    }

    public override void ComputeInterest()
    {
        Console.WriteLine("计算利息!");
    }

    public override void StateCheck()
    {
        if (Acc.Balance > 0)
        {
            Acc.SetState(new NormalState(this));
        }
        else if (Acc.Balance > -2000)
        {
            Acc.SetState(new OverdraftState(this));
        }
    }
}

Program:测试代码

Account acc = new Account("段誉", 0.0);
acc.Deposit(1000);
acc.Withdraw(2000);
acc.Deposit(3000);
acc.Withdraw(4000);
acc.Withdraw(1000);
acc.ComputeInterest();
Console.ReadLine();

四、总结

1、优点

(1)状态模式封装了状态的转换规则,在状态模式中可以将状态的转换代码封装在环境类或者具体状态类中,可以对状态转换代码进行集中管理,而不是分散在一个个业务方法中。
(2)状态模式将所有与某个状态有关的行为放到一个类中,只需注入一个不同的状态对象即可使环境对象拥有不同的行为。
(3)状态模式允许状态转换逻辑与状态对象合成一体,而不是提供一个巨大的条件语句块,状态模式可以避免使用庞大的条件语句将业务方法和状态转换代码交织在一起。
(4)状态模式可以让多个环境对象共享一个状态对象,从而减少系统中对象的个数。

2、缺点

(1)状态模式会增加系统中类和对象的个数,导致系统运行开销增大。
(2)其结构与实现都较为复杂,如果使用不当将导致程序结构和代码混乱,增加系统设计的难度。
(3)状态模式对开闭原则的支持并不太好,增加新的状态类需要修改负责状态转换的源代码,否则无法转换到新增状态,而且修改某个状态类的行为也需要修改对应类的源代码。