2024年10月

前言

项目中经常会遇到一些非分布式的调度任务,需要在未来的某个时刻周期性执行。实现这样的功能,我们有多种方式可以选择:

  1. Timer
    类, jdk1.3引入,不推荐。

它所有任务都是串行执行的,同一时间只能有一个任务在执行,而且前一个任务的延迟或异常都将会影响到之后的任务。可能会出现任务执行时间过长而导致任务相互阻塞的情况

  1. Spring的
    @Scheduled
    注解,不是很推荐

这种方式底层虽然是用线程池实现,但是有个最大的问题,所有的任务都使用的同一个线程池,可能会导致长周期的任务运行影响短周期任务运行,造成线程池"饥饿",更加推荐的做法是同种类型的任务使用同一个线程池。

  1. 自定义
    ScheduledThreadPoolExecutor
    实现调度任务

这也是本文重点讲解的方式,通过自定义
ScheduledThreadPoolExecutor
调度线程池,提交调度任务才是最优解。

基本介绍

ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor,为任务提供延迟或周期执行,属于线程池的一种。和 ThreadPoolExecutor 相比,它还具有以下几种特性:

  • 使用专门的任务类型—ScheduledFutureTask 来执行周期任务,也可以接收不需要时间调度的任务(这些任务通过 ExecutorService 来执行)。

  • 使用专门的存储队列—DelayedWorkQueue 来存储任务,DelayedWorkQueue 是无界延迟队列DelayQueue 的一种。相比ThreadPoolExecutor也简化了执行机制(delayedExecute方法,后面单独分析)。

  • 支持可选的run-after-shutdown参数,在池被关闭(shutdown)之后支持可选的逻辑来决定是否继续运行周期或延迟任务。并且当任务(重新)提交操作与 shutdown 操作重叠时,复查逻辑也不相同。

基本使用

ScheduledThreadPoolExecutor 最常见的应用场景就是实现调度任务,

创建方式

创建
ScheduledThreadPoolExecutor
方式一共有两种,第一种是通过自定义参数,第二种通过
Executors
工厂方法创建。 根据
阿里巴巴代码规范中
的建议,更加推荐使用第一种方式创建。

  1. 自定义参数创建
ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler)
  • corePoolSize
    :核心工作的线程数量
  • threadFactory
    :线程工厂,用来创建线程
  • handler
    : 拒绝策略,饱和策略
  1. Executors
    工厂方法创建
  • static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    :根据核心线程数创建调度线程池。

  • static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
    :根据核心线程数和线程工厂创建调度线程池。

核心API

  1. schedule(Runnable command, long delay, TimeUnit unit)
    :创建并执行在给定延迟后启用的一次性操作
  • command
    : 执行的任务
  • delay
    :延迟的时间
  • unit
    : 单位
  1. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
    :定时执行周期任务,任务执行完成后,延迟delay时间执行
  • command
    : 执行的任务
  • initialDelay
    : 初始延迟的时间
  • delay
    : 上次执行结束,延迟多久执行
  • unit
    :单位
@Test
public void testScheduleWithFixedDelay() throws InterruptedException {
    // 创建调度任务线程池
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    // 按照上次执行完成后固定延迟时间调度
    scheduledExecutorService.scheduleWithFixedDelay(() -> {
        try {
            log.info("scheduleWithFixedDelay ...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, 1, 2, TimeUnit.SECONDS);
    Thread.sleep(10000);
}
  1. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    :按照固定的评率定时执行周期任务,不受任务运行时间影响。
  • command
    : 执行的任务
  • initialDelay
    : 初始延迟的时间
  • period
    : 周期
  • unit
    :单位
@Test
public void testScheduleAtFixedRate() throws InterruptedException {
    // 创建调度任务线程池
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    // 按照固定2秒时间执行
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            log.info("scheduleWithFixedDelay ...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, 1, 2, TimeUnit.SECONDS);
    Thread.sleep(10000);
}

综合案例

通过
ScheduledThreadPoolExecutor
实现每周四 18:00:00 定时执行任务。

// 通过ScheduledThreadPoolExecutor实现每周四 18:00:00 定时执行任务
@Test
public void test() {
    //  获取当前时间
    LocalDateTime now = LocalDateTime.now();
    System.out.println(now);
    // 获取周四时间
    LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
    // 如果 当前时间 > 本周周四,必须找到下周周四
    if(now.compareTo(time) > 0) {
        time = time.plusWeeks(1);
    }
    System.out.println(time);
    // initailDelay 代表当前时间和周四的时间差
    // period 一周的间隔时间
    long initailDelay = Duration.between(now, time).toMillis();
    long period = 1000 * 60 * 60 * 24 * 7;
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    pool.scheduleAtFixedRate(() -> {
        System.out.println("running...");
    }, initailDelay, period, TimeUnit.MILLISECONDS);
}

底层源码解析

接下来一起看看 ScheduledThreadPool 的底层源码

数据结构

ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor:

ScheduledThreadPoolExecutor 内部构造了两个内部类 ScheduledFutureTask 和 DelayedWorkQueue:

  • ScheduledFutureTask: 继承了FutureTask,说明是一个异步运算任务;最上层分别实现了Runnable、Future、Delayed接口,说明它是一个可以延迟执行的异步运算任务。

  • DelayedWorkQueue: 这是 ScheduledThreadPoolExecutor 为存储周期或延迟任务专门定义的一个延迟队列,继承了 AbstractQueue,为了契合 ThreadPoolExecutor 也实现了 BlockingQueue 接口。它内部只允许存储 RunnableScheduledFuture 类型的任务。与 DelayQueue 的不同之处就是它只允许存放 RunnableScheduledFuture 对象,并且自己实现了二叉堆(DelayQueue 是利用了 PriorityQueue 的二叉堆结构)。

内部类ScheduledFutureTask

属性

//为相同延时任务提供的顺序编号
private final long sequenceNumber;

//任务可以执行的时间,纳秒级
private long time;

//重复任务的执行周期时间,纳秒级。
private final long period;

//重新入队的任务
RunnableScheduledFuture<V> outerTask = this;

//延迟队列的索引,以支持更快的取消操作
int heapIndex;
  • sequenceNumber: 当两个任务有相同的延迟时间时,按照 FIFO 的顺序入队。sequenceNumber 就是为相同延时任务提供的顺序编号。

  • time: 任务可以执行时的时间,纳秒级,通过triggerTime方法计算得出。

  • period: 任务的执行周期时间,纳秒级。正数表示固定速率执行(为scheduleAtFixedRate提供服务),负数表示固定延迟执行(为scheduleWithFixedDelay提供服务),0表示不重复任务。

  • outerTask: 重新入队的任务,通过reExecutePeriodic方法入队重新排序。

核心方法run()

public void run() {
    boolean periodic = isPeriodic();//是否为周期任务
    if (!canRunInCurrentRunState(periodic))//当前状态是否可以执行
        cancel(false);
    else if (!periodic)
        //不是周期任务,直接执行
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();//设置下一次运行时间
        reExecutePeriodic(outerTask);//重排序一个周期任务
    }
}

说明: ScheduledFutureTask 的run方法重写了 FutureTask 的版本,以便执行周期任务时重置/重排序任务。任务的执行通过父类 FutureTask 的run实现。内部有两个针对周期任务的方法:

  • setNextRunTime(): 用来设置下一次运行的时间,源码如下:
//设置下一次执行任务的时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)  //固定速率执行,scheduleAtFixedRate
        time += p;
    else
        time = triggerTime(-p);  //固定延迟执行,scheduleWithFixedDelay
}
//计算固定延迟任务的执行时间
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
  • reExecutePeriodic(): 周期任务重新入队等待下一次执行,源码如下:
//重排序一个周期任务
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {//池关闭后可继续执行
        super.getQueue().add(task);//任务入列
        //重新检查run-after-shutdown参数,如果不能继续运行就移除队列任务,并取消任务的执行
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();//启动一个新的线程等待任务
    }
}

reExecutePeriodic与delayedExecute的执行策略一致,只不过reExecutePeriodic不会执行拒绝策略而是直接丢掉任务。

cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}

ScheduledFutureTask.cancel本质上由其父类 FutureTask.cancel 实现。取消任务成功后会根据removeOnCancel参数决定是否从队列中移除此任务。

核心属性

//关闭后继续执行已经存在的周期任务 
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

//关闭后继续执行已经存在的延时任务 
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

//取消任务后移除 
private volatile boolean removeOnCancel = false;

//为相同延时的任务提供的顺序编号,保证任务之间的FIFO顺序
private static final AtomicLong sequencer = new AtomicLong();
  • continueExistingPeriodicTasksAfterShutdown:和executeExistingDelayedTasksAfterShutdown是 ScheduledThreadPoolExecutor 定义的 run-after-shutdown 参数,用来控制池关闭之后的任务执行逻辑。

  • removeOnCancel:用来控制任务取消后是否从队列中移除。当一个已经提交的周期或延迟任务在运行之前被取消,那么它之后将不会运行。默认配置下,这种已经取消的任务在届期之前不会被移除。 通过这种机制,可以方便检查和监控线程池状态,但也可能导致已经取消的任务无限滞留。为了避免这种情况的发生,我们可以通过setRemoveOnCancelPolicy方法设置移除策略,把参数removeOnCancel设为true可以在任务取消后立即从队列中移除。

  • sequencer:是为相同延时的任务提供的顺序编号,保证任务之间的 FIFO 顺序。与 ScheduledFutureTask 内部的sequenceNumber参数作用一致。

构造函数

首先看下构造函数,ScheduledThreadPoolExecutor 内部有四个构造函数,这里我们只看这个最大构造灵活度的:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

构造函数都是通过super调用了ThreadPoolExecutor的构造,并且使用特定等待队列DelayedWorkQueue。

Schedule

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//构造ScheduledFutureTask任务
    delayedExecute(t);//任务执行主方法
    return t;
}

