2025年1月

2024即将过去,回顾这一年应该是人生中的重大转折之年,从服务11年公司退役,从青年进入了尴尬的中年,网上有很多声音抱怨“中年男人不如狗”,上有老下有小的年纪。幸好几年前遇到挫折看了一些哲学,接受一切发生的事务,好好享受低谷期,感谢曾老先生的智慧!

2024回顾

一、6月份从退役后,跟家里人申请了半年的时间,因为有几个意向项目一直跟踪,顺便将未来社区的公众号运营起来;

二、7-8月
物联网浏览器
增加了AI视觉识别相关的功能《
探讨使用智能AI在农业养殖中的风险预警与应用
》,后续几乎停滞了更新,主要有两个原因,一是谈了几个月的电力监测系统在10月收到通知被中止了,原本是打算用项目来继续优化产品的。二是驿站开业的前面2个月,每天累到倒头就睡,完全没有精力。好在博客园上认识的朋友提供了一些插件开发机会,后端是java前端使用html5开发的无人值守过磅称重系统《
物联网浏览器(IoTBrowser)-Java快速对接施耐德网络IO网关
》,这次主要是开发了几款新的地磅秤驱动,感谢园友的支持

三、9月份在小区开了一家菜鸟驿站,解决了一年多来快递丢失、找快递难的问题,主要是给未来社区寻找一个运营载体。经过几个月的摸索,驿站已经正常运转,驿站最大的好处不是赚多少钱,而是治好了多年的颈椎病,体重瘦了10KG;当然未来社区的样板也在同时期推广运营起来了,成功上线了几个小功能,比如快递查询、预约配送、会员打折等等。

2024年最大的收获是看到了未来的方向,通过菜鸟驿站认识了远方好物平台,远方好物让我再次燃起了对农业、食品安全的热情,完成了我多年前想做又没有实现愿望“为解决中国农产品低价滞销,解决小孩子吃的健康,解决有机农人农产品卖不出去”。

2025计划

一、2025年,借助远方好物平台的思路,将私域电商与未来社区进行结合,在每个小区寻找关心粮食与安全的人,共同服务更多城市居民,这是一个长期值得投入的方向,欢迎同频的兄弟们一起交流学习。

二、承接软件系统开发业务,尤其是物联网、工控系统方向的软硬件系统集成与开发。有下列场景欢迎与我联系

物联网浏览器(IoTBrowser)是用于开发人机界面(HMI)或数据采集与监督控制系统(SCADA) 的工具,使用HTML或Vue前端技术开发物联网终端用户界面,支持串口、RFID、电子秤等硬件协议,支持js控制关机、全屏等工控操作。详细介绍:
物联网浏览器(IoTBrowser)-简单介绍 - 木子清 - 博客园 (cnblogs.com)

项目开源地址:

https://gitee.com/yizhuqing/IoTBrowser

三、2025年变局元年,未来的路充满挑战与期待,希望未来的十年做点自己想做又不给社会带来麻烦的事情,将所学之术让生活更美好一点。

CountDownLatch是一个常用的共享锁,其功能相当于一个多线程环境下的倒数门闩。CountDownLatch可以指定一个计数值,在并发环境下由线程进行减一操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒。通过CountDownLatch可以实现线程间的计数同步。

为什么说CountDownLatch是一种共享锁?因为CountDownLatch提供了一种“计数器”机制,允许N个线程在CountDownLatch的协调下同时运行,计数器归零才会触发阻塞的线程继续执行后续代码。

可能还是有人无法明白“共享”两个字,举个例子:如果计数器初始值是10,那开始的时候会有十个线程自动获取到共享锁,由于计数器的值是10,所以业务上如果有更多的线程想要获取锁,就要等待(主线程await);十个线程调用countDown方法之后锁才会释放,这时候阻塞的主线程才能获取到共享锁。

一、基本使用

CountDownLatch的使用步骤如下所示:

(1)创建倒数闩,初始化CountDownLatch时设置倒数的总次数,比如为100。

(2)等待线程调用倒数闩的await()方法阻塞自己,等待倒数闩的计数器数值为0(倒数线程全部执行结束)。

(3)倒数线程执行完,调用CountDownLatch.countDown()方法将计数器数值减一。

现在我们定义10个线程,十个线程都运行任务结束后,打印“Hello,World”。

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author kdyzm
 * @date 2024/12/28
 */
@Slf4j
public class CountdownLatchDemo {

    private static final CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(), "" + i).start();
        }
        countDownLatch.await();
        log.info("Hello,World");

    }

    static class Task implements Runnable {
        @SneakyThrows
        @Override
        public void run() {
            log.info("我是线程【{}】,即将完成任务", Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(1);
            countDownLatch.countDown();
        }
    }
}

代码运行结果如下:

image-20241228145230060

CountDownLatch很神奇,竟然能同步多个线程的执行,让多个线程都执行完才进行下一步行动,那么它实现的原理是什么呢?

搂一眼CountDownLatch类:

image-20241230222815271

这代码可太熟悉了,和ReentrantLock非常相似啊。。。内部也是定义了Sync类继承了AQS类,大体上可以明白了,CountDownLatch类也是基于AQS类实现的。

