2024年1月

ZooKeeperServer

实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:

ZooKeeperServer
  |-- QuorumZooKeeperServer
    |-- LeaderZooKeeperServer
    |-- LearnerZooKeeperServer
      |-- FollowerZooKeeperServer
      |-- ObserverZooKeeperServer
  |-- ReadOnlyZooKeeperServer

主要字段

// tickTime参数默认值
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// 默认tickTime * 2
protected int minSessionTimeout = -1;
// 默认tickTime * 20
protected int maxSessionTimeout = -1;

// 会话跟踪
protected SessionTracker sessionTracker;

// 存储组件
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;

// 缓存数据
private ResponseCache readResponseCache;
private ResponseCache getChildrenResponseCache;

// zxid会在启动阶段设置为最新lastZxid
private final AtomicLong hzxid = new AtomicLong(0);
// 请求处理器链入口
protected RequestProcessor firstProcessor;
// 缓存变化的数据
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();

protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;

// 大请求判断使用的参数
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
private volatile int largeRequestThreshold = -1;

主要方法

方法定义

// 通过zkDb从dataTree中删除Watcher监听器
void removeCnxn(ServerCnxn cnxn);

// 创建zkDb(为null时)并loadData加载数据
public void startdata() throws IOException, InterruptedException;
// 加载数据、清理session、生成快照
public void loadData() throws IOException, InterruptedException;
// 保存zkDb当前快照
public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere,
                                      boolean fastForwardFromEdits) throws IOException;

// 从指定的输入流解析数据,生成新的zkDb和SessionTrack
public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException;

// 使用zkDb.truncateLog(zxid)删除快照数据
public void truncateLog(long zxid) throws IOException;
// 通过zkDb获取dataTree.lastProcessedZxid的值
public long getLastProcessedZxid();

// 提交closeSession类型的Request来关闭会话
private void close(long sessionId);
// 使用zkDb杀掉会话
protected void killSession(long sessionId, long zxid);
// 启动组件
private void startupWithServerState(State state);
// 创建RequestProcessor用来处理请求
protected void setupRequestProcessors();
// 创建SessionTracker
protected void createSessionTracker();

// 为指定的session生成一个密码
byte[] generatePasswd(long id);
// 验证session密码
protected boolean checkPasswd(long sessionId, byte[] passwd);
// 使用sessionTracker创建session、生成密码、提交一个createSession请求
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout);
// 为指定的session绑定owner
public void setOwner(long id, Object owner) throws SessionExpiredException;
// 验证session之后使用finishSessionInit方法确定继续通信或者断开连接
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException;
public void finishSessionInit(ServerCnxn cnxn, boolean valid);
// checkPasswd->revalidateSession->finishSessionInit
public void reopenSession(ServerCnxn cnxn, long sessionId,
                          byte[] passwd, int sessionTimeout) throws IOException;

// 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理
public void enqueueRequest(Request si);
// 使用firstProcessor处理请求
public void submitRequestNow(Request si);

// 处理连接请求,网络IO层调用
public void processConnectRequest(
    ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException;
// 处理业务请求,网络IO层调用
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException;
// sasl认证
private void processSasl(
    RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException;

// 处理transaction
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn);
public ProcessTxnResult processTxn(Request request);
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn);
private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest);

// Grant or deny authorization to an operation on a node
public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids,
                     String path, List<ACL> setAcls) throws KeeperException.NoAuthException;
// Check a path whether exceeded the quota
public void checkQuota(String path, byte[] lastData, byte[] data,
                       int type) throws KeeperException.QuotaExceededException;
private void checkQuota(String lastPrefix, long bytesDiff, long countDiff,
                        String namespace) throws KeeperException.QuotaExceededException;

// 获取上级父类path
private String parentPath(String path) throws KeeperException.BadArgumentsException;
// 从Request获取有效的path
private String effectiveACLPath(
    Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException;
// 根据Request获取需要的权限类型
private int effectiveACLPerms(Request request);
// 检查写权限
public boolean authWriteRequest(Request request);

loadData方法

加载数据、清理session、生成快照:

public void loadData() throws IOException, InterruptedException {
    // 初始化zxid
    if (zkDb.isInitialized()) {
        setZxid(zkDb.getDataTreeLastProcessedZxid());
    } else {
        setZxid(zkDb.loadDataBase());
    }

    // 使用killSession方法杀死过期会话
    zkDb.getSessions().stream()
                    .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
                    .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));

    // 保存快照
    // txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap)
    takeSnapshot();
}

killSession方法

protected void killSession(long sessionId, long zxid) {
    // 需要清理临时节点
    zkDb.killSession(sessionId, zxid);
    if (sessionTracker != null) {
        // 删除会话跟踪信息
        sessionTracker.removeSession(sessionId);
    }
}

startupWithServerState方法

