2024年10月

技术背景

2024年诺贝尔物理学奖和化学奖的揭幕,正式宣告了科学界对AI时代的认可,人工智能正在全方位的改变人类社会各种场景的互作模式,而数据拟合以及误差与算力的控制,则是大多数人工智能工作者所关注的重点。与数据拟合的思想不同的是,传统的数值计算中人们更倾向于使用多项式进行精确的参数计算,这种方法叫做插值。当然,插值算法的精确是相对于边界条件而言的,随着点数的变化,不同的插值算法有不同的余项。现在在模型训练中,因为数据点本身就是有误差的,所以强行使用插值算法会导致过拟合的现象。只有在一些传统的对精度要求较高的计算场景中,保留了插值算法的应用。

线性插值

给定两个点:
\((x_1,y_1),(x_2,y_2)\)
,其插值出来的线性函数为:

\[f(x)=\frac{y_2-y_1}{x_2-x_1}x+y_1-\frac{y_2-y_1}{x_2-x_1}x_1=\frac{y_2-y_1}{x_2-x_1}x+y_2-\frac{y_2-y_1}{x_2-x_1}x_2
\]

稍微改写一下形式有:

\[f(x)=\left(\frac{x_2-x}{x_2-x_1}\right)y_1+\left(\frac{x-x_1}{x_2-x_1}\right)y_2
\]

可以得到
\(f(x_1)=y_1,f(x_2)=y_2\)

二次插值

给定三个点:
\((x_1,y_1),(x_2,y_2),(x_3,y_3)\)
,假设其插值函数为:
\(f(x)=ax^2+bx+c\)
,那么可以根据三个点联立方程组,写成矩阵形式就是:

\[\left(
\begin{matrix}
x_1^2&&x_1&&1\\
x_2^2&&x_2&&1\\
x_3^2&&x_3&&1
\end{matrix}
\right)\left(
\begin{matrix}
a\\b\\c
\end{matrix}
\right)=\left(
\begin{matrix}
y_1\\y_2\\y_3
\end{matrix}
\right)
\]

所以求解系数
\(a,b,c\)
变成了一个矩阵求逆问题,可以手动做一个初等变换:

\[\begin{align*}
\left(
\begin{matrix}
x_1^2&&x_1&&1&&1&&0&&0\\
x_2^2&&x_2&&1&&0&&1&&0\\
x_3^2&&x_3&&1&&0&&0&&1
\end{matrix}
\right)&\rightarrow
\left(
\begin{matrix}
1&&\frac{1}{x_1}&&\frac{1}{x_1^2}&&\frac{1}{x_1^2}&&0&&0\\
1&&\frac{1}{x_2}&&\frac{1}{x_2^2}&&0&&\frac{1}{x_2^2}&&0\\
1&&\frac{1}{x_3}&&\frac{1}{x_3^2}&&0&&0&&\frac{1}{x_3^2}
\end{matrix}
\right)\\
&\rightarrow
\left(
\begin{matrix}
1&&\frac{1}{x_1}&&\frac{1}{x_1^2}&&\frac{1}{x_1^2}&&0&&0\\
0&&\frac{x_1-x_2}{x_1x_2}&&\frac{x_1^2-x_2^2}{x_1^2x_2^2}&&-\frac{1}{x_1^2}&&\frac{1}{x_2^2}&&0\\
0&&\frac{x_1-x_3}{x_1x_3}&&\frac{x_1^2-x_3^2}{x_1^2x_3^2}&&-\frac{1}{x_1^2}&&0&&\frac{1}{x_3^2}
\end{matrix}
\right)\\
&\rightarrow
\left(
\begin{matrix}
1&&\frac{1}{x_1}&&\frac{1}{x_1^2}&&\frac{1}{x_1^2}&&0&&0\\
0&&\frac{x_1-x_2}{x_1x_2}&&\frac{x_1^2-x_2^2}{x_1^2x_2^2}&&-\frac{1}{x_1^2}&&\frac{1}{x_2^2}&&0\\
0&&\frac{x_1-x_2}{x_1x_2}&&\frac{(x_1+x_3)(x_1-x_2)}{x_1^2x_2x_3}&&-\frac{x_3(x_1-x_2)}{x_1^2x_2(x_1-x_3)}&&0&&\frac{x_1-x_2}{x_2x_3(x_1-x_3)}
\end{matrix}
\right)\\
&\rightarrow
\left(
\begin{matrix}
1&&\frac{1}{x_1}&&\frac{1}{x_1^2}&&\frac{1}{x_1^2}&&0&&0\\
0&&\frac{x_1-x_2}{x_1x_2}&&\frac{x_1^2-x_2^2}{x_1^2x_2^2}&&-\frac{1}{x_1^2}&&\frac{1}{x_2^2}&&0\\
0&&0&&\frac{(x_1-x_2)(x_2-x_3)}{x_1x_2^2x_3}&&\frac{x_2-x_3}{x_1x_2(x_1-x_3)}&&-\frac{1}{x_2^2}&&\frac{x_1-x_2}{x_2x_3(x_1-x_3)}
\end{matrix}
\right)\\
&\rightarrow
\left(
\begin{matrix}
1&&\frac{1}{x_1}&&\frac{1}{x_1^2}&&\frac{1}{x_1^2}&&0&&0\\
0&&1&&\frac{x_1+x_2}{x_1x_2}&&-\frac{x_2}{x_1(x_1-x_2)}&&\frac{x_1}{x_2(x_1-x_2)}&&0\\
0&&0&&1&&\frac{x_2x_3}{(x_1-x_2)(x_1-x_3)}&&-\frac{x_1x_3}{(x_1-x_2)(x_2-x_3)}&&\frac{x_1x_2}{(x_1-x_3)(x_2-x_3)}
\end{matrix}
\right)\\
&\rightarrow
\left(
\begin{matrix}
1&&\frac{1}{x_1}&&\frac{1}{x_1^2}&&\frac{1}{x_1^2}&&0&&0\\
0&&1&&0&&-\frac{x_2+x_3}{(x_1-x_2)(x_1-x_3)}&&\frac{x_1+x_3}{(x_1-x_2)(x_2-x_3)}&&-\frac{x_1+x_2}{(x_1-x_3)(x_2-x_3)}\\
0&&0&&1&&\frac{x_2x_3}{(x_1-x_2)(x_1-x_3)}&&-\frac{x_1x_3}{(x_1-x_2)(x_2-x_3)}&&\frac{x_1x_2}{(x_1-x_3)(x_2-x_3)}
\end{matrix}
\right)\\
&\rightarrow
\left(
\begin{matrix}
1&&0&&0&&\frac{1}{(x_1-x_2)(x_1-x_3)}&&\frac{-1}{(x_1-x_2)(x_2-x_3)}&&\frac{1}{(x_1-x_3)(x_2-x_3)}\\
0&&1&&0&&-\frac{x_2+x_3}{(x_1-x_2)(x_1-x_3)}&&\frac{x_1+x_3}{(x_1-x_2)(x_2-x_3)}&&-\frac{x_1+x_2}{(x_1-x_3)(x_2-x_3)}\\
0&&0&&1&&\frac{x_2x_3}{(x_1-x_2)(x_1-x_3)}&&-\frac{x_1x_3}{(x_1-x_2)(x_2-x_3)}&&\frac{x_1x_2}{(x_1-x_3)(x_2-x_3)}
\end{matrix}
\right)
\end{align*}
\]

