wenmo8 发布的文章

版本:DataX v202309  DataXWeb 2.1.3-alpha-release

DataX:

Github:https://github.com/alibaba/DataX

功能介绍文档:https://github.com/alibaba/DataX/blob/master/introduction.md

文档上虽然只写了Linux系统,但实际部署Windows也可以

JDK版本使用1.8即可

Python如果环境的版本可以选择的话,可以使用2.6或者2.7,我这边使用的是3.12.5

Maven 3.x是编译时需要的条件

一开始下载的是v202308版本,安装包下载路径:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz

因为要Python3.x,所以替换DataX /bin目录下py文件(替换的文件在:DataXWeb:doc/datax-web/datax-python3/)

由于DataX对Mysql 只支持5.x,但是我这边的Mysql DB是 8.x的

所以下载了v202309的源码,调整代码使其支持mysql 8.x

(修改代码的步骤:https://blog.csdn.net/weixin_41640312/article/details/132019719)

然后按照github中的步骤打包即可

问题:

打包过程中发现oceanbasev10writer报错,项目的libs下缺少特定jar文件,

解决:

去master分支找到了这个jar,下载后复制,即可打包成功(打包过程非常慢,不知道是不是网络的问题)

问题:

创建了Mysql 数据源之间的迁移Job(文档中并没有说mysql的限制版本,也就没有想到支持的Mysql版本这么低)

配置是正确的,但dataX一直报错

解决:

去搜索了一下,才发现版本限制,所以切换了版本

问题:

打包好最新版后,运行Mysql Job还是报错(在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数)

解决:

在打包后的datax\conf\core.json 中core.transport.speed.byte由-1修改为2000000

DataXWeb:

Github:https://github.com/WeiYe-Jing/datax-web

众所周知,DataX是使用Python命令行运行Job的Json文件配置来同步数据源

所以使用DataXWeb来搭配使用

一开始使用的是DataXWeb v2.1.2版本,但是配置字段映射方面有点不太容易理解,就换了2.1.3-alpha-release

1. 下载源码

2. 运行datax-admin&datax-executor(按需修改配置文件)

配置文件都有说明,按照说明配置DB,以及datax.py的路径等即可。

相对来讲新版的配置比旧版更容易理解,但是,页面的数据不是很即使,操作时,还是需要刷新,不知道以后会不会调整

至于DataX创建Job的步骤我就不提了,使用DataXWeb,可以很容易创建一个Job

其他:

附上DataX支持的数据源(github上都有)

DataX的核心架构

Job通过源端切分策略,切分为多个Task,然后调用Schedule模块,根据配置的并发参数等,将Task划分为TaskGroup(默认一个TaskGroup5个Task)

每一个Task中启用一个线程,完成Reader->Channel->Writer流程

Demosaic,中文直接发翻译为去马赛克, 但是其本质上并不是去除马赛克,这让很多第一次接触这个名词或者概念的人总会想的更多。因此,一般传感器在采集信息时一个位置只采集RGB颜色中的一个通道,这样可以减少采集量,降低成本,由于人的视觉对绿色最为敏感,所以一般情况下,绿色分量会比红色和蓝色分量多一倍的信息,这样,根据RGB不同的位置排布,就产生了RGGB、GBRG、GRBG、BGGR四种常用的模式,比如RGGB模式就如下所示:




RGGB            BGGR            GBRG             GRBG

去马赛克的目的就是从这些确实的信息中尽量的完美的复原原始信息,比如上图第一个点,只有红色分量是准确的,那就要想办法通过其他的信息重构出改点的绿色和蓝色分量。

目前,关于这方面的资料也是很多的,这里我描述下目前我已经优化和处理的四个算法,并分享部分代码。

一、双线性处理

一个最为直接和简单的想法就是利用领域的相关信息通过插值来弥补当前缺失的颜色分量,我们以RGGB格式为例,参考上图(坐标都是从0开始,X轴从左向右递增,Y轴从上向下递增)。

先不考虑边缘。

比如(1,1)坐标这个点,现在已经有了B这个分量,缺少R和G这个分量,但是他四个对角线上都有R分量,因此,可以使用这个4个像素的平均值来计算改点的R分量,而在其上下左右四个方向有恰好有4个点的G分量,同理可以用这个四个的平均值来评估当前点的G值。

在考虑(1,2)这个坐标点,他的当前有效值是G分量,缺少R和B分量,而他则没有(1,1)点那么幸运,他周边的有效R和B分量都只有2个,因此,只能利用这两个有效值的平均值来评估该点的R/B分量。

其他的点类似处理。

在考虑边缘,由于边缘处的像素总有某一个方向或某二个方向缺少像素,因此,可以利用镜像的关系把缺少的那一边取镜像的位置信息来补充。

一个简单的高效的C++代码如下所示:

int IM_DecodeBayerRG8ToBGR_Bilinear_PureC(unsigned char*Src, BitmapData Dest)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;
unsigned
char* RowCopy = (unsigned char*)malloc((Width + 2) * 3 * sizeof(unsigned char));if (RowCopy == NULL) returnIM_STATUS_OUTOFMEMORY;
unsigned
char* First =RowCopy;
unsigned
char* Second = RowCopy + (Width + 2);
unsigned
char* Third = RowCopy + (Width + 2) * 2;
unsigned
char* SrcP = (unsigned char*)Src;
Second[
0] = SrcP[0];
memcpy(Second
+ 1, SrcP, Width * sizeof(unsigned char));
Second[Width
+ 1] = SrcP[Width - 1];
memcpy(First, Second, (Width
+ 2) * sizeof(unsigned char)); //第一行和第二行一样Third[0] =SrcP[Width];
memcpy(Third
+ 1, SrcP + Width, Width * sizeof(unsigned char));
Third[Width
+ 1] = SrcP[Width + Width - 1];for (int Y = 0; Y < Height; Y++)
{
unsigned
char* LinePD = Dest.Scan0 + Y *Stride;if (Y != 0)
{
unsigned
char* Temp = First; First = Second; Second = Third; Third =Temp;
}
if (Y == Height - 1)
{
memcpy(Third, Second, (Width
+ 2) * sizeof(unsigned char));
}
else{
Third[
0] = SrcP[(Y + 1) *Width];
memcpy(Third
+ 1, SrcP + (Y + 1) * Width, Width * sizeof(unsigned char));
Third[Width
+ 1] = SrcP[(Y + 1) * Width + Width - 1];
}
if ((Y & 1) == 0) //偶数列
{for (int X = 0; X < Width; X++, LinePD += 3)
{
int P0 = First[X], P1 = First[X + 1], P2 = First[X + 2];int P3 = Second[X], P4 = Second[X + 1], P5 = Second[X + 2];int P6 = Third[X], P7 = Third[X + 1], P8 = Third[X + 2];if ((X & 1) == 0)
{
LinePD[
0] = (P0 + P2 + P6 + P8 + 2) >> 2;
LinePD[
1] = (P1 + P3 + P5 + P7 + 2) >> 2;
LinePD[
2] =P4;
}
else{
LinePD[
0] = (P1 + P7 + 1) >> 1;
LinePD[
1] =P4;
LinePD[
2] = (P3 + P5 + 1) >> 1;
}
}
}
else{for (int X = 0; X < Width; X++, LinePD += 3)
{
int P0 = First[X], P1 = First[X + 1], P2 = First[X + 2];int P3 = Second[X], P4 = Second[X + 1], P5 = Second[X + 2];int P6 = Third[X], P7 = Third[X + 1], P8 = Third[X + 2];if ((X & 1) == 0)
{
LinePD[
0] = (P3 + P5 + 1) >> 1;
LinePD[
1] =P4;
LinePD[
2] = (P1 + P7 + 1) >> 1;
}
else{
LinePD[
0] =P4;
LinePD[
1] = (P1 + P3 + P5 + P7 + 2) >> 2;
LinePD[
2] = (P0 + P2 + P6 + P8 + 2) >> 2;
}
}
}
}
free(RowCopy);
returnIM_STATUS_OK;
}

这个算法是非常高效的,而且极易使用指令集优化,在一台普通的配置的PC上(12th Gen Intel(R) Core(TM) i7-12700F   2.10 GHz)处理1902*1080的图,大概只需要2ms,SSE优化后可以做到0.6ms,但是这个方法忽略边缘结构和通道间的相关性,从而容易导致颜色伪彩和图像模糊,比如下面这个经典的测试图,解码后的结果有点惨不忍睹。


用彩色显示RGGB格式图(即把其他通道的颜色分量设计为0)                  双线性解码后的结果

在解码后的水平和垂直方向栅栏中都可以看到明显的色彩异常。

但是由于这个算法的极度高效性,在有些要求不高的场合,依旧可以使用该算法。

二、Hamilton-Adams算法

这也是个非常经典的算法,在说明这个算法之前,必须说明下色差恒定理论。

色差恒定准则与色比恒定准则都是基于颜色通道之间的相关性,目的都是把颜色通道之间的相关性信息引入颜色插值算法,提高插值的准确性。色差相比于色比有两点优势:

第一,色差的运算简单,更容易实现。第二, 色比在G通道接近0时误差较大,色差不存在这类问题。因此,绝大多数颜色插值算法中使用了色差。

那么色差恒定理论,其最为核心的思想就是:临近的两个彩色像素,其相同颜色分量之间的差异值应该近似差不多,用公式表示如下:

R(X,Y) - R(X-1,Y)  = G(X,Y) - G(X-1,Y) = B(X,Y) - B(X-1,Y)

那这是时候如果我们已经获取了某个颜色通道的所有位置的值,通过这个公式就很容易推导出其他位置缺失的值了。

我们还是以上面的(1,1)坐标点为例,假定我们已经获取了所有G通道的数据,也就是说这个(1,1)这个点实际只有R通道数据缺失了(B数据本身就有),这个时候根据颜色恒差理论,应该有

R(1,1) - R(0,0) = G(1,1) - G(0,0)   ------> R(1,1) = G(1,1) + R(0,0) - G(0, 0)

实际上满足这个等式还有(0,2)、(2,0)、(2,2)这三个点(这三个点的红色分量是准确值),所以为了得到更好的精度,我们可以通过下式最终确定R(1,1)的值。

R(1,1) =   (G(1,1) + R(0,0) - G(0, 0) +  G(1,1) + R(0,2) - G(0, 2) +  G(1,1) + R(2,0) - G(2, 0) + G(1,1) + R(2,2) - G(2, 2)) /4

整理后即为:

R(1,1) =   G(1,1) + (R(0,0)  + R(0,2) + R(2, 0) + R(2,2) - G(0,0)  -G(0,2) -G(2, 0) -G(2,2)) / 4

对于(1,2)这个点,G通道数据本来就有,缺少R和B,那根据颜色恒差理论,应该有

R(1,2) - G(1,2) = R(0,2) - G(0,2)

B(1,2) - G(1,2) = B(1,1) - G(1,1)

同样的道理,还有

R(1,2) - G(1,2) = R(2,2) - G(2,2)

B(1,2) - G(1,2) = B(1,3) - G(1,3)

类似(1,2)的方法,把他们综合起来可以得到更为精确的结果:

R(1,2) = G(1,2) + ((R(0,2) + R(2,2) - G(0,2) - G(2,2)) / 2

B(1,2) = G(1,2) + ((B(1,1) + B(1,3) - G(0,2) - G(2,2)) / 2

以上利用颜色恒差理论,就把各个通道之间的数据关联了起来。那么前面的计算都有一个前提条件,就是绿色通道的数据都已经知道了。

我们前面说过,绿色通道的数据量本身只缺少了一半,而缺少了那一半中任何一个点的数据都可以用周边四个点的领域数据填充,因此,如果我们绿色通道就这样处理,而红色和蓝色则用颜色恒差理论借助绿色通道的数据后结果如何呢,我们就把这样做的算法叫做CDCT把,一个简单的严重这个算法的代码如下所示:

int IM_DecodeBayerRG8ToBGR_CDCT_PureC(unsigned char*Src, BitmapData Dest)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;

unsigned
char* Blue = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Green = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Red = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));if ((Blue == NULL) || (Green == NULL) || (Red ==NULL))
{
Status
=IM_STATUS_OUTOFMEMORY;gotoFreeMemory;
}
//先直接复制数据,也无需把通道的值单独提取出来填充,因为后续会把无效的点填充的
memcpy(Blue, Src, Height * Width * sizeof(unsigned char));
memcpy(Green, Src, Height
* Width * sizeof(unsigned char));
memcpy(Red, Src, Height
* Width * sizeof(unsigned char));//因为Green分量占了1/2像素,先填充Green像素//因为后续的Green分量涉及到了3*3的领域,对于边缘的部分,直接使用四个点的平均,单独提出来,这部分计算量很小,无需加速
for (int X = 0; X < Width; X++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height, X,
0);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X, Height
- 1);
}
for (int Y = 1; Y < Height - 1; Y++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height,
0, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height, Width
- 1, Y);
}
//填充Green通道的无效位置的数据
for (int Y = 1; Y < Height - 1; Y++)
{
int Index = Y *Width;for (int X = 1; X < Width - 1; X++)
{
//偶数行和偶数列 或者奇数行和奇数列,绿色分量都是无效的
if (((X + Y) & 1) == 0)
{
Green[Index
+ X] = (Green[Index + X - Width] + Green[Index + X + Width] + Green[Index + X - 1] + Green[Index + X + 1] + 2) / 4;
}
}
}
IM_RGGB_CalcRed_CDCT_PureC(Red, Green, Width, Height);
IM_RGGB_CalcBlue_CDCT_PureC(Blue, Green, Width, Height);
Status
=IM_CombineRGB_PureC(Blue, Green, Red, Dest.Scan0, Dest.Width, Dest.Height, Dest.Stride, Width);if (Status != IM_STATUS_OK) gotoFreeMemory;
FreeMemory:
if (Blue !=NULL) free(Blue);if (Green !=NULL) free(Green);if (Red !=NULL) free(Red);returnStatus;
}

其中 IM_RGGB_CalcRed_CDCT_PureC代码如下所示:

void IM_RGGB_CalcRed_CDCT_PureC(unsigned char* Red, unsigned char* Green, int Width, intHeight)
{
//R G R G R G//G B G B G B//R G R G R G//G B G B G B//R G R G R G//G B G B G B//色差恒定原理:色差是色度信号(R和B分量)与亮度信号(G分量)的差,在图像很小的范围内,当前像素的色差与其周围点的色差是差不多的,也就是类似下面的说法//R(I,J)-G(I,J) = R(I,J + 1)-G(I,J + 1),或者写成R(I,J)-R(I,J+1) = G(I,J)-G(I,J + 1)。这样利用已经前面已经完成重构的G分量,可以重构出其他未知的R和B分量。

for (int X = 0; X < Width; X++)
{
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height, X,
0);
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height, X, Height
- 1);
}
for (int Y = 1; Y < Height - 1; Y++)
{
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height,
0, Y);
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height, Width
- 1, Y);
}
//填充Red通道的无效位置的数据
for (int Y = 1; Y < Height - 1; Y++)
{
int Index = Y *Width;for (int X = 1; X < Width - 1; X++)
{
//偶数行奇数列, 水平方向填充红色分量
if ((Y & 1) == 0 && (X & 1) == 1)
{
Red[Index
+ X] = IM_ClampToByte((Red[Index + X - 1] + Red[Index + X + 1] - Green[Index + X - 1] - Green[Index + X + 1] + 1) / 2 + Green[Index +X]);
}
//奇数行偶数列, 垂直方向填充红色分量
else if ((Y & 1) == 1 && (X & 1) == 0)
{
Red[Index
+ X] = IM_ClampToByte((Red[Index + X - Width] + Red[Index + X + Width] - Green[Index + X - Width] - Green[Index + X + Width] + 1) / 2 + Green[Index +X]);
}
//奇数行奇数列, 水平垂直方向填充红色分量
else if ((Y & 1) == 1 && (X & 1) == 1)
{
Red[Index
+ X] = IM_ClampToByte((Red[Index + X - Width - 1] + Red[Index + X - Width + 1] + Red[Index + X + Width - 1] + Red[Index + X + Width + 1] - Green[Index + X - Width - 1] - Green[Index + X - Width + 1] - Green[Index + X + Width - 1] - Green[Index + X + Width + 1] + 2) / 4 + Green[Index +X]);
}
}
}
}

IM_RGGB_CalcBlue_CDCT_PureC是类似的道理。

经过测试,这样做的结果和直接双线性相比,基本没有什么差异的,所以直接这样还是不行的,

Hamilton-Adams等人在结合绿色通道的一阶导数和周边红色和蓝色的通道的二阶倒数的基础上,对绿色分量的插值提出了如下算法,这个过程考虑到了像素之间的边缘信息:


当水平方向的梯度大于垂直方向的梯度时,使用垂直方向的有关像素计算结果,否则使用水平方形的有关值,如果两者相等,则使用平均值。

实际操作时,都会定义一个阈值,如果水平和垂直的梯度之差的绝对值小于阈值,则使用平均值,如果在阈值之外,再考虑水平和垂直之间的关系。这样能获得更为合理的结果。

实际上,仔细看看,上面每个方向的G5的计算也是利用到了颜色恒差理论的,只是只是单独利用了水平或者垂直方向的像素而已。

我们分享一下更具上述思路编写的C++代码结果:

int IM_DecodeBayerRG8ToBGR_HamiltonAdams_PureC(unsigned char* Src, BitmapData Dest,    intThreshold)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;//宽度和高度都必须是偶数
if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;

unsigned
char* Blue = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Green = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Red = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));if ((Blue == NULL) || (Green == NULL) || (Red ==NULL))
{
Status
=IM_STATUS_OUTOFMEMORY;gotoFreeMemory;
}
//先直接复制数据,也无需把通道的值单独提取出来填充,因为后续会把无效的点填充的
memcpy(Blue, Src, Height * Width * sizeof(unsigned char));
memcpy(Green, Src, Height
* Width * sizeof(unsigned char));
memcpy(Red, Src, Height
* Width * sizeof(unsigned char));//因为Green分量占了1/2像素,先填充Green像素//因为后续的Green分量涉及到了5*5的领域,对于边缘的部分,直接使用四个点的平均,单独提出来,这部分计算量很小,无需加速
for (int X = 0; X < Width; X++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height, X,
0);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X,
1);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X, Height
- 2);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X, Height
- 1);
}
for (int Y = 2; Y < Height - 2; Y++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height,
0, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height,
1, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height, Width
- 2, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height, Width
- 1, Y);
}
//处理剩下的能否安全访问领域的算法
for (int Y = 2; Y < Height - 2; Y++)
{
int IndexC = Y *Width;int IndexN1 = (Y + 1) * Width, IndexN2 = (Y + 2) *Width;int IndexP1 = (Y - 1) * Width, IndexP2 = (Y - 2) *Width;
unsigned
char* Sample = (Y & 1) == 0 ?Red : Blue;for (int X = 2; X < Width - 2; X++)
{
//只有当X和Y都是偶数或者都是奇数时才需要处理
if (((X + Y) & 1) == 0)
{
//周边蓝色或者红色分量的二阶导数
int SecDH = 2 * Sample[IndexC + X] - Sample[IndexC + X + 2] - Sample[IndexC + X - 2];int SecDV = 2 * Sample[IndexC + X] - Sample[IndexN2 + X] - Sample[IndexP2 +X];//加上绿色分量的一阶导数得到梯度
int GradH = IM_Abs(Green[IndexC + X - 1] - Green[IndexC + X + 1]) +IM_Abs(SecDH);int GradV = IM_Abs(Green[IndexP1 + X] - Green[IndexN1 + X]) +IM_Abs(SecDV);//如果垂直或者水平的梯度差不多,则计算周边的平均值
if (IM_Abs(GradV - GradH) <Threshold)
Green[IndexC
+ X] = IM_ClampToByte((Green[IndexP1 + X] + Green[IndexN1 + X] + Green[IndexC + X - 1] + Green[IndexC + X + 1] + 2) / 4 + (SecDH + SecDV + 4) / 8);//如果水平差异小一些,则利用水平方向的平均值
else if (GradH <GradV)
Green[IndexC
+ X] = IM_ClampToByte((Green[IndexC + X - 1] + Green[IndexC + X + 1] + 1) / 2 + (SecDH + 2) / 4);//否则利用垂直方向的平均值
elseGreen[IndexC+ X] = IM_ClampToByte((Green[IndexP1 + X] + Green[IndexN1 + X] + 1) / 2 + (SecDV + 2) / 4);
}
}
}
IM_RGGB_CalcRed_CDCT_SSE(Red, Green, Width, Height);
IM_RGGB_CalcBlue_CDCT_SSE(Blue, Green, Width, Height);
Status
=IM_CombineRGB_PureC(Blue, Green, Red, Dest.Scan0, Dest.Width, Dest.Height, Dest.Stride, Width);if (Status != IM_STATUS_OK) gotoFreeMemory;
FreeMemory:
if (Blue !=NULL) free(Blue);if (Green !=NULL) free(Green);if (Red !=NULL) free(Red);returnStatus;
}