说明: schedule主要用于执行一次性(延迟)任务。函数执行逻辑分两步:

  • 封装 Callable/Runnable
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}
  • 执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);//池已关闭,执行拒绝策略
    else {
        super.getQueue().add(task);//任务入队
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&//判断run-after-shutdown参数
            remove(task))//移除任务
            task.cancel(false);
        else
            ensurePrestart();//启动一个新的线程等待任务
    }
}

说明: delayedExecute是执行任务的主方法,方法执行逻辑如下:

  • 如果池已关闭(ctl >= SHUTDOWN),执行任务拒绝策略;

  • 池正在运行,首先把任务入队排序;然后重新检查池的关闭状态,执行如下逻辑:

A: 如果池正在运行,或者 run-after-shutdown 参数值为true,则调用父类方法ensurePrestart启动一个新的线程等待执行任务。ensurePrestart源码如下:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

ensurePrestart是父类 ThreadPoolExecutor 的方法,用于启动一个新的工作线程等待执行任务,即使corePoolSize为0也会安排一个新线程。

B: 如果池已经关闭,并且 run-after-shutdown 参数值为false,则执行父类(ThreadPoolExecutor)方法remove移除队列中的指定任务,成功移除后调用ScheduledFutureTask.cancel取消任务

scheduleAtFixedRate 和 scheduleWithFixedDelay

/**
 * 创建一个周期执行的任务,第一次执行延期时间为initialDelay,
 * 之后每隔period执行一次,不等待第一次执行完成就开始计时
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //构建RunnableScheduledFuture任务类型
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),//计算任务的延迟时间
                                      unit.toNanos(period));//计算任务的执行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);//执行用户自定义逻辑
    sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行
    delayedExecute(t);//执行任务
    return t;
}

/**
 * 创建一个周期执行的任务,第一次执行延期时间为initialDelay,
 * 在第一次执行完之后延迟delay后开始下一次执行
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    //构建RunnableScheduledFuture任务类型
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),//计算任务的延迟时间
                                      unit.toNanos(-delay));//计算任务的执行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);//执行用户自定义逻辑
    sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行
    delayedExecute(t);//执行任务
    return t;
}

说明: scheduleAtFixedRate和scheduleWithFixedDelay方法的逻辑与schedule类似。

注意scheduleAtFixedRate和scheduleWithFixedDelay的区别
: 乍一看两个方法一模一样,其实,在unit.toNanos这一行代码中还是有区别的。没错,scheduleAtFixedRate传的是正值,而scheduleWithFixedDelay传的则是负值,这个值就是 ScheduledFutureTask 的period属性。

shutdown()

public void shutdown() {
    super.shutdown();
}
//取消并清除由于关闭策略不应该运行的所有任务
@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    //获取run-after-shutdown参数
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {//池关闭后不保留任务
        //依次取消任务
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();//清除等待队列
    }
    else {//池关闭后保留任务
        // Traverse snapshot to avoid iterator exceptions
        //遍历快照以避免迭代器异常
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    //如果任务已经取消,移除队列中的任务
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate(); //终止线程池
}

说明: 池关闭方法调用了父类ThreadPoolExecutor的shutdown,具体分析见 ThreadPoolExecutor 篇。这里主要介绍以下在shutdown方法中调用的关闭钩子onShutdown方法,它的主要作用是在关闭线程池后取消并清除由于关闭策略不应该运行的所有任务,这里主要是根据 run-after-shutdown 参数(continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown)来决定线程池关闭后是否关闭已经存在的任务。

ScheduledThreadPoolExecutor吞掉异常

如果
ScheduledThreadPoolExecutor
中执行的任务出错抛出异常后,不仅不会打印异常堆栈信息,同时还会取消后面的调度, 直接看例子。

@Test
public void testException() throws InterruptedException {
    // 创建1个线程的调度任务线程池
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 创建一个任务
    Runnable runnable = new Runnable() {

        volatile int num = 0;

        @Override
        public void run() {
            num ++;
            // 模拟执行报错
            if(num > 5) {
                throw new RuntimeException("执行错误");
            }
            log.info("exec num: [{}].....", num);
        }
    };

    // 每隔1秒钟执行一次任务
    scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
    Thread.sleep(10000);
}

  • 只执行了5次后,就不打印,不执行了,因为报错了
  • 任务报错,也没有打印一次堆栈,更导致调度任务取消,后果十分严重。

解决方法也非常简单,只要通过try catch捕获异常即可。

public void run() {
    try {
        num++;
        // 模拟执行报错
        if (num > 5) {
            throw new RuntimeException("执行错误");
        }
        log.info("exec num: [{}].....", num);
    } catch (Exception e) {
	    e.printStackTrace();
    }
}

原理探究

那大家有没有想过为什么任务出错会导致异常无法打印,甚至调度都取消了呢?从源码出发,一探究竟。

从上面
delayedExecute
方法可以看到,延迟或周期性任务的主要执行方法, 主要是将任务丢到队列中,后续再由工作线程获取执行。

  1. 在任务入队列后,就是执行任务内容了,任务内容其实就是在继承了Runnable类的run方法中。
// ScheduledFutureTask#run方法
public void run() {
    // 是不是周期性任务
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 不是周期性任务的话, 直接调用一次下面的run    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性任务,则调用runAndReset方法,如果返回true,继续执行
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次调度时间
        setNextRunTime();
        // 重新执行调度任务
        reExecutePeriodic(outerTask);
    }
}
  • 这里的关键就是看
    ScheduledFutureTask.super.runAndReset()
    方法是否返回true,如果是true的话继续调度。
  1. runAndReset方法也很简单,关键就是看报异常如何处理。
// FutureTask#runAndReset
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    // 是否继续下次调度,默认false
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行任务
                c.call(); 
                // 执行成功的话,设置为true
                ran = true;

                // 异常处理,关键点
            } catch (Throwable ex) {
                // 不会修改ran的值,最终是false,同时也不打印异常堆栈
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 返回结果
    return ran && s == NEW;
}
  • 关键点ran变量,最终返回是不是下次继续调度执行
  • 如果抛出异常的话,可以看到不会修改ran为true。

小结

Java的ScheduledThreadPoolExecutor定时任务线程池所调度的任务中如果抛出了异常,并且异常没有捕获直接抛到框架中,会导致ScheduledThreadPoolExecutor定时任务不调度了。

封装包装类,统一调度

在实际项目使用中,可以在自己的项目中封装一个包装类,要求所有的调度都提交通过统一的包装类,从而规范代码,如下代码:

@Slf4j
public class RunnableWrapper implements Runnable {
    // 实际要执行的线程任务
    private Runnable task;
    // 线程任务被创建出来的时间
    private long createTime;
    // 线程任务被线程池运行的开始时间
    private long startTime;
    // 线程任务被线程池运行的结束时间
    private long endTime;
    // 线程信息
    private String taskInfo;

    private boolean showWaitLog;

    /**
     * 执行间隔时间多久,打印日志
     */
    private long durMs = 1000L;

    // 当这个任务被创建出来的时候,就会设置他的创建时间
    // 但是接下来有可能这个任务提交到线程池后,会进入线程池的队列排队
    public RunnableWrapper(Runnable task, String taskInfo) {
        this.task = task;
        this.taskInfo = taskInfo;
        this.createTime = System.currentTimeMillis();
    }

    public void setShowWaitLog(boolean showWaitLog) {
        this.showWaitLog = showWaitLog;
    }

    public void setDurMs(long durMs) {
        this.durMs = durMs;
    }

    // 当任务在线程池排队的时候,这个run方法是不会被运行的
    // 但是当任务结束了排队,得到线程池运行机会的时候,这个方法会被调用
    // 此时就可以设置线程任务的开始运行时间
    @Override
    public void run() {
        this.startTime = System.currentTimeMillis();

        // 此处可以通过调用监控系统的API,实现监控指标上报
        // 用线程任务的startTime-createTime,其实就是任务排队时间
        // 这边打印日志输出,也可以输出到监控系统中
        if(showWaitLog) {
            log.info("任务信息: [{}], 任务排队时间: [{}]ms", taskInfo, startTime - createTime);
        }

        // 接着可以调用包装的实际任务的run方法
        try {
            task.run();
        } catch (Exception e) {
            log.error("run task error", e);
        }

        // 任务运行完毕以后,会设置任务运行结束的时间
        this.endTime = System.currentTimeMillis();

        // 此处可以通过调用监控系统的API,实现监控指标上报
        // 用线程任务的endTime - startTime,其实就是任务运行时间
        // 这边打印任务执行时间,也可以输出到监控系统中
        if(endTime - startTime > durMs) {
            log.info("任务信息: [{}], 任务执行时间: [{}]ms", taskInfo, endTime - startTime);
        }

    }
}