也就是说,最终的逆矩阵为:

\[\left(
\begin{matrix}
\frac{1}{(x_1-x_2)(x_1-x_3)}&&\frac{-1}{(x_1-x_2)(x_2-x_3)}&&\frac{1}{(x_1-x_3)(x_2-x_3)}\\
-\frac{x_2+x_3}{(x_1-x_2)(x_1-x_3)}&&\frac{x_1+x_3}{(x_1-x_2)(x_2-x_3)}&&-\frac{x_1+x_2}{(x_1-x_3)(x_2-x_3)}\\
\frac{x_2x_3}{(x_1-x_2)(x_1-x_3)}&&-\frac{x_1x_3}{(x_1-x_2)(x_2-x_3)}&&\frac{x_1x_2}{(x_1-x_3)(x_2-x_3)}
\end{matrix}
\right)
\]

可以验证:

\[\left(
\begin{matrix}
x_1^2&&x_1&&1\\
x_2^2&&x_2&&1\\
x_3^2&&x_3&&1
\end{matrix}
\right)
\left(
\begin{matrix}
\frac{1}{(x_1-x_2)(x_1-x_3)}&&\frac{-1}{(x_1-x_2)(x_2-x_3)}&&\frac{1}{(x_1-x_3)(x_2-x_3)}\\
-\frac{x_2+x_3}{(x_1-x_2)(x_1-x_3)}&&\frac{x_1+x_3}{(x_1-x_2)(x_2-x_3)}&&-\frac{x_1+x_2}{(x_1-x_3)(x_2-x_3)}\\
\frac{x_2x_3}{(x_1-x_2)(x_1-x_3)}&&-\frac{x_1x_3}{(x_1-x_2)(x_2-x_3)}&&\frac{x_1x_2}{(x_1-x_3)(x_2-x_3)}
\end{matrix}
\right)
=
\left(
\begin{matrix}
1&&0&&0\\
0&&1&&0\\
0&&0&&1
\end{matrix}
\right)
\]

有了逆矩阵,就可以计算参数数值
\(a,b,c\)
,那么这里我们直接写出函数形式:

\[f(x)=\frac{(x-x_2)(x-x_3)}{(x_1-x_2)(x_1-x_3)}y_1+\frac{(x-x_1)(x-x_3)}{(x_2-x_1)(x_2-x_3)}y_2+\frac{(x-x_1)(x-x_2)}{(x_3-x_1)(x_3-x_2)}y_3
\]

拉格朗日插值法

观察前面线性插值和二次插值的函数规律,可以给出一个推广形式:

\[f(x)=\sum_{i=1}^{N}c_i(x,x_1,x_2,...,x_N)y_N
\]

其中系数函数
\(c_i(x,x_1,x_2,...,x_N)=\prod_{j=1}^{i-1}\frac{x-x_j}{x_i-x_j}\prod_{k=i+1}^{N}\frac{x-x_k}{x_i-x_k}\)
。可以给出
\(N\)
个数据点的
\(N-1\)
次插值函数解析式,这就是拉格朗日插值法,满足
\(f(x_i)=y_i\)
的约束条件。

牛顿插值法

如果把线性插值中的函数表达式再修改一下形式,变成:

\[f(x)=y_1+\frac{y_2-y_1}{x_2-x_1}(x-x_1)
\]

类似的,二阶插值函数可以改成如下形式:

\[f(x)=y_1+\frac{y_2-y_1}{x_2-x_1}(x-x_1)+\frac{\frac{y_3-y_2}{x_3-x_2}-\frac{y_2-y_1}{x_2-x_1}}{x_3-x_1}(x-x_1)(x-x_2)
\]

如果定义一个一阶差商为:

\[g(x_i,x_{i+1})=\frac{y_{i+1}-y_i}{x_{i+1}-x_i}
\]

其含义为
\((x_i,x_{i+1})\)
区间内的平均变化率。有了一阶差商的定义,就可以递归的定义二阶差商:

\[g(x_i,x_{i+1},x_{i+2})=\frac{g(x_{i+1},x_{i+2})-g(x_{i},x_{i+1})}{x_{i+2}-x_i}
\]

以及
\(m\)
阶的差商:

\[g(x_i,x_{i+1},x_{i+2},...,x_{i+m})=\frac{g(x_{i+1},x_{i+2},...,x_{i+m})-g(x_{i},x_{i+1},...,x_{i+m-1})}{x_{i+m}-x_i}
\]

则可以写出牛顿插值的函数形式为:

\[f(x)=y_1+\sum_{i=1}^{N-1}g(x_1,...,x_{i+1})\prod_{j=1}^{i}(x-x_j)
\]

插值形式对比

拉格朗日插值算法和牛顿插值算法,插值的阶数是一致的,同样的点数插值出来的多项式也是唯一的,换句话说两个方法插值出来的函数其实是等价的。那么两个插值算法的优劣势在哪里?我们考虑这么一种情况,原本有
\(N\)
个数据点需要插值,此时如果再引入一个新的数据点,总点数变成了
\(N+1\)
。此时如果使用的是拉格朗日插值法,那么就需要我们把所有的系数全都再算一遍。而如果使用的是牛顿插值法,那么我们发现前面的
\(N\)
个系数是不需要发生变化的,我们只需要再计算一个新的系数即可,极大程度上的减少了点数更新所带来的参数计算量。但也并不是说拉格朗日插值没有用武之地,在现如今的张量计算时代,拉格朗日插值法的每一项系数都是同Shape的张量操作,反而是牛顿插值的递归形式在张量计算中会有一些麻烦。

总结概要

本文通过线性插值和二次插值的形式,介绍了拉格朗日插值算法以及牛顿插值算法的基本形式。两种插值算法的最终函数形式是一致的,但是在不同场景下的参数求解计算量是不一致的,需要根据自己的应用场景选择更加合适的插值算法。

版权声明

本文首发链接为:
https://www.cnblogs.com/dechinphy/p/lg-interp.html

作者ID:DechinPhy

更多原著文章:
https://www.cnblogs.com/dechinphy/

请博主喝咖啡:
https://www.cnblogs.com/dechinphy/gallery/image/379634.html

大家好,我是 V 哥,ArkTS 是 HarmonyOS 优选的主力应用开发语言,它在 TypeScript 的基础上进行了扩展,提供了声明式 UI 描述、自定义组件和动态扩展 UI 元素的能力。这些能力与 ArkUI 开发框架中的系统组件及其相关的事件方法、属性方法等共同构成了 UI 开发的主体。ArkTS 还提供了多维度的状态管理机制,允许数据在组件内使用,也可以在不同组件层级间传递,实现数据和 UI 的联动。此外,ArkTS 还提供了渲染控制的能力,包括条件渲染、循环渲染和数据懒加载,以适应不同的应用开发需求。

在声明式 UI 描述中,ArkTS 允许开发者以声明方式组合和扩展组件来描述应用程序的 UI。这包括基本的属性配置、事件处理和子组件的配置方法。例如,可以通过链式调用的方式配置系统组件的样式和其他属性,如
Text('hello').fontSize(20).fontColor(Color.Red)
。同时,也可以设置组件的事件响应逻辑,如
Button('Click me').onClick(() => { this.myText = 'ArkUI'; })
。此外,如果组件支持子组件配置,可以在闭包中添加子组件的 UI 描述,如
Column() { Text('Hello').fontSize(100) }