我们分享下前面说的CDCT算法以及HamiltonAdams算法的结果:


简单的CDCT结果                        HamiltonAdams结果

从上面右图的结果可以看到,相比于双线性,水平栅栏处似乎已经看不到色彩异常了,垂直栅栏处的异常点也大为减少,但是还是存在些许瑕疵。

在速度方面,由于直接使用双线性时,可以直接把数据数据有规律的填充到目的图中,而且计算量很简单,而使用色差恒定理论后,由于顺序的要求以及一些编码方面的原因,需要使用一些中间内存,而且计算量相对来说大了很多,因此,速度也慢了不少,上述C++的代码处理1080P的图像,需要大概7ms,经过SSE优化后的代码可以达到4ms左右的速度,这个速度在某些实时性要求比较高的场景下还是具有实际价值的。

为了进一步提高效果,在HA算法的基础上,后续还是有不少人提出了更多的改进算法,在 IPOL上,可以找到一篇
A Mathematical Analysis and Implementation of Residual Interpolation Demosaicking Algorithms
的综述性文章,文章里列举了数种类型的处理方式,那个网站是个也提供了在线的DEMO和代码,可以看到各自的处理后的结果,如下图所示:

不过这些都是比较传统的算法方面的东西了,而且我看了代码和有关效果,感觉这些算法更偏重于理论,实际操作起来,可能效率完全不够,同时IPOL上还提供了一篇基于CNN深度学习方面的算法,效果我看了下,确实还是很不错的,不过就是感觉速度比较堪忧,有兴趣可以在
A Study of Two CNN Demosaicking Algorithms
这里浏览。

那么我后面关注到的是IPOL的另外一篇文章:
Zhang-Wu Directional LMMSE Image Demosaicking
,通过测试发现这个文章的效果还是非常不错和稳定的,而且分析其代码,感觉可以做很大的优化工作。

这个文章对R和B通道的处理方式是和HA算法一样的,都是先要获取G通道的全部数据,然后采用色差恒定原理计算R和B,在计算G通道时,从水平和垂直方向计算 (G-R) 和 (G-B) 的颜色差异开始。然后,这些计算被视为对实际色差的噪声估计,并使用线性最小均方误差框架将它们优化组合。

所谓的优化组合可以这样理解,传统的HA算法,在计算G通道时,就是根据水平和垂直方向的梯度,决定最后是使用水平还是垂直方向的计算值,而ZhangWu算法则不完全使用水平或垂直的信息,而且根据某些过程计算出水平和垂直方向各自的权重然后融合。

这里简单的描述了一下色差噪声估计的公式:


这几个公式可以看成是一个简单的去燥过程,其中f表示原始的噪音信息,s呢就是一个对f的简单的低通滤波,比如高斯滤波,然后呢公式9计算s的一定领域大小的平均值, 公式10计算s在这个领域内的方差, 公式11则计算f和s差异平方的均值。

利用以上信息则可以估算出去除噪音后的估算值 u(公式12)以及对一个的估算误差(公式13)。

分别对水平和垂直反向上的颜色差异进行这样的滤波,然后利用上面的结果按照下述公式进行融合:

上面的公式是一个一维的去燥过程,我有空将其改为2维的图形去燥看看效果如何。

这些公式在论文的配套代码里都有有关的实现,有兴趣的朋友可以有去看看代码,我将其过程进行过程化,这个函数大概有如下代码构成:

//Zhang–Wu Directional LMMSE Image Demosaicking
int IM_DecodeBayerRG8ToBGR_ZhangWu_SSE(unsigned char*Src, BitmapData Dest)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;//宽度和高度都必须是偶数 if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;//宽度或者高度小于8有些领域会溢出 if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;
unsigned
char* Blue = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Green = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Red = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));short* DiffH = (short *)malloc(Width * Height * sizeof(short));short* DiffV = (short *)malloc(Width * Height * sizeof(short));short* LowPassH = (short*)malloc(Width * Height * sizeof(short));short* LowPassV = (short *)malloc(Width * Height * sizeof(short));if ((Blue == NULL) || (Green == NULL) || (Red == NULL) || (DiffH == NULL) || (DiffV == NULL) || (LowPassH == NULL) || (LowPassV ==NULL))
{
Status
=IM_STATUS_OUTOFMEMORY;gotoFreeMemory;
}
//先直接复制数据,也无需把通道的值单独提取出来填充,因为后续会把无效的点填充的 memcpy(Blue, Src, Height * Width * sizeof(unsigned char));//memcpy(Green, Src, Height * Width * sizeof(unsigned char));//绿色通道因为其特殊性,后续会在代码里进行填充,不需要单独复制数据的memcpy(Red, Src, Height* Width * sizeof(unsigned char));//获取水平方向上准确信号和插值信号的差异 IM_GetHoriDiffSignal_SSE(Src, DiffH, Width, Height);//获取垂直方向上准确信号和插值信号的差异 IM_GetVertDiffSignal_SSE(Src, DiffV, Width, Height);//对水平差异进行1*9的高斯滤波 IM_HoriLowPass1X9_SSE(DiffH, LowPassH, Width, Height);//对垂直差异进行9*1的高斯滤波 IM_VertLowPass9X1_SSE(DiffV, LowPassV, Width, Height);//通过LMMSE算法计算完整的绿色通道 Status = IM_RGGB_CalcGreen_ZW_SSE(Src, Green, DiffH, DiffV, LowPassH, LowPassV, Width, Height, 4);if (Status != IM_STATUS_OK) gotoFreeMemory;//使用色差恒定原理计算出Red通道的数据 IM_RGGB_CalcRed_CDCT_SSE(Red, Green, Width, Height);//使用色差恒定原理计算出Blue通道的数据 IM_RGGB_CalcBlue_CDCT_SSE(Blue, Green, Width, Height);//把RGB单通道数据组合成RGB彩色图像 Status =IM_CombineRGB_SSE(Blue, Green, Red, Dest.Scan0, Dest.Width, Dest.Height, Dest.Stride, Width);if (Status != IM_STATUS_OK) gotoFreeMemory;
FreeMemory:
if (Blue !=NULL) free(Blue);if (Green !=NULL) free(Green);if (Red !=NULL) free(Red);if (DiffH !=NULL) free(DiffH);if (DiffV !=NULL) free(DiffV);if (LowPassH !=NULL) free(LowPassH);if (LowPassV !=NULL) free(LowPassV);returnStatus;
}

具体的实现就不做过多的探讨,原始作者的C代码效率非常的低下,简单的测试了下1080P的图大概需要1分钟左右,这完全没有实际的意义的,所以需要进行深度的优化,比如水平方向的滤波,原作者采用1*9的滤波器,我稍作改进并用SSE指令优化如下:

//不支持In-Place操作
void IM_HoriLowPass1X9_SSE(short* Src, short* Dest, int Width, intHeight)
{
for (int Y = 0; Y < Height; Y++)
{
int Index = Y *Width;//边缘采用镜像的关系 4 3 2 1 0 1 2 3 4 5 6 7 Dest[Index + 0] = ((Src[Index + 4] + Src[Index + 4]) * 4 + (Src[Index + 3] + Src[Index + 3]) * 8 + (Src[Index + 2] + Src[Index + 2]) * 16 + (Src[Index + 1] + Src[Index + 1]) * 23 + Src[Index + 0] * 26 + 64) / 128;
Dest[Index
+ 1] = ((Src[Index + 3] + Src[Index + 5]) * 4 + (Src[Index + 2] + Src[Index + 4]) * 8 + (Src[Index + 1] + Src[Index + 3]) * 16 + (Src[Index + 0] + Src[Index + 2]) * 23 + Src[Index + 1] * 26 + 64) / 128;
Dest[Index
+ 2] = ((Src[Index + 2] + Src[Index + 6]) * 4 + (Src[Index + 1] + Src[Index + 5]) * 8 + (Src[Index + 0] + Src[Index + 4]) * 16 + (Src[Index + 1] + Src[Index + 3]) * 23 + Src[Index + 2] * 26 + 64) / 128;
Dest[Index
+ 3] = ((Src[Index + 1] + Src[Index + 7]) * 4 + (Src[Index + 0] + Src[Index + 6]) * 8 + (Src[Index + 1] + Src[Index + 5]) * 16 + (Src[Index + 2] + Src[Index + 4]) * 23 + Src[Index + 3] * 26 + 64) / 128;//W-8 W-7 W-6 W-5 W-4 W-3 W-2 W-1 W-2 W-3 W-4 W-5 Dest[Index + Width - 4] = ((Src[Index + Width - 8] + Src[Index + Width - 2]) * 4 + (Src[Index + Width - 7] + Src[Index + Width - 1]) * 8 + (Src[Index + Width - 6] + Src[Index + Width - 2]) * 16 + (Src[Index + Width - 5] + Src[Index + Width - 3]) * 23 + Src[Index + Width - 4] * 26 + 64) / 128;
Dest[Index
+ Width - 3] = ((Src[Index + Width - 7] + Src[Index + Width - 3]) * 4 + (Src[Index + Width - 6] + Src[Index + Width - 2]) * 8 + (Src[Index + Width - 5] + Src[Index + Width - 2]) * 16 + (Src[Index + Width - 4] + Src[Index + Width - 2]) * 23 + Src[Index + Width - 3] * 26 + 64) / 128;
Dest[Index
+ Width - 2] = ((Src[Index + Width - 6] + Src[Index + Width - 4]) * 4 + (Src[Index + Width - 5] + Src[Index + Width - 3]) * 8 + (Src[Index + Width - 4] + Src[Index + Width - 3]) * 16 + (Src[Index + Width - 3] + Src[Index + Width - 1]) * 23 + Src[Index + Width - 2] * 26 + 64) / 128;
Dest[Index
+ Width - 1] = ((Src[Index + Width - 5] + Src[Index + Width - 5]) * 4 + (Src[Index + Width - 4] + Src[Index + Width - 4]) * 8 + (Src[Index + Width - 3] + Src[Index + Width - 4]) * 16 + (Src[Index + Width - 2] + Src[Index + Width - 2]) * 23 + Src[Index + Width - 1] * 26 + 64) / 128;int BlockSize = 8, Block = (Width - 8) /BlockSize;for (int X = 4; X < 4 + Block * BlockSize; X +=BlockSize)
{
//(V0 * 4 + V1 * 8 + V2 * 16 + V3 * 23 + V4 * 26 + V5 * 23 + V6 * 16 + V7 * 8 + V8 * 4 + 64) / 128 __m128i V0 = _mm_loadu_si128((__m128i*)(Src + Index + X - 4));
__m128i V1
= _mm_loadu_si128((__m128i*)(Src + Index + X - 3));
__m128i V2
= _mm_loadu_si128((__m128i*)(Src + Index + X - 2));
__m128i V3
= _mm_loadu_si128((__m128i*)(Src + Index + X - 1));
__m128i V4
= _mm_loadu_si128((__m128i*)(Src + Index + X + 0));
__m128i V5
= _mm_loadu_si128((__m128i*)(Src + Index + X + 1));
__m128i V6
= _mm_loadu_si128((__m128i*)(Src + Index + X + 2));
__m128i V7
= _mm_loadu_si128((__m128i*)(Src + Index + X + 3));
__m128i V8
= _mm_loadu_si128((__m128i*)(Src + Index + X + 4));

__m128i V08
= _mm_slli_epi16(_mm_add_epi16(V0, V8), 2); //(V0 + V8) * 4 __m128i V17 = _mm_slli_epi16(_mm_add_epi16(V1, V7), 3); //(V1 + V7) * 8 __m128i V26 = _mm_slli_epi16(_mm_add_epi16(V2, V6), 4); //(V2 + V6) * 16 __m128i V35 = _mm_mullo_epi16(_mm_add_epi16(V3, V5), _mm_set1_epi16(23)); //(V3 + V5) * 23 __m128i V44 = _mm_mullo_epi16(V4, _mm_set1_epi16(26)); //V4 * 26 __m128i Sum=_mm_add_epi16(_mm_add_epi16(_mm_add_epi16(V08, V17), _mm_add_epi16(V26, V35)), V44);
__m128i Avg
= _mm_srai_epi16(_mm_add_epi16(Sum, _mm_set1_epi16(64)), 7);
_mm_storeu_si128((__m128i
*)(Dest + Index +X), Avg);
}
for (int X = 4 + Block * BlockSize; X < Width - 4; X++)
{
Dest[Index
+ X] = ((Src[Index + X - 4] + Src[Index + X + 4]) * 4 + (Src[Index + X - 3] + Src[Index + X + 3]) * 8 + (Src[Index + X - 2] + Src[Index + X + 2]) * 16 + (Src[Index + X - 1] + Src[Index + X + 1]) * 23 + Src[Index + X] * 26 + 64) / 128;
}
}
}