CountDownLatch有两个重要的方法:await方法和countDown方法。

二、CountDownLatch实例的创建

CountDownLatch构造方法需要传递一个整数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在构造方法中初始化了Sync类的实例。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        //使用传递的count参数初始化state数值
        setState(count);
    }
    ......
}

可以看到,在Sync类的构造方法中,会使用传入的count参数初始化state参数。在ReetrantLock类中,state参数代表着重入锁的重入次数,在CountDownLatch中将其设置成count数值有什么意义呢?

实际上在CountDownLatch中state参数表示可以执行countDown方法的次数。

二、await方法

一般而言,await方法是先开始执行的方法,然后迅速进入阻塞状态,等待其它线程计数到0,就会触发唤醒,然后主线程执行await方法之后的代码。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await方法很简单,它的逻辑都委托给Sync类实例来实现了,但是实际上这里的acquireSharedInterruptibly方法调用是AQS的模板方法。

1、可中断获取共享锁:acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //查看计数器是否归零
    if (tryAcquireShared(arg) < 0)
        //如果没归零,就开始执行可中断获取共享锁流程
        doAcquireSharedInterruptibly(arg);
}

这个方法非常简单,这个方法是一个可中断的方法,可中断意味着它可以抛出InterruptedException异常。所以上来它先检查了当前线程是否发生了中断,如果发生了中断,就立刻抛出InterruptedException异常。

接下来调用了tryAcquireShared方法查看计数器是否归零

2、查看计数器是否归零:tryAcquireShared

tryAcquireShared方法是AQS的钩子方法,被CountDownLatch实现,用于检查计数器是否变成了0。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

可以看到,传过来的acquires参数并没有被用到,实际上该方法就是检查了state参数是否变成了0。

这里其实有个疑问,根据ReentrantLock的经验,tryAcquireShared是尝试获取锁的方法,但是它仅仅只是判断了state的值是否为0,这是为何?

3、执行获取共享锁:doAcquireSharedInterruptibly

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //将节点标记为共享类型,并加入AQS同步队列。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //获取节点的前置节点
            final Node p = node.predecessor();
            //判断是否是头结点,是头结点就表示自己可以尝试获取锁了。
            if (p == head) {
                //尝试获取共享锁,实际上是检查了state参数是否为0
                int r = tryAcquireShared(arg);
                //r>=0的条件只有一个,那就是r为1,表示state值已经变成了0
                if (r >= 0) {
                    //执行获取锁成功的流程。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //检查是否应该在失败后挂起。
            if (shouldParkAfterFailedAcquire(p, node) &&
                //挂起线程
                parkAndCheckInterrupt())
                //如果发生了中断,就抛出IE异常。
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在这里,AQS队列的规则和ReentrantLock中AQS队列一样,头部节点表示已经获取锁的节点,一旦被唤醒,头部节点的后续节点就可以抢锁了。

CountDownLatch的抢锁逻辑比较简单,只是检查下state参数是否已经变成了0,变成了0,就表示其余线程都执行完了countDown方法,计数器已经变成了0,当前线程需要释放了;没变成0,就表示不是所有线程都执行了countDown方法,还不满足获取锁的条件。

不满足获取锁的条件的话,就开始判定是否需要挂起线程,执行
shouldParkAfterFailedAcquire
方法,该方法会被执行两次,然后执行
parkAndCheckInterrupt
方法,将当前主线程挂起。

主线程挂起之后,就开始等待其余线程执行执行countDown方法之后唤醒它。

4、获取锁成功之后:setHeadAndPropagate

执行到
setHeadAndPropagate
方法,就表示当前线程已经获取到了锁,执行到该方法的时候,计数器必定已经变成了0,parkAndCheckInterrupt()方法原本是被阻塞的现在已经被释放。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    //propagate值为1,满足if条件
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //AQS队列中只有node一个等待节点,所以next必定为null,满足if条件
        if (s == null || s.isShared())
            //后续没有等待节点了,执行锁释放流程
            doReleaseShared();
    }
}

该方法中有两处if,有很多判断根本用不到,大概是给其它共享锁使用的。总之CountDownLatch的await方法执行到这里之后会执行doReleaseShared方法执行锁释放,因为后续已经没有节点了。

5、锁释放:doReleaseShared

await方法中刚获取到锁就开始执行锁释放了,这是因为AQS中等待队列中就一个节点,该节点后续没有了等待节点,就需要释放锁。

rivate void doReleaseShared() {
    for (;;) {
        Node h = head;
        //此时AQS队列就一个head节点,head == tail,不满足if条件
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        //满足if条件,结束循环
        if (h == head)                  
            break;
    }
}

CountDownLatch的await方法中执行到doReleaseShared的时候是有附件条件的:AQS队列中只有一个头结点,没有等待节点了,所以第一个if条件没有满足条件,第二个if条件满足了,所以结束了循环。

doReleaseShared方法什么都没做。

这样await方法结束后,主线程继续执行await方法后的代码。

三、countDown方法

通常情况下,await方法会先执行,并在doAcquireSharedInterruptibly方法中的parkAndCheckInterrupt处阻塞,countDown方法在其它线程中被调用,并在计数器归零时唤醒被阻塞在await方法的线程。