ArkTS 的声明式 UI 开发范式提供了一种高效、直观的方式来构建应用程序的用户界面。通过声明式语法,开发者可以更加专注于应用的逻辑和结构,而不是具体的实现细节,从而提高开发效率和代码的可维护性。随着 HarmonyOS 的不断发展,ArkTS 也将持续演进,提供更多的特性和能力,以满足开发者在应用开发和运行中的需求。

在HarmonyOS中,ArkTS语言提供了声明式UI的描述方式,允许开发者以声明的方式来构建和操作用户界面。以下是一些关键点和代码示例,以及对它们的分析:

1. 基本语法和组件创建

ArkTS定义了声明式UI描述、自定义组件和动态扩展UI元素的能力。在创建组件时,可以有无参数两种方式:

  • 无参数组件
    :如果组件的接口定义没有包含必选构造参数,则组件后面的“()”不需要配置任何内容。例如,Divider组件不包含构造参数:
  Column() {
    Text('item 1')
    Divider()
    Text('item 2')
  }
  • 有参数组件
    :如果组件的接口定义包含构造参数,则在组件后面的“()”配置相应参数。例如,Image组件的必选参数src:
  Image('https://weige/my.jpg')

或者Text组件的非必选参数content:

  Text('weige')

2. 配置属性

属性方法以“.”链式调用的方式配置系统组件的样式和其他属性。例如,配置Text组件的字体大小:

Text('weige').fontSize(15)

也可以配置组件的多个属性:

Image('weige.jpg').alt('error.jpg').width(100).height(100)

3. 配置事件

事件方法以“.”链式调用的方式配置系统组件支持的事件。例如,使用lambda表达式配置组件的事件方法:

Button('Click me').onClick(() => {
  this.myText = 'ArkUI';
})

4. 配置子组件

如果组件支持子组件配置,则需在尾随闭包"{…}"中为组件添加子组件的UI描述。例如,Column组件配置子组件的示例:

Column() {
  Text('Hello').fontSize(100)
  Divider()
  Text(this.myText).fontSize(100).fontColor(Color.Red)
}

5. 状态管理

ArkTS提供了多维度的状态管理机制。状态变量变化会触发UI刷新。例如:

@Entry
@Component
struct Index {
  @State message: string = 'Hello World'
  build() {
    Column() {
      Text('Hello').fontSize(30)
      Text(this.message).fontSize(30)
      Button() {
        Text('Click Me').fontSize(30)
      }.onClick(() => {
        this.message='ArkUI'
      }).width(200).height(50)
    }
  }
}

6. 渲染控制

ArkTS提供了渲染控制的能力,包括条件渲染、循环渲染和数据懒加载。这些能力允许开发者根据应用的不同状态,渲染对应状态下的UI内容。

通过这些基本语法和示例,开发者可以构建出功能丰富的HarmonyOS应用界面。ArkTS的声明式UI描述提供了一种高效、直观的方式来构建应用程序的用户界面。

本想
吐槽
一下阿里云,推动阿里云做的更好,为园子的商业化中做一些阿里云相关业务做一些准备,自己明知用着不舒服,还要推广阿里云的产品真的没底气。

没想到的是,吐槽阿里云,倒没得罪阿里云,反而引起阿里云的重视,在积极解决问题,却得罪了园子的很多用户,引来一片批评,这也许就是大家对园子的发展恨铁不成钢的一种表达方式吧。

算了,本来想做的阿里云云大使业务不作为重点了,虽然阿里云给了很诱人的返佣比例,把大部分返佣直接返给用户吧,通过
https://market.cnblogs.com/
链接领券购买,阿里云官网上能买到的产品都可以享受折上6.5折(仅限新用户)。

以后尽量减少在官博上发布情绪化的小学生作文,等园子发展壮大了,有专业人员撰稿,会根本解决这个问题。

最近园子上线了华为的一个广告,在首页侧边栏与博文下方,推广的是开发者调查问卷,本想写一篇博文推一下,现在也有点不敢写了,因为这个问卷调查有个尴尬的地方。

这个调查问卷是由CSDN创建的,之前这个调查问卷都是华为找CSDN合作,根本轮不到园子,今年幸运的是华为分了一些调查指标给园子完成。因为这个原因,所以调查表单用的是CSDN的,广告上线后遇到有园友在会员群里反馈,因为看到是CSDN的调查表单,不愿填写。

还是回到众包平台吧,这个帮开发者挣钱的商业模式应该是人心所向吧,这也很苦恼。

目前已经召集了2000多位合作开发者,但单子很少,好不容易在国庆期间10月2日等来一个50w的app开发大单,但到今天还没找到符合要求的开发团队接单,本想发博文出来公开招募,但客户又不愿意。

国庆假期快结束的时候,有个报价5w的单子,要求支撑峰值50w并发,接单开发者报价8w,客户同意,但启动开发之前最终确认的时候,客户问了一下自己公司的开发人员,开发人员说只需要几k就能搞定,于是客户取消了单子。

目前还有一个在谈的很有想象空间的广告大单,如果接下来做好了,可以解决园子这个季度的收入问题,但这也要写博文在园子里推广,需要大家的支持才能完成。

继续努力吧,二十年没有完成的商业化不可能短时间完成,大家着急的心情可以理解,但因为主观上的着急而操之过急是商业化的大忌,牢牢把握属于自己的市场机会才是关键。

Transformer
在计算机视觉任务中表现出了令人鼓舞的性能,包括图像超分辨率(
SR
)。然而,流行的基于
Transformer

SR
方法通常采用具有二次计算复杂度的窗口自注意力机制,导致固定的小窗口,限制了感受野的范围。论文提出了一种将基于
Transformer

SR
网络转换为分层
Transformer

HiT-SR
)的通用策略,利用多尺度特征提升
SR
性能,同时保持高效设计。具体而言,首先用扩展的分层窗口替代常用的固定小窗口,以聚合不同尺度的特征并建立长距离依赖关系。考虑到大窗口所需的密集计算,进一步设计了一种具有线性复杂度的空间通道相关方法,以窗口大小高效地从分层窗口中收集空间和通道信息。大量实验验证了
HiT-SR
的有效性和效率,改进版本的
SwinIR-Light

SwinIR-NG

SRFormer-Light
在参数、
FLOPs
和速度方面取得了最先进的
SR
结果(约
7
倍)。

来源:晓飞的算法工程笔记 公众号,转载请注明出处

论文: HiT-SR: Hierarchical Transformer for Efficient Image Super-Resolution

Introduction


图像超分辨率(
SR
)是一项经典的低级视觉任务,旨在将低分辨率(
LR
)图像转换为具有更好视觉细节的高分辨率(
HR
)图像。如何解决不适定的
SR
问题引起了数十年的广泛关注。许多流行的方法采用卷积神经网络(
CNN
)来学习
LR
输入与
HR
图像之间的映射。尽管已取得显著进展,但基于
CNN
的方法通常侧重于通过卷积利用局部特征,往往在聚合图像中的长距离信息方面表现不足,从而限制了基于
CNN

SR
性能。