这极大的提高了运行速度。

在LMMSE的计算过程中,作者的M大小取的是4,即涉及到的领域大小也是9个像素,作者在代码里使用硬循环实现,实际上这个也就是普通一进一出的统计,完全可以做点优化的,特别是垂直方向的循环,每次都要跳一行像素,cachemiss很大,所以通过适当的改动结构,能极大的提高速度。经过优化,我们测试这个算法处理处理一副1080P的图像,SSE版本大概耗时12ms,我自己优化后的C++代码耗时约27ms(使用了编译器自己的向量化方式编译,而且非原始作者的C++代码),这个速度应该说在实时系统中还是可以接受的,而且这个过程还是可以进行多线程并行化的,如果开两个线程,SSE版本有望做到8ms一帧。

同HA算法相比,这个算法得到的结果更加完美,而且有瑕疵的地方更少,如下所示:


Zhang Wu算法结果                              原始图像

我们再分享一组测试及图像:


Mosaic                                          Bilinear


HA                                            Zhang Wu

整体来看,效果最好是的Zhang Wu算法, 其次是HA, 最一般的就是Bilinear了,但是,也不要一板子拍死Bilinear, 在很多不是很复杂的场景的情况下,使用他得到的效果依旧是可以满足需求的,关键是他真的快啊。

当然,如果对算法执行效率和效果都要做一个均衡的话,应该说HA算法就比较适中了。

为了比较方便,我编写了一个简易的DEMO,供大家做算法验证。

https://files.cnblogs.com/files/Imageshop/DeMosaic.rar?t=1725238639&download=true

如果想时刻关注本人的最新文章,也可关注公众号:

参考资料:

// https://blog.csdn.net/OdellSwan/article/details/136887148 ISP算法 | Demosaic(一)
// https://blog.csdn.net/OdellSwan/article/details/137246117 ISP算法 | Demosaic(二)
// https://cloud.tencent.com/developer/article/1934157 ISP图像处理之Demosaic算法及相关
// https://zhuanlan.zhihu.com/p/594341024 Demosaic(二)Hamilton & Adams插值算法
// https://zhuanlan.zhihu.com/p/144651850 Understanding ISP Pipeline - Demosaicking
//
https://blog.csdn.net/feiyanjia/article/details/124366793

2024年9月开始了,
救园
的最后一个月到了,园子的
20年纪念T恤
终于赶在天凉好个秋之前上架了。

用这件设计简约清新给人希望的T恤,纪念过去二十年充满艰难而满怀希望的时光。

园子的二十年,是困难重重的二十年,是走了很多弯路摔了很多跟头的二十年,但也是心中一直燃烧着希望的二十年——服务好开发者,一切随之而来;真心帮助开发者,一定会有未来。在当前生死关头,很多园友出手相救,给燃烧的希望火上浇油,让这燃得更旺的希望,成为接下来二十年飞翔的翅膀。

园子的二十年,是陪伴很多园友美好青春时光的二十年,是见证很多园友不惜宝贵青春时光挥洒技术热情无私分享的二十年,这种分享精神是园子的希望之光,我们要在接下来二十年让它绽放光芒。

博客园20年纪念T恤载着怀念与希望在这个决定园子命运的9月上架啦!

价格

  • 售价:¥79
  • VIP会员价:¥59(仅限3件)
  • PLUS会员价:¥39(仅限3件)
  • 回头客价:¥59(仅限3件)

购买方式

优惠与赠送

  • 会员领优惠券:
    https://vip.cnblogs.com/benefits/t-shirt-20th-anniversary/coupon
    (只能在淘宝店使用)
  • PLUS会员赠送:开通
    PLUS会员
    会赠送100元周边礼,在周边小店购买纪念T恤会自动免费
  • 终身会员赠送:
    终身会员
    额外赠送1件纪念T恤(一共赠送2件),成为终身会员后加
    企业微信
    领取
  • 回头客优惠:如果您买过任一款2024年博客园T恤或者polo衫,可享回头客价,下单后先不付款,联系客服改价

面料参数

  • 面料:100%精梳紧密赛络棉
  • 支数:20支
  • 克重:230克

尺码表

liwen01 2024.09.01

前言

最近十几年,通信技术发展迅猛,通信标准更新频繁,有的设备还在使用 802.11/b/g/n 协议,有的已支持到 WiFi6、WiFi7。

而国内有关无线 WiFi 的书籍或资料却很少,就算能找着的,大多也是比较老旧。本文试图使用最新的数据来介绍 WiFi 相关的一些基础知识。

关于 WiFi 技术的发展,下面几个问题看你了解多少:

  1. 家用路由器一般都兼容支持哪些 WiFi 协议标准?

  2. 802.11 b/g/n/ac/ax 具体是指什么?与 WiFi4/5/6/7 有什么区别?

  3. 为什么不同协议间的最大速率相差巨大?它们实现的原理是什么?

  4. 在 WiFi 发展中,是由哪些关键技术的发展使 WiFi 速率得到显著提升?

  5. 为什么实际速都远低于理论速率?

  6. 有线以太网与无线 WiFi 在 OSI 七层模型中有哪些差异?

  7. 802.11 a/b/g/n 等这些标准是位于 OSI 的哪一层?

  8. WiFi 在 2.4Ghz/5GHz  各有多少个可用信道,有没其它限制?

  9. WiFi 在 2.4Ghz/5GHz  各有多少个不重叠信道?

  10. 为什么我们很少见到使用 WiFi5 的设备?

  11. 在同一个空间,两台使用不同协议的路由器相互之间是否会有干扰或影响?

  12. 最新的 WiFi7 可以工作在哪些频段?

本文主要介绍 WiFi 信道,无线网 OSI 模型,以及802.11b/g/n 标准协议的一些关键技术。由于篇幅的限制,WiFi5、WiFi6、WiFi7 将在下一篇中介绍。

(一) IEEE 协会 与 802.11 标准

  • IEEE
    (Institute of Electrical and Electronics Engineers)电气与电子工程师协会,是一个国际性的专业学会组织,是全球最大的技术专业组织
  • IEEE 802
    是一个标准系列项目,包括以太网、局域网、城域网的多个技术标准。
  • IEEE 802.11
    是 IEEE 802 标准系列中的一个工作组,专注于无线局域网 (WLAN) 技术。
  • 802.11a/b/g/n...
    是由 IEEE 802.11 工作组下的任务组开发的标准

在早些年,我们看到比较多的 WiFi 分类是按 802.11b/g/n 字母来区分,但是随着 WiFi 协议的不断发展,WiFi 联盟对不同 WiFi 标准指定了新的名字,也就是 WiFi4、WiFi5、WiFi6、WiFi7 按数字代号表示;其主要目的是方便大家记忆和区分。

802.11 be 也就是 WiFi7,预计在今年(2024)正式发布,现在网上可以买到的 WiFi7 设备,应该是预认证设备,具备 WiFi7 的部分功能,但可能与正式发布的标准会存在一些差异

在介绍各 WiFi 协议标准之前,我们先了解一下 WiFi 信道相关的概念。

(二) WiFi信道

目前在安防IPC设备上,使用比较多的还是 802.11b/g/n 三个标准,但也有不少厂家开始切换到 802.11ax(WiFi6) 协议上来了。

实际上大部分产品是直接 从 802.11n(WiFi4) 直接切换到 802.11ax(WiFi6)。

为什么不使用 WiFi5,而是从 WiFi4 直接跨越到了 WiFi6 呢?

因为 WiFi5 只支持 5GHz 频段,对于以前使用 2.4GHz 的设备就没法兼容了。

(1) 2.4Ghz 频段信道

  • 802.11b使用的信道频宽是 22MHz,目前使用的其它标准都是 20Mhz信道带宽
  • 每个相邻信道的中心频率,相差5MHz(除了14信道)
  • 传统认知上,有 3 个不重叠的信道(1、6、11)

由于 802.11b (使用 DSSS 调制技术频宽22 MHz) 已经淡出 WLAN 网络,不考虑兼容性问题,通常情况下,可以认为1、5、9和13信道也是非重叠信道。

对于 12~14 信道,不同国家有不同的要求规范,实际产品设计需要根据国家码去适配。