当然,也还可以在包装类里面封装各种监控行为,如本例打印日志执行时间等。

其它使用注意点

  1. 为什么ThreadPoolExecutor 的调整策略却不适用于 ScheduledThreadPoolExecutor?

由于 ScheduledThreadPoolExecutor 是一个固定核心线程数大小的线程池,并且使用了一个无界队列,所以调整maximumPoolSize对其没有任何影响(所以 ScheduledThreadPoolExecutor 没有提供可以调整最大线程数的构造函数,默认最大线程数固定为Integer.MAX_VALUE)。此外,设置corePoolSize为0或者设置核心线程空闲后清除(allowCoreThreadTimeOut)同样也不是一个好的策略,因为一旦周期任务到达某一次运行周期时,可能导致线程池内没有线程去处理这些任务。

  1. Executors 提供了哪几种方法来构造 ScheduledThreadPoolExecutor?
  • newScheduledThreadPool: 可指定核心线程数的线程池。

  • newSingleThreadScheduledExecutor: 只有一个工作线程的线程池。如果内部工作线程由于执行周期任务异常而被终止,则会新建一个线程替代它的位置。

注意: newScheduledThreadPool(1, threadFactory) 不等价于newSingleThreadScheduledExecutor。newSingleThreadScheduledExecutor创建的线程池保证内部只有一个线程执行任务,并且线程数不可扩展;而通过newScheduledThreadPool(1, threadFactory)创建的线程池可以通过setCorePoolSize方法来修改核心线程数。

面试题专栏

Java面试题专栏
已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;

那么可以私信我,我会尽我所能帮助你。

前言

两个月前尤大在Vue 仓库中引入了
pkg.pr.new
,有了这个后Vue仓库中的每个commit或者PR都会自动触发一个新的发布,我们就可以在项目中体验
最新版本
的Vue啦。

关注公众号:【前端欧阳】,给自己一个进阶vue的机会

如何体验最新版本Vue

我们先来看看如何使用最新版本Vue。很简单,在Vue的GitHub上面去找一个
open状态
的Pull request。如下图:
open-pr

然后在这个PR中可以看到一个名为
pkg-pr-new
的机器人发布的评论,如下图:
bot

因为Vue是模块化设计,项目中的每个模块都会被发布成一个包。这些模块的名字都是以
@vue
开头的,并且支持单独使用。

一般我们都是使用整个Vue中的功能,所以在项目中使用Vue一般都是:

pnpm add vue

这样就是从
npm
中下载Vue的包的方法。

细心的小伙伴已经发现了,在图中
pnpm add
的不是
vue
,而是一个链接:

pnpm add https://pkg.pr.new/vue@12227

因为
pkg.pr.new
生成的Vue包是放到了
pkg.pr.new
自己网站上面的。不会发布到npm中,
pkg.pr.new
和npm是完全隔离的。

还有一点值得注意的是,当PR被关闭或者合并后,这个PR对应的
pkg.pr.new
站点中的包就会被清理。

Vue是如何集成
pkg.pr.new
的?

在Vue源码中可以看到有个
.github/workflows
文件夹,如下图:
ci

这个
.github/workflows
文件夹中包含一堆以
.yml
结尾的文件,这些文件是用来定义 GitHub Actions 工作流程的。

GitHub Actions 是 GitHub 提供的持续集成和持续部署(CI/CD)服务。

比如上面这种图中的
ci.yml
文件就定义了当代码push到Vue仓库中的任何一个分支,或者是发起一个到main 或 minor 分支的 Pull request 时,就会执行
ci.yml
文件中定义的jobs(任务)。

ci.yml
的完整代码如下:

name: 'ci'
on:
  push:
    branches:
      - '**'
    tags:
      - '!**'
  pull_request:
    branches:
      - main
      - minor

jobs:
  test:
    if: ${{ ! startsWith(github.event.head_commit.message, 'release:') && (github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository) }}
    uses: ./.github/workflows/test.yml

  continuous-release:
    if: github.repository == 'vuejs/core'
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v4

      - name: Install pnpm
        uses: pnpm/action-setup@v4

      - name: Install Node.js
        uses: actions/setup-node@v4
        with:
          node-version-file: '.node-version'
          registry-url: 'https://registry.npmjs.org'
          cache: 'pnpm'

      - name: Install deps
        run: pnpm install

      - name: Build
        run: pnpm build --withTypes

      - name: Release
        run: pnpx pkg-pr-new publish --compact --pnpm './packages/*'

这个
ci.yml
定义了一个名为
ci
的工作流,触发条件就是上面的
on
字段定义的内容:当代码push到Vue仓库中的任何一个分支,或者是发起一个到main 或 minor 分支的 Pull request 时。