视觉
Transformer
的最新发展为建立长程依赖关系提供了一种有前景的解决方案,这为许多计算机视觉任务(包括图像超分辨率)带来了好处。在流行的基于
Transformer

SR
方法中,一个重要的组件是窗口自注意力(
W-SA
)。通过将局部性引入自注意力,
W-SA
机制不仅更好地利用了输入图像中的空间信息,还减轻了处理高分辨率图像时的计算负担。然而,目前的基于
Transformer

SR
方法通常采用固定的小窗口尺寸,例如
SwinIR
中的
\(8\times8\)
。将感受野限制在单一尺度,阻碍了网络收集多尺度信息,如局部纹理和重复模式。此外,
W-SA
对窗口大小的二次计算复杂度也使得在实际中扩大感受野变得不可承受。

为了减轻计算开销,以往的尝试通常减少通道数以支持大窗口,例如
ELAN
中的
group-wise
多尺度自注意力(
GMSA
)的通道分割和
SRFormer

permuted
自注意力块(
PSA
)的通道压缩。然而,这些方法不仅面临空间信息与通道信息之间的权衡,而且对窗口大小仍然保持二次复杂性,限制了窗口扩展(在
ELAN
中最大为
\(16\times16\)
,在
SRFormer
中为
\(24\times24\)
,而在论文的方法中则可达到
\(64\times64\)
及更大)。因此,如何在保持计算效率的同时有效聚合多尺度特征,仍然是基于
Transformer

SR
方法面临的一个关键问题。

为此,论文开发了一种通用策略,将流行的基于
Transformer

SR
网络转换为高效图像超分辨率的层级
Transformer

HiT-SR
)。受到多尺度特征聚合在超分辨率任务中成功的启发,论文首先提出用扩展的层级窗口替换
Transformer
层中的固定小窗口,使得
HiT-SR
能够利用逐渐增大感受野的信息丰富的多尺度特征。为了应对处理大窗口时
W-SA
计算负担的增加,论文进一步设计了一种空间-通道相关(
spatial-channel correlation

SCC
)方法,以高效聚合层级特征。具体而言,
SCC
由一个双特征提取(
dual feature extraction

DFE
)层组成,通过结合空间和通道信息来改善特征投影,还有空间和通道自相关(
S-SC

C-SC
)方法,以高效利用层级特征。其计算复杂度与窗口大小呈线性关系,更好地支持窗口扩展。此外,与传统的
W-SA
采用硬件效率较低的
softmax
层和耗时的窗口平移操作不同,
SCC
直接使用特征相关矩阵进行变换,并使用层级窗口进行感受野扩展,从而在保持功能性的同时提升了计算效率。

总体而言,论文的主要贡献有三个方面:

  1. 提出了一种简单而有效的策略,即
    HiT-SR
    ,将流行的基于
    Transformer
    的超分辨率方法转换为层级
    Transformer
    ,通过利用多尺度特征和长距离依赖关系来提升超分辨率性能。

  2. 设计了一种空间-通道相关方法,以高效利用空间和通道特征,其计算复杂度与窗口大小呈线性关系,从而实现对大层级窗口的利用,例如
    \(64\times64\)
    窗口。


  3. SwinIR-Light

    SwinIR-NG

    SRFormer-Light
    转换为
    HiT-SR
    版本,即
    HiT-SIR

    HiT-SNG

    HiT-SRF
    ,取得了更好的性能,同时参数和
    FLOPs
    更少,并实现了约
    7
    倍的速度提升。

Method


Hierarchical Transformer

如图
2
所示,流行的基于
Transformer
的超分辨率框架通常包括卷积层,从低分辨率输入图像
\(I_{LR} \in \mathbb{R}^{3\times H\times W}\)
中提取浅层特征
\(F_{S} \in \mathbb{R}^{C\times H\times W}\)
,特征提取模块通过
Transformer
块(
TBs
)聚合深层图像特征
\(F_{D} \in \mathbb{R}^{C\times H\times W}\)
,以及重建模块从浅层和深层特征恢复高分辨率图像
\(I_{HR} \in \mathbb{R}^{3\times sH\times sW}\)

\(s\)
表示放大因子)。在特征提取模块中,
TBs
通常是由级联的
Transformer
层(
TLs
)和后续的卷积层构成,其中每个
TL
包括自注意力(
SA
)、前馈网络(
FFN
)和层归一化(
LN
)。由于自注意力的计算复杂度与输入大小呈二次关系,因此在
TL
中通常采用窗口划分来限制自注意力的作用于局部区域,这被称为窗口自注意力(
W-SA
)。尽管
W-SA
缓解了计算负担,但其感受野被限制在较小的局部区域,从而阻碍了超分辨率网络利用长距离依赖关系和多尺度信息。

为了高效聚合层级特征,论文提出了一种通用策略,将上述超分辨率框架转换为层级
Transformer
。如图
2
所示,主要在两个方面进行了改进:

  1. 在块级别对不同的
    TLs
    应用层级窗口,而不是对所有
    TLs
    使用固定的小窗口大小,从而使
    HiT-SR
    能够建立长距离依赖关系并聚合多尺度信息。
  2. 为了克服大窗口带来的计算负担,用一种新颖的空间-通道关联(
    SCC
    )方法替代
    TLs
    中的
    W-SA
    ,这种方法更好地支持以线性计算复杂度进行窗口缩放。

基于上述策略,
HiT-SR
不仅通过利用层级特征获得了更好的性能,还得益于
SCC
保持了计算效率。

Block-Level Design: Hierarchical Windows

在块级别,为不同的
TLs
分配层级窗口,以收集多尺度特征。给定一个基础窗口大小
\(h_{B}\times w_{B}\)
,为第
\(i\)

TL
设置窗口大小
\(h_i\times w_i\)

\[\begin{equation}
h_i = \alpha_i h_{B},\quad w_i = \alpha_i w_{B},
\end{equation}
\]

其中
\(\alpha_i>0\)
是第
\(i\)

TL
的层级比率。

  • Expanding Windows

为了更好地聚合层级特征,采用一种扩展策略来安排窗口。如图
3
所示,首先在初始层使用小窗口大小,从局部区域收集最相关的特征,然后逐渐扩大窗口大小,以利用从长距离依赖中获得的信息。

之前的方法通常对固定的小窗口应用平移和掩码操作以扩大感受野,但这些操作在实践中耗时且效率低下。与它们相比,论文的方法直接利用级联的
TLs
形成一个层级特征提取器,使得小到大的感受野在保持整体效率的同时得以实现。
HiT-SR
方法相较于原始模型具有约
7
倍的速度提升,并且性能更佳。

Layer-Level Design: Spatial-Channel Correlation

在层级水平上,论文提出了空间-通道相关性(
SCC
),以高效利用来自层级输入的空间和时间信息。如图
4
所示,
SCC
主要由双特征提取(
DFE
)、空间自相关(
S-SC
)和通道自相关(
C-SC
)组成。此外,与常用的多头策略不同,
S-SC

C-SC
应用了不同的相关头策略,以更好地利用图像特征。

  • Dual Feature Extraction

线性层通常用于特征投影,只提取通道信息而忽略了空间关系的建模。相反,论文提出了带有双分支设计的双特征提取(
DFE
),以利用来自两个领域的特征。如图
4
所示,
DFE
由一个卷积分支来利用空间信息和一个线性分支来提取通道特征组成。给定输入特征
\(X \in \mathbb{R}^{C\times H\times W}\)