public void countDown() {
    sync.releaseShared(1);
}

countDown方法依然调用了sync的方法:releaseShared方法是AQS的模板方法。

1、释放共享锁:releaseShared

public final boolean releaseShared(int arg) {
    //尝试释放锁,计数器减一
    if (tryReleaseShared(arg)) {
        //释放共享锁
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared是AQS的模板方法,该方法会调用钩子方法tryReleaseShared方法执行计数器减一,如果满足了释放共享锁的条件,就执行doRelease方法释放共享锁。

比如计数器初始值是10,则前九个线程调用countDown方法都不会满足if条件,第十个线程调用countDown方法时,调用tryReleaseShared方法之后会满足if条件,然后执行doReleaseShared方法。

2、计数器减一:tryReleaseShared

tryReleaseShared是AQS的钩子方法,在CountDownLatch中,被Sync实现了该方法。

protected boolean tryReleaseShared(int releases) {
    //循环防止之后的CAS失败
    for (;;) {
        int c = getState();
        //如果计数器的值已经是0,就返回失败,这里是为了防止出现超出计数器初始值的线程数调用countDown方法
        if (c == 0)
            return false;
        //计数器减一
        int nextc = c-1;
        //CAS设置state值
        if (compareAndSetState(c, nextc))
            /**
             * 如果设置成功了,再次判断计数器是否为0:
             *   如果是0表示当前操作使得计数器归零,应当触发释放共享锁的操作
             *	 如果不是0表示计数器还未归零,不到释放共享锁的时机
             */
            return nextc == 0;
    }
}

由于多个线程可能会同时执行countDown方法,可能会并行修改state值,所以该方法使用了for循环+CAS的方式保证了线程安全性。

需要注意的唯一一点就是

 if (c == 0)
	return false;

这个判断语句,该判断语句用于防止出现超出计数器初始值的线程数调用countDown方法,当超出计数器初始值的线程调用countDown方法,会因为计数器已经归零而导致tryReleaseShared方法返回false。

3、释放锁:doReleaseShared

在众多执行countDown方法的线程中,只有将计数器置为零的那个线程有执行doReleaseShared方法的机会。

doReleaseShared方法在之前await方法中已经讲过一部分,在await方法中如果当前节点是最后一个节点,那最后将调用doRelease方法释放锁。为什么这里会释放锁?因为执行countDown方法的线程都是获取到锁的线程,执行完毕之后要调用countDown方法释放锁,当然还是那句话,只有将计数器置为零的那个线程有执行doReleaseShared方法的机会。

和await方法中调用doReleaseShared方法什么都没做不一样,在countDown方法中调用doReleaseShared执行逻辑就变得不一样了。一般来说,由于await方法会先于countDown方法执行,所以在执行countDown方法的时候,AQS队列中会有两个节点,一个是头结点(AQS初始化创建的虚拟节点),一个是等待队列中的主线程节点。

private void doReleaseShared() {
    for (;;) {
        //获取头结点,并暂存到h变量
        Node h = head;
        //当前AQS队列中有两个节点,满足if条件
        if (h != null && h != tail) {
            //获取到头结点状态
            int ws = h.waitStatus;
            //由于await方法已经将头结点状态更改成了SIGNAL,所以这里会满足if条件
            if (ws == Node.SIGNAL) {
                //将头结点状态改为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;   
                //唤醒后继结点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //一般情况下,执行countDown方法的线程执行速度比较快,会满足该if条件并结束循环
        if (h == head)               
            break;
    }
}

可以看到将计数器归零的countDown方法调用的线程,最后会在doReleaseShared方法中调用unparkSuccessor方法唤醒后继线程。后继线程实际上就是await方法被阻塞的主线程,它将会在doAcquireSharedInterruptibly方法中的parkAndCheckInterrupt方法处被唤醒后继续执行后续代码。

四、扩展问题

1、countDown方法先于await方法执行

一般情况下await方法会先执行,执行后线程被挂起阻塞;countDown方法后执行,最后将计数器归零的线程负责将调用await方法被挂起的线程唤醒。想一个问题:如果cuontDown方法先都执行完了,最后再执行await方法,会发生什么呢?

答案是:什么都不会发生,因为这就是正确的逻辑啊。await方法和countDown方法均为此做了防范措施。

countDown方法:

最后将计数器清零的线程会将被阻塞的线程唤醒,但是如果说await方法没有被调用,则AQS队列中就没有需要被唤醒的线程,doReleaseShared方法会如何做呢?

private void doReleaseShared() {
    for (;;) {
        //由于AQS队列为空,所以head为null
        Node h = head;
        //head为null,不满足if条件
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;           
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        //h == head == null,满足if条件,结束循环
        if (h == head)                  
            break;
    }
}

doReleaseShared方法此时什么都没做,由于没有满足触发条件,因此都没有机会执行
unparkSuccessor(h);
唤醒后继节点。

await方法:

await方法中调用了
acquireShardInterruptibly
方法,该方法在执行的过程中会先判定当前计数器的值,如果是0,就不会再继续执行
doAcquireSharedInterruptibly
方法,避免了线程被挂起无人唤醒的尴尬局面。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //由于计数器的值已经归零,所以这里没有满足if条件
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

2、超出计数器初始值的线程数执行countDown方法

如下代码所示

@Slf4j
public class CountdownLatchDemo {

    //计数器初始值是1
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        //10个线程执行了countDown方法
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(), "" + i).start();
        }
        countDownLatch.await();
        log.info("Hello,World");

    }

    static class Task implements Runnable {
        @SneakyThrows
        @Override
        public void run() {
            log.info("我是线程【{}】,即将完成任务", Thread.currentThread().getName());
            //等待一秒钟模拟业务执行,确保await方法先执行
            TimeUnit.SECONDS.sleep(1);
            countDownLatch.countDown();
        }
    }
}

CountDownLatch对多余执行countDown方法的线程的处理策略就是:忽略它。看看countDown方法中调用的releaseShared方法

//该方法为AQS的模板方法
public final boolean releaseShared(int arg) {
    //tryReleaseShared方法是关键
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

//该方法文为CountDownLatch中实现AQS的钩子方法
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        //如果计数器值已经变成了0,就直接返回false,防止超量的线程执行countDown方法。
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

releaseShared方法仅仅是通过判断一次

 if (c == 0)
    return false;

就解决了超量线程调用countDown方法的问题。

3、CountDownLatch使用进阶

本篇文章通篇均是以以下代码示例分析的:

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author kdyzm
 * @date 2024/12/28
 */
@Slf4j
public class CountdownLatchDemo {

    private static final CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(), "" + i).start();
        }
        countDownLatch.await();
        log.info("Hello,World");

    }

    static class Task implements Runnable {
        @SneakyThrows
        @Override
        public void run() {
            log.info("我是线程【{}】,即将完成任务", Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(1);
            countDownLatch.countDown();
        }
    }
}