工作流中的内容主要包含:检出代码、安装 pnpm、设置 Node.js 环境、安装依赖、构建项目、发布包。

pkg.pr.new
是在build命令打包完成后工作的,执行的命令在是
ci.yml
文件的末尾:

pnpx pkg-pr-new publish --compact --pnpm './packages/*'

执行这个命令后会将
packages
文件夹中的所有模块都发布到
pkg.pr.new
站点上面。

我们也可以在GitHub网站的Actions中看到
ci.yml
工作流的执行日志。

先在所有的工作流中找到名为
ci
的工作流,如下图:
actions

在actions页面可以看到有很多工作流,工作流的名字就是ci文件中定义的name字段。
ci.yml
文件中定义的name就是
ci

在右侧通过Pull request的编号,就可以找到这个Pull request所触发的
ci
工作流,点进去就是这样的:
release

从上图中可以看到在执行
pkg-pr-new publish
时将包发布到
pkg.pr.new
站点的日志了。

总结

Vue 引入了
pkg.pr.new
后,每个commit或者PR都会自动触发一个新的发布到
pkg.pr.new
网站上面。并且和npm站点上面发布的包不会冲突,我们也可以不用等待Vue发布就可以体验最新版本的Vue啦。

还有就是欧阳写Vue代码时总是需要在浏览器和vscode之间切来切去,非常不方便,一直想要找一个好用的分区的软件。

最近在研究刚买的明基RD280U显示器发现竟然还有配套的软件:
Display Pilot2
。这个软件不光可以直接控制显示器,比如调节亮度、切换显示模式、旋转显示器等。还可以做一些更牛逼的事情,比如欧阳最喜欢的两个功能:
桌面分区

Flow

1

桌面分区功能也是傻瓜式操作,直接将窗口拖到对应的区域就能直接分区啦。

split4.gif

这样就能利用分区功能一边写代码,一边在浏览器中看到页面效果,还能同时看微信群消息。

flow功能就更牛逼了,可以提前设置到指定时间段控制显示器做某些事情。另外不光可以控制显示器,还能打开应用程序。

5.png

4.png

比如欧阳晚上coding结束后需要刷一下视频换换脑子,不然容易失眠。这时就会使用flow功能,每晚11点自动打开B站APP,并且将coding时偏暗的“编码模式”切换为刷视频较亮的“观影模式”,实现自动化无缝切换。

flow.gif

关注公众号:【前端欧阳】,给自己一个进阶vue的机会

另外欧阳写了一本开源电子书
vue3编译原理揭秘
,看完这本书可以让你对vue编译的认知有质的提升。这本书初、中级前端能看懂,完全免费,只求一个star。

以前看过matlab的bwlookup函数,但是总感觉有点神秘,一直没有去仔细分析,最近在分析计算二值图像的欧拉数时,发现自己写的代码和matlab的总是对不少,于是又去翻了下matlab的源代码,看到了matlab里实现欧拉数的代码非常简单,如下所示:

if n==4
lut = 4*[0 0.25 0.25 0 0.25 0 .5 -0.25 0.25 0.5 0 -0.25 0 ...
-0.25 -0.25 0] + 2;
else
lut = 4*[0 0.25 0.25 0 0.25 0 -.5 -0.25 0.25 -0.5 0 -0.25 0 ...
-0.25 -0.25 0] + 2;
end
% Need to zero-pad the input
b = padarray(a,[1 1],'both');

weights = bwlookup(b,lut);
if coder.isColumnMajor
e = (sum(weights(:),'double') - 2*numel(b)) / 4;
else
e = (sum(sum(weights,2,'double'),1,'double') - 2*numel(b)) / 4;
end

在内部就是调用了bwlookup函数,那没办法了,就仔细看了M的帮助文档,发现原来这个函数真的非常简单,我们贴下M的帮助文档:

A = bwlookup(BW,lut)

The
bwlookup
function performs these steps to determine the value of each pixel in the processed image
A
:

  • Locate the pixel neighborhood in input image
    BW
    based on the coordinates of the target pixel in
    A
    . The function zero-pads border pixels of image
    BW
    when the neighborhood extends past the edge of
    BW
    .

  • Calculate an index,
    idx
    , based on the binary pixel pattern of the neighborhood.

  • Set the target pixel in
    A
    as the value of the lookup table at the index
    idx
    , in other words, the value of
    lut(idx)
    .

2-by-2 Neighborhood Lookup

For 2-by-2 neighborhoods, there are four pixels in each neighborhood. Each binary pixel has two possible states, therefore the total number of permutations is 24 and the length of the lookup table
lut
is 16.

To find the value of the target output pixel at (row, column) coordinate (
r
,
c
),
bwlookup
uses the 2-by-2 pixel neighborhood in the input binary image
BW
whose top left pixel is at coordinate (
r
,
c
). The index
idx
into the lookup table is the weighted sum of the four pixels in the neighborhood, plus 1.

Pixel weights start with a weight of 1 for the top left pixel in the neighborhood, and increase as powers of two along rows then along columns, with a final weight of 8 for the bottom right pixel.

3-by-3 Neighborhood Lookup

For 3-by-3 neighborhoods, there are nine pixels in each neighborhood. Each binary pixel has two possible states, therefore the total number of permutations is 29 and the length of the lookup table
lut
is 256.

To find the value of the target output pixel at (row, column) coordinate (
r
,
c
),
bwlookup
uses the 3-by-3 pixel neighborhood in the input binary image
BW
centered on coordinate (
r
,
c
). The index
idx
into the lookup table is the weighted sum of the nine pixels in the neighborhood, plus 1.

Pixel weights start with a weight of 1 for the top left pixel in the neighborhood, and increase as powers of two along rows then along columns, with a final weight of 256 for the bottom right pixel.

简单的说,这个查找表只有2种可能,一个是表的元素是16个,一种是由512个元素,其中16个元素时,查表的依据是以当前点为起点,向右向下各扩展一个像素范围,得到一个2*2领域4个像素,4个像素,每个像素也就只有2种可能的取值,这样就有16种可能的组合,对应16个元素。

当表的元素是512个时,查表的依据是以当前点为中心点,向左向右,向上向下各扩展一个像素,得到一个3*3的领域9个像素,同样每个像素也只有2种可能取值,所以共有1<<9即512种取值。

因为涉及到领域,所有在原图边缘的地方会有越界的图像,这个时候的原则是填充0,而不是复制最近的像素。

这样,当表中有不同的内容时,就可以实现不同的结果。

我们以16个元素的表为例,假定 lut[16] ={ 6 3 16 11 7 14 8 5 15 1 2 4 13 9 10 12},

对于下面的一个4*4的二值数据:

0   0   1   1
0 0 1 1
1 1 0 0
1 1 0 0

如果当前的点位于第二行第一列,则其2*2的领域范围的像素信息为:

0   0
1 1

  按照位置信息即对应的值计算索引为: 1 + 0 * 1 + 1 * 2 + 0 * 4 + 1 * 8 = 11, 对用的查表结果为 lut[11] = 2。
  使用类似的计算方法可以得到其他位置经过查表后的结果为:
  6    13    12    11
2 8 14 3
12 11 6 6
14 3 6 6


 注意,以上所有的下标起点都是1(matlab内部的约定),而且行列顺序和我们常用的C语言的二维数组也是掉了边的。
除了刚刚说的欧拉数的计算(其实没有搞懂欧拉数里的那个查找表是如何设计的,且8领域的欧拉数也是用的2*2的表,而不是3*3的),我们以matlab的bwarea函数为例,说明这个函数的价值。
matlab中关于bwarea函数的定义为:

bwarea
通过对图像中每个像素的面积求和来估计图像中所有
on
像素的面积。单个像素的面积是通过观察其 2×2 邻域来确定的。有六种不同的情形,每种情形表示一个不同面积:

  • 具有零个
    on
    像素的情形(面积 = 0)

  • 具有一个
    on
    像素的情形(面积 = 1/4)

  • 具有两个相邻
    on
    像素的情形(面积 = 1/2)

  • 具有两个对角
    on
    像素的情形(面积 = 3/4)

  • 具有三个
    on
    像素的情形(面积 = 7/8)

  • 具有所有四个
    on
    像素的情形(面积 = 1)

用图形化的表达方式上述六种情况对应如下(为了显示,使用绿色表示黑色值,红色表示白色值):
















索引值              0     1      2    3      4    5    6       7     8       9       10     11      12     13      14      15

面积值      0    1/4     1/4   1/2    1/4   1/2   3/4     7/8   1/4     3/4     1/2    7/8     1/2      7/8      7/8      1

把面积值放大八倍得到面积值依次为:  0 2 2 4 2 4 6 7 2 6 4 7 4 7 7 8,我们翻看matlab的bwarea函数,可以得到其代码如下:

lut = [0     2     2     4     2     4     6     7     2     6     4     7...4     7     7     8];% Need to zero-pad the input.
bb
= false(size(b)+2);
bb(
2:end-1,2:end-1) =b;

weights
=bwlookup(bb,lut);
a
= sum(weights(:),[],'double')/8;

查找表和这里的一摸一样。

使用查找表的优点是把领域所有的组合提前计算出结果来,而不用到程序里进行一些列复杂的判断,这样在很多情况下是可以提高速度的,比如刚刚这个面积计算的东西,如果在循环内部做,则要做N种判断,那如果是涉及到3*3的领域,那就更为复杂了。

朴素的来说,有了bwlookup的原理,要把这个算法移植到C++中,就是一个比较简单的过程了,假如是处理16元素的表,也就是涉及到了4个像素,假定他们的值分别为P0\P1\P2\P3,则最基本的C代码如下所示:

int Index = 0;if (P0 == 255)    Index += 1;if (P1 == 255)    Index += 2;if (P2 == 255)    Index += 4;if (P3 == 255)    Index += 8;
LinePD[X]
= Lut[Index];

处理512个元素的过程也是类似的,只不过索引值多加了几个(注意,这里用255表示白色,而不是1)。

如果考虑指令集优化,一个自然的翻译过程如下所示:

__m128i Index =_mm_setzero_si128();
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(1)), _mm_cmpeq_epi8(P0, _mm_set1_epi8(255)));
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(2)), _mm_cmpeq_epi8(P1, _mm_set1_epi8(255)));
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(4)), _mm_cmpeq_epi8(P2, _mm_set1_epi8(255)));
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(8)), _mm_cmpeq_epi8(P3, _mm_set1_epi8(255)));
_mm_storeu_si128((__m128i
*)(LinePD + X), _mm_shuffle_epi8(Lut128, Index));

一次性处理16个字节,最为关键的一点是,这个查表可以方便的直接使用_mm_shuffle_epi8非常高效的实现,这个在我前期的文章里多次提到(16个字节表的查找,我们假定lut的类型为字节类型,这个在实际的应用中也足够了),

这个代码的性能提升相对于C++来说是非常可观的,大概又4倍多的速度提升。
仔细上述代码其实还有提高的地方,我们使用_mm_blendv_epi8来进行结果的混合, 而混合的标志是值是否等于255,这里用_mm_cmpeq_epi8做判断,而_mm_cmpeq_epi8的结果就是如果P等于255,则返回255,否则返回0,那这个不就是P本省吗,所以根本就不用做这个判断,这样代码就变为:

__m128i Index =_mm_setzero_si128();
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(1)), P0, _mm_set1_epi8(255));
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(2)), P1, _mm_set1_epi8(255));
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(4)), P2, _mm_set1_epi8(255));
Index
= _mm_blendv_epi8(Index, _mm_add_epi8(Index, _mm_set1_epi8(8)), P3, _mm_set1_epi8(255));
_mm_storeu_si128((__m128i
*)(LinePD + X), _mm_shuffle_epi8(Lut128, Index));

另外,再观察这个C语言的特殊性,我们可以把上述C代码修改为:

int Index = 0;
Index
+= (1 &P0);
Index
+= (2 &P1);
Index
+= (4 &P2);
Index
+= (8 &P3);
LinePD[X]
= Lut[Index];

即直接使用位运算替换掉那个判断,这样对应的SSE指令也可以进行修改为如下代码:

__m128i Index =_mm_setzero_si128();
Index
= _mm_add_epi8(Index, _mm_and_si128(_mm_set1_epi8(1), P0));
Index
= _mm_add_epi8(Index, _mm_and_si128(_mm_set1_epi8(2), P1));
Index
= _mm_add_epi8(Index, _mm_and_si128(_mm_set1_epi8(4), P2));
Index
= _mm_add_epi8(Index, _mm_and_si128(_mm_set1_epi8(8), P3));//可以直接接用shuffle实现查找表
_mm_storeu_si128((__m128i*)(LinePD + X), _mm_shuffle_epi8(Lut128, Index));

相比于原始的SSE代码,这个改动也约有30%的性能提升的。

对于512个元素的表,情况会有所不同,因为索引值大于了255,所以已经无法用字节类型来保存累加后得到的索引了,至少的用ushort类型,但是呢,前8个位置的索引相加还是不会超出字节类型的,所以前8个位置还是可以一次性处理16个像素,到最后一个位置时,单独转换为16位,然后再接用2次16数据的处理指令,就可以一次性得到16个字节对应的查找表索引。

但是这个时候,由于表的大小时512,所以已经无法使用SSE去优化这个查表功能了,只能把索引值单个的提取出来,然后用普通的C语言去执行代码。如果CPU能支持AVX2,那么AVX2倒是又有关指令能直接查表,不过也要做很多的改造工程。

我们测试,对于512个元素的表,优化后的SSE指令处理一副 3000*2000的二值图,大概耗时在2ms不到,速度还是相当的快的。

搜索matlab的代码,除了bweuler及bwarea内使用了bwlookup,另外就是bwmorph里也大量的使用了bwlookup,而且都是使用的512个元素的表,也就是说使用3*3的领域,我们看bwmorph的帮助文档,有很多相关内容:

这些对应的查找表在 MATLAB\R2023b\toolbox\images\images\+images\+internal 目录下以lut开头的文件中可以找到,比如说,这个clean的表内容如下:

这些表的内容都是提前设计好的,然后可以多次重复的调用,从而得到某种效果。

一般来说,这种3*3的领域算子,在迭代了一定的次数后效果就不会有变化了。

我们在morph中也能看到一些常用的算子,比如这个remove就可以得到二值图像的边界,这个majority可以平滑噪音(和我博客里专门讲的那个majority效果还是有差异的)。



原图                  remove              majority



fatten                       skeleton                 thin

个人感觉这个方法在处理一些比较复杂的领域信息时还是很有帮助的,特别是不方便用判断条件一个一个的写的,如果能提前把这个表弄出来,那就效率能得到很大的提升的。

在个人的SSE Demo里,也集成了这个算法,其内容在Binary(二值处理)--》Processing(后处理)-->> Morph(形态学)中,有兴趣的朋友可以看看。

SSE Demo下载地址:
https://files.cnblogs.com/files/Imageshop/SSE_Optimization_Demo.rar

PolarPlane
,顾名思义,是用于创建极坐标平面的类。

与笛卡尔坐标系不同,极坐标系是基于角度和半径来定位点的,这里的每个点由一个
角度
和距离原点的
距离
表示。


Manim
中,
PolarPlane
通过极径($ r
\()和极角(\)
\theta $)来展示坐标系,这种表示方式便于处理与角度和半径相关的数学概念。