private void startupWithServerState(State state) {
    if (sessionTracker == null) {
        createSessionTracker();
    }
    startSessionTracker();
    // 创建RequestProcessor用于处理请求
    setupRequestProcessors();

    // 这是一个限流的组件,不做分析
    startRequestThrottler();

    registerJMX();
    startJvmPauseMonitor();
    registerMetrics();

    setState(state);

    requestPathMetricsCollector.start();

    localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

    notifyAll();
}

setupRequestProcessors方法(重要)

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor) syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor) firstProcessor).start();
}

RequestProcessor接口:以处理器链方式处理事务,请求总是按顺序处理。standaloneServer、follower和leader有不同的处理器链。请求通过processRequest方法传递给其他RequestProcessor对象,通常情况总是由单个线程调用。当调用shutdown时,RequestProcessor还应关闭与其关联的其他RequestProcessor对象。

FinalRequestProcessor类:处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。

SyncRequestProcessor类:将请求记录到磁盘,对请求进行批处理,以有效地执行IO操作。在日志同步到磁盘之前,请求不会传递给下一个RequestProcessor对象。SyncRequestProcessor用于3种不同的情况:

  • Leader - 将请求同步到磁盘,并将其转发给AckRequestProcessor,后者将ack发送回leader自己
  • Follower - 将请求同步到磁盘,并将其转发给SendAckRequestProcessor,后者将ack发送给leader
  • Observer - 将请求同步到磁盘,作为INFORM数据包接收。不将ack发送回leader,因此nextProcessor将为null

PrepRequestProcessor类:通常位于RequestProcessor链开头,为更新请求关联的事务做设置。

createSessionTracker方法

protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime,
                                            createSessionTrackerServerId, getZooKeeperServerListener());
}

不同的子类使用了不同的SessionTracker实现类:

  • LeaderZooKeeperServer - LeaderSessionTracker
  • LearnerZooKeeperServer- LearnerSessionTracker

createSession方法

long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    if (passwd == null) {
        passwd = new byte[0];
    }
    // 创建一个session
    long sessionId = sessionTracker.createSession(timeout);
    // 生成session密码
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    // 提交createSession请求,该请求会被RequestProcessor处理
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}

submitRequestNow方法

public void submitRequestNow(Request si) {
    try {
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            setLocalSessionFlag(si);
            // 使用firstProcessor处理请求
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            // Update request accounting/throttling limits
            requestFinished(si);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        // Update request accounting/throttling limits
        requestFinished(si);
    } catch (RequestProcessorException e) {
        // Update request accounting/throttling limits
        requestFinished(si);
    }
}

processConnectRequest方法

public void processConnectRequest(
        ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {

    long sessionId = request.getSessionId();
    // 略

    if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        // zxid参数有误
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
    }
    int sessionTimeout = request.getTimeOut();
    byte[] passwd = request.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);
    // We don't want to receive any packets until we are sure that the session is setup
    cnxn.disableRecv();
    if (sessionId == 0) {
        // 创建session
        long id = createSession(cnxn, passwd, sessionTimeout);
    } else {
        validateSession(cnxn, sessionId); // do nothing
        // 关闭旧的ServerCnxn
        if (serverCnxnFactory != null) {
            serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        if (secureServerCnxnFactory != null) {
            secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        cnxn.setSessionId(sessionId);
        // 开启新session
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
    }
}

processPacket方法

public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
    cnxn.incrOutstandingAndCheckThrottle(h);

    if (h.getType() == OpCode.auth) {
        AuthPacket authPacket = request.readRecord(AuthPacket::new);
        String scheme = authPacket.getScheme();
        ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
        Code authReturn = KeeperException.Code.AUTHFAILED;
        // 认证、继续通信或者关闭连接,略
        return;
    } else if (h.getType() == OpCode.sasl) {
        processSasl(request, cnxn, h);
    } else {
        if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
            return;
        } else {
            Request si = new Request(
                cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
            int length = request.limit();
            if (isLargeRequest(length)) { // 判断large请求
                checkRequestSizeWhenMessageReceived(length);
                si.setLargeRequestSize(length);
            }
            si.setOwner(ServerCnxn.me);
            // 提交请求等待firstProcessor处理
            submitRequest(si);
        }
    }
}

processTxn相关方法

// entry point for quorum/Learner.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    processTxnForSessionEvents(null, hdr, txn);
    return processTxnInDB(hdr, txn, null);
}

// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
    TxnHeader hdr = request.getHdr();
    processTxnForSessionEvents(request, hdr, request.getTxn());

    final boolean writeRequest = (hdr != null);
    final boolean quorumRequest = request.isQuorum();

    // return fast w/o synchronization when we get a read
    if (!writeRequest && !quorumRequest) {
        return new ProcessTxnResult();
    }
    synchronized (outstandingChanges) {
        ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

        // request.hdr is set for write requests, which are the only ones
        // that add to outstandingChanges.
        if (writeRequest) {
            long zxid = hdr.getZxid();
            while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) {
                ChangeRecord cr = outstandingChanges.remove();
                ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                if (outstandingChangesForPath.get(cr.path) == cr) {
                    outstandingChangesForPath.remove(cr.path);
                }
            }
        }

        // do not add non quorum packets to the queue.
        if (quorumRequest) {
            getZKDatabase().addCommittedProposal(request);
        }
        return rc;
    }
}

private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
    int opCode = (request == null) ? hdr.getType() : request.type;
    long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;

    if (opCode == OpCode.createSession) {
        if (hdr != null && txn instanceof CreateSessionTxn) {
            CreateSessionTxn cst = (CreateSessionTxn) txn;
            // Add the session to the local session map or global one in zkDB.
            sessionTracker.commitSession(sessionId, cst.getTimeOut());
        }
    } else if (opCode == OpCode.closeSession) {
        sessionTracker.removeSession(sessionId);
    }
}

private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
    if (hdr == null) {
        return new ProcessTxnResult();
    } else {
        return getZKDatabase().processTxn(hdr, txn, digest);
    }
}

QuorumZooKeeperServer

集群模式下的ZooKeeperServer基类:

  • 封装了QuorumPeer用来获取节点信息
  • 封装了UpgradeableSessionTracker做会话追踪

LeaderZooKeeperServer

Just like the standard ZooKeeperServer. We just replace the request processors: PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor

实现类概述

集群模式下leader节点使用的ZooKeeperServer实现类:

  • 继承QuorumZooKeeperServer

  • 使用的RequestProcessor与父类不同:

    // 构建处理器链
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = 
            new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(
            toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    
        setupContainerManager(); // 启动ContainerManager用于删除ttl节点和container节点
    }
    
  • 使用LeaderSessionTracker做会话追踪

  • 与learner节点通信

处理器链

  • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾

  • ToBeAppliedRequestProcessor - 维护toBeApplied列表

  • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器

  • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor

    public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        // 内部有维护SyncRequestProcessor和AckRequestProcessor
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    
        forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
                FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
    }
    
  • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置

  • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器

LearnerZooKeeperServer

Learner基类:

  • 使用LearnerSessionTracker做会话追踪
  • 使用CommitProcessor、SyncRequestProcessor做处理器链

FollowerZooKeeperServer

实现类概述

与ZooKeeperServer类似,只是处理器链不同:

FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

使用SyncRequestProcessor来记录leader的提案。

处理器链

setupRequestProcessors方法:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}
  • FinalRequestProcessor
  • CommitProcessor
  • FollowerRequestProcessor - 将数据更新请求转发给Leader
  • SyncRequestProcessor
  • SendAckRequestProcessor - 给leader发ACK

ObserverZooKeeperServer

Observer类型节点的ZooKeeperServer实现。

setupRequestProcessors方法:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
    ((ObserverRequestProcessor) firstProcessor).start();

    // 默认false
    if (syncRequestProcessorEnabled) {
        syncProcessor = new SyncRequestProcessor(this, null);
        syncProcessor.start();
    }
}

下面介绍三种用于进行排序的专用窗口函数:

1、RANK()

在计算排序时,若存在相同位次,会跳过之后的位次。

例如,有3条排在第1位时,排序为:1,1,1,4······

2、DENSE_RANK()

这就是题目中所用到的函数,在计算排序时,若存在相同位次,不会跳过之后的位次。

例如,有3条排在第1位时,排序为:1,1,1,2······

3、ROW_NUMBER()

这个函数赋予唯一的连续位次。

例如,有3条排在第1位时,排序为:1,2,3,4······

窗口函数用法:

<窗口函数> OVER ( [PARTITION BY <列清单> ]

ORDER BY <排序用列清单> )

*其中[ ]中的内容可以忽略

例如:题目源自牛客网

代码:

selectemp_no, salary,
dense_rank()
over (order by salary desc) ast_rankfromsalarieswhere to_date='9999-01-01' order by t_rank asc,emp_no asc;

本文介绍在
Win10
电脑中,安装
MicroStation软件

Terrasolid插件合集
的详细方法。

首先,我们需要有
MicroStation
软件与
Terrasolid
插件合集的安装包;这些内容在互联网中已经有很多资源,大家自行搜索、下载即可,这里就不再赘述。

第一步,我们需要首先安装
MicroStation
软件;我这里
MicroStation
软件安装包是
V8i
版本的。


MicroStation
软件安装包中,找到
Setup.exe
文件并打开。

弹出如下所示的界面,点击“
Advanced Settings
”选项。

配置好下载路径。

随后,即可开始安装。

此时会进入一个新的安装界面。

其中,需要再次配置一下
MicroStation
软件的安装路径与工作空间路径;这里请大家记好自己所设定的安装路径,后期需要用到。