DFE
的输出计算为

\[\begin{equation}
\begin{aligned}
&\operatorname{DFE}(X) = X_{ch} \odot X_{sp},\quad \text{with}
\\
X&_{ch} = \operatorname{Linear}(X),\ X_{sp} = \operatorname{Conv}(X),
\end{aligned}
\end{equation}
\]

其中,
\(\odot\)
表示逐元素相乘。
reshape
的通道特征
\(X_{ch} \in \mathbb{R}^{HW\times C}\)
和空间特征
\(X_{sp} \in \mathbb{R}^{HW\times C}\)
分别通过线性层和卷积层捕获。在空间分支中,使用一个沙漏结构堆叠三层卷积层,并将隐藏维度按比例
\(r\)
减小以提高效率。最后,空间特征和通道特征通过相乘相互作用,生成
DFE
输出。

与通过线性投影预测查询、键和值的标准
SA
方法不同,将键与值等同,因为它们都反映了输入特征的内在属性,仅通过对
DFE
输出进行拆分来生成查询
\(Q\in \mathbb{R}^{HW\times \frac{C}{2}}\)
和值
\(V \in \mathbb{R}^{HW\times \frac{C}{2}}\)
,如图
4
所示。

\[\begin{equation}
[Q, V] = \operatorname{DFE}(X),
\end{equation}
\]

这减少了由于键生成引起的信息冗余。然后,根据分配的窗口大小将查询和键划分为不重叠的窗口,例如,对于第
\(i\)

TL
,有
\(Q_i,\ V_i\in \mathbb{R}^{h_{i}w_{i}\times \frac{C}{2}}\)
(为了简化,省略了窗口的数量),并使用划分后的查询和值进行后续的自相关计算。

  • Spatial Self-Correlation


W-SA
相比,
S-SC
以高效的方式聚合空间信息。考虑到层级化策略中扩展的窗口大小,首先通过在空间维度上应用线性层(称为
S-Linear
)自适应地总结不同
TL
中值
\(V_i\)
的空间信息,即,

\[\begin{equation}
V_{\downarrow,i}^T = \operatorname{S-Linear}_{i}(V_i^T),
\end{equation}
\]

其中
\(V_{\downarrow,i}\in \mathbb{R}^{h_\downarrow w_\downarrow \times \frac{C}{2}}\)
表示投影后的值,具有

\[\begin{equation}
\left[h_\downarrow, w_\downarrow \right]= \left\{
\begin{array}{ll}
\left[h_i, w_i\right], & \text { if } \alpha_i \leq 1, \\
\left[h_B, w_B\right], & \text { if } \alpha_i > 1.
\end{array}\right.
\end{equation}
\]

因此,
HiT-SR
能够从大窗口中总结高层信息,即
\(\alpha_i> 1\)
,同时保留小窗口中的细粒度特征,即
\(\alpha_i\leq 1\)
。随后,基于
\(Q_i\)

\(V_{\downarrow,i}\)
计算
S-SC
,如下所示:

\[\begin{equation}
\operatorname{S-SC}(Q_i, V_{\downarrow,i}) = \left(\frac{Q_i V_{\downarrow,i}^T}{D} + B\right)\cdot V_{\downarrow,i},
\end{equation}
\]

其中
\(B\)
表示相对位置编码,常数分母
\(D=\frac{C}{2}\)
用于归一化。与标准的
W-SA
相比,
S-SC
在效率和复杂性上显示出优势:

  1. 利用相关图而不是注意力图来聚合信息,去掉了在硬件上效率低下的
    softmax
    操作,以提高推理速度。
  2. S-SC
    支持大窗口,具有线性的计算复杂度与窗口大小相关。假设输入包含
    \(N\)
    个窗口,每个窗口在
    \(\mathbb{R}^{hw\times C}\)
    空间中,那么
    W-SA

    S-SC
    所需的
    mult-add
    操作数量分别为:

\[\begin{equation}
\begin{aligned}
&\operatorname{Mult-Add}(\operatorname{W-SA})= 2NC(hw)^2,
\\
&\operatorname{Mult-Add}(\operatorname{S-SC})= 2NCh_\downarrow w_\downarrow hw,
\end{aligned}
\end{equation}
\]

其中前者与窗口大小
\(hw\)
成平方关系。由于
\(h_\downarrow w_\downarrow\)
受到固定基准窗口大小
\(h_B w_B\)
的上限限制,
S-SC
的计算复杂性与窗口大小成线性关系,从而有利于窗口的放大。

  • Channel Self-Correlation

除了空间信息,论文进一步设计了
C-SC
以从通道域中聚合特征,如图
4
所示。给定第
\(i\)

TL
中的分区查询和数值,
C-SC
的输出为:

\[\begin{equation}
\operatorname{C-SC}(Q_i, V_i) = \frac{Q_i^T V_i}{D_i} \cdot V^T_i,
\end{equation}
\]

其中分母
\(D_i = h_i w_i\)
。与当前普遍采用的转置注意力进行通道聚合相比,
C-SC
利用层级化窗口,并利用丰富的多尺度信息来提升超级分辨率 (
SR
) 性能。在计算复杂性方面,在
\(\mathbb{R}^{N\times hw \times C}\)
空间中的输入下,
C-SC
所需的
mult-add
操作数量为:

\[\begin{equation}
\operatorname{Mult-Add}(\operatorname{C-SC}) = 2N C^2 hw
\end{equation}
\]

结合公式
7

9
,空间-通道相关性的复杂度保持与窗口大小成线性关系,如表
1
所示,使得可扩展窗口能够充分利用层级信息。

  • Different Correlation Head

多头策略通常在自注意力 (
SA
) 中被采用,以从不同的表示子空间中聚集信息,并且在处理空间信息时表现出了良好的性能。然而,在处理通道信息时,多头策略反而限制了通道信息聚合的感受野,即每个通道只能与有限的一组其他通道进行交互,这导致了次优的表现。

为了解决这个问题,论文提议在
S-SC
中应用标准的多头策略,但在
C-SC
中使用单头策略,从而实现全面的通道交互。因此,
S-SC
可以通过多头策略利用来自不同通道子空间的信息,而
C-SC
则可以通过层级化窗口利用来自不同空间子空间的信息。

Experiments and Analysis


  • Implementation Details


HiT-SR
策略应用于流行的超级分辨率 (
SR
) 方法
SwinIR-Light
,以及最近的最先进的
SR
方法
SwinIR-NG

SRFormer-Light
,对应于本文中的
HiT-SIR

HiT-SNG

HiT-SRF
。为了公平地验证有效性和适应性,控制每种方法转换为
HiT-SR
版本所需的更改最小,并且对所有
SR Transformer
应用相同的超参数设置。

具体而言,遵循
SwinIR-Light
的原始设置,将所有
HiT-SR
改进模型的
TB
数量、
TL
数量、通道数量和头数量分别设置为
4

6

60

6
。基础窗口大小
\(h_{B}\times w_{B}\)
设置为广泛采用的值,即
\(8\times8\)
,并且将每个
TB
中的
6

TL
的层级化比例设置为
\([0.5, 1, 2, 4, 6, 8]\)


HiT-SIR

HiT-SNG

HiT-SRF
应用相同的训练策略。所有模型均基于
PyTorch
实现,并在
\(64\times64\)
的图像块大小和
\(64\)
的批量大小下训练
500K
次迭代。模型优化采用
\(L_1\)
损失和
Adam
优化器 (
\(\beta_1=0.9\)

\(\beta_2=0.99\)
)。将初始学习率设置为
\(5\times10^{-4}\)
,并在 [
250K
,
400K
,
450K
,
475K
] 次迭代时将其减半。在模型训练过程中,我们还随机利用
90
°、
180
° 和
270
° 的旋转以及水平翻转进行数据增强。

  • Result



如果本文对你有帮助,麻烦点个赞或在看呗~
更多内容请关注 微信公众号【晓飞的算法工程笔记】

work-life balance.

一、前言

本文介绍一下Kafka赫赫有名的组件Purgatory,相信做Kafka的朋友或多或少都对其有一定的了解,至少是听过它的名字。那它的作用是什么呢,用来解决什么问题呢?官网confluent早就有文章对其做了阐述

这里简单总结一下:Purgatory是用来存储那些处于临时或等待状态的请求,这些请求可能某些条件未被满足,而被临时管理了起来。在这些条件满足后,或者请求超时后,这些请求会被Purgatory高效回调,继而继续执行后续逻辑

这里聊个题外话,为什么Kafka要给其取名“炼狱”呢?以下可以看一下百科对其的释义

在教会的传统中,炼狱是指人死后精炼的过程,是将人身上的罪污加以净化,是一种人经过死亡而达到圆满的境界(天堂)过程中被净炼的体验

相信Purgatory在这里更强调的是临时,另外还有诸如Reaper(死神)等的命名,可见Kafka原作者们还是很有文艺范的 :)