(2) 5GHz 频段信号


  • 5 GHz 频段通常被划分为 4 个 UNII (Unlicensed National Information Infrastructure) 子频段。
  • DFS(Dynamic Frequency Selection,动态频率选择)
    信道是为了避免干扰重要的雷达系统而设计的,这些信道需要 WiFi 设备监测雷达信号,并在探测到时自动切换信道。
  • 20 MHz 信道
    :是最常用的信道带宽,适合在设备较多的环境中使用,以避免干扰
  • 40 MHz 信道
    :通过聚合两个相邻的 20 MHz 信道,提供更高的吞吐量,但更容易受到干扰
  • 80 MHz 和 160 MHz 信道
    :适用于对吞吐量要求极高的应用(如 4K 流媒体、高清视频会议),但在实际使用中较少,因为它们占用了更多的频谱资源
  • UNII-2 Extended(5470-5725 MHz) 的所有信道在中国都不能使用
  • 在中国,
    只有 UNII-3 的 5 个信道可以在所有场景使用
  • 在中国,可以使用的非重叠 5GHz 频段有 13 个

上面表格数据是来源于华为的一份开源文档,我们可以看到低频和中频是被限定在室内使用,但是我们查看很多其它的资料,发现与华为的数据对不上,比如下图,它们对我国在 UNII-1 和 UNII-2 的部分信道并没有做限定。

通过查询最新版本上的《中华人民共和国无线电频率划分规定》,我们可以看到,华为的数据是对的,在 2023 年我国有规定,UNII-1 和 UNII-2 的信道只能在室内使用。

所以,
对于中国 5GHz 可以直接使用的信道,2023年之前的资料会包括UNII-1 和 UNII-2 里面的信道,但是在2023年之后,UNII-1 和 UNII-2 会被标注为仅限室内使用。

(3) 6GHz 频段信道

在 WiFi6 和 WiFi7 中会使用到一些 6GHz 的信道,但是目前我国还没有开放 6GHz 信道的使用。

6GHz 频段范围从 5925MHz 扩展到 7125MHz,共计 1200MHz 频谱。它可以通过信道绑定成  3 个 320MHz 信道、7 个 160MHz 信道、14 个 80MHz 信道或者是 29 个40MHz 信道。如果不绑定直接使用,它提供了 59 个 20MHz 信道。

对比 2.4GHz 和 5GHz,6GHz 频段的频谱资源比前两者相加还要多。

随着 WiFi6、WiFi7 逐渐地普及,国内将来应该也会开放一部分 6GHz 的 WiFi 信道

(三) 无线网中的 OSI 模型

计算机课程中常用网络分层参考7层模型:
物理层、数据链路层、网络层、传输层、会话层、表示层、应用层

上面这个模型其实是非常概括性的,实际要复杂很多,从这个图上我们看不出以太网与无线网有什么差别。

以太网与无线网在 OSI 模型上主要的差异在于第一和第二层,也就是物理层和数据链路层

(1) 物理层(Physical Layer)

物理层主要负责在网络设备之间传输原始的比特流(0和1)。它涉及物理连接,如电缆、光纤和无线电波,以及数据传输的电气和机械特性。常见的物理层设备包括网卡、集线器和电缆。

以太网
:以太网使用有线连接,如双绞线电缆或光纤来传输数据。物理层定义了传输的电信号、电压和脉冲等特性。

无线网(Wi-Fi)
:Wi-Fi通过无线电波在空气中传输数据。物理层涉及无线频率的选择、天线的配置,以及信号的调制和解调方式

(2) 数据链路层(
Data Link Layer
)

数据链路层负责在相邻节点之间建立可靠的通信链路。它将数据帧从一个节点发送到下一个节点,并处理帧的传输错误。数据链路层还包括 MAC (介质访问控制)子层和 LLC (逻辑链路控制)子层。常见的设备有交换机和网桥。

以太网
:在数据链路层,以太网通常使用以太网帧(Ethernet Frame)进行数据封装。MAC 地址用于标识网络设备,并控制对介质的访问(
CSMA/CD
,载波侦听多路访问/冲突检测机制)。

无线网(Wi-Fi):
无线网在数据链路层也使用帧进行数据封装,但 Wi-Fi 帧格式与以太网帧有所不同。Wi-Fi使用
CSMA/CA
(载波侦听多路访问/冲突避免机制)来管理介质访问,并增加了加密(如 WPA/WPA2 )和认证(如802.1X)的功能,以增强安全性。

(3) 无线网数据帧封装

对无线网的物理层和数据链路层再进一步划分,我们可以看到物理层有:PLCP 和 PMD 层,数据链路层有:MAC 层和 LLC层

这里我们简单介绍一下各层的一个基本功能,详细的 WiFi 数据帧分析我们将在后面章节来介绍。

  • LLC 子层
    :(
    Logical Link Control
    )逻辑链路控制子层,为上层网络协议提供统一的接口,管理逻辑链路的控制和数据传输。
  • MAC 子层
    :(
    Medium Access Control
    )媒体访问控制子层,管理设备对共享通信介质的访问和数据帧的传输。
  • PLCP 子层
    :(
    Physical Layer Convergence Procedure
    )物理层收敛过程子层,负责在 MAC 层和 PMD 子层之间转换数据帧格式。
  • PMD 子层
    :(
    Physical Medium Dependent
    ) 物理介质相关子层,直接处理物理信号的传输和接收。

我们常说的802.11 b/g/n等协议标准,实际上是位于物理层。

(4) 物理层扩频技术

扩频技术是无线局域网数据传输使用的技术,扩频技术最初是用于军事部门防止窃听或信号干扰。

WiFi(无线局域网) 使用扩频技术来提高通信的可靠性和抗干扰能力,扩频技术在 WiFi 中的应用主要通过以下几种方式实现:

(a) 直接序列扩频 (Direct Sequence Spread Spectrum, DSSS)

DSSS
通过将数据与一个伪随机噪声码 (PN码)进行异或运算,将数据分散到一个更宽的频谱上。这样做的好处是使得信号在频谱中的能量密度降低,从而提高了信号对噪声和干扰的抵抗力。

(b) 跳频扩频 (Frequency Hopping Spread Spectrum, FHSS)

FHSS
通过快速在多个频率之间跳转来避免干扰,这需要提前在发送和接收端约定好跳频的规律,实际在WiFi中使用得比较少。

(c) 正交频分复用 (Orthogonal
Frequency-Division Multiplexing
, OFDM)

OFDM 使用多个正交子载波,每个子载波传输数据的一部分,这样就大大降低了多径效应的影响,并提高了频谱效率。

上面的这三种扩频方式看不懂没关系,下面会有稍微比较详细的介绍。

(四) 802.11b

802.11b 是1999年发布的标准,
为什么它最大的理论数据只有11Mps?

这与802.11b 物理层使用的编码方式和调制方式有关系:

(a)
BPSK
与 QPSK调制方式

BPSK
: (Binary Phase Shift Keying)每个符号代表1个比特,即每次调制一个符号时只能传递1个比特。

QPSK
: (Quadrature Phase Shift Keying)每个符号代表2个比特,因为它可以区分四种相位,所以比BPSK效率更高。

(b) Barker 与 CCK 编码

Barker编码
: Barker 码是一个 11 比特序列 (例如10110111000),在无线传输方面存在优势,可以有效降低干扰,不过降低了效率。

每一个比特编码为一个 11 位 Barker 码,因此而产生的一个数据对象形成一个chip(碎片)。
实际传输的信息量是有效传输的 11 倍

CCK编码:
(Complementary Code Keying)补码键控,采用了复杂的数学转换函数,可以使用若干个 8-bit 序列在每个码字中编码 4 或 8 个位。

补码键控编码方式能有效防止噪声及多径干扰,缺点是补码键控为了对抗多径干扰,技术复杂,实现困难

(c) 802.11b 速率计算

关于802.11b 各速率的计算:

1Mbps (Barker + BPSK)

调制方式
: BPSK,每个符号1比特。

编码方式
: Barker 编码,每个符号被编码为11位。

结果
: 由于符号速率是1 MSym/s,BPSK调制1个符号1比特,所以最大理论速率是1 Mbps。

2Mbps (Barker + QPSK)

调制方式
: QPSK,每个符号2比特。

编码方式
: Barker编码。

结果
: 符号速率1 MSym/s,每个符号传输2比特,所以最大理论速率是2 Mbps。

5.5Mbps (4-bits CCK + QPSK)

调制方式
: QPSK,每个符号 2 比特。

编码方式
: 4-bits CCK编码,利用复杂的编码方式提高了每符号的比特传输效率。

结果
: 虽然每个符号代表 2 个比特,但CCK编码使得每个符号最终可以传递 4 个比特。因此最大理论速率是 5.5 Mbps。

11Mbps (8-bits CCK + QPSK)

调制方式:
QPSK,每个符号 2 比特。

编码方式
: 8-bits CCK 编码,每个符号可传递 8 个比特。

结果
:
在QPSK的基础上,通过CCK编码的优化使得每个符号可以传输8个比特,所以最大理论速率是11 Mbps

注意:
上面 Sym/s 是符号率/码元速率的单位,用于表示通信系统中每秒传输符号数量的单位

  • 在BPSK (Binary Phase Shift Keying)调制中,一个符号代表1个比特。
  • 在QPSK (Quadrature Phase Shift Keying)调制中,一个符号代表2个比特。

(五) 802.11g

802.11g 可以从 802.11b 中的最大速率 11Mbps 提升到 54Mbps, 核心是使用了OFDM 调制载波技术。

(1)正交频分复用技术(OFDM)

正交频分复用 (Orthogonal Frequency Division Multiplexing ,OFDM) 是一种数字多载波调制方案,它通过在同一单信道内使用多个子载波来扩展单子载波调制的概念。

OFDM 不是使用单个子载波传输高速率数据流,而是使用大量并行传输的紧密间隔的正交子载波。每个子载波均采用传统的数字调制方案。许多子载波的组合可以在等效带宽内实现与传统单载波调制方案类似的数据速率。

从上图我们可以看到,
当某个载波信号振幅最高的时候,也就是信号强度最强的时候,其它载波的振幅都刚好为0。

OFDM 基于频分复用 (FDM) 技术,在 FDM 中,不同的信息流被映射到单独的并行频道上,每个 FDM 信道均通过频率保护带与其他信道分开,以减少相邻信道之间的干扰。