接下来,选择需要安装的软件类型;一般选择推荐类型,即“
Typical
”即可。

随后,即可开始安装过程。

安装完毕后,点击“
Finish
”。

接下来,将
MicroStation
软件安装包中
Crack
文件夹内的两个
.dll
格式文件复制。

粘贴到前期我们设定的安装路径中,并替换其中的文件。

随后,即可在开始菜单中找到我们刚刚安装好的
MicroStation
软件。

点击“
MicroStation V8i (SELECTseries 3)
”选项,即可打开软件;软件第一次打开后如下所示。

接下来,我们开始安装
Terrasolid
插件合集。在这里,我们就以
Terrasolid插件合集
下属的
TerraScan插件
为例来介绍。

首先需要注意,我们在安装
Terrasolid
插件合集时,要选择和我们前期所安装的
MicroStation
软件的版本号一致。在本文中,前期所安装的
MicroStation
软件为
V8i
版本,所以就要选择对应
V8i
版本的
TerraScan
插件。

打开其中的
SETUP.EXE
文件。

在弹出的窗口中,首先选择第一项安装路径;接下来选择第二项,即
MicroStation
软件的安装路径(一般这一项会自动填入);随后点击“
OK
”即可。

插件的安装会很快完成。

接下来,我们打开
MicroStation
软件,并在弹出的窗口中选择右上角的“
New file
”。

选择一个路径,然后新建一个名称为
seed3D.dgn
的文件。

可以看到文件已经新建完成。接下来,打开这一文件。

打开后如下所示。

选择“
Utilities
”→“
MDL Applications
”。

在弹出的窗口中,从下方的“
Available Applications
”中滑动,找到“
TSCAN
”,并点击“
Load
”。

第一次操作时,会弹出一个提示框,让你配置一下
License
信息。

此时,找到
Terrasolid
插件合集的安装包中的注册机。

双击
.exe
文件。

弹出如下所示的窗口。

接下来,将下图左侧窗口中的“
Computer name
”与“
Computer ID
”分别复制到右侧窗口中“
Computer Name
”与“
Computer ID
”栏中。

随后,将右侧窗口中的下拉框进行调整,选择“
Scan
”一项。

接下来,再将右侧窗口中的“
Code
”复制到左侧窗口的“
Code
”栏,同时将右侧窗口中“
Fenerbahce
”一栏的内容复制到左侧窗口中“
Number
”栏,并在左侧窗口中“
User name
”中输入
FENERBAHCE

从而完成
TerraScan
插件的安装。安装完毕后,出现如下所示的
TerraScan
插件操作栏,即说明安装成功。

Terrasolid
插件合集中的其他插件,都用类似上述的方法进行安装即可。

拙著《Tornado(龙卷风)编程实战》结集付梓,自惟庸陋,略為芹献,积年咳唾,不入方家,聊供诸君一哂。

这本书以异步框架Tornado为基底,透过一个完整的项目阐释异步框架结合Vue.js如何实现一个低成本的前后端分离架构。项目内涉及的知识点涵括:Web3.0、数据库、设计模式落地、算法和数据结构落地、跨境支付、全文检索、Websocket、以及当下时新的ChatGPT和深度学习等技术,最后以K8S部署收尾。

Tornado框架在开发者社区中有一定的知名度和使用率,但确实没有专门为Tornado框架编写的专著,但是私以为Tornado值得地球上任何一座开源奖杯,这本书,献给Tornado,也献给广大的异步编程开发者。

赋酸诗一首:

刘某拙著甫问世,奔走相告友朋间。继往开来结硕果,异步编程谱新篇。

关于写法

关于技术类书籍和文章,其实我一直都有一个很偏颇的观点,即,技术文章是可以带上那么一点儿文学样式的。

出版社有一句老话,说书里每增加一个数学公式,读者就会跑掉一半,虽然有些夸张,但事实确实如此,技术类书籍通常被认为枯燥乏味,这可能是因为它们通常包含大量的技术细节和术语,这些内容对于一般读者来说可能比较晦涩难懂。然而,一些技术书籍通过引入生动的例子、真实世界的案例以及轻松幽默的语气来增加趣味性。例如,Kent Beck的《测试驱动开发》、Joshua Bloch的《Effective Java》以及《Head First设计模式》等书籍都以轻松幽默的方式呈现技术内容。

受技术前辈们的启发,既然技术文章可以轻松幽默,那么可以不可以也体现一点儿文学性?哪怕就那么一丁点儿。

随着技术文化传播平台的深刻改观,数字化、网络化与人工智能的飞速发展,普及性文章和研究性著作的界限其实正在渐渐地模糊,泾渭分明的旧况其实也正在改变。