这种CountDownLatch的使用方式比较简单,实际上CountDownLatch还有另外一种经典的使用方式,其源码示例如下所示

import java.util.concurrent.CountDownLatch;

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();//线程刚开始执行全部进入阻塞状态
        doSomethingElse();            // 先执行一些动作
        startSignal.countDown();      // 让所有阻塞的线程开始执行
        doSomethingElse();
        doneSignal.await();           // 等待所有线程执行完毕
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

这种模式巧妙的使用了双CountDownLatch实例精准控制线程的开始运行时间,并在开始执行前、执行中间、执行之后插入一些自定义的方法执行。实际上使用了CountDownLatch的两种使用模式:

模式一:N次countDown方法调用,一次await方法调用
:这种模式用于等待所有线程执行完毕之后再执行await方法之后的代码

模式二:N次await方法调用,一次countDown方法调用
:这种模式用于需要在所有线程开始执行前插入一些自定义操作。

本篇文章一直围绕模式一分析,模式二一直没有提及,因为模式二用的实在太少,在实际开发中没见到过。。。

现在将模式二的核心代码提炼出来,看看到底它做了什么事情:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;

@Slf4j
public class Driver {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        for (int i = 0; i < 10; ++i) {
            new Thread(new Worker(startSignal)).start();
        }
        log.info("在所有线程执行前先执行一些任务");
        startSignal.countDown();
    }
}

@Slf4j
class Worker implements Runnable {
    private final CountDownLatch startSignal;

    Worker(CountDownLatch startSignal) {
        this.startSignal = startSignal;
    }

    @Override
    public void run() {
        try {
            startSignal.await();
            doWork();
        } catch (InterruptedException ex) {
            log.error("", ex);
        }
    }

    void doWork() {
        log.info("正在执行任务。。。");
    }
}

我们分析下这个过程,由于10个线程运行时上来就调用了await方法,所以它们会在AQS队列中排队,其数据结构如下所示

image-20250101223924205

此时AQS队列中有11个节点,第一个头部节点是队列初始化的时候创建的虚拟节点,剩余十个节点,前9个节点状态都是SIGNAL(-1),因为它们在抢锁失败后会调用shouldParkAfterFailedAcquire方法,该方法会将当前节点的前置节点修改成SIGNAL(-1),最后一个节点入队后默认状态是0,由于没有后续节点入队了,所以其状态没有变成-1,而是0。

接下来主线程调用了countDown方法,由于计数器的初始值是1,所以CountDownLatch方法一次调用就会触发唤醒后继结点。接着,head节点后的节点将会在
parkAndCheckInterrupt
方法处继续执行后续代码:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //将节点标记为共享类型,并加入AQS同步队列。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //获取节点的前置节点
            final Node p = node.predecessor();
            //判断是否是头结点,是头结点就表示自己可以尝试获取锁了。
            if (p == head) {
                //尝试获取共享锁,实际上是检查了state参数是否为0
                int r = tryAcquireShared(arg);
                //r>=0的条件只有一个,那就是r为1,表示state值已经变成了0
                if (r >= 0) {
                    //执行获取锁成功的流程。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //检查是否应该在失败后挂起。
            if (shouldParkAfterFailedAcquire(p, node) &&
                //挂起线程
                parkAndCheckInterrupt())
                //如果发生了中断,就抛出IE异常。
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在该方法中,会执行
setHeadAndPropagate
方法:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    //propagate值为1,满足if条件
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //此时AQS队列中有很多节点,节点类型都是SHARED类型,所以依然满足if条件
        if (s == null || s.isShared())
            //执行锁释放,并唤醒后续节点
            doReleaseShared();
    }
}