OFDM 方案与传统 FDM 的不同之处在于以下相关方面:

  1. 多个载波 (称为子载波)承载信息流,
  2. 子载波彼此正交,并且为每个符号添加保护间隔,以最小化信道延迟扩展和符号间干扰

上图说明了 OFDM 信号的主要概念以及频域和时域之间的相互关系。

在频域中,多个相邻子载波各自独立地用复数数据进行调制。对频域子载波执行逆 FFT 变换以产生时域中的 OFDM 符号。

在时域中,在每个符号之间插入保护间隔,以防止由于无线电信道中的多径延迟扩展而在接收机处引起的符号间干扰。可以连接多个符号来创建最终的 OFDM 突发信号。

在接收器处,对 OFDM 符号执行 FFT 以恢复原始数据位。
这里的 FFT 就是高数中的傅里叶变换。

在802.11g 中,有48个子载波用来传输数据,4个子载波用来做相位参考

为什么802.11g速率可以达到 54Mbps 呢?

802.11g 除了使用了OFDM调制载波技术,它还使用了64-QAM 的编码方式。

(2) 64-QAM 编码方式

QAM (Quadrature Amplitude Modulation)正交幅度调制,在QAM (正交幅度调制)中,数据信号由相互正交的两个载波的幅度变化表示。模拟信号的相位调制和数字信号的PSK (相移键控)可以被认为是幅度不变、仅有相位变化的特殊的正交幅度调制。

64-QAM 中的每个符号都是一个包含 6 位的星座状态,每个符号是从 000 000 到 111 111 的 64 种不同状态中的一种可能组合。由于该调制方案使用二进制数据,因此可能的组合总数使用6 位为 2的6次方,即 64。

相应的在WiFi中还有使用16-QAM和256-QAM的编码方式,16-QAM 传输4个位,64-QAM 传输6个位,256-QAM传输8个位。

在802.11g 中使用的是64-QAM,并且它的编码率是3/4

(3) 802.11g 速率计算

数据速率是符号速率、每个符号承载的比特数和信道编码率的乘积。

调制方式
: 64-QAM,每个符号代表6个比特。

编码率
: 3/4 (前向纠错编码中使用的编码率)。

符号速率
: 250 ksps。

每个OFDM符号在所有子载波上
传输的总时间为4微秒 (μs),其中包括3.2微秒的数据传输时间和0.8微秒的保护间隔 (Guard Interval)

因此,符号周期 (Symbol Period)为4微秒。

由于每个符号周期为4微秒,符号速率为:

  • 每个符号可以传输的比特数 = 6bit (因为64-QAM)。
  • 载波编码率是3/4,所以实际有效比特数 = 6bit * 3/4 = 4.5 bit。
  • 有48个数据子载波,所以每个OFDM符号可以传输的比特数 = 48 * 4.5bit = 216bit。
  • 符号速率是250 ksps,所以总数据速率 = 216 比特/符号 * 250 ksps = 54 Mbps。

由上面的计算可以知道,802.11g 最大支持的速率是54Mbps。

于此同时,802.11g可以向下兼容,在不同调制方式和编码率下,可以匹配到不同的速率上。

调制方式 编码率 数据速率
BPSK 1/2 6 Mbps
BPSK 3/4 9 Mbps
QPSK 1/2 12 Mbps
QPSK 3/4 18 Mbps
16-QAM 1/2 24 Mbps
16-QAM 3/4 36 Mbps
64-QAM 2/3 48 Mbps
64-QAM 3/4 54 Mbps

(六) 802.11n (WiFi4)

2009 年更新的 802.11n 也就是 WiFi4,可以同时支持 2.4G 和 5G 信道,2.4Ghz 的理论速度达到了 450 Mbps, 5GHz 的理论速度达到了 600Mbps。同时支持两个频段,并且速率得到了跨越式的增长,大大地提升了 WiFi 的使用体验。

就目前而言,很多设备还是使用的 802.11n 协议,特别是在安防 IPC 行业。

那么,从 2003 年的 802.11g 到 2009 年的 802.11n(WiFi4),又有哪些关键技术的实现让 WiFi4 的速率得到质的飞跃呢?

WiFi4 核心的技术是 OFDM、FEC、MIMO、40Mhz、Short Gi。

(1) 802.11n 的 OFDM

这里使用的 OFDM 正交频分复用技术与 802.11g 中使用的是相同的。不同的点是:

  • 802.11g 总共有 52 个子载波,802.11n 有 56 个子载波
  • 802.11g 有 48 个数据子载波,802.11n 有 52 个数据子载波

数据子载波数 x 每个符号传输比特数 x 载波编码率 x 符号速率 = 最大理论速率

52 * 6bit * 3/4 * 250 ksps = 58.5Mbps

数据子载波数量增加了 4 个,所以速率由 802.11g 的 54Mbps 提升到了 58.5Mbps。

(2) 802.11n 的 FEC

前向纠错编码 (Forward Error Correction,FEC) 技术在发送端将原始数据块进行编码,添加冗余信息形成编码数据块。接收端通过解析这些冗余信息来检测和纠正传输过程中出现的错误。

这种方法不需要反馈和重传,因此可以显著提高数据传输的效率,特别是在高噪声或信号衰减严重的无线环境中。


使用 FEC 前向纠错编码之后,载波编码率由 802.11g 的 3/4 提升到了 5/6 。

数据子载波数 x 每个符号传输比特数 x 载波编码率 x 符号速率 = 最大理论速率

52 * 6bit * 5/6 * 250 ksps = 65Mbps

使用 FEC 编码之后,速率提升到了 65Mbps

(3) 802.11n 的 Short Gi

Guard Interval
(GI) 是指在每个 OFDM (正交频分复用) 符号之间插入的一段保护时间,用来防止符号间的干扰 (
ISI,
Inter-Symbol Interference
)。这种干扰通常由多径传播引起,即信号在传播过程中经过多次反射、折射和散射,从而导致信号在不同的时间到达接收端。

在传统的 802.11 系统中,
GI 的标准长度为 800 纳秒 (ns)
,这个时间间隔足够长,以消除大部分的符号间干扰。然而,长时间的 GI 也意味着浪费了一部分可以用于数据传输的时间。

为了提高数据传输效率,802.11n 及后续标准引入了 Short GI 技术,将 GI 的长度从 800 ns 缩短到 400 ns。这一缩短的保护时间段带来了显著的性能提升。

由于保护时间缩短了400ns,所以每个符号周期为4微秒-0.4微秒 = 3.6微秒

符号率为:277.778ksps

数据子载波数 x 每个符号传输比特数 x 载波编码率 x 符号速率 = 最大理论速率

52 * 6bit * 5/6 * 277.778 ksps = 72.2222Mbps

使用 Short GI技术之后,速率提升到了 72.2222Mbps.

(4) 802.11n 信道捆绑

802.11n 允许使用 信道捆绑 技术,将两个相邻的 20 MHz 信道捆绑在一起,形成一个 40 MHz 的信道。这使得数据传输可以在更宽的频谱范围内进行。

通过增加信道宽度,可以承载更多的子载波 (subcarriers),从而提高数据的传输速率。

  • 一个标准信道是 20Mhz 频宽,包含 52 个子载波
  • 两个相邻信道捆绑起来就是 40Mhz 频宽,包含108 (52*2+4=108) 个子载波

为什么上面两个信道捆绑到一起后,子载波数还多了4个呢?

因为信道与信道之间有间隙,当两个信道绑定之后,两个信道中间的频段也可以被使用到。

在 2.4G 模式上最多可以有一个 40M 信道,在5G模式上 40M 信道数目因国家不同而不同,理论上最多有11个 40M 信道。

数据子载波数 x 每个符号传输比特数 x 载波编码率 x 符号速率 = 最大理论速率

108 * 6bit * 5/6 * 277.778 ksps = 150Mbps

2.4Ghz频段信道捆绑注意事项:
在 2.4 GHz 频段,由于可用的信道较少且信道间隔较窄,通常使用的信道捆绑配置包括:

  • 信道 1 和 5:这些信道可以捆绑在一起形成 40 MHz 宽的信道。
  • 信道 6 和 10:这些信道也可以捆绑在一起形成 40 MHz 宽的信道。
  • 信道 11 和 7:这些信道也可以捆绑在一起形成 40 MHz 宽的信道。

由于 2.4 GHz 频段的信道带宽较小,捆绑时的信道间隔可能会导致较高的信道重叠和干扰,因此在这个频段使用信道捆绑时需要特别注意干扰管理。

(5) 802.11n MIMO

MIMO(Multiple Input Multiple Output)概念

多输入多输出
:MIMO 技术利用多个发射天线和接收天线在无线通信中进行数据传输。通过同时传输多个数据流,MIMO 技术可以显著提高无线网络的吞吐量和覆盖范围。

空间复用
:MIMO 技术允许在相同的频谱资源上同时传输多个数据流,增加了频谱的利用效率。这种技术基于空间复用原理,即在同一频段内通过空间分离的数据流来实现更高的数据传输速率。

发射机的多个天线意味着有多个信号输入到无线信道中,接收机的多个天线是指有多个信号从无线信道输出,多天线接收机利用先进的空时编码处理能够分开并解码这些数据子流,从而实现最佳处理,并有效地抵抗空间选择性衰落。

802.11n 使用了 MIMO 技术之后,速率可以提升到 150Mbps*n(n为空间流个数),n 的最大值为4,

数据子载波数 x 每个符号传输比特数 x 载波编码率 x 符号速率 x MIMO = 最大理论速率

108 * 6bit * 5/6 * 277.778 ksps *4 = 600Mbps

所以 802.11n 的最大速率是 600Mbps

我们回到最开始的WiFi标准与WiFi世代图中,
我们可以看到 802.11n (WiFi4) 在2.4GHz 的最大速率是 450Mbps,而在 5Ghz 的最大速率是 600Mbps,这是为什么?

我在网上看的资料是,802.11n 在 2.4GHz 的时候最大是 3 条数据流,而在 5GHz 的时候最大是 4 条数据流。

802.11n 除了上面介绍的 OFDM、FEC、MIMO、40Mhz、Short Gi 这些关键技术之外,它还有帧聚合、Block Ack 块确认、更加高效的MAC层等技术使 WiFi 的整体性能得到了很大的提升。

结尾

上面介绍了 WiFi 信道,无线 WiFi 的 OSI 模型,以及 802.11b/g/n 标准协议的一些关键技术,下一篇将介绍 WiFi5、WiFi6、WiFi7 相关的一些内容,以及这些标准在使用时需要注意的事项。

上面内容,如有错误,欢迎评论区批评指出,不胜感激。

------------------End------------------
如需获取更多内容
请关注 liwen01 公众号