二、演化

关于Purgatory组件的形成,并不是一蹴而就的,它至少经历了2个大版本的迭代。

  • 版本一:在Kafka 0.8版本及以前,使用的是第一版,这个版本的核心是严重依赖了JUC的延迟队列(
    java.util.concurrent.DelayQueue
    )。然而放入Purgatory中的这些延迟任务,大多数的时候,并不会真正等到时间超时。例如acks=all的这种case,假设默认的超时时间为1秒,即需要在1秒钟之内将数据同步给所有的follower,leader将数据放入Purgatory后便开始了回调等待,但大多数情况,可能几十ms数据便同步完并执行回调结束本次异步操作,然而存在于延迟队列
    DelayQueue
    中的请求并不能真正被删除,它只能在真正超时的时候(1秒后),才能被发现并删除。因此白白占用昂贵的内存资源是一个弊端,而且存在一些性能上的问题,对于entry的增加、修改时间复杂度也达到了log(N)
  • 版本二:而在之后的版本中,Kafka对其做了优化,引入了优秀的设计 Hierarchical Timing Wheel(多层时间轮)的概念,不仅能够立即将已完成的任务删除,而且使其性能飙升,几乎达到了常量O(1)的程度,关于具体的版本演练及性能测试可以参看官网文章

不过文章中对很多细节并没有展开,也没有源码级流程的讲解,这也是本文诞生的初衷,接下来我们会对Purgatory有一个全方位的介绍,包括其设计理念及源码分析。另:源码均来自于当前社区最新的trunk分支,也就是不久后的4.0.0的release分支,考虑到Purgatory已经相当稳定成熟,因此在当前trunk分支至4.0.0并不会有大的变动

三、整体业务流程

为了对Purgatroy扮演的角色有一个全局的概念,我们以Consumer的
Join Group
来举例说明。Join Group要做的事情也非常简单,它需要协调多个Consumer在某个时间窗口内,尽量快速调用Join Group接口,在此,后续的结果分两种情况考虑

  • 所有的Consumer在时间窗口内均调用了Join Group后,Coordinator开始制定分区分配策略
  • 在时间窗口结束的时候,只有一部分Consumer调用了Join Group,Coordinator便将那些没有调用Join Group剔除,只对这些调用了Join Group的Consumer开始制定分配策略

假设现在没有Purgatory组件,我们实现起来的话,流程可能如下:

假定4个线程在时间窗口内都到达了,但是到达的前后顺序不一致。线程2已到达,线程1、线程3、线程4都还在路上,因此线程2一直处于挂起状态,即便是有新的任务到达,线程2也无法进行响应处理,且对应线程2的占用一直要等到ack response后,才能释放出来,
效率低下

而图中“Wait All Threads Ready”组件如何实现呢?其实可以使用JDK的CyclicBarrier或Semaphore或CountDownLatch均可实现,但不是本文要讨论的重点,不再展开

Purgatory如何实现呢?它在整个流程中扮演了一个什么角色呢

首先流程上还是4个线程先后过来调用接口,但区别在于,已经到达的线程只需要将自己已经receive的消息(包含回调、超时等基础数据)给到Purgatory即可,而后这个线程将会被释放,它可以去处理其他任务。而后续的操作则会由Purgatory来接管,包括判断条件是否满足、窗口是否超时,一旦其中一个条件满足,Purgatory将执行回调,挨个对这些请求进行ack response

知道了Purgatory在整体流程中扮演的角色,接下来我们就要对这个组件内部实现的细节进行展开了

四、Purgatory组成

我们还是先提供一张Purgatory运作的流程图

4.1、业务线程

业务线程,对应上图左侧部分的流程。所谓业务线程,即使用Purgatory组件作为暂存请求的线程,例如Join Group、Producer ACKS=all等,虽然业务线程调用Purgatory的代码非常简单,只有一行,拿Join Group举例:

rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))

但在Purgatory内部却做了很多事儿:

  • 首先尝试去完成调用,如果所有条件均已满足,那么当前任务直接成功,也就不需要与时间轮交互了
  • 如果条件不满足,则会将任务存储在时间轮
  • 如果用户设置了key,同时还会对同key的TimeTask进行监听
    • 其实就是对同key的任务做批量操作,比如一起取消,后文还会提及

可见业务线程只负责向时间轮中写入数据,那时间轮中的数据什么时候被清除呢?这就要涉及另外一个核心线程ExpiredOperationReaper

4.2、收割线程

在Purgatory的内部还存在一个独立的线程ExpiredOperationReaper,我们可以将其翻译为收割线程或清理线程,它的作用是实时扫描那些已经过期的任务,并将其从时间轮中移除。它的定义如下

/**
 * A background reaper to expire delayed operations that have timed out
 */
private class ExpiredOperationReaper extends ShutdownableThread(
  "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
  false) {

  override def doWork(): Unit = {
    advanceClock(200L)
  }
}

这里需要注意的是,收割线程只是一个独立的单线程,它的作用只是去实时找出那些已经过期的任务,并将后续的回调逻辑扔给线程池,继而继续扫描,由此可见其任务并不繁重

这里简单提一下“回调线程池”,由上文我们知道线程将任务交给Purgatory后便结束使命了,后续的触发均有这个“回调线程池”中的线程来执行的,这个线程池定义如下

this.taskExecutor = Executors.newFixedThreadPool(1,
            runnable -> KafkaThread.nonDaemon(SYSTEM_TIMER_THREAD_PREFIX + executorName, runnable));

可见它是一个单线程的,且固定线程数的线程池。为什么要设置为单线程呢?如果某个应用的回调阻塞了,那岂不是所有线程池中的回调均会阻塞吗?