无论是坐标系网格,还是坐标的标记,
PolarPlane
都提供了直观的展示方式。

PolarPlane
一般用于展示极函数、幅角等极坐标相关的数学概念。

1. 主要参数

极坐标系
的参数和之前介绍的
直角坐标系
差别很大,主要参数如下:

参数名称 类型 说明
azimuth_step float 方位角(极角)标记之间的角度步长
azimuth_units str 方位角的单位
azimuth_compact_fraction bool 是否以紧凑的分数形式显示方位角标签
azimuth_offset float 方位角的偏移量,影响角度的起始位置
azimuth_direction str 方位角的递增方向
azimuth_label_buff float 方位角标签与极坐标图的距离
azimuth_label_font_size float 方位角标签的字体大小
size float 极坐标平面的大小,若未指定,则根据radius_max自动计算
radius_step float 半径标记之间的间隔
radius_max float 极坐标平面上半径的最大值
radius_config dict 自定义半径标记的样式
background_line_style dict 背景线的样式
faded_line_style dict 淡化线条的样式,用于控制辅助线的风格
faded_line_ratio int 控制淡化线条的比例

上面的参数有几个需要补充说明一下,

一个是
azimuth_units
参数,表示方位角的单位,它的值固定为以下5种:

  1. "PI radians"
    :$ \pi
    \(弧度,范围\)
    [0, 2\pi] $
  2. "TAU radians"
    :$ \tau
    \(弧度,范围\)
    [0, \tau]
    \(,其中\)
    \tau = 2\pi $
  3. "degrees"
    :度数,范围$ [0, 360] $
  4. "gradians"
    :梯度,范围$ [0, 400] $
  5. None
    :数值,范围$ [0, 1] $

还有
azimuth_direction
参数,它的值有2种:

  1. CW:顺时针
  2. CCW:逆时针

2. 主要方法

PolarPlane
也继承了坐标系统
CoordinateSystem
类的方法,

其中,常用的是以下2个方法:

名称 说明
add_coordinates 在极坐标平面上添加坐标轴和刻度标签
plot_polar_graph 在极坐标平面上绘制极坐标函数$ r=f(\theta) $的图像

3. 使用示例

下面通过示例展示如何使用
PolarPlane
的参数和方法来创建和自定义极坐标平面。

3.1. 基本极坐标平面

这个示例创建了一个基本的极坐标平面,没有过多的自定义设置。

只是启用了
add_coordinates
方法来显示坐标轴和刻度标签。

plane = PolarPlane()
plane.add_coordinates()

3.2. 自定义角度单位和范围

这个示例先创建一个极坐标平面,然后对其角度单位和范围进行了自定义。

我们设置不同的
azimuth_units

azimuth_step
的值来更改角度刻度的单位和间隔,使其更加密集或稀疏,以适应不同的展示需求。

# 角度作为刻度
plane1 = PolarPlane(
    azimuth_units="degrees",
    azimuth_step=12,
)
plane1.add_coordinates()

# 弧度作为刻度
plane2 = PolarPlane(
    azimuth_units="PI radians",
    azimuth_step=10,
)
plane2.add_coordinates()

# 梯度作为梯度
plane3 = PolarPlane(
    azimuth_units="gradians",
    azimuth_step=20,
)
plane3.add_coordinates()

上图分别用不同的刻度(
角度

弧度

梯度
)和间隔(
12

10

20
)展示了极坐标系。

3.3. 自定义极坐标样式

这个示例演示如何通过
PolarPlane

background_line_style
参数和
faded_line_style
参数来控制极坐标系的
背景线

淡化线
的显示效果。

线的颜色,粗细都可以根据显示需要灵活调整。

plane1 = PolarPlane(
    background_line_style={
        "stroke_color": RED,
        "stroke_width": 2,
        "stroke_opacity": 0.5,
    },
)

plane2 = PolarPlane(
    background_line_style={
        "stroke_color": YELLOW,
        "stroke_width": 4,
        "stroke_opacity": 0.5,
    },
    faded_line_style={
        "stroke_color": GREY,
        "stroke_width": 2,
        "stroke_opacity": 0.3,
    },
    faded_line_ratio=2,
)

plane3 = PolarPlane(
    background_line_style={
        "stroke_color": GREEN,
        "stroke_width": 2,
    },
    faded_line_style={
        "stroke_color": TEAL,
        "stroke_width": 1,
        "stroke_opacity": 0.6,
    },
    faded_line_ratio=2,
)

3.4. 极坐标函数图像

在这个示例中,我们利用
PolarPlane

plot_polar_graph
方法来在极坐标系中绘制函数图像。

通过函数:$ y=f(\theta)=3\times \sin(6\theta) $绘制一个花瓣的图案;

通过函数:$ y=f(\theta) =2.5\times (1-\sin(\theta)) $绘制一个爱心的图案。

plane = PolarPlane(size=4)

# 花瓣
r = lambda theta: 3 * np.sin(theta * 6)
graph1 = plane.plot_polar_graph(r, [0, 2 * PI], color=YELLOW)

# 爱心
r = lambda theta: 2.5 * (1 - np.sin(theta))
graph2 = plane.plot_polar_graph(r, [0, 2 * PI], color=RED)

4. 附件

文中的代码只是关键部分的截取,完整的代码共享在网盘中(
polar_plane.py
),

下载地址:
完整代码
(访问密码: 6872)

良好的权重初始化可以有效降低深度神经网络(
DNN
)模型的训练成本。如何初始化参数的选择是一个具有挑战性的任务,可能需要手动调整,这可能既耗时又容易出错。为了解决这些限制,论文迈出了建立权重生成器以合成神经网络初始化权重的创新一步。采用图像到图像的转换任务,使用生成对抗网络(
GAN
)作为示例,因为这方面的模型权重收集相对简单。

具体而言,首先收集了一个包含各种图像编辑概念及其对应训练权重的数据集,这些数据集随后用于权重生成器的训练。为了应对层之间的不同特性及需要预测的权重数量庞大,将权重划分为相等大小的块,并为每个块分配一个索引。随后,使用文本条件(即概念)和块索引的这种数据集来训练扩散模型。通过用扩散模型预测的去噪权重初始化图像转换模型,训练只需
43.3
秒。与从头开始训练(即
Pix2pix
)相比,实现了新概念训练时间加速
15\times
的效果,同时获得了更好的图像生成质量。

来源:晓飞的算法工程笔记 公众号,转载请注明出处

论文: Efficient Training with Denoised Neural Weights

Introduction


高效训练深度神经网络(
DNN
)不仅加快了模型开发过程,还降低了对计算资源和成本的要求。许多之前的研究探讨了高效训练策略,如稀疏训练和低比特训练。然而,实现高效训练往往受到有效初始化模型权重的挑战所阻碍。虽然在权重初始化领域已经采取了一些措施,但在不同任务中确定合适的方案仍然具有挑战性。调节权重初始化的参数可能会耗时且容易出错,导致性能不佳和训练时间增加。

为了解决这些挑战,受到最近
HyperNetworks
设计进展的启发,论文首次研究了构建一个权重生成器的可行性,以在不同任务中提供更好的权重初始化,减少获得经过良好训练的
DNN
模型所需的训练时间和资源消耗。以使用
GAN
模型训练的图像到图像翻译任务为例,展开在预测神经权重方面的设计。需要注意,论文的框架是一个通用设计,并不局限于生成
GAN
权重,选择这个例子的原因在于可以轻松获取在不同数据集上训练的海量不同权重。