所以,在撰写本书时,我带了一些文学样式,企图让非技术背景的读者也可以开卷有益,也囿于此,导致技术深度的研究不够,这许是创新风格的一种代价吧。但不能否认的是,人们需要技术类文学的普及,所谓:白雪阳春,雅派难寻同世知音;下里巴人,俗韵易与当代同频。人们的智慧也是与时俱进的,完全能够理解维特根斯坦:人只有通过对语言的使用,才能赋予技术以实在的意义。

也许这是是一次勇气非凡地探索,又或许是一次并不成功的赌博,但也无所谓了,总要有人去试一试。

表达欲和创作

有的时候,我觉得自己写的东西像臭大粪,比大多数人清晨在马桶上憋出来的东西还臭,臭不可闻。但我依然把这样的东西发布了出来,为什么?因为冯唐老师最近分享的九字真言启发了我:不着急,不害怕,不要脸。

术业有专攻,闻道有先后,虽然有人会发现作者的能力或格调和标准路径有差距,并且予以批评,但那又怎样呢,不要害怕失败,因为根本不会有人记得你失败的作品,甚至那些平庸的作品也不会有人在意,人们最多只会记得那些最出彩的华章。所以尽管尝试,大胆发布,这波血赚,怎么样都不会亏的。

但东西发布了,又会面临另外一种情况:没人看。

无人阅读的作品,有意义吗?直到我翻开了毛姆的一本书:《月亮与六便士》。

书里的主人公查尔斯·斯特里·克兰德,一个中年英国股票经纪人,四十岁时,他突然抛妻弃子,去法国巴黎学习绘画。

自此之后,他潦倒、颓废、背叛朋友、始乱终弃、四处流浪、离群索居。

最后死在了一个荒岛上的小黑屋子里。

他在那座小屋子里画下一幅壁画:

库特拉斯医生的眼睛已经完全适应屋里的朦胧光线,他凝视墙上的画,心里有一种无法抑制的感情。他对绘画不在行,但墙上的这些画让他激动。四面墙壁、从地板到天花板上,展开一幅奇特的、精心绘制的巨画,非常奇妙,也非常神秘。库特拉斯医生几乎停止呼吸,心里浮起一种无法理解、无法分析的感情。如果能比较,也许一个人看到开天辟地之初就是怀着这种欣喜又畏服的感觉的。这幅画具有压人的气势,它既是肉欲的,又充满无限热情。与此同时它还有一种令人恐惧的成分,叫人看着心惊肉跳。绘制这幅巨作的人已经深入到大自然的隐秘中,探索到某种既美丽、又可怕的秘密。这个人知道了一般人所不该知道的事。他画出来的是某种原始的、令人震骇的东西,是不属于人世尘寰的。库特拉斯医生模模糊糊地联想到黑色魔法,既美得惊人,又污秽邪恶。 “上帝啊,这是天才。” 这句话脱口而出,说出来以后他才意识到自己下了一个评语。

如此的神迹,不在敦煌莫高窟,不在山西芮城的永乐宫,而在荒岛之上、人迹罕至的小屋中。

在恢宏壮丽的壁画下,查尔斯·斯特里·克兰德已经死了。

他的遗言是把房子烧掉。房子连壁画,一起烧掉。

表达欲就像残忍的魔鬼,有的时候你会被它抓住。

你别无选择,身不由己,必须把这样的欲望喷薄在键盘上。

能不能发表、有没有人看,还重要吗?

归根结底,没什么是不朽的,我们终将化为粉尘,归彼大荒,但有些事情必须得去做,什么也不图,却非这么做不可。王小波说,双目失明的汉弥尔顿为什么还坐在黑灯瞎火里头写十四行诗?那就叫“表达”。

结语

一个人能坚持做一件事,要么是兴趣使然,要么是身不由己。和诸君共勉。

你好呀,我是歪歪。

这篇文章来盘一下我最近遇到的两个有意思的代码案例,有意思的点在于,拿到代码后,你一眼望去,没有任何毛病。然后一顿分析,会发现破绽藏的还比较的深。

几个基础招式的一套组合拳下来,直接把我打懵逼了。

你也来看看,是不是你跺你也麻。


第一个场景

首先第一个是这样的:

一个读者给我发来的一个关于线程池使用的疑问,同时附上了一个可以复现问题的 Demo。

我打开 Demo 一看,一共就这几行代码,结合问题描述来看想着应该不是啥复杂的问题:

我拿过来 Demo,根本就没看代码,直接扔到 IDEA 里面跑了两次,想着是先看看具体报错是什么,然后再去分析代码。

但是两次程序都正常结束了。

好吧,既然没有异常,我也大概的瞅了一眼 Demo,重点关注在了 CountDownLatch 的用法上。

我是横看竖看也没看出问题,因为我一直都是这样用的,这就是正确的用法啊。

于是从拿到 Demo 到定位问题,不到两分钟,我直接得出了一个大胆的结论,那就是:常规用法,没有问题:

然后我们就结束了这次对话。

过了一会,我准备关闭 IDEA 了。鬼使神差的,我又点了一次运行。

你猜怎么着?

居然真的报错了,抛出了 rejectedExecution 异常,意思是线程池满了。

哦哟,这就有点意思了。

带大家一起盘一盘。

首先我们还是过一下代码,为了减少干扰项,便于理解,我把他给我的 Demo 稍微简化了一点,但是整体逻辑没有发生任何变化。

简化后的完整代码是这样的,你直接粘过去,引入一个 guava 的包就能跑:

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Test {

    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(64, 64, 0, TimeUnit.MINUTES, new ArrayBlockingQueue<>(32));

    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 400; i++) {
            list.add(i);
        }
        for (int i = 0; i < 100; i++) {
            List<List<Integer>> sublist = Lists.partition(list, 400 / 32);
            int n = sublist.size();
            CountDownLatch countDownLatch = new CountDownLatch(n);
            for (int j = 0; j < n; j++) {
                threadPoolExecutor.execute(() -> {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("===============>  详情任务 - 任务处理完成");
        }
        System.out.println("都执行完成了");
    }
}
/**
 * <dependency>
 *     <groupId>com.google.guava</groupId>
 *     <artifactId>guava</artifactId>
 *     <version>31.1-jre</version>
 * </dependency>
 */

一起分析一波代码啊。

首先定义了一个线程池:

private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(64, 64, 0, TimeUnit.MINUTES, new ArrayBlockingQueue<>(32));

该线程池核心大小数和最大线程数都是 64,队列长度为 32,也就是说这个线程池同时能容纳的任务数是 64+32=96。

main 方法里面是这样的:

在实际代码中,肯定是有具体的业务含义的,这里为了脱敏,就用 List 来表示一下,这个点你知道就行。

编号为 ① 的地方,是在给往 list 里面放 400 个数据,你可以认为是 400 个任务。

编号为 ② 的地方,这个 List 是 guava 的 List,含义是把 400 个任务拆分开,每一组有 400/32=12.5 个任务,向下取整,就是 12 个。

具体是什么个意思呢,我给你看一下 debug 的截图你就知道了:

400 个任务分组,每一组 12 个任务,那就可以拆出来 34 组,最后一组只有 4 个任务:

但是这都不重要,一点都不重要好吧。

因为后续他根本就没有用这个 list ,只是用到了 size 的大小,即 34 。

所以你甚至还能拿到一个更加简洁的代码:

为什么我最开始的时候不直接给你这个最简化的代码,甚至还让你多引入一个包呢?

因为歪师傅就是想体现这个简化代码的过程。

按照我写文章的经验,在定位问题的时候,一定要尽量多的减少干扰项。排除干扰项的过程,也是梳理问题的过程,很多问题在排除干扰项的时候,就逐渐的能摸清楚大概是怎么回事儿。

如果你遇到一个让你摸不着头脑的问题,那就先从排除干扰项做起。

好了,说回我们的代码。现在我们的代码就只有这几行了,核心逻辑就是我圈起来的这个方法:

而圈起来这个部分,主要是线程池结合 CountDownLatch 的使用。

对于 CountDownLatch 我一般只关注两个地方。

第一个是 new 的时候传入的“令牌数”和调用 countDown 方法的次数能不能匹配上。只有保持一致,程序才能正常运行。

第二个地方就是 countDown 方法的调用是不是在 finally 方法里面。

这两个点,在 Demo 中都是正确的。

所以现在从程序分析不出来问题,我们怎么办?

那就从异常信息往回推算。

我们的异常信息是什么?

触发了线程池拒绝策略:

什么时候会出现线程池拒绝策略呢?

核心线程数用完了,队列满了,最大线程数也用完了的时候。

但是按理来说,由于有 countDownLatch.await() 的存在,在执行完 for 循环中的 34 次 countDownLatch.countDown() 方法之前,主线程一定是阻塞等待的。

而 countDownLatch.countDown() 方法在 finally 方法中调用,如果主线程继续运行,执行外层的 for 循环,放新的任务进来,那说明线程池里面的任务也一定执行完成了。

线程池里面的任务执行完成了,那么核心线程就一定会释放出来等着接受下一波循环的任务。

这样捋下来,感觉还是没毛病啊?

除非线程池里面的任务执行完成了,核心线程就一定会释放出来等着接受下一波循环的任务,
但是不会立马释放出来。

什么意思呢?

就是当一个核心线程执行完成任务之后,到它进入下一次可以开始处理任务的状态之间,有时间差。

而由于这个时间差的存在,导致第一波的核心线程虽然全部执行完成了 countDownLatch.countDown(),让主线程继续运行下去。 但是,在线程池中还有少量线程未再次进入“可以处理任务”的状态,还在进行一些收尾的工作。

从而导致,第二波任务进来的时候,需要开启新的核心线程数来执行。

放进来的任务速度,快于核心线程的“收尾工作”的时间,最终导致线程池满了,触发拒绝策略。

需要说明的是,这个原因都是基于我个人的猜想和推测。这个结论不一定真的正确,但是伟人曾经说过:大胆假设,小心求证。

所以,为了证明这个猜想,我需要找到实锤证据。

从哪里找实锤呢?

源码之下,无秘密。

当我有了这个猜想之后,我立马就想到了线程池的这个方法:

java.util.concurrent.ThreadPoolExecutor#runWorker

标号为 ① 的地方是执行线程 run 方法,也就是这一行代码执行完成之后,一个任务就算是执行完成了。对应到我们的 Demo 也就是这部分执行完成了:

这部分执行完成了,countDownLatch.countDown() 方法也执行完成了。

但是这个核心线程还没跑完呢,它还要继续往下走,执行标号为 ② 和 ③ 处的收尾工作。

在核心线程执行“收尾工作”时,主线程又咔咔就跑起来了,下一波任务就扔进来了。

这不就是时间差吗?

另外,我再问一个问题:线程池里面的一个线程是什么时候处于“来吧,哥们,我可以处理任务了”的状态的?

是不是要执行到红框框着的这个地方 WAITING 着:

java.util.concurrent.ThreadPoolExecutor#getTask

那在执行到这个红框框之前,还有一大坨代码呢,它们不是收尾工作,属于“就绪准备工作”。

现在我们再捋一捋啊。

线程池里面的一个线程在执行完成任务之后,到下一次可以执行任务的状态之间,有一个“收尾工作”和“就绪准备工作”,这两个工作都是非常快就可以执行完成的。

但是这“两个工作”和“主线程继续往线程池里面扔任务的动作”之间,没有先后逻辑控制。

从程序上讲,这是两个独立的线程逻辑,谁先谁后,都有可能。

如果“两个工作”先完成,那么后面扔进来的任务一定是可以复用线程的,不会触发新开线程的逻辑,也就不会触发拒绝策略。

如果“主线程继续往线程池里面扔任务的动作”先完成,那么就会先开启新线程,从而有可能触发拒绝策略。

所以最终的执行结果可能是不报错,也可能是抛出异常。

同时也回答了这个问题:为什么提高线程池的队列长度,就不抛出异常了?

因为队列长度越长,核心线程数不够的时候,任务大不了在队列里面堆着。而且只会堆一小会儿,但是这一小会,给了核心线程足够的时间去完成“两个工作”,然后就能开始消耗队列里面的任务。

另外,提出问题的小伙伴说换成 tomcat 的线程池就不会被拒绝了:

也是同理,因为 tomcat 的线程池重写了拒绝策略,一个任务被拒绝之后会进行重试,尝试把任务仍回到队列中去,重试是有可能会成功的。

对应的源码是这个部分:

org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

这就是我从源码中找到的实锤。

但是我觉得锤的还不够死,我得想办法让这个问题必现一下。

怎么弄呢?

如果要让问题必现,那么就是延长“核心线程完成两个工作”的时间,让主线程扔任务的动作”的动作先于它完成。

很简单,看这里,afterExecute 方法:

线程池给你留了一个统计数据的口子,我们就可以基于这个口子搞事情嘛,比如睡一下下:

private static final ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(64, 64, 0, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(32)) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };

由于收尾任务的时间过长,这样“主线程扔任务的动作”有极大概率的是先执行的,导致触发拒绝策略:

到这里,这个问题其实就算是分析完成了。

但是我还想分享一个我在验证过程中的一个验证思路,虽然这个思路最终并没有得到我想要的结论,但是技多不压身,你抽空学学,以后万一用得上呢。

前面说了,在我重写了 afterExecute 方法之后,一定会触发拒绝策略。

那么我在触发拒绝策略的时候,dump 一把线程,通过 dump 文件观察线程状态,是不是就可以看到线程池里面的线程,可能还在 RUNNING 状态,但是是在执行“两个工作”呢?

于是就有了这样的代码:

我自定义了一个拒绝策略,在触发拒绝策略的时候,dump 一把线程池:

但是很不幸,最终 dump 出来的结果并不是我期望的,线程池里面的线程,不是在 TIMED_WAITING 状态就是在 WAITING 状态,没有一个是 RUNNING 的。

为什么?

很简单,因为在触发拒绝策略之后,dump 完成之前,这之间代码执行的时间,完全够线程池里面的线程完成“两个工作”。

虽然你 dump 了,但是还是晚了一点。

这一点,可以通过在 dump 前面输出一点日志进行观察验证:

虽然我没有通过 dump 文件验证到我的观点,但是你可以学习一下这个手段。

在正常的业务逻辑中触发拒绝策略的时候,可以 dump 一把,方便你分析。