的确是这样,不过考虑到这个线程池做的工作只是回调,一般是网络发送模块,数据其实都是已经准备好的,TPS响应是非常快的,因此通常也不会成为瓶颈

由上文可知,不论是业务线程还是收割线程,其与时间轮均有密不可分的关系

五、时间轮(Timing Wheel)

不论是延迟任务的管理、存储、移除等核心操作,均是由时间轮来完成,因此时间轮是整个Purgatory中的最核心组件。此处我们再明确一下Purgatory与时间轮的关系,时间轮只是Purgatory中的一个子概念,是为了让Purgatory更高效、性能更快而提炼出的一个内部组件

5.1、数据结构

时间轮的数据结构也相对简单,由一个轮子+双向链表组成

轮子:

双向列表:

轮子+双向列表的结构便为:

之所以设计双向链表的方式,主要是Task的新增跟删除是一件非常频繁的事儿,我们的数据结构要能确保高效地处理这些请求,而双向循环链表则能够保证任意一个Task节点的新增与删除都能维持O(1)的时间复杂度,因此可谓是不二之选

5.2、Task添加与移除

具体的Task添加与删除的过程是什么样呢?我们举个例子来说明:

假定我现在时间轮的粒度为10秒钟,即每10秒一个格子

现在来了一个任务,这个任务Task1要在第35秒被触发,此时我们找到第4个时间格,将这个任务放在这里

接着又来了2个任务,分别在36、38秒被触发,那么同样它们将会被放在第4个时间格中

同理,不难想象,如果又来了几个任务,它们的触发时间分别为12、18、69、62、65、53、54,那么时间轮将会变为如下

如果来个延迟时间是100秒的任务呢?其实这块就涉及到多级时间轮了

至于任务的移除,参照双向链表删除节点即可

5.3、多级时间轮

5.3.1、基础定义

多级时间轮,顾名思义,即有很多个层级的时间轮,越往上,粒度越粗,理论上只要内存够大,时间轮可以存储无限大小的延迟任务。下图展示了一个2级时间轮:

  • 内层时间轮:时间粒度是10秒钟,每个时间轮有8个格子,因此内层的时间轮可以存储0-80秒的任务
  • 外层时间轮:时间粒度是80秒钟,同样也是有8个格子,外层的时间轮则可以存储0-640秒的任务

其实每个外层时间轮的一个格子,均对应一个内层时间轮,只不过上图没有呈现出这一点,因此,接上文,如果我们要存储一个100秒的任务时,当前时间轮发现越界了,它会无脑向上抛,直到找到能接住这个超时时间的时间轮,上层时间轮的跨度更大,因此100秒的任务会落在“81-160”这个格子上。虽然更上层的时间轮承接了这个任务,但其实处理这个任务的最终还将会是最细粒度的时间轮,也就是将来在“81-160”这个格子对应的内层时间轮会最终接受这个任务并触发回调,这点我们在“时钟模拟”章节再展开

因此其实我们不用在意到底Purgatory会有多少个层级的时间轮,理论上它可能是无限大的,我们只需要知道最细粒度的时间轮的步长+个数,后面的轮子构成都可以推导出来。那Kafka设定的最细粒度的轮子步长跟个数分别是多少呢?这个答案藏在org.apache.kafka.server.util.timer.SystemTimer类的构造方法中

public SystemTimer(String executorName) {
    this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
}

可见,最细粒度的时间步长为1ms,个数为20,由此可推导出一个表格

层级

步长

个数

最长时间

1

1ms

20

20ms

2

20ms

20

400ms

3

400ms

20

8s

4

8s

20

160s (2分40秒)

5

160s

20

3200s (53分20秒)

6

3200s

20

64000s (17时46分40秒)

可见到了第6层,延时时长已经到达了17个小时,而Kafka一般的case,可能到第4层就足矣;而从整体看,时间轮最细粒度精确到了1ms,且可以接收理论上无限长的定时任务,真可谓是神器了。不过这里也存有一点疑问,那就是粒度做的这么细,性能方面是不是存在问题?这点我们在后文也会涉及

5.3.2、Task添加

这节梳理一下在多级时间轮下,Task的添加与移除操作,关于Task的添加,一言以蔽之就是如果目标Task超过当前时间轮的最大时间范围,那么直接抛给上级时间轮;还是那上文举个例子,假如时间轮收到一个700秒后执行的延迟任务

一级时间轮,也就是最细粒度的时间轮,范围是0-80秒,无法存放,那么向上抛送;

二级时间轮收到这个任务后,发现超时时间是700秒,而自己的范围则是0-640,依旧无法存放,继续向上抛送

三级时间轮收到任务后依然检查自己能接收的时间范围,发现是0-5120秒,700秒在自己的范围内,继而计算700秒任务应该落在哪个格子,最终其被存放在641-1280这个格子中

总结:任务的添加总是先交给最细粒度的时间轮,而后层层上报,直到找到能承接这个Task的轮子后便将其存放在对应的格子中

5.3.2、Task移除

常规情况下,一个处于高Level的Task,在还没有真正过期时,它的移除动作就是将其放入更细粒度的时间轮中,还是以上图中的例子来说明

  1. 现在700秒的Task被放在三级时间轮的"641-1280"这个格子(TaskList)中,这个格子将在640秒过期
  2. 现在时钟刚过640秒,"641-1280"这个格子被推出,发现其中有1个700秒超时的任务,但是其还没有真正超时,因为当前的时间是640秒
  3. 而后这个任务将会被重新加入时间轮,因为时钟已经过了640秒,因此此时的一级、二级时间轮均发生了变化,二级时间轮被替换为如下,因此当前任务会被放入641-720格子中
    1. 641-720
    2. 721-800
    3. 801-880
    4. 881-960
    5. 961-1040
    6. 1041-1120
    7. 1121-1200
    8. 1201-1280
  1. 而641-720格子对应的一级时间轮如下,700秒任务对应的格子为691-700,因此在后续的时钟模拟中,真正要等到691-700这个格子被唤醒才能调用
    1. 641-650
    2. 651-660
    3. 661-670
    4. 671-680
    5. 681-690
    6. 691-700
    7. 701-710
    8. 711-720

5.4、时钟模拟

接下来就是非常重要的一步,Purgatory要模拟时钟往前推进时间,从而触发相关任务被唤醒

5.4.1、java.util.concurrent.DelayQueue

在真正开始介绍时钟模拟前,我们需要先铺垫一个关键的JUC包下的类java.util.concurrent.DelayQueue,整个时钟模拟在很大程度上依赖这个延迟队列的能力。DelayQueue有如下几个核心方法:

  • put (java.util.concurrent.Delayed delayed) 将一个延迟对象放入延迟队列中
  • offer (java.util.concurrent.Delayed delayed) 同 put
  • poll() 将会一直阻塞, 直到返回一个已经过期的延迟对象,不过如果当前的延迟队列中没有数据,将会直接返回null
  • poll(long timeout, TimeUnit unit) 功能与poll() 相似,只不过当前方法加入了超时限定,且如果延迟队列为空的话,也不会立即返回null,而是等待超时

因为DelayQueue只接受java.util.concurrent.Delayed对象,此对象的定义如下

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * {@code compareTo} method that provides an ordering consistent with
 * its {@code getDelay} method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