更具体地说,权重生成器可以为未见的新概念和风格预测初始化权重。为了减少需要预测的权重数量,将低秩适配(
LoRA
)应用于图像生成模型,从而在保持高质量图像生成的同时,大幅减少模型参数的数量。由于
GAN
模型由不同类型的层组成,且具有不同的权重大小和数量,对权重进行分组,并将其划分为大小相等的块。利用扩散过程来建模
GAN
模型的训练好的权重空间,通过训练扩散模型进行权重估计,即权重生成器。为了提高权重生成器的性能,进一步将块索引作为权重生成器中的一个条件机制,采用正弦位置编码方案,并计算块索引的嵌入。该嵌入为权重生成器提供关于每个权重块在所有模型权重中的位置的信息。在获得权重生成器后,为了训练一个基于
GAN
的图像翻译模型,通过单步去噪过程快速推断权重生成器,并使用预测的权重来初始化
GAN
模型。
GAN
模型只需随后进行高效的微调过程即可获得高质量的图像生成结果,显著减少获得新颖未见概念模型的时间消耗。

论文贡献总结如下:

  1. 提出了一个框架,用于生成不同概念/风格的权重初始化,以高效训练用于图像翻译的
    GAN
    模型。
  2. 在扩散模型的帮助下(即准备成对的图像数据集),收集了大量不同概念/风格的
    LoRA
    权重的真实数据集,这为权重生成器的训练奠定了基础。
  3. 通过利用扩散过程,引入了一种高效的权重生成器设计,该设计将文本概念信息和块索引作为输入。为了处理不同的层类型和权重形状,将权重组织为大小相等的一维块,显著减少了计算开销。通过将块索引与时间步(
    time step
    )嵌入相结合,这些块索引被无缝集成到权重生成器设计中。因此,权重生成器掌握了每个权重块在所有模型权重中的位置信息。
  4. 提出的框架可以通过单次去噪步骤预测
    GAN
    模型的初始化神经权重,仅需
    \(1.19\)
    秒。通过使用预测的权重进行初始化,快速微调过程可以在
    \(42.1\)
    秒内传达目标风格。与从头训练(即
    Pix2pix
    )相比,将总训练时间减少了
    \(15\times\)
    ,同时保持了更好的图像生成质量。与其他高效训练方法相比,可以节省
    \(4.6\times\)
    的训练时间。

Motivations and Challenges


有效的权重初始化对稳定训练至关重要,能够促进更快的学习速率,加速收敛,并增强泛化能力。然而,在不同任务中确定良好的权重初始化仍然具有挑战性。受到最近超网络(
HyperNetwork
)进展的启发,论文希望调查是否可以构建一个权重生成器来获取良好的权重初始化,从而减少训练时间和资源消耗。与流行的图像/视频生成不同,探索权重生成的研究工作相对较少。构建这样的权重生成器前景广阔但也面临挑战。

第一个重大挑战来自深度神经网络(
DNN
)架构中的不同层类型。每一层的权重具有不同的大小和形状,这就需要一种能够适应这种异质性的权重生成方法。其次,权重生成器必须具备高效生成大量参数的能力,以确保网络的全面覆盖。第三,权重生成器的推理过程应快速有效,以节省为新任务获取权重的时间。

解决这些挑战有望构建出效率更高且有效性更强的深度学习系统的
DNN
训练模式。因此,在本研究中,论文研究了权重生成器的构建,以实现更好的权重初始化。论文旨在展示权重生成能力不仅限于在特定数据集上对单一模型架构的权重初始化,如基于 在
CIFAR-10
上的
ResNet-18
,而是适用于不同任务的多种模型。为了实现这一目标,以
GANs
在图像到图像转换任务中的初始化权重生成为例,因为收集多样化数据集用于
GAN
模型相对容易,但论文的方法并不局限于
GAN
架构或图像到图像转换任务。

Method


目标是训练一个权重生成器,以预测不同任务的权重初始化。以
GANs
在图像到图像转换任务中的应用为例,当出现新概念/风格时,可以查询权重生成器以提供初始化所需的权重值。权重生成器采用扩散过程建模,如图
1
所示。

与从纯噪声反转出干净图像的图像扩散模型不同,该框架旨在将噪声转化为用于初始化的权重值。通过插入预测的权重值,快速微调过程得以进行,以实现目标风格的
GAN
模型的高效训练。框架的核心是权重生成器的设计。

Dataset Collection

为了有效地训练一个权重生成器,用于生成不同概念的
GAN
模型的权重初始化,需要收集一个大规模的真实权重值数据集。为了获得真实权重值数据集,大规模的提示数据集显得尤为重要。通过使用提示数据集中的概念/风格,可以利用扩散模型进行图像收集,从而获得每个目标概念的代表性图像的丰富集合。每个概念/风格的图像进一步被利用来训练
GAN
,以获得真实的
GAN
权重。

作为权重生成器训练的数据准备基础,提示数据集应包括多样化的视觉概念/风格,以使权重生成器能够学习全面的表示,用于初始化针对特定任务的
GAN
。然而,收集这样一个数据集的过程面临巨大的挑战。确保不同概念/风格之间的多样性和代表性需要大量的数据。此外,收集到的提示还会进一步用于利用扩散模型生成目标概念/风格的图像。

为了构建提示数据集以训练一个可靠的权重生成器用于
GAN
权重初始化,采用了一种系统的方法,结合大语言模型 (
LLMs
) 进行风格生成和增强,以确保概念表现的丰富性和多样性。首先概述三个广泛的类别:
1
)艺术概念,
2
)特征概念,以及
3
)面部修改概念。在每个类别中,利用一个大语言模型(
ChatGPT-3.5
)来请求生成一系列包含各种概念的文本描述。通过过滤冗余的概念/风格,进一步通过查询另一个大语言模型(
Vicuna
)来实施增强方法,以提供具有相似含义但不同表现的概念/风格。为了进一步丰富提示数据集,还在不同类别之间排列和组合概念/风格。通过这个过程,能够构造一个大规模的提示数据集,不仅涵盖了多样化的概念领域,且捕捉了复杂的风格差异,为权重生成器的训练提供了更好的权重初始化基础。

在提示数据集收集之后,使用扩散模型编辑真实图像,以获得提示数据集中每个概念/风格的编辑图像,形成用于
GAN
训练的数据对。在这里,采用了一种混合了
ResNet
块和
Transformer
块的生成器(
E2gan
)作为训练模型,这种模型的有效性和混合架构设计能够展示不同类型层上的生成能力。在
GAN
训练过程后,为不同的概念/风格建立一个来自
GAN
检查点的权重数据集。为了进一步增强权重值数据集,在
FID
指标收敛后,为每个概念/风格保存
\(K\)
个检查点。

Data Format Design for Weight Generator

为了训练一个能够高效生成适用于不同概念的
GAN
模型权重初始化的权重生成器,设计训练和推断的权重格式是非常重要的。目标是每当新的概念作为输入提供给权重生成器时,它能够为该概念生成所有层的权重初始化。考虑到模型中存在多种不同类型的层,例如全连接层(
FC
)、卷积层(
CONV
)和批量归一化层(
BN
),以及不同层之间的大小和维度差异,设计合适的数据格式变得至关重要且具有挑战性。此外,
GAN
模型中的权重规模通常在百万级别,这为数据格式设计带来了更多挑战。

需要预测的权重数量越大,权重生成器面临的困难就越多。为了减轻这一问题,对不同层应用低秩适配(
LoRA
),以大幅减少需要预测的权重数量。例如,对于一个权重为
\(\mathbf{w}_i \in \mathbb{R}^{c\times f \times k_h\times k_w}\)
的卷积层
\(i\)
,应用两个秩为
\(r_i\)
的低秩矩阵,即
\(\mathbf{w}_{i}^A \in \mathbb{R}^{c\times r_i \times k_h \times k_w}\)
作为
LoRA
下层,
\(\mathbf{w}_{i}^B \in \mathbb{R}^{r_i \times f \times 1\times1}\)
作为
LoRA
上层,以近似权重变化。通过这样做,待预测的权重总量从
7.06M
减少到
0.22M
。微调
LoRA
权重足以转移
GAN
模型的生成领域,尽管大大减少了权重数量,但通过权重生成器一次性直接预测所有
0.22M
权重仍然是具有挑战性的。这需要一个大的权重生成器,并且会带来巨大的计算和内存负担。