大家好,我是码农先森。

我们在某宝或某多多上抢购商品时,如果只是下了订单但没有进行实际的支付,那在订单页面会有一个支付倒计时,要是过了这个时间点那么订单便会自动取消。在这样的业务场景中,一般情况下就会使用到延时队列。

通常在客户下单之后,就会将订单数据推送到延时队列中并且会对该消息设置一个延时时长,比如设置五分钟、十分钟、或十五分钟等,具体的时长应该还是要结合当前的业务进行衡量,然后消费端会在指定时间到达后就对该消息进行支付支付状态判断,如果已经支付则不予处理,要还是未支付,则会取消该订单,并且释放商品库存。

我们这次分享的内容,主要是基于 Redis 延时队列的实现方式,当然除了 Redis 还可以用其他的技术,比如 RabbitMQ、Kafka、RocketMQ 等专业的消息队列。但是我用 Redis 的原因是,它的应用场景比较广泛,我们平时接触也比较多,而且相对于专业的消息队列它没有过多复杂的配置,学起来容易上手,出了问题解决起来也快,学东西的路径都是由易到难嘛。

另外,如果你对上面提到的专业消息队列使用很熟练,也可以将 Redis 更换成它们,这里只是存储介质的不同,技术的实现逻辑上没有太大区别,重要的是设计思想,大家各取所需吧。

好了,我先介绍一下这次延时队列的实现逻辑。主要分为三个部分,一是:消息的发送,如果设置了延时时间则会将消息存储到 Redis 的延时队列中,反之会直接将消息推送到 Redis 的就绪队列中等待消费。二是:将到期的消息从 Redis 延时队列中取出,并且推送到 Redis 的就绪队列中等待消费。三是:消费端会从 Redis 的就绪队列中按顺序读取出消息,并且执行对应的业务处理逻辑,如果处理失败则会将该消息,再次推送到 Redis 的延时队列中进行下一次的重试。

这里说到的延时队列是利用 Redis 有序集合来实现的,它每间隔一秒钟就会被轮询一次,如果有到期的消息,则就会将该消息推送到 Redis 就绪队列,并且从该集合中移除过期的消息,至此就可以等待着消费端进行消费了。接下来我们就从实际的代码出发,来看一下如何实现基于 Redis 的延时队列。

话不多说,开整!我们先来看一下整体的项目目录结构,内容主要分为 PHP 和 Go 两部分。

[manongsen@root php_to_go]$ tree -L 2
.
├── go_delay
│   ├── app
│   │   ├── controller
│   │   │   └── notify.go
│   │   ├── config
│   │   │   └── config.go
│   │   ├── extend
│   │   │   └── queue.go
│   │   └── route.go
│   ├── go.mod
│   ├── go.sum
│   └── main.go
└── php_delay
│   ├── app
│   │   ├── controller
│   │   │   └── Notify.php
│   ├── composer.json
│   ├── composer.lock
│   ├── command
│   │   └── Consumer.php
│   ├── route
│   │   └── app.php
│   ├── extend
│   │   └── Queue.php
│   ├── think
│   ├── vendor
│   └── .env

ThinkPHP

使用 composer 创建基于 ThinkPHP 框架的 php_delay 项目。

## 当前目录
[manongsen@root ~]$ pwd
/home/manongsen/workspace/php_to_go/php_delay

## 安装 ThinkPHP 框架
[manongsen@root php_delay]$ composer create-project topthink/think php_delay
[manongsen@root php_delay]$ cp .example.env .env

## 安装 Composer 依赖包
[manongsen@root php_delay]$ composer require predis/predis
## 创建一个消费者脚本
[manongsen@root php_delay]$ php think make:command Consumer
## 创建一个生产者脚本,用于测试
[manongsen@root php_delay]$ php think make:command Producer

这个就是延时队列实现的核心类,定义了就绪、延时、失败三个消息队列。
send()
方法用于发送消息,其中可以指定
$delay
参数设置延时时间单位是秒。
wait()
方法用于消费端监听消息,从下面的代码可以看出这里还利用多进程,父进程的作用是每间隔一秒钟,就从 Redis 有序集合中读取到期的消息,并将该消息推送到 Redis 就绪队列,子进程则阻塞监听就绪队列的消息,并且将接收到的消息回调到用户自定义的业务函数中。

<?php
declare (strict_types = 1);

class Queue
{
    // 就绪消息存放的队列
    const QUEUE_READY = 'redis:queue:ready'; 

    // 延迟消息存放的队列(实际的数据结构是有序集合)
    const QUEUE_DELAY = 'redis:queue:delay'; 

    // 失败消息存放的队列
    const QUEUE_FAILED = 'redis:queue:failed'; 

    protected $_client;
    protected $_options = [
        'retry_seconds' => 5, // 重试延时5秒
        'max_attempts'  => 5, // 最大重试次数
    ];

    public function __construct()
    {
        // 与 Redis 建立连接
        $this->_client = new \think\cache\driver\Redis(config('cache.stores.redis'));
        $this->_client->get("ping");
    }

    // 发送消息
    public function send($data, $delay = 0)
    {
        static $_id = 0;
        $id = \microtime(true) . '.' . (++$_id);
        $now = time();
        $package_str = \json_encode([
            'id'       => $id,    // 消息ID
            'time'     => $now,   // 当前时间
            'delay'    => $delay, // 延迟时长(秒)
            'attempts' => 0,      // 重试次数
            'data'     => $data   // 消息内容
        ]);

        // 如果不是延时消息,则直接将消息推送到就绪队列
        if ($delay == 0) {
            $this->_client->lpush(static::QUEUE_READY, $package_str);
        } else {
            // 否则将消息写入到有序集合中
            $this->_client->zadd(static::QUEUE_DELAY, $now + $delay, $package_str);
        }
    }

    // 从有序集合中取出数据推送到就绪队列中
    public function tryToPullDelayQueue()
    {
        while (true) {
            try {
                $now = time(); // 当前时间
                $options = ['LIMIT', 0, 128]; // 每次取 128 条数据
                $items = $this->_client->zrevrangebyscore(static::QUEUE_DELAY, $now, '-inf', $options);
                foreach ($items as $package_str) {
                    // 从有序集合中移除该数据
                    $result = $this->_client->zrem(static::QUEUE_DELAY, $package_str);
                    if ($result !== 1) {
                        continue;
                    }
                    // 将数据JSON反序列化解析
                    $package = \json_decode($package_str, true);
                    if (!$package) {
                        // 解析失败则推送到失败队列
                        $this->_client->lpush(static::QUEUE_FAILED, $package_str);
                        continue;
                    }
                    // 将数据推送到就绪队列
                    $this->_client->lpush(static::QUEUE_READY, $package_str);
                }
            } catch (\Throwable $e) {
                echo $e->getMessage() . PHP_EOL;
            }

            // 间隔1s之后再次轮询
            sleep(1);
        }
    }

    // 监听消息
    public function wait($success_callback, $failure_callback)
    {
        echo "开始监听消息..." . PHP_EOL;
        // 创建一个进程
        // 父进程用于轮询有序集合消息
        // 子进程监听就绪队列消息
        $pid = pcntl_fork();
        if ($pid < 0) {
            exit('fork error');
        } else if($pid > 0) {
            // 轮询有序集合消息并推送到就绪队列
            (new \Queue())->tryToPullDelayQueue();
            pcntl_wait($status);
            exit();
        }

        while (true) {
            try {            
                // 阻塞监听就绪队列消息
                $data = $this->_client->brpop(static::QUEUE_READY, 0);
                if ($data) {
                    $package_str = $data[1];
                    // 将数据JSON反序列化解析
                    $package = json_decode($package_str, true);
                    if (!$package) {
                        // 解析失败则推送到失败队列
                        $this->_client->lpush(static::QUEUE_FAILED, $package_str);
                    } else {
                        try {
                            // 将消息回调到我们在业务层面写的回调函数中
                            \call_user_func($success_callback, $package['data']);
                        } catch (\Throwable $e) {
                            $package['max_attempts'] = $this->_options['max_attempts'];
                            $package['error'] = $e->getMessage();
                            $package_modified = null;
                            // 如果出现异常并且我们设置了失败回调函数
                            if ($failure_callback) {
                                try {
                                    // 则会回调到我们在业务层面写的回调函数中
                                    $package_modified = \call_user_func($failure_callback, $e, $package);
                                } catch (\Throwable $ta) {
                                }
                            }
                            // 如果修改了消息内容,则重新构造消息
                            if (is_array($package_modified)) {
                                $package['data'] = $package_modified['data'] ?? $package['data'];
                                $package['attempts'] = $package_modified['attempts'] ?? $package['attempts'];
                                $package['max_attempts'] = $package_modified['max_attempts'] ?? $package['max_attempts'];
                                $package['error'] = $package_modified['error'] ?? $package['error'];
                            }
                            // 如果已经超过了最大重试次数,则将消息推送到失败队列
                            if (++$package['attempts'] > $package['max_attempts']) {
                                $this->fail($package);
                            } else {
                                // 否则进入有序集合中,等待下一轮的轮询
                                $this->retry($package);
                            }
                        }
                    }
                }
            } catch (\Throwable $e) {
                echo $e->getMessage() . PHP_EOL;
            }
        }
    }

    // 重新添加到有序集合
    protected function retry($package)
    {
        // 延时时间随着重试的次数成倍增加
        $delay = time() + $this->_options['retry_seconds'] * ($package['attempts']);
        $this->_client->zadd(static::QUEUE_DELAY, $delay, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
    }

    // 推送到失败的队列
    protected function fail($package)
    {
        $this->_client->lpush(static::QUEUE_FAILED, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
    }
}

这个是消费端脚本,主要是实现在接收到消息之后,进行具体的业务逻辑处理。

<?php
declare (strict_types = 1);

namespace app\command;

use think\facade\Cache;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Consumer extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('app\command\consumer')
            ->setDescription('the app\command\consumer command');
    }

    protected function execute(Input $input, Output $output)
    {
        (new \Queue())->wait(function($data){
            // 这里是正常接收消息的逻辑
            var_dump($data);
        }, function($e, $package){
            // 这里是消息异常的处理逻辑
            return $package;
        });
    }
}

这个是通过 API 接口将消息,推送到延时队列中。

<?php

namespace app\controller;

use app\BaseController;

class Notify extends BaseController
{
    public function sendMsg()
    {
        // 接收 GET 参数
        $params = $this->request->param();
        if (empty($params["content"])) {
            return json(["code" => -1, "msg" => "内容不能为空"]);
        }
        $content = $params["content"];

        // 推送到延时队列 15 秒之后会执行
        (new \Queue())->send($content, 15);

        return json(["code" => 0, "msg" => "success"]);
    }
}