可见是一个接口,如果使用的话,我们需要定义一个延迟类,并实现这个接口。我们可以写一个延迟队列的小例子来个直观感受

public class DelayedQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedItem> delayQueue = new DelayQueue<>();

        delayQueue.put(new DelayedItem(2000));
        delayQueue.put(new DelayedItem(5000));
        delayQueue.offer(new DelayedItem(6000));

        while (!delayQueue.isEmpty()) {
            DelayedItem delayedItem = delayQueue.poll(200, TimeUnit.MILLISECONDS);
            if (delayedItem != null) {
                System.out.println("delayedItem content : " + delayedItem);
            } else {
                System.out.println("DelayedItem is null");
            }
        }
    }

    private static class DelayedItem implements Delayed {
        private final long expirationTime;

        public DelayedItem(long delayTime) {
            this.expirationTime = System.currentTimeMillis() + delayTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long diff = expirationTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed other) {
            if (this.getDelay(TimeUnit.MILLISECONDS) < other.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            }
            if (this.getDelay(TimeUnit.MILLISECONDS) > other.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            }
            return 0;
        }
    }
}

上述例子中,我们往延迟队列中放入了3条数据,它们需要处理延迟请求的时间分别是2秒、5秒、6秒,当调用poll()方法时,可以精确在对应的时间收到该请求的回调,当然这块的高效得益于Doug Lea大神的JUC包

有同学可能会说,既然JUC的延迟队列都能把这些事儿做了,还要时间轮做什么用呢?自然延迟队列是有它自己问题的,参看“演化”模块

5.4.2、延迟对象

那放入延迟队列java.util.concurrent.DelayQueue的元素是这些延迟Task吗?答案是否定的,因为这些任务一旦放入延迟队列,那它的删除就会成为负担,而且带来大量内存的占用(其实Purgatory的第一版就是这样设计的),其实这里延迟队列的元素是时间轮中的双向循环列表,如下图

这里关于任务的添加与删除,站在延迟队列的角度再讨论一下

  • 当Task加入到时间轮中的一个空格子中时,此时会创建一个TaskList对象,当然这个TaskList的双向链表中只有一个元素,而后这个TaskList被加入延迟队列
  • 当Task加入一个有数据的格子中时,直接将这个Task加入TaskList的链表中,因为这个TaskList已经托付给延迟队列管理,因此此时不涉及延迟队列操作
  • 当某个Task需要删除时,直接找到对应的TaskList,将其从链表中移除
  • 当TaskList超时,被延迟队列唤起,此时这些Task将会被依次处理,而如果TaskList中的链表为空,则直接跳过

这样不仅完美避开了对延迟队列中元素删除的操作,而且完美解决了OOM的问题,且元素的新增、删除时间复杂度均为O(1)

5.4.3、Tick

模拟时钟推进使用的线程即为上文提到的收割线程,方法的入口为
kafka.server.DelayedOperationPurgatory#advanceClock
,当然这里处理的均是已经超时的请求,因此如果所有的操作均没有超时,那收割线程实际没有需要处理的业务

其实关于Tick操作的核心就是调用延迟队列的poll操作,用来获取那些已经超时的TimerTaskList

TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);

不过这个Tick的粒度是时间轮的每一个格子,因此它与Task的频率是不一致的,通常一个格子中可能包含了多个Task,这些Task如果在时间上确实超时了,那么会真正业务回调,如果没有超时,将重新加入时间轮

5.5、Watch Key

所谓Watch Key,通常是将一组生命周期相关的数据设置同一个key,这样在条件达成后,可将这组任务统一回调,不论是成功还是取消

例如当执行Group的Join操作时,预期会有10个consumer调用Join接口,然后每个consumer调用接口时,均带上watch key参数(这里的watch key可以设置为group name),只要发现调用这个key的数量满10个后,便可以将这10个延迟请求统一回调,同时将其从时间轮中删除

六、源码分析

我们将上图关键部分标记出相关类

  • 首先整个时间轮对应的类为
    org.apache.kafka.server.util.timer.TimingWheel
  • 时间轮上每个格子对应的类为
    org.apache.kafka.server.util.timer.TimerTaskList
  • 每个格子中链表中的元素对应的类为
    org.apache.kafka.server.util.timer.TimerTaskEntry
  • 每个元素中需要存储TimerTask,这个类为抽象类,也是需要用户自己去实现的
    org.apache.kafka.server.util.timer.TimerTask

其实通过这张图,我们对Purgatory涉及的类便有了全貌的了解,这里主要了解的是TimerTask,因为其他类均被包装在Purgatory组件内部,不需要继承,也不涉及改动。TimerTask的定义如下

public abstract class TimerTask implements Runnable {
    private volatile TimerTaskEntry timerTaskEntry;
    public final long delayMs;
}

这两个属性也较好理解

  • timerTaskEntry:它与TimerTask其实就是1:1的关系,同样在TimerTask中也有TimerTaskEntry的引用
  • delayMs:延迟时间,也就该任务将来被触发调用的时间

然而仅仅有这个类还是不够的,还需要在一些关键操作时,对相关接口进行回调,例如onComplete、onExpiration等。因此Kafka涉及了TimerTask的子类DelayedOperation

abstract class DelayedOperation(delayMs: Long,
                                lockOpt: Option[Lock] = None)
  extends TimerTask(delayMs) with Logging {

  private val completed = new AtomicBoolean(false)
  // Visible for testing
  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)

  /*
   * Force completing the delayed operation, if not already completed.
   * This function can be triggered when
   *
   * 1. The operation has been verified to be completable inside tryComplete()
   * 2. The operation has expired and hence needs to be completed right now
   *
   * Return true iff the operation is completed by the caller: note that
   * concurrent threads can try to complete the same operation, but only
   * the first thread will succeed in completing the operation and return
   * true, others will still return false
   */
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }

  /**
   * Check if the delayed operation is already completed
   */
  def isCompleted: Boolean = completed.get()

  /**
   * Call-back to execute when a delayed operation gets expired and hence forced to complete.
   */
  def onExpiration(): Unit

  /**
   * Process for completing an operation; This function needs to be defined
   * in subclasses and will be called exactly once in forceComplete()
   */
  def onComplete(): Unit

  /**
   * Try to complete the delayed operation by first checking if the operation
   * can be completed by now. If yes execute the completion logic by calling
   * forceComplete() and return true iff forceComplete returns true; otherwise return false
   *
   * This function needs to be defined in subclasses
   */
  def tryComplete(): Boolean

  /**
   * Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
   * @param f else function to be executed after first tryComplete returns false
   * @return result of tryComplete
   */
  private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
    if (tryComplete()) true
    else {
      f
      // last completion check
      tryComplete()
    }
  }

  /**
   * Thread-safe variant of tryComplete()
   */
  private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())

  /*
   * run() method defines a task that is executed on timeout
   */
  override def run(): Unit = {
    if (forceComplete())
      onExpiration()
  }
}

所有的业务类均需要继承DelayedOperation并重写相关方法,相关逻辑不再赘述

总结:以上只是分析了Purgatory的设计思路及大致流程,还有很多多线程并发相关的性能操作,Kafka均处理的非常漂亮,本文不能枚举,读者有兴趣可以参照文章过一遍源码,相信大有裨益