那么问题就来了?

怎么去 dump 呢?

关键代码就这一行:

JVMUtil.jstack(jStackStream);

这个方法其实是 Dubbo 里面的一个工具,我只是引用了一下 Dubbo 的包:

但是你完全可以把这个工具类粘出去,粘到你的项目中去。

你的代码很好,现在它是我的了。

最后,我还是必须要再补充一句:

以上从问题的定位到问题的复现,都是基于我个人的分析,从猜测出发,最终进行验证的。有可能我猜错了,那么整个论证过程可能都是错的。你可以把 Demo 粘过去跑一跑,带着怀疑一切的眼光去审视它,如果你有不同的看法,可以告诉我,我学习一下。

最后,你想想整个过程。

拆开了看,无非是线程池和 CountDownLatch 的八股文的考察,这两个玩意都是面试热点考察部分,大家应该都背的滚瓜烂熟。

在实际工作中,这两个东西碰撞在一起也是经常有的写法,但是没想到的是,在套上一层简单的 for 循环之后,完全就变成了一个复杂的问题了。

这玩意着实是把我打懵逼了。以后把 CountDownLatch 放在 for 循环里面的场景,都需要多多注意一下了。


第二个场景

这个场景就简单很多了。

当时有个小伙伴在群里扔了一个截图:

需要注意的是, if(!lock) 他截图的时候是给错了,真实的写法是 if(lock),lock 为 true 的时候就是加锁成功,进入 if。

同时这个代码这一行是有事务的:

写一个对应的伪代码是这样的:

if(加锁成功){
    try{
        //save有事务注解,并且确认调用的service对象是被代理的对象,即事务的写法一定是正确的
        return service.save();
    } catch(Exception e){
        //异常打印   
    } finally {
        //释放锁
        unlock(lockKey);
    }
}

就上面这个写法,先加锁,再开启事务,执行事务方法,接着提交事务,最后解锁,反正歪师傅横看竖看是没有发现有任何毛病的。

但是提供截图的小伙伴是这样描述的。

当他是这样写的时候,从结果来看,程序是先加锁,再开启事务,执行事务方法,然后解锁,最后才提交事务:

当时我就觉得:这现象完全超出了我的认知,绝不可能。

紧接着他提供了第二张截图:

他说这样拆开写的时候,事务就能正常生效了:

这两个写法的唯一区别就是一个是直接 return,一个是先返回了一个 resultModel 然后在 return。

在实际效果上,我认为是没有任何差异的。

但是他说这样写会导致锁释放的时机不一样。

我还是觉得:

然而突然有人冒出来说了一句: try 带着 finally 的时候,在执行 return 语句之前会先执行 finally 里面的逻辑。会不会是这个原因导致的呢?

按照这个逻辑推,先执行了 finally 里面的释放锁逻辑,再执行了 return 语句对应的表达式,也就是事务的方法。那么确实是会导致锁释放在事务执行之前。

就是这句话直接给我干懵逼了,CPU 都快烧了,感觉哪里不对,又说不上来为什么。

虽然很反直觉,但是我也记得八股文就是这样写的啊,于是我开始觉得有点意思了。

所以我搞了一个 Demo,准备本地复现一下。

当时想着,如果能复现,这可是一个违背直觉的巨坑啊,是一个很好的写作素材。

可惜,没有复现:

最后这个哥们也重新去定位了原因,发现是其他的 BUG 导致的。

另外,关于前面“try 带着 finally”的说法其实说的并不严谨,应该是当 try 中带有 return 时,会先执行 return 前的代码,然后把需要 return 的信息暂存起来,接着再执行 finally 中的代码,最后再通过 return 返回之前保存的信息。

这才是写在八股文里面的正确答案。

要永远牢记另一位伟人说过:实践是检验真理的唯一标准。

遇事不决,就搞个 Demo 跑跑。

关于这个场景,其实也很简单,拆开来看就是关于事务和锁碰撞在一起时的注意事项以及 try-return-finally 的执行顺序这两个基础八股而已。

但是当着两个糅在一起的时候,确实有那么几个瞬间让我眼前一黑,又打得我一脸懵逼。

最后,事务和锁碰撞在一起的情况,上个伪代码:

@Service
public class ServiceOne{
    // 设置一把可重入的公平锁
    private Lock lock = new ReentrantLock(true);
    
    @Transactional(rollbackFor = Exception.class)
    public Result  func(long seckillId, long userId) {
        lock.lock();
        // 执行数据库操作——查询商品库存数量
        // 如果 库存数量 满足要求 执行数据库操作——减少库存数量——模拟卖出货物操作
        lock.unlock();
    }
}

如果你五秒钟没看出这个代码的问题,秒杀这个问题的话,那歪师傅推荐你个假粉丝看看这篇文章::
《几行烂代码,我赔了16万。》

好了,就酱,打完收工~