接着执行doReleaseShared方法:在N次countDown,一次await模式中,该方法在await方法中的调用什么都没做就结束了循环;在N次await调用,一次countDown调用的模式中,await方法调用将唤醒后继节点

private void doReleaseShared() {
    for (;;) {
        //head节点已经变成当前执行线程节点
        Node h = head;
        //满足if条件
        if (h != null && h != tail) {
            //当前节点状态是SIGNAL
            int ws = h.waitStatus;
            //满足if条件
            if (ws == Node.SIGNAL) {
                //唤醒后继结点前,先将头部节点状态更改为初始状态0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒后继结点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

可以看到,AQS队列中的节点一个一个唤醒后继结点,然后去做自己的事情,直到最后一个节点;最后一个节点由于不满足
h != tail
的条件,会什么都不做就结束循环。

最后考虑一个问题,如果countDown方法先被执行了,那await方法执行的时候会怎样?

答案是什么事情都不会发生,因为await方法执行前需要先判断计数器的值,如果等于0,就不会有后续的获取锁、释放锁流程了。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //countDown计数器为0的时候不满足该if条件。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

总结一下,ReentrantLock中lock方法和unlock方法中间是临界区代码,临界区代码是抢锁线程依次轮流进入执行的,所以ReetrantLock是“独占锁”;而在CountDownLatchd的“N次await方法调用,一次countDown方法调用”的模式中,中,所有线程排队进入AQS队列之后,一旦被唤醒,被唤醒的节点还会自动唤醒后边节点,然后各自做各自的事情,所有线程都是并行运行的,所以CountDownLatch是“共享锁”。



最后的最后,欢迎关注我的个人博客呀:

https://blog.kdyzm.cn


END.

如题,有客户咨询这个问题:Exadata X6支持的最新image和19c数据库版本?
直观感觉,看到X6这个型号就觉得是很老的机器了,毕竟现在最新都X10M了。

首先,去查MOS文档:

  • Exadata Database Machine and Exadata Storage Server Supported Versions (Doc ID 888828.1)


Exadata System Software Requirements for Exadata Database Machine Hardware
章节可以看到X6 支持的 image版本:

  • Exadata 12.1.2.3.1 through 24.1
    这也就是说支持最新的image到了24.1这个版本,而这个版本的image,连最新的Oracle 23ai数据库都能装!

23ai Long Term Release => Supported: Exadata 24.1 or higher
Database 23ai requires storage server and database server Exadata version to be 24.1 or higher, and fabric switch version to be that which is supplied with Exadata 24.1 or higher. Full Exadata offload functionality for all Database 23ai features is available starting in Exadata 24.1.

客户没有23ai需求,只关注19c的情况,而19c需要的image版本,只要在19.1.2或以上就OK,推荐23或24版本。

19c Long Term Release (Recommended) => Supported: Exadata 19.1.2.0.0 or higher
Exadata 24.1 requires ACFS Patch 36114443 in Grid Infrastructure 19c home, which is included in 19.23.
Database 19c requires storage server and database server Exadata version to be 19.1.2.0.0 or higher, and InfiniBand switch version to be 2.2.11-2 or higher (that which is supplied with Exadata 19.1.2.0.0). It is not supported to use lower versions on storage servers, database servers, or InfiniBand switches.

而数据库19c具体最新的版本,是取决于当前19c最新的RU的,目前还是19.25,是去年10月份发布的,按照惯例,预计这个月会推出更新的19.26:

参考MOS文档:

  • Assistant: Download Reference for Oracle Database/GI Update, Revision, PSU, SPU(CPU), Bundle Patches, Patchsets and Base Releases (Doc ID 2118136.2)

总结下,X6支持的image版本:Exadata 12.1.2.3.1 through 24.1,安装新版本的image,不但能支持19c数据库最新的RU版本,还支持最新的23ai数据库。
如果你还想知道其他Exadata型号的image版本和数据库版本支持情况,都可以结合查询MOS文档888828.1、2118136.2获取到官方说明。

本篇介绍
Manim
中两个和动画轨迹相关的类,
AnimatedBoundary

TracedPath

AnimatedBoundary
聚焦于图形边界的动态呈现,能精准控制边界绘制的每一帧,助力我们清晰展示几何图形的搭建流程。

TracedPath
则擅长实时追踪物体或点的运动轨迹,以直观且动态的方式呈现各类运动路径,为我们分析和展示复杂运动提供了强大支持 。

1. 动画概述

1.1. AnimatedBoundary

在讲解几何图形(如多边形、圆形等)的构造过程时,
AnimatedBoundary
可以逐帧展示图形边界的绘制,帮助我们理解图形是如何一步步形成的。

此外,当图形的边界随着某个参数或条件动态变化时,使用
AnimatedBoundary
也可以生动地呈现这种变化。

AnimatedBoundary
动画的主要特点在于图形边界的绘制,它能够精确控制边界的出现顺序和方式。

这使得在展示几何图形的构建过程时,能够突出边界这一关键元素,更清楚地展示图形的轮廓是如何形成的。

它的参数主要有:

参数名称 类型 说明
vmobject VMobject 要应用动画边界的
VMobject
colors [Color] 颜色列表,用于指定边界颜色变化的序列
max_stroke_width int 最大描边宽度
cycle_rate float 颜色循环速率
back_and_forth bool 是否来回循环颜色变化
draw_rate_func func 用于控制绘制速率的函数
fade_rate_func func 用于控制淡出速率的函数

1.2. TracedPath

在物理学或数学中,当需要展示物体的运动轨迹时,
TracedPath
是一个非常合适的工具。

例如,展示抛体运动、圆周运动等物体的运动路径时,能让我们直观地看到物体在空间中的运动轨迹。

此外,对于函数图像的绘制,也可使用
TracedPath
来模拟绘图过程,展示函数曲线是如何随着自变量的变化而逐步生成的。

这在演示函数的性质和图像绘制方法时非常有用,能够帮助学生更好地理解函数的变化规律。

TracedPath
动画的主要特点是能够实时跟踪物体或点的运动轨迹,并将其以动画的形式呈现出来。

这种实时跟踪的特性使得动画更加真实、生动,能够准确地反映物体的运动状态。

它的参数主要有:

参数名称 类型 说明
traced_point_func func 要跟踪的函数,该函数应返回一个点的坐标
stroke_width float 轨迹的线条宽度
stroke_color Color 轨迹的颜色
dissipating_time float 路径消散所需的时间


dissipating_time
参数为
None
时,表示路径轨迹不消散。

TracedPath
还有一个方法:

名称 说明
update_path 用于更新轨迹路径的方法,通常在动画过程中被调用,以实时跟踪点的移动并更新轨迹

2. 使用示例

下面通过几个根据实际应用场景简化而来的示例来演示两个动画类的使用。

2.1. 多边形绘制

这个示例中,首先创建了一个矩形多边形,然后使用
AnimatedBoundary
为其添加边界动画,

颜色在
蓝色

绿色

黄色
之间循环变化,循环速率为
3
,突出展示多边形边界的绘制动画。

polygon = Polygon(
    [-2, -1, 0],
    [2, -1, 0],
    [2, 1, 0],
    [-2, 1, 0],
)
boundary = AnimatedBoundary(
    polygon,
    colors=[BLUE, GREEN, YELLOW],
    cycle_rate=3,
)
self.add(polygon, boundary)

2.2. 动态更新圆形边界

先创建了一个圆形,其边界的颜色在
红色

黄色

绿色
之间循环,循环速率为 2。

然后通过动画将圆形的半径放大
2
倍,展示了圆形边界在动态变化过程中的动画效果。

circle = Circle(radius=1)
boundary = AnimatedBoundary(
    circle,
    colors=[RED, YELLOW, GREEN],
    cycle_rate=2,
)
self.add(circle, boundary)
self.play(circle.animate.scale(2), run_time=3)

2.3. 跟踪抛体运动轨迹

首先定义一个抛体运动的函数
move_path
,再创建了一个点
Dot
和一个
TracedPath
对象来跟踪点的运动轨迹。

轨迹颜色为
绿色
,宽度为
3
,展示了抛体运动的轨迹跟踪效果。

d = Dot().shift(LEFT * 2)
trace = TracedPath(
    d.get_center,
    stroke_color=GREEN,
    stroke_width=3,
)
self.add(d, trace)

def move_path(t):
    x = t
    y = 2 - 0.5 * t**2
    return np.array([x, y, 0])

f = ParametricFunction(
    move_path,
    t_range=(-3, 3),
)
self.play(MoveAlongPath(d, f), run_time=3)

2.4. 函数图像绘制过程

这个示例中,定义了一个正弦函数
move_path
,再创建一个点沿着这个正弦函数图像运动,同时创建了一个
TracedPath
对象来跟踪函数图像的绘制过程。

轨迹颜色为
紫色
,宽度为
2
,且设置轨迹在
1
秒后消失。

d = Dot(color=BLUE).shift([-PI, 0, 0])
trace = TracedPath(
    d.get_center,
    stroke_color=PURPLE,
    stroke_width=2,
    dissipating_time=1,
)
self.add(d, trace)

def move_path(x):
    return np.array([x, np.sin(x), 0])

f = ParametricFunction(move_path, t_range=(-PI, PI))
self.play(MoveAlongPath(d, f), run_time=3)

3. 附件

文中的代码只是关键部分的截取,完整的代码共享在网盘中(
trace.py
),

下载地址:
完整代码
(访问密码: 6872)

思想:非比较而是划分值域

基数排序(Radix Sort)是一种非比较排序算法,它通过逐位对数据进行处理,依次按位从
最低有效位(Least Significant Digit, LSD)

最高有效位(Most Significant Digit, MSD)
或者反过来,对数据进行排序。

与常见的比较排序算法(如快速排序、归并排序)存在本质不同,基数排序不基于元素间的直接比较,而是依赖于元素的位权信息来排序。这使得基数排序能够在某些情况下实现线性时间复杂度,理论上达到
\(O(kn)\)
,其中
\(k\)
是位数,
\(n\)
是元素个数。即复杂度取决于值域规模。

基数排序的核心思想是分桶和合并:通过多次分桶操作,将元素按照某个位的值放入对应的桶中,然后再按照桶的顺序合并,逐步将数组排序。

简单的十进制基数排序

以下是 LSD 基数排序的简单流程,基于十进制:

  1. 找到数组中最大的数,确定需要处理的最大位数
    \(d\)
  2. 从最低有效位开始,依次对每一位执行以下步骤:
    • 使用计数排序(Counting Sort)等稳定排序算法,根据当前位的值对数据进行分桶。
    • 按分桶的顺序重新排列数组。

这种方式之所以有效,是因为每次分桶时,数据局部是有序的,每一位的排序是稳定的,因此较高位的排序不会改变低位已排序数字的相对顺序。通过逐位排序,逐渐将数字按整体大小排列。想象整理一叠卡片,先按卡片右侧的颜色分组,再按中间的图案分组,最后按左侧的形状分组。由于每一步都保留了之前分组的顺序,最终整理好的卡片是完全有序的。

#include <iostream>
#include <vector>
#include <cmath>
using namespace std;

void countingSort(vector<int>& arr, int exp) {
    int n = arr.size();
    vector<int> output(n);
    vector<int> count(10, 0); // count[i]: 第 exp 位为 i 的数有多少?

    for (int i = 0; i < n; i++)
        count[(arr[i] / exp) % 10]++;

    for (int i = 1; i < 10; i++)
        count[i] += count[i - 1];

    for (int i = n - 1; i >= 0; i--) {
        // 根据第 exp 位决定应该占用的位置,
        output[count[(arr[i] / exp) % 10] - 1] = arr[i];
        count[(arr[i] / exp) % 10]--;
    }

    for (int i = 0; i < n; i++)
        arr[i] = output[i];
}

void radixSort(vector<int>& arr) {
    int maxVal = *max_element(arr.begin(), arr.end());

    for (int exp = 1; maxVal / exp > 0; exp *= 10)
        countingSort(arr, exp);
}

int main() {
    vector<int> arr = {170, 45, 75, 90, 802, 24, 2, 66};
    radixSort(arr);

    for (int num : arr)
        cout << num << " ";
    return 0;
}

可以把百,十,个位依次认为是第一,第二,第三关键字,按关键字权重低到高多趟排序,而排到某一位时,其低位已经排序好,只要按顺序取出就可以保持顺序。

从中可以看出,基数排序通常需要
\(O(n+k)\)
的辅助空间。且在对每一位(如个位、十位)排序时,基数排序是稳定的,相同键值的元素在排序后相对顺序保持不变。而高位的排序不会改变低位已排好元素的顺序。因此,基数排序天然稳定。

高位优先(MSD)和低位优先(LSD)

上述是一个 LSD 的例子,其实从高位到低位也是可行的,且理解起来更加简单。MSD 基数排序从数字的最高位开始排序,将数字分组到不同的桶中(如按千位分组)。对每个桶分别递归排序,逐步向低位处理。每轮排序后,桶的内容会被按顺序合并。像按照城市、省份、街道逐级分类邮件。先按城市分,再在每个城市中按省份分,最后在每个省份中按街道分。高层次分类先决定大范围,递归细分确保每个细节都正确。

void msdRadixSortUtil(vector<int>& arr, int left, int right, int exp) {
    if (left >= right || exp == 0) return;

    vector<vector<int>> buckets(10);

    // Place elements into corresponding buckets based on the current significant digit
    for (int i = left; i <= right; i++) {
        int digit = (arr[i] / exp) % 10;
        buckets[digit].push_back(arr[i]);
    }

    // Merge buckets back into the array
    int index = left;
    for (int i = 0; i < 10; i++) {
        for (int num : buckets[i]) {
            arr[index++] = num;
        }
    }

    // Recursively sort each non-empty bucket
    index = left;
    for (int i = 0; i < 10; i++) {
        if (!buckets[i].empty()) {
            int bucketSize = buckets[i].size();
            msdRadixSortUtil(arr, index, index + bucketSize - 1, exp / 10);
            index += bucketSize;
        }
    }
}

void msdRadixSort(vector<int>& arr) {
    if (arr.empty()) return;

    // Find the maximum value to determine the number of digits
    int maxVal = *max_element(arr.begin(), arr.end());
    int maxExp = pow(10, static_cast<int>(log10(maxVal)));

    // Start MSD radix sort from the highest significant digit
    msdRadixSortUtil(arr, 0, arr.size() - 1, maxExp);
}

高位优先(MSD)排序
从最高有效位开始排序,递归地对子数组进行分桶,逐步细化到最后的排序结果,通常需要递归。MSD方法常用于字符串排序,因为它可以提前确定不同类别。

低位优先(LSD)排序
从最低有效位开始排序,逐步提升到最高有效位。它每次操作的排序范围是全数组,且每次排序不破坏之前的顺序(稳定性)。因此,对于整数排序,LSD方法更为常用。

两种方法都可行,但低位优先排序实现简单,且可以直接应用于数字,故在实践中更受欢迎。

二进制的基数排序

计算机中的数据都使用二进制(或者说十六进制)存储,十进制会导致每位信息利用不充分,且需要低效的模 10 运算,非常低效。

假设仅有正数,对于无符号32位整数,可以按二进制位分为多组。例如,每次处理8位(共分4组)。这样的处理方式仍然保持基数排序的思想,但使用更接近硬件位操作的方式,比十进制效率高得多,处理效率高。

void radixSortBinary(vector<uint32_t>& arr) {
    const int BITS = 32;
    const int RADIX = 256; // 每次处理8位
    const int MASK = RADIX - 1;

    vector<uint32_t> buffer(arr.size());

    // 四轮循环,分别处理0 - 7, 8 - 15, 16 - 23, 24 - 32 位。count 大小也增加到 256
    for (int shift = 0; shift < BITS; shift += 8) {
        array<int, RADIX> count = {0};

        for (uint32_t num : arr)
            count[(num >> shift) & MASK]++;

        for (int i = 1; i < RADIX; i++)
            count[i] += count[i - 1];

        for (int i = arr.size() - 1; i >= 0; i--) {
            uint32_t bucket = (arr[i] >> shift) & MASK;
            buffer[--count[bucket]] = arr[i];
        }

        arr.swap(buffer);
    }
}

int main() {
    vector<uint32_t> arr = {170, 45, 75, 90, 802, 24, 2, 66};
    radixSortBinary(arr);

    for (uint32_t num : arr)
        cout << num << " ";
    return 0;
}

每次处理位数和

上例处理 32 位整数,分四次排序,一次八位。称呼其位宽 8。实际上也可以选择一次排序 16 位,排序两次,可以减少一半的轮次,但创建 65536 个桶可能会导致内存压力,且桶分布不均时效率下降:如果数据的分布高度集中,某些桶可能会很大,导致操作不均衡。若位宽仅 4,则分桶的范围较小,分桶和合并过程相对快速,但排序的趟数太多,适合小规模数组或内存受限的场景。

拓展知识

基数排序与快速排序

基数排序和快速排序是两种经典的排序算法,适用于不同场景。
基数排序
是一种非比较排序算法,依赖数字的位数特性,通过按位分组排序实现有序,适合处理数字或固定长度的字符串,具有线性时间复杂度
\(O(n \cdot d)\)
(其中
\(d\)
是位数)。它对数据规模较大且值域较小的数据表现出色,但需要额外的空间来存储桶。相比之下,
快速排序
是最经典的基于比较的分治算法,通过选择一个基准值(pivot)将数组划分为两部分递归排序,平均时间复杂度为
\(O(n \log n)\)
。快速排序在大多数情况下效率极高,适用于通用数据类型,且原地排序所需额外空间较少,但其性能可能因基准选择不当而退化。简而言之,基数排序适合特定结构数据(如整数或字符串),而快速排序更通用,适合各种类型和规模的输入数据。

基数排序和桶排序

基数排序和桶排序虽然都是基于分组的非比较排序算法,但它们的目标和实现方式有所不同,且可以认为
基数排序是桶排序的延伸
。桶排序通过将数据分布到有限数量的桶中,每个桶内部再进行排序(通常使用插入排序或其他算法),最终将桶内容按顺序合并以获得排序结果;它主要依赖数据的分布特性,适用于数据均匀分布的场景,时间复杂度接近
\(O(n)\)
。而基数排序本质上可以看作是多轮的桶排序:在值域很大时,它通过按位(如个位、十位等)多次分桶并排序,逐步实现最终的全局有序性。基数排序的核心思想是通过多次分桶来解决单次分桶无法处理多位数据的问题。因此,可以理解为基数排序在设计上对桶排序的扩展,用于处理数字、固定长度字符串等多位特征的数据。

基数排序应用于非整数

某些场合下,
基数排序可以扩展应用于非整数(如浮点数)和结构体
,但需要对数据进行适当的预处理,使其特性适合基数排序的机制。以下是实现这些扩展的关键思路:


1. 处理浮点数

浮点数位码有一个特殊性质:IEEE 754 格式保证了从小到大的正数,其位模式从小到大单调递增。因此,可以直接将浮点数的位模式解释为无符号整数,然后按整数排序。也就是说,若不考虑符号可以直接视为整数排序。

void radixSortFloat(vector<float>& arr) {
    vector<uint32_t> bitPattern(arr.size());

    // 将浮点数解释为无符号整数,假设浮点数均是正数。
    for (size_t i = 0; i < arr.size(); ++i) {
        memcpy(&bitPattern[i], &arr[i], sizeof(float));
    }

    // 对无符号整数排序
    radixSort(bitPattern.begin(), bitPattern.end());

    // 排序完成后还原为浮点数
    for (size_t i = 0; i < arr.size(); ++i) {
        memcpy(&arr[i], &bitPattern[i], sizeof(float));
    }
}


2. 处理结构体

基数排序天然可以划分关键字,对于结构体,可以通过选择一个或多个键值(字段)作为排序依据,将结构体排序问题转化为对这些键的排序。

比如对于包含字段
age

salary
的结构体数组:

struct Employee {
    int age;
    double salary;
};

可先按
salary
使用浮点数处理方法进行基数排序。再按
age
使用整数直接排序。