我们来实际测试一下,先执行
php think consumer
启动消费者,然后再执行
php think run
启动服务,最后使用 Postman 工具进行调用。

Gin

通过 go mod 初始化 go_delay 项目。

## 当前目录
[manongsen@root ~]$ pwd
/home/manongsen/workspace/php_to_go/go_delay

## 初始化项目
[manongsen@root go_delay]$ go mod init go_delay

## 安装第三方依赖库
[manongsen@root go_delay]$ go get github.com/gin-gonic/gin
[manongsen@root go_delay]$ github.com/go-redis/redis

这里和上面 PHP 中的实现逻辑都差不多,有一点值得注意的是在 Go 中是利用协程来异步从 Redis 有序集合中轮询到期的消息,而 PHP 是利用的多进程。

package extend

import (
	"encoding/json"
	"fmt"
	"go_delay/app/config"
	"time"

	"github.com/go-redis/redis"
)

var comId int

const (
	// 就绪消息存放的队列
	QUEUE_READY = "redis:queue:ready"

	// 延迟消息存放的队列(实际的数据结构是有序集合)
	QUEUE_DELAY = "redis:queue:delay"

	// 失败消息存放的队列
	QUEUE_FAILED = "redis:queue:failed"
)

type PackageData struct {
	Id          string `json:"id"`           // 消息ID
	Time        int64  `json:"time"`         // 当前时间
	Delay       int    `json:"delay"`        // 延迟时长(秒)
	Attempts    int    `json:"attempts"`     // 重试次数
	MaxAttempts int    `json:"max_attempts"` // 最大重试次数
	Data        string `json:"data"`         // 消息内容
	Error       string `json:"error"`        // 错误信息
}

type Queue struct {
	RetrySeconds int
	MaxAttempts  int
}

func NewQueue() *Queue {
	return &Queue{
		RetrySeconds: 5, // 重试延时5秒
		MaxAttempts:  5, // 最大重试次数
	}
}

// 发送消息
func (q *Queue) Send(data string, delay int) {
	comId += 1
	now := time.Now().UnixMilli() / 1000
	msgId := fmt.Sprintf("%d.%d", now, comId)
	packageData := &PackageData{
		Id:       msgId,      // 消息ID
		Time:     int64(now), // 当前时间
		Delay:    delay,      // 延迟时长(秒)
		Attempts: 0,          // 重试次数
		Data:     data,       // 消息内容
	}
	packageStr, err := json.Marshal(packageData)
	if err != nil {
		fmt.Printf("json.Marshal fail, err: %v\n", err)
		return
	}

	// 如果不是延时消息,则直接将消息推送到就绪队列
	if delay == 0 {
		config.RedisConn.LPush(QUEUE_READY, packageStr)
	} else {
		// 否则将消息写入到有序集合中
		z := redis.Z{
			Score:  float64(int(now) + delay),
			Member: packageStr,
		}
		config.RedisConn.ZAdd(QUEUE_DELAY, z)
	}
}

// 从有序集合中取出数据推送到就绪队列中
func (q *Queue) tryToPullDelayQueue() {
	for {
		// 当前时间
		now := time.Now().UnixMilli() / 1000
		// 每次取 128 条数据
		z := redis.ZRangeBy{
			Max:    fmt.Sprintf("%d", now),
			Min:    "-inf",
			Offset: 0,
			Count:  128,
		}
		cmd := config.RedisConn.ZRevRangeByScore(QUEUE_DELAY, z)
		items, err := cmd.Result()
		if err != nil {
			fmt.Printf("ZRevRangeByScore cmd.Result fail, err: %v\n", err)
			continue
		}
		for _, item := range items {
			// 从有序集合中移除该数据
			intCmd := config.RedisConn.ZRem(QUEUE_DELAY, item)
			if intCmd.Err() != nil {
				continue
			}
			var packageData *PackageData
			// 将数据JSON反序列化解析
			err = json.Unmarshal([]byte(item), &packageData)
			if err != nil {
				// 解析失败则推送到失败队列
				fmt.Printf("json.Unmarshal fail, err: %v\n", err)
				config.RedisConn.LPush(QUEUE_FAILED, item)
				continue
			}
			// 将数据推送到就绪队列
			config.RedisConn.LPush(QUEUE_READY, item)
		}

		// 间隔1s之后再次轮询
		time.Sleep(time.Second)
	}
}

func (q *Queue) Wait(successCallback func(string) error, failureCallback func(error, *PackageData) *PackageData) {
	// 启动一个协程用于轮询有序集合消息并推送到就绪队列
	go q.tryToPullDelayQueue()

	for {
		// 阻塞监听就绪队列消息
		stringSliceCmd := config.RedisConn.BRPop(0, QUEUE_READY)
		if stringSliceCmd.Err() != nil {
			fmt.Printf("RedisConn.BRPop stringSliceCmd.Err fail, err: %v\n", stringSliceCmd.Err().Error())
			continue
		}
		data, err := stringSliceCmd.Result()
		if err != nil {
			fmt.Printf("RedisConn.BRPop stringSliceCmd.Result fail, err: %v\n", err)
			continue
		}
		// 将数据JSON反序列化解析
		var packageData *PackageData
		packageStr := data[1]
		err = json.Unmarshal([]byte(packageStr), &packageData)
		if err != nil {
			fmt.Printf("json.Unmarshal fail, err: %v\n", err)
			// 解析失败则推送到失败队列
			config.RedisConn.LPush(QUEUE_FAILED, packageStr)
			continue
		}

		// 将消息回调到我们在业务层面写的回调函数中
		err = successCallback(packageData.Data)
		if err != nil {
			fmt.Printf("successCallback fail, err: %v\n", err)

			// 如果出现异常并且我们设置了失败回调函数
			packageData.MaxAttempts = q.MaxAttempts
			packageData.Error = err.Error()
			if failureCallback != nil {
				// 则会回调到我们在业务层面写的回调函数中
				packageModified := failureCallback(err, packageData)
				// 重新构造消息
				packageData.Data = packageModified.Data
				packageData.Attempts = packageModified.Attempts
				packageData.MaxAttempts = packageModified.MaxAttempts
				packageData.Error = packageModified.Error
			}
			continue
		}

		// 如果已经超过了最大重试次数,则将消息推送到失败队列
		packageData.Attempts += 1
		if packageData.Attempts > packageData.MaxAttempts {
			q.fail(packageData)
		} else {
			// 否则进入有序集合中,等待下一轮的轮询
			q.retry(packageData)
		}
	}
}

// 重新添加到有序集合
func (q *Queue) retry(packageData *PackageData) {
	// 延时时间随着重试的次数成倍增加
	delay := time.Now().Second() + q.RetrySeconds*packageData.Attempts
	packageStr, err := json.Marshal(packageData)
	if err != nil {
		fmt.Printf("json.Marshal fail, err: %v\n", err)
		return
	}
	z := redis.Z{
		Score:  float64(delay),
		Member: packageStr,
	}
	config.RedisConn.ZAdd(QUEUE_DELAY, z)
}

// 推送到失败的队列
func (q *Queue) fail(packageData *PackageData) {
	packageStr, err := json.Marshal(packageData)
	if err != nil {
		fmt.Printf("json.Marshal fail, err: %v\n", err)
		return
	}
	config.RedisConn.LPush(QUEUE_FAILED, packageStr)
}

func InitQueue() {
	queue := NewQueue()
	queue.Wait(func(data string) error {
		// 正常接收到消息
		fmt.Printf("接收到消息: %s\n", data)
		return nil
	}, func(err error, packageData *PackageData) *PackageData {
		// 消息异常了在这里增加处理逻辑
		return packageData
	})
}

使用
go extend.InitQueue()
启动了一个消费者。从这里可以看出在 Go 中不需要单独启动一个消费者脚本进程,只需启动一个异步的协程即可监听消息,因此在 Go 中实现 Redis 延时队列相较于 PHP 要方便很多。

package main

import (
	"go_delay/app"
	"go_delay/app/config"
	"go_delay/app/extend"

	"github.com/gin-gonic/gin"
)

func main() {
	r := gin.Default()
	app.InitRoutes(r)
	config.InitRedis()
	go extend.InitQueue()
	r.Run(":8001")
}

这个是通过 API 接口将消息,推送到延时队列中。

package controller

import (
	"go_delay/app/extend"
	"net/http"

	"github.com/gin-gonic/gin"
)

func SendMsg(c *gin.Context) {
	// 接收 GET 参数
	content := c.Query("content")
	if len(content) == 0 {
		c.JSON(http.StatusOK, gin.H{
			"msg":  "内容不能为空",
			"code": -1,
		})
		return
	}

	// 推送到延时队列 15 秒之后会执行
	queue := extend.NewQueue()
	queue.Send(content, 15)

	// 直接返回
	c.JSON(http.StatusOK, gin.H{
		"code": 0,
		"msg":  "success",
	})
}

我们直接执行
go run main.go
启动服务,然后使用 Postman 工具进行调用。

结语

看到这里我相信大家已经对基于 Redis 延时队列的实现方式,有所了解了。从上面的例子中可以看出来,这次延时队列用到的核心数据结构是 Redis 的列表和有序集合。有序集合主要用于存放设置了延时时长的消息,而列表存放的是就绪的消息,即等着被消费者消费的消息。

从 PHP 和 Go 两者语言的区别来看,在 PHP 中需要单独启动消费者脚本,还有在轮询有序集合中到期的消息,也需要在额外的进程中进行,不然就会阻塞消息的消费逻辑。而在 Go 中只需要异步开启一个协程就可以等待消息的到来,轮询到期的消息也再另外开启一个协程便可以完成对应的操作,单从这一点就可以看出 Go 的优势比 PHP 的要大。

此外,在 Go 语言中还可以利用通道 Channel 来替代 Redis,同样也可以实现延时队列,不过 Channel 不能持久化到磁盘,一旦服务挂了消息就丢失了,所以还是老老实实用 Redis 的好。再好的技术知识,也需要亲自来实践才能吸收,所以建议大家手动实践一下,如果有想要获取完整案例代码的朋友,可以在公众号内回复「8392」即可,本次分享的内容就到这里结束了,希望对大家能有所帮助。

感谢大家阅读,个人观点仅供参考,欢迎在评论区发表不同观点。

欢迎关注、分享、点赞、收藏、在看,我是微信公众号「码农先森」作者。