为了解决这个问题,将权重划分为多个组,以减轻计算复杂性,并增强在训练和推断期间将权重生成器适配到内存中的可行性。由于不同层具有不同的统计特性,将每个层
\(i\)

LoRA
下层和上层(如果适用,还包括相关的
BN
层)分为一个组。尽管如此,每个组的权重数量和形状仍然不同。因此,进一步将权重展平为一维向量,并将权重划分为
\(N\)
个等大小的块,每个块包含
\(b\)
个权重。

于是,数据格式表示为
\(<n, \mathbf{w}_n, T>\)
,其中
\(n\)
是块索引,
\(\mathbf{w}_n \in \mathbb{R}^b\)
是第
\(n\)
个权重块的展平一维权重向量,
\(T\)
表示当前概念/风格的文本提示。使用这种数据格式的优点包括:
1
)适用于不同类型和形状的层;
2
)降低了计算复杂性和预测难度;
3
)使权重生成器更容易适配到内存中。

Weight Generator Training

使用论文的权重值数据集训练一个生成模型,该模型学习为其他概念/风格提供权重初始化。通过扩散过程对
GAN
的权重初始化空间进行建模。生成器是一个
UNet
权重信息生成器
\(\hat{\mathbf{\epsilon}}_\theta\)
,其参数为
\(\theta\)
,用于一维向量,如图
2
所示。将权重块
\(\mathbf{w}_n\)
从真实权重分布
\(p(\mathbf{w}_n)\)
扩散(迭代)为一个噪声版本,并训练去噪
UNet
逐渐逆转这个过程,从高斯噪声中生成权重。训练可以形式化为以下噪声预测问题:

\[\begin{equation}
\min_\theta \mathbb{E}[\| \hat{\epsilon}_\theta(\mathbf{w}_n^t,t,n,\tau(T)) - \mathbf{\epsilon} \|_2^2],
\end{equation}
\]

其中
\(t\)
表示时间步;
\(\epsilon\)
是真实噪声;
\(\mathbf{w}_n^t = \alpha_t \mathbf{w}_n + \sigma_t \epsilon\)
是块
\(n\)
的噪声权重;
\(\alpha_t\)

\(\sigma_t\)
分别是信号和噪声的强度,由噪声调度器决定;
\(\tau\)
是一个冻结的文本编码器,如
CLIP

为了将块索引作为权重生成器中的进一步条件机制,采用来自于序列到序列模型中常用的正弦位置编码。计算正弦块索引编码,该编码用于向权重生成器提供有关每个权重块在所有模型权重中的位置的信息。具体而言,令
\(N\)
表示权重块的总数,
\(d\)
表示编码的维度。块索引
\(n\)
的正弦块索引编码
\(\text{SinEnc}(n, d)\)
计算如下:

\[\begin{equation}
\text{SinEnc}(n, 2i) = \sin\left(\frac{n}{10000^{2i/d}}\right), \text{SinEnc}(n, 2i + 1) = \cos\left(\frac{n}{10000^{2i/d}}\right),
\end{equation}
\]

其中
\(i\)

0

\(\left\lfloor\frac{d-1}{2}\right\rfloor\)
。将正弦编码输入到嵌入层中,以获得块索引嵌入
\(emb\_n\)
,将块索引嵌入
\(emb\_n\)
与时间步嵌入
\(emb\_t\)
结合,表示为
\(emb = emb\_n + emb\_t\)
,以便在生成器的每个残差块中使用。因此,权重生成器在整个去噪过程中都可以访问块索引。根据结果,论文观察到块索引
\(n\)
能够有效地建模来自不同块的权重,而不必依赖于先前预测的权重,从而大大减少了计算量。

Fast Fine-Tuning with Generated Weight Initializations

当出现一个新概念/风格
\(T\)
时,可以通过对每个权重块
\(n\)
进行已训练权重生成器
\(\hat{\epsilon}_\theta\)
的推理来获得权重初始化。为了快速获取权重初始化,采用直接重建方法以避免迭代去噪过程。更具体地说,选定的偏向噪声的时间步
\(t\)
,推理去噪扩散模型来预测噪声
\(\hat{\epsilon}_\theta(\mathbf{w}_n^t, t, n, \tau(T))\)
,并进行直接恢复以获得真实权重
\(\mathbf{w}_{n}=\mathbf{w}_{n}^0\)

\[\begin{equation}
\mathbf{w}_{n}^0 = \frac{1}{\alpha_t} \mathbf{w}_{n}^t - \sigma_t \hat{\epsilon}_\theta(\mathbf{w}_n^t,t,n,\tau(T)).
\end{equation}
\]

在对所有
\(N\)
个权重块进行推理之后,可以获得概念/风格
\(T\)
的权重初始化
\(\{\mathbf{w}_{n} \}_{n=1}^N\)

为了更好地捕捉新概念/风格的细节,利用条件
GAN
损失对
GAN
权重进行进一步的微调,具体如下:

\[\begin{equation}
\begin{aligned}
&\min_{\mathbf{w}_{lora}} \max_{\mathbf{w}_d} \lambda \underbrace{ \mathbb{E}_{\mathbf{x},\tilde{\mathbf{x}}^T,\mathbf{z}, T} \left[ \| \tilde{\mathbf{x}}^T - \mathcal{G}(\mathbf{x}, \mathbf{z}, T;\mathbf{w}_g,\mathbf{w}_{lora}) \|_1 \right]}_{\textrm{$\ell_1$ loss}} + \\ &\underbrace{\mathbb{E}_{\mathbf{x}, \tilde{\mathbf{x}}^T} \left[\log \mathcal{D} (\mathbf{x},\tilde{\mathbf{x}}^T; \mathbf{w}_d) \right] + \mathbb{E}_{\mathbf{x},{\mathbf{z}}, T} \left[\log (1- \mathcal{D} (\mathbf{x}, \mathcal{G}(\mathbf{x},\mathbf{z}, {T}; \mathbf{w}_g); \mathbf{w}_d)) \right]}_{\textrm{conditional GAN loss}},
\end{aligned}
\end{equation}
\]

其中
\(\tilde{\mathbf{x}}^T\)
表示由扩散模型生成的基于目标风格的概念
\(T\)
的图像,
\(\mathcal{G}\)
是具有原始权重
\(\mathbf{w}_g\)

LoRA
权重
\(\mathbf{w}_{lora}\)
的生成器,
\(\mathcal{D}\)
表示由
\(\mathbf{w}_d\)
参数化的判别器函数,
\(\mathbf{z}\)
是引入的随机噪声,以增加输出的随机性,
\(\lambda\)
可用于调整两个损失项之间的相对重要性。

在微调过程中,生成器仅优化使用预测
\(\{\mathbf{w}_{n} \}_{n=1}^N\)
初始化的
LoRA
权重
\(\mathbf{w}_{lora}\)
。通过从预测中初始化
GAN
权重,能够使用更少的训练周期达到相同或更好的
FID
性能。除了在预测后进行微调外,还考虑将公式
4
中的
GAN
训练损失纳入公式
1
中的权重预测损失。然而,通过实验,论文发现将这两个损失项结合并不能提供更好的性能,反而增加了训练权重生成器的计算成本。

Experiments




如果本文对你有帮助,麻烦点个赞或在看呗~
更多内容请关注 微信公众号【晓飞的算法工程笔记】

work-life balance.