2024年8月

在光照条件不佳下捕获的图像可能同时包含过曝和欠曝。目前的方法主要集中在调整图像亮度上,这可能会加剧欠曝区域的色调失真,并且无法恢复过曝区域的准确颜色。论文提出通过学习估计和校正这种色调偏移,来增强既有过曝又有欠曝的图像。先通过基于
UNet
的网络推导输入图像的增亮和变暗版本的色彩特征图,然后使用伪正常特征生成器生成伪正常色彩特征图。接着,通过论文提出的
COlor Shift Estimation
(
COSE
) 模块来估计推导的增亮(或变暗)色彩特征图与伪正常色彩特征图之间的色调偏移,分别校正过曝和欠曝区域的估计色调偏移。最后,使用提出的
COlor MOdulation
(
COMO
) 模块来调制过曝和欠曝区域中分别校正后的颜色,以生成增强图像。

来源:晓飞的算法工程笔记 公众号

论文: Color Shift Estimation-and-Correction for Image Enhancement

Introduction


现实世界的场景通常涉及广泛的照明条件,这对摄影构成了重大挑战。尽管相机具有自动曝光模式来根据场景亮度确定“理想”的曝光设置,但是在整个图像范围内均匀调整曝光仍可能导致区域过度明亮和过度昏暗,这种欠曝和过曝的区域可能表现出明显的色调失真。欠曝区域相对较高的噪音水平会改变数据分布,导致色调偏移,而过曝区域则会失去原始的色彩。因此,增强这类图像通常涉及到亮度调整和色调偏移校正。

近年来,已经进行了许多努力来增强不正确曝光的图像。这些方法可以大致分为两类。

  1. 第一类专注于增强过曝或欠曝的图像。一些方法提出学习曝光不变的表示空间,其中不同的曝光水平可以映射到一个标准化和不变的表示中。其他方法则提出将频率信息与空间信息整合,这有助于模拟图像固有的结构特征,从而增强图像的亮度和结构失真。然而,上述方法通常假设过度或欠曝发生在整个图像上,对于同时存在过度曝光和欠曝光的图像(例如,图
    1
    (b)),它们效果不佳。
  2. 第二类工作旨在增强同时存在过度曝光和欠曝光的图像,利用局部颜色分布作为先验来引导增强过程。然而,尽管设计了金字塔式的局部颜色分布先验,仍然倾向于产生在大面积均质区域中出现显著色彩偏移的结果(例如,图
    1
    (c))。

本文旨在校正同时存在过度曝光和欠曝光的图像的亮度和色彩失真问题。为了解决这个问题,首先在图
1
(f)和
1
(g)中展示了从两个相关数据集(
MSEC

LCDP
)中随机抽样的像素的
PCA
结果。
MSEC
数据集中每个场景包含五张不同曝光值(
EV
)的输入图像,而
LCDP
数据集中每个场景只有一张同时包含过度曝光和欠曝光的输入图像。从这个初步研究中,可以得出了两个观察结果。

  1. 在这两个数据集中,欠曝光像素(绿点)倾向于与过度曝光像素(红点)有相反的分布偏移。

  2. MSEC
    数据集包含了
    0 EV
    输入图像作为曝光标准化过程的参考图像不同,
    LCDP
    的图像没有这样的“正常曝光”像素。

第一个观察结果启发我们考虑估计和校正这样的色彩偏移,而第二个观察结果则启发我们创建伪正常曝光特征图,作为色彩偏移估计和矫正的参考。

为此,论文提出了一种新方法,联合调整图像亮度并校正色调失真。首先使用基于
UNet
的网络,从输入图像的增亮和变暗版本中提取过度曝光和欠曝光区域的色彩特征图。接着,伪正常特征生成器基于这些派生的色彩特征图创建伪正常色彩特征图。随后,论文提出了一种新的颜色偏移估计(
COSE
)模块,分别估计和校正派生的增亮(或变暗)色彩特征图与创建的伪正常色彩特征图之间的色彩偏移,通过在颜色特征域中扩展可变形卷积来实现
COSE
模块。进一步,论文提出了一种新的颜色调制(
COMO
)模块,通过定制的交叉注意力机制,在过度曝光和欠曝光区域的分别校正的色彩上进行调制,以生成增强图像。通过在输入图像和估计的变暗/增亮色彩偏移上执行定制的交叉注意力机制来实现
COMO
模块,图
1
(d)显示了我们的方法能够生成视觉上令人愉悦的图像。

论文的主要贡献可以总结如下:

  1. 提出了一种新颖的神经网络方法,通过建模色彩分布的变化来增强同时存在过度曝光和欠曝光的图像。

  2. 提出了一种新颖的神经网络,包括两个新模块:一是用于分别估计和校正过度曝光和欠曝光区域中色彩的新颖颜色偏移估计(
    COSE
    )模块,二是用于调制校正后的颜色以生成增强图像的新颖颜色调制(
    COMO
    )模块。

  3. 广泛的实验证明,论文的网络具有轻量化的特点,并且在流行的基准测试中表现优于现有的图像增强方法。

Proposed Method


论文的方法受到两点观察的启发。首先,与欠曝光像素相比,过曝光像素倾向于具有反向分布偏移,这表明有必要分别捕捉和修正这样的色彩偏移。其次,由于绝大多数(如果不是全部)像素都受到过曝光或欠曝光的影响,因此有必要创建伪正常曝光信息,以指导过曝光或欠曝光像素色彩偏移的估计。基于这两点观察,我们提出了一种新的网络,其中包括两个新模块:新的色彩偏移估计(
COSE
)模块和新的色彩调制(
COMO
)模块,用于增强具有过曝光或欠曝光的图像。

Network Overview

给定一个具有过曝光和欠曝光的输入图像
\(I_x\in \mathcal{R}^{3\times H\times W}\)
,旨在生成一个增强后的图像
\(I_y\in \mathcal{R}^{3\times H\times W}\)
,具有校正的图像亮度以及恢复的图像细节和颜色,模型结构如图
2
所示。给定输入图像
\(I_x\)
,首先通过计算其反向版本
\(\hat{I}_x=1-I_x\)
,然后将两者输入基于
UNet
的网络,以提取两个光照图
\(F_L^U\in \mathcal{R}^{1\times H\times W}\)

\(F_L^O\in \mathcal{R}^{1\times H\times W}\)
,这两个光照图(即
\(F_L^U\)

\(F_L^O\)
)分别表示受欠曝光和过曝光影响的区域。接下来,计算暗化特征图
\(F_D\)
和增亮特征图
\(F_B\)
,具体如下:

\[\begin{align}
F_B = \frac{I_x}{F_L^U} &= \frac{I_x}{f(I_x)}, \\
F_D = 1-\frac{1-I_x}{F_L^O} &= 1 - \frac{1-I_x}{f(1 - I_x)},
\end{align}
\]

其中,
\(f(\cdot)\)
表示基于
UNet
的特征提取器。根据增亮和暗化的特征图
\(F_B, F_D \in \mathbb{R}^{3 \times H \times W}\)
来建模色彩偏移。

给定
\(F_B\)

\(F_D\)
,首先使用伪正常特征生成器将它们与输入图像
\(I_x\)
融合,生成伪正常特征图
\(F_N\)
,具体如下:

\[\begin{align}
F_N = g(F_B, F_D, I_x),
\end{align}
\]

其中,
\(g(\cdot)\)
表示伪正常曝光生成器。然后,将
\(F_N\)
可以作为参考,分别通过两个
COSE
模块引导估计
\(F_B\)

\(F_N\)
以及
\(F_D\)

\(F_N\)
之间的色彩偏移。这两个
COSE
模块产生的暗化偏移
\(O_D\)
和增亮偏移
\(O_B\)
,针对输入图像
\(I_x\)
模拟了亮度和色彩的变化。因此,
\(O_D\)

\(O_B\)

\(I_x\)
被送入提出的
COMO
模块中,用于调整图像的亮度并纠正色彩偏移,生成最终的图像
\(I_y\)

Color Shift Estimation (COSE) Module

与亮度调整不同,色彩偏移校正更具挑战性,因为它本质上要求网络在
RGB
色彩空间中建模像素方向,而不是像素强度的幅度。尽管有一些工作使用余弦相似性正则化来帮助在训练过程中保持图像的颜色,但这样的策略通常在大面积低曝光或过曝区域失败,因为这些区域中的小值或高值像素预期具有不同的颜色。

论文提出基于可变形卷积技术的
COSE
模块来解决这一问题。可变形卷积(
DConv
)通过引入空间偏移
\(\Delta p_n\)
扩展了普通卷积,能够自适应地在任何
\(N\times N\)
像素的任意位置执行卷积,其中
\(N\times N\)
表示卷积核的大小。调制项
\(\Delta m_n\)
被提出来为不同的卷积核位置分配不同的权重,使卷积运算符聚焦于重要的像素。虽然可变形卷积可以预测相对于基础的偏移量,从而捕捉颜色分布的变化,但由于之前的方法只在像素空间域应用了可变形卷积,论文提出将可变形卷积扩展到空间域和色彩空间中,以联合建模亮度变化和色彩偏移。

如图
3
所示,
COSE
模块首先沿通道维度连接伪正常特征图
\(F_N\)
和增亮/暗化特征图
\(F_B\)
/
\(F_D\)
,然后使用三个独立的
\(3\times 3\)
卷积来提取位置偏移
\(\Delta p_n\in \mathcal{R}^{B\times 2N\times H\times W}\)
,颜色偏移
\(\Delta c_n\in \mathcal{R}^{B\times 3N\times H\times W}\)
和调制项
\(\Delta m_n\in \mathcal{R}^{B\times N\times H\times W}\)
。位置偏移
\(\Delta p_n\)
和调制项
\(\Delta m_n\)
在空间域内执行,以聚合卷积操作中变形不规则感受野的空间上下文信息。此外,引入了颜色偏移
\(\Delta c_n\)
,用于表示每个通道在每个卷积核位置上的颜色偏移。学习到的颜色偏移
\(\Delta c_n\)
被设计为具有
\(3N\)
个通道,用于模拟具有
3
个通道的输入
sRGB
图像的颜色偏移。

可变形卷积在空间域和色彩空间中的计算可以写成:

\[\begin{align}
y = \sum_{p_n\in \mathcal{R}} (w_n\cdot x(p_0 + p_n + \Delta p_n) + \Delta c_n) \cdot \Delta m_n,\label{eq:cdc}
\end{align}
\]

其中,
\(x\)
表示卷积操作的输入特征,而
\(p_0\)

\(p_n\)

\(\Delta p_n\)
是表示空间位置的二维变量。
\(y\)
(或
\(y(p_0)\)
)表示输入图像中每个像素
\(p_0\)
的色彩空间可变形卷积的输出。集合
\(\mathcal{R} = \{(-1, -1), (-1, 0), \dots, (1, 1)\}\)
表示常规
\(3\times 3\)
卷积核的网格。
\(n\)

\(\mathcal{R}\)
中元素的枚举器,指示第
\(n\)
个位置,
\(N\)

\(\mathcal{R}\)
的长度(对于常规
\(3\times 3\)
卷积核,
\(N=9\)
)。由于位移
\(\Delta p_n\)
在实践中可能具有小数,采用双线性插值进行计算,这与空间可变形卷积相一致。

Color Modulation (COMO) Module

COMO
模块用于调节输入图像的亮度和颜色,生成最终的输出图像
\(I_y\)
,基于学习到的亮化特征
\(F_B\)
和变暗特征
\(F_D\)
之间的偏移量
\(O_B\)
/
\(O_D\)
,以及伪正常特征
\(F_N\)
。由于在生成具有和谐颜色的校正图像时聚合全局信息至关重要,论文从非局部上下文建模中汲取灵感,并通过将
self-affinity
计算扩展为
cross-affinity
计算来制定
COMO
模块,以便
COMO
能够通过查询
\(O_B\)

\(O_D\)
来增强输入图像。

如图
4
所示,为处理输入图像
\(I_x\)
、变暗偏移量
\(O_D\)
和亮化偏移量
\(O_B\)
分别分配了三个分支,每个分支包含三个
\(1\times 1\)
卷积层(分别表示为
\(Conv\psi\)

\(Conv\phi\)

\(ConvZ\)
)。然后,在每个分支中计算
self-affinity
矩阵
\(A_i\)
,如下所示:

\[\begin{align}
A_i = \psi_i \otimes \phi_i,\ for\ i\in \{I, B, D\},
\end{align}
\]

其中,
\(\otimes\)
表示矩阵乘法,
\(\psi_i\)

\(\phi_i\)
分别是由
\(Conv\psi\)

\(Conv\phi\)
得到的特征图。然后,
\(A_i\)
被对称化并归一化,以确保存在实特征值并稳定反向传播。
\(A_i\)
的每一行用作空间注意力图,而
\(Z_i\)
(通过
\(ConvZ\)
获得)用作注意力图的权重。接下来,通过矩阵乘法建模
\(I_x\)

\(O_B\)
/
\(O_D\)
之间的相关性,并将它们与
self-affinity
特征相加,如下所示:

\[\begin{align}
f_j = w_1 A_j \otimes Z_j + w_2 A_j \otimes Z_I,
\end{align}
\]

其中,
\(j \in \{B, D\}\)
是亮化或变暗分支中的亲和矩阵
\(A_j\)
和特征图
\(Z_j\)
的索引。
\(w_1\)

\(w_2\)
是由
\(1\times 1\)
卷积生成的权重矩阵。在公式
6
中,第一项是为了发现由
COSE
学习到的
\(O_B\)

\(O_D\)
中显著的颜色偏移区域,而第二项旨在利用输入
\(Z_I\)
的学习权重来关注
\(O_B\)

\(O_D\)
的注意力图,以了解输入的显著区域中的偏移情况。

最后,将
\(f_B\)

\(f_D\)
和输入图像
\(I_x\)
结合起来,作为指导输入图像的探索的颜色偏移,生成最终的结果
\(I_y\)
,如下所示:

\[\begin{align}
I_y = w_4(BN(f_B) + BN(f_D) + w_3A_I\otimes Z_I) + I_x,
\end{align}
\]

其中,
\(BN(\cdot)\)
表示批量归一化,
\(w_3\)

\(w_4\)
是由
\(1\times 1\)
卷积生成的权重矩阵。

Loss Function

使用两个损失函数
\(\mathcal{L}_{pesudo}\)

\(\mathcal{L}_{output}\)
来训练。由于需要生成一个伪正常的特征图来帮助识别颜色偏移,使用
\(\mathcal{L}_{pesudo}\)
来为生成过程提供中间监督。

\[\begin{align}
\mathcal{L}_{pesudo} = ||F_N - GT||_1.
\end{align}
\]

\(\mathcal{L}_{output}\)
包含四个项,用于监督网络生成增强图像,即
\(L1\)
损失,余弦相似度
\(\mathcal{L}_{cos}\)
,结构相似性(
SSIM
)损失
\(\mathcal{L}_{ssim}\)

VGG
损失
\(\mathcal{L}_{vgg}\)

\(\mathcal{L}_{output}\)
可以表达为:

\[\begin{align}
\mathcal{L}_{output} = \lambda_1 \mathcal{L}_{L1} + \lambda_2 \mathcal{L}_{cos} + \lambda_3 \mathcal{L}_{ssim} + \lambda_4 \mathcal{L}_{vgg},
\end{align}
\]

其中,
\(\lambda_1\)

\(\lambda_2\)

\(\lambda_3\)

\(\lambda_4\)
是四个平衡超参数。整体损失函数为:

\[\begin{align}
\mathcal{L} = \lambda_p \mathcal{L}_{pesudo} + \lambda_o \mathcal{L}_{output},
\end{align}
\]

其中,
\(\lambda_p\)

\(\lambda_o\)
是两个平衡超参数。

Experiments




如果本文对你有帮助,麻烦点个赞或在看呗~
更多内容请关注 微信公众号【晓飞的算法工程笔记】

work-life balance.

前言

在上一篇文章:
OpenTelemetry 实战:从零实现分布式链路追踪
讲解了链路相关的实战,本次我们继续跟进如何使用 OpenTelemetry 集成 metrics 监控。

建议对指标监控不太熟的朋友可以先查看这篇前菜文章:
从 Prometheus 到 OpenTelemetry:指标监控的演进与实践

名称 作用 语言 版本
java-demo 发送 gRPC 请求的客户端 Java opentelemetry-agent: 2.4.0/SpringBoot: 2.7.14
k8s-combat 提供 gRPC 服务的服务端 Golang go.opentelemetry.io/otel: 1.28/ Go: 1.22
Jaeger trace 存储的服务端以及 TraceUI 展示 Golang jaegertracing/all-in-one:1.56
opentelemetry-collector-contrib OpenTelemetry 的 collector 服务端,用于收集 trace/metrics/logs 然后写入到远端存储 Golang otel/opentelemetry-collector-contrib:0.98.0
Prometheus 作为 metrics 的存储和展示组件,也可以用
VictoriaMetrics
等兼容 Prometheus 的存储替代。
Golang quay.io/prometheus/prometheus:v2.49.1
image.png

快速开始

以上是加入 metrics 之后的流程图,在原有的基础上会新增一个
Prometheus
组件,collector 会将 metrics 指标数据通过远程的 remote write 的方式写入到 Prometheus 中。

Prometheus 为了能兼容 OpenTelemetry 写入过来的数据,需要开启相关
特性
才可以。

如果是 docker 启动的话需要传入相关参数:

docker run  -d -p 9292:9090 --name prometheus \
-v /prometheus/prometheus.yml:/etc/prometheus/prometheus.yml \
quay.io/prometheus/prometheus:v2.49.1 \
--config.file=/etc/prometheus/prometheus.yml \
--storage.tsdb.path=/prometheus \
--web.console.libraries=/etc/prometheus/console_libraries \
--web.console.templates=/etc/prometheus/consoles \
--enable-feature=exemplar-storage \
--enable-feature=otlp-write-receiver

--enable-feature=otlp-write-receiver
最主要的就是这个参数,用于开启接收 OTLP 格式的数据。

但使用这个 Push 特性就会丧失掉 Prometheus 的许多 Pull 特性,比如服务发现,定时抓取等,不过也还好,Push 和 Pull 可以同时使用,原本使用 Pull 抓取的组件依然不受影响。

修改 OpenTelemetry-Collector

接着我们需要修改下 Collector 的配置:

exporters:
  debug:
  otlp:
    endpoint: "jaeger:4317"
    tls:
      insecure: true
  otlphttp/prometheus:
    endpoint: http://prometheus:9292/api/v1/otlp
    tls:
      insecure: true      

processors:
  batch:

service:
  pipelines:
    traces:
      receivers:
      - otlp
      processors: [batch]
      exporters:
      - otlp
      - debug        
    metrics:
      exporters:
      - otlphttp/prometheus
      - debug
      processors:
      - batch
      receivers:
      - otlp

这里我们在
exporter
中新增了一个
otlphttp/prometheus
的节点,用于指定导出
prometheus

endpoint
地址。

同时我们还需要在
server.metrics.exporters
中配置相同的 key:
otlphttp/prometheus

需要注意的是这里我们一定得是配置在
metrics.exporters
这个节点下,如果配置在
traces.exporters
下时,相当于是告诉 collector 讲 trace 的数据导出到
otlphttp/prometheus.endpoint
这个 endpoint 里了。

所以重点是需要理解这里的配对关系。

运行效果

这样我们只需要将应用启动之后就可以在 Prometheus 中查询到应用上报的指标了。

java -javaagent:opentelemetry-javaagent-2.4.0-SNAPSHOT.jar \
-Dotel.traces.exporter=otlp \
-Dotel.metrics.exporter=otlp \
-Dotel.logs.exporter=none \
-Dotel.service.name=java-demo \
-Dotel.exporter.otlp.protocol=grpc \
-Dotel.propagators=tracecontext,baggage \
-Dotel.exporter.otlp.endpoint=http://127.0.0.1:5317 -jar target/demo-0.0.1-SNAPSHOT.jar

# Run go app
export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:5317 OTEL_RESOURCE_ATTRIBUTES=service.name=k8s-combat
./k8s-combat

因为我们在 collector 中开启了 Debug 的 exporter,所以可以看到以下日志:

2024-07-22T06:34:08.060Z	info	MetricsExporter	{"kind": "exporter", "data_type": "metrics", "name": "debug", "resource metrics": 1, "metrics": 18, "data points": 44}

此时是可以说明指标上传成功的。

然后我们打开
Prometheus
的地址:
http://127.0.0.1:9292/graph
便可以查询到 Java 应用和 Go 应用上报的指标。

OpenTelemetry 的 javaagent 会自动上报 JVM 相关的指标。


而在 Go 程序中我们还是需要显式的配置一些埋点:

func initMeterProvider() *sdkmetric.MeterProvider {  
    ctx := context.Background()  
  
    exporter, err := otlpmetricgrpc.New(ctx)  
    if err != nil {  
       log.Printf("new otlp metric grpc exporter failed: %v", err)  
    }  
    mp := sdkmetric.NewMeterProvider(  
       sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)),  
       sdkmetric.WithResource(initResource()),  
    )    otel.SetMeterProvider(mp)  
    return mp  
}

mp := initMeterProvider()
defer func() {
	if err := mp.Shutdown(context.Background()); err != nil {
		log.Printf("Error shutting down meter provider: %v", err)
	}
}()

和 Tracer 类似,我们首先也得在 main 函数中调用
initMeterProvider()
函数来初始化 Meter,此时它会返回一个
sdkmetric.MeterProvider
对象。

OpenTelemetry Go 的 SDK 中已经提供了对 go runtime 的自动埋点,我们只需要调用相关函数即可:

err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second))
if err != nil {
    log.Fatal(err)
}

之后我们启动应用,在 Prometheus 中就可以看到 Go 应用上报的相关指标了。
image.png

runtime_uptime_milliseconds_total Go 的运行时指标

Prometheus
中展示指标的 UI 能力有限,通常我们都是配合
grafana
进行展示的。
image.png

手动上报指标

当然除了 SDK 自动上报的指标之外,我们也可以类似于 trace 那样手动上报一些指标;

比如我就想记录某个函数调用的次数。

var meter =  otel.Meter("test.io/k8s/combat")  
apiCounter, err = meter.Int64Counter(  
    "api.counter",  
    metric.WithDescription("Number of API calls."),  
    metric.WithUnit("{call}"),  
)  
if err != nil {  
    log.Err(err)  
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {  
    defer apiCounter.Add(ctx, 1)  
    return &pb.HelloReply{Message: fmt.Sprintf("hostname:%s, in:%s, md:%v", name, in.Name, md)}, nil  
}

只需要创建一个
Int64Counter
类型的指标,然后在需要埋点处调用它的函数
apiCounter.Add(ctx, 1)
即可。

image.png
之后便可以在
Prometheus
中查到这个指标了。

除此之外 OpenTelemetry 中的 metrics 定义和 Prometheus 也是类似的,还有以下几种类型:

  • Counter
    :单调递增计数器,比如可以用来记录订单数、总的请求数。
  • UpDownCounter
    :与 Counter 类似,只不过它可以递减。
  • Gauge
    :用于记录随时在变化的值,比如内存使用量、CPU 使用量等。
  • Histogram
    :通常用于记录请求延迟、响应时间等。

在 Java 中也提供有类似的 API 可以完成自定义指标:

messageInCounter = meter    
        .counterBuilder(MESSAGE_IN_COUNTER)    
        .setUnit("{message}")    
        .setDescription("The total number of messages received for this topic.")    
        .buildObserver();

对于 Gauge 类型的数据用法如下,使用
buildWithCallback
回调函数上报数据,
OpenTelemetry
会在框架层面每 30s 回调一次。

public static void registerObservers() {      
    Meter meter = MetricsRegistration.getMeter();      
      
    meter.gaugeBuilder("pulsar_producer_num_msg_send")      
            .setDescription("The number of messages published in the last interval")      
            .ofLongs()      
            .buildWithCallback(      
                    r -> recordProducerMetrics(r, ProducerStats::getNumMsgsSent));  
  
private static void recordProducerMetrics(ObservableLongMeasurement observableLongMeasurement, Function<ProducerStats, Long> getter) {      
    for (Producer producer : CollectionHelper.PRODUCER_COLLECTION.list()) {      
        ProducerStats stats = producer.getStats();      
        String topic = producer.getTopic();      
        if (topic.endsWith(RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX)) {      
            continue;      
        }        observableLongMeasurement.record(getter.apply(stats),      
                Attributes.of(PRODUCER_NAME, producer.getProducerName(), TOPIC, topic));      
    }}

更多具体用法可以参考官方文档链接:
https://opentelemetry.io/docs/languages/java/instrumentation/#metrics

如果我们不想将数据通过 collector 而是直接上报到 Prometheus 中,使用 OpenTelemetry 框架也是可以实现的。

我们只需要配置下环境变量:

export OTEL_METRICS_EXPORTER=prometheus

这样我们就可以访问
http://127.0.0.1:9464/metrics
获取到当前应用暴露出来的指标,此时就可以在
Prometheus
里配置好采集 job 来获取数据。

scrape_configs:
  - job_name: "k8s-combat"
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
      - targets: ["k8s-combat:9464"]   

这就是典型的 Pull 模型,而 OpenTelemetry 推荐使用的是 Push 模型,数据由 OpenTelemetry 进行采集然后推送到 Prometheus。

这两种模式各有好处:

Pull模型 Push 模型
优点 可以在一个集中的配置里管理所有的抓取端点,也可以为每一个应用单独配置抓取频次等数据。 在 OpenTelemetry 的 collector中可以集中对指标做预处理之后再将过滤后的数据写入 Prometheus,更加的灵活。
缺点 1. 预处理指标比较麻烦,所有的数据是到了 Prometheus 后再经过relabel处理后再写入存储。
2. 需要配置服务发现
1. 额外需要维护一个类似于 collector 这样的指标网关的组件

比如我们是用和 Prometheus 兼容的 VictoriaMetrics 采集了 istio 的相关指标,但里面的指标太多了,我们需要删除掉一部分。

就需要在采集任务里编写规则:

apiVersion: operator.victoriametrics.com/v1beta1  
kind: VMPodScrape  
metadata:  
  name: isito-pod-scrape  
spec:  
  podMetricsEndpoints:  
    - scheme: http  
      scrape_interval: "30s"  
      scrapeTimeout: "30s"  
      path: /stats/prometheus  
      metricRelabelConfigs:  
        - regex: ^envoy_.*|^url\_\_\_\_.*|istio_request_bytes_sum|istio_request_bytes_count|istio_response_bytes_sum|istio_request_bytes_sum|istio_request_duration_milliseconds_sum|istio_response_bytes_count|istio_request_duration_milliseconds_count|^ostrich_apigateway.*|istio_request_messages_total|istio_response_messages_total  
          action: drop_metrics  
  namespaceSelector:  
    any: true

换成在 collector 中处理后,这些逻辑都可以全部移动到 collector 中集中处理。

总结

metrics 的使用相对于 trace 更简单一些,不需要理解复杂的 context、span 等概念,只需要搞清楚有哪几种 metrics 类型,分别应用在哪些不同的场景即可。

参考链接:

前言

微服务架构已经成为搭建高效、可扩展系统的关键技术之一,然而,现有许多微服务框架往往过于复杂,使得我们普通开发者难以快速上手并体验到微服务带了的便利。为了解决这一问题,于是作者精心打造了一款最接地气的 .NET 微服务框架,帮助我们轻松构建和管理微服务应用。

本框架不仅支持 Consul 服务注册与发现,还自带了一系列高级特性,包括配置中心、链路跟踪(APM)、服务网关等,极大地简化了微服务的开发和运维过程。

此外框架还实现了 Saga 分布式事务、RabbitMQ 事件总线等功能,确保系统能够高效处理复杂的业务逻辑。更重要的是提供了一个人性化的 Dashboard 管理面板,使得监控和管理微服务集群变得方便。

通过本文的介绍大家可以学习到如何快速上手并充分利用这些特性,从而构建出既高效又稳定的微服务应用。

项目介绍

Wing 致力于打造一个功能强大且易于使用的 .NET 微服务框架,支持 .NET 6+ 运行平台。

该框架具备以下特点:

1、服务注册与发现
:支持 Consul 服务注册与发现机制,确保服务间的自动发现和动态管理。

2、服务间通讯
:支持 HTTP 和 gRPC 两种调用方式,内置负载均衡器,实现高效的服务间通信。

3、服务策略与异常处理
:提供服务策略配置,支持服务异常降级处理,确保系统的稳定性和可靠性。

4、Saga 分布式事务
:支持三种恢复策略(向前恢复、向后恢复、先前再后),确保事务的一致性和完整性。

5、配置中心
:内置配置中心,实现服务配置的在线集中统一管理。

6、链路追踪与性能监控
:支持 HTTP/gRPC/SQL 的链路追踪(APM)及耗时分析统计,帮助开发者快速定位性能瓶颈。

7、服务网关
:内置服务网关,支持全局服务策略和个性化服务策略配置,简化服务入口管理。

8、事件总线
:支持 RabbitMQ 事件总线,实现服务间的异步通信和事件传递。

9、管理界面
:提供人性化的 Dashboard 管理界面,便于监控和管理整个微服务集群。

Wing 框架为开发者提供一个强大而直观的开发平台,帮助快速构建和管理高效、可扩展的微服务应用。

快速入门

1、服务注册

什么是服务注册?

服务注册是指服务启动后将该服务的IP、端口等信息注册到
Consul

创建一个Web API 项目

提前准备:安装并启动Consul

打开 Visual Studio 2022 并创建Web API项目

安装依赖包

dotnet add package Wing.Consul

Program代码

builder.Services.AddWing();

添加配置

{//是否启用配置中心,默认启用
  "ConfigCenterEnabled": false,"Consul": {"Url": "http://localhost:8500","Service": {//Http  Grpc
      "Option": "Http","HealthCheck": {"Url": "http://localhost:1210/health",//单位:秒
        "Timeout": 10,//单位:秒
        "Interval": 10},"Name": "Wing.Demo_1.2.1","Host": "localhost","Port": 1210,"Tag": "","LoadBalancer": {//RoundRobin  WeightRoundRobin LeastConnection
        "Option": "WeightRoundRobin",//权重
        "Weight": 60},"Scheme": "http","Developer": "linguicheng"},//定时同步数据时间间隔,单位:秒 小于等于0表示立即响应
    "Interval": 10,//数据中心
    "DataCenter": "dc1",//等待时间,单位:分钟
    "WaitTime": 3}
}

查看运行效果

程序运行后,打开consul UI管理界面,可以看到注册服务
Wing.Demo_1.2
,具体如下图所示:

2、启动UI

Wing.UI

Wing
微服务框架中的一个可视化操作管理系统,主要功能有服务治理、配置中心、APM管理、Saga分布式事务查询。

安装依赖包

安装服务注册nuget包
Wing.Consul
,UI可视化界面管理nuget包
Wing.UI
,选择对应的数据库驱动(
参考FreeSql官网
),以SqlServer为例,安装
FreeSql.Provider.SqlServer

dotnet add package Wing.UI
dotnet add package FreeSql.Provider.SqlServer

Program代码

using Wing;var builder =WebApplication.CreateBuilder(args);
builder.Host.AddWing(builder
=>builder.AddConsul());
builder.Services.AddWing().AddWingUI(FreeSql.DataType.SqlServer);

查看运行效果

程序运行后,浏览器访问 ,运行效果如下图:

可以看到示例
1.2
注入的服务`Wing.Demo_1.2

3、服务发现与调用

什么是服务发现?

服务发现是指服务启动后将服务注册信息定时同步刷新到本地或实时获取
Consul
的服务信息。

安装依赖包

dotnet add package Wing.Consul

Grpc健康检查

protobuf文件

syntax = "proto3";
package grpc.health.v1;
message HealthCheckRequest {
string service
= 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN
= 0;
SERVING
= 1;
NOT_SERVING
= 2;
}
ServingStatus status
= 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

HealthCheck代码

public classHealthCheck : Health.HealthBase
{
public override Task<HealthCheckResponse>Check(HealthCheckRequest request, ServerCallContext context)
{
return Task.FromResult(new HealthCheckResponse() { Status =HealthCheckResponse.Types.ServingStatus.Serving });
}
public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse>responseStream, ServerCallContext context)
{
await responseStream.WriteAsync(newHealthCheckResponse()
{ Status
=HealthCheckResponse.Types.ServingStatus.Serving });
}
}

Program代码

usingWing;var builder =WebApplication.CreateBuilder(args);
builder.Host.AddWing(builder
=>builder.AddConsul());//Add services to the container. builder.Services.AddGrpc();
builder.Services.AddWing();
var app =builder.Build();//Configure the HTTP request pipeline. app.MapGrpcService<GreeterService>();
app.MapGet(
"/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
app.Run();

运行效果

运行当前程序并启动示例
1.3
,浏览器访问 ,可以看到注入的Grpc服务
Wing.Demo_1.4
,运行效果如下图:

在示例
1.2
中调用当前Grpc服务中
SayHello
方法,代码如下:

[HttpGet("hello")]public Task<string>SayHello()
{
return _serviceFactory.InvokeAsync("Wing.Demo_1.4", async serviceAddr =>{var channel =GrpcChannel.ForAddress(serviceAddr.ToString());var greeterClient = newGreeter.GreeterClient(channel);var result = await greeterClient.SayHelloAsync(new HelloRequest { Name = "Wing"});returnresult.Message;
});
}

运行示例
1.2
,浏览器访问
http://localhost:1210/weatherforecast/hello
,运行效果如下图:

4、启动服务网关

服务网关是系统对外的唯一入口,它封装了系统内部架构,为每个客户端提供了定制的API,所有的客户端和消费端都通过统一的网关接入微服务,在网关层处理所有非业务功能。

安装依赖包

安装服务注册nuget包
Wing.Consul
,服务网关nuget包
Wing.Gateway
,选择对应的数据库驱动(
参考FreeSql官网open in new window
),以SqlServer为例,安装
FreeSql.Provider.SqlServer
,请求日志支持本地消息队列和分布式消息队列进行异步持久化,基本上不影响网关性能。如果不想记录请求日志,可以不安装该包。

如果想启用
EventBus
记录请求日志,需要安装RabbitMQ nuget包
Wing.RabbitMQ

dotnet add package Wing.Consul
dotnet add package Wing.Gateway
dotnet add package Wing.RabbitMQ
dotnet add package FreeSql.Provider.SqlServer

Program代码

usingWing;
var builder =WebApplication.CreateBuilder(args);

builder.Host.AddWing(builder
=>builder.AddConsul());
//Add services to the container.
builder.Services.AddControllers();

builder.Services.AddWing()
.AddPersistence(FreeSql.DataType.SqlServer)
.AddGateWay()
.AddEventBus();
//如果不想使用EventBus记录请求日志,可以删除此行代码 var app =builder.Build();
//Configure the HTTP request pipeline.
app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

添加配置

{//是否启用配置中心,默认启用
  "ConfigCenterEnabled": false,"Consul": {"Url": "http://localhost:8500","Service": {//Http  Grpc
      "Option": "Http","HealthCheck": {"Url": "http://localhost:1510/health",//单位:秒
        "Timeout": 10,//单位:秒
        "Interval": 10},"Name": "Wing.Demo_1.5","Host": "localhost","Port": 1510,"LoadBalancer": {//RoundRobin  WeightRoundRobin LeastConnection
        "Option": "WeightRoundRobin",//权重
        "Weight": 50},"Scheme": "http","Developer": "linguicheng"},//定时同步数据时间间隔,单位:秒 小于等于0表示立即响应
    "Interval": 10,//数据中心
    "DataCenter": "dc1",//等待时间,单位:分钟
    "WaitTime": 3},"ConnectionStrings": {"Wing": "Data Source=192.168.56.96;User Id=sa;Password=wing123.;Initial Catalog=Wing;TrustServerCertificate=true;Pooling=true;Min Pool Size=1"},//自动同步实体结构到数据库
  "UseAutoSyncStructure": true,//如果不启用EventBus,可以删除RabbitMQ配置
  "RabbitMQ": {"HostName": "192.168.56.99","UserName": "admin","Password": "admin","VirtualHost": "/","Port": 5672,//消息过期时间,单位:毫秒,过期会自动路由到死信队列,小于或等于0则永久有效
    "MessageTTL": 0,"ExchangeName": "Sample.GateWay",//每次投递消息数量
    "PrefetchCount": 1},"Gateway": {//请求日志
    "Log": {//是否启用网关日志记录
      "IsEnabled": true,//是否启用事件总线(RabbitMQ)存储日志,生产环境推荐启用,可以提升程序的性能
      "UseEventBus": false}
}
}

查看运行效果

运行示例
1.2
并启动当前示例程序,浏览器访问,运行效果如下图:

运行示例
1.3
,浏览器访问,可以看到网关请求日志,运行效果如下图:

服务地址组成

请求服务地址默认是{网关IP或域名}/{服务名}/{服务路由},例如:
http://localhost:1510/Wing.Demo_1.2/weatherforecast

注意:
更多具体内容可以访问Wing 官方文档,具体内容如下所示

项目地址

Github:
https://linguicheng.github.io/Wing

Gitee:
https://gitee.com/linguicheng/Wing

文档地址:
https://linguicheng.github.io/Wing

示例地址:
https://gitee.com/linguicheng/wing-demo

开源协议:
基于MIT协议永久开源免费使用

最后

如果你觉得这篇文章对你有帮助,不妨点个赞支持一下!你的支持是我继续分享知识的动力。如果有任何疑问或需要进一步的帮助,欢迎随时留言。

也可以加入微信公众号
[DotNet技术匠]
社区,与其他热爱技术的同行一起交流心得,共同成长!

写在前面:本文所述未必符合当前最新情形(包括蓝牙技术发展、微信小程序接口迭代等)。

微信小程序为蓝牙操作提供了很多接口,但在实际开发过程中,会发现隐藏了不少坑。目前主流蓝牙应用都是基于
低功耗蓝牙(BLE)
的,本文介绍相关的几个基础接口,并对其进行封装,便于业务层调用。

蓝牙发展

在开发蓝牙应用程序之前,有必要对蓝牙这项技术做大致了解。

经典蓝牙

一种短距离无线通信标准,运行在 2.4GHz 频段,主要用于两个设备之间的数据传输。

一般将蓝牙 4.0 之前的版本称为
经典蓝牙
,其传输速率在 1-3Mbps 之间。虽然有着不错的传输速率,但由于功耗较大,难以满足移动终端和物联网的需求,逐渐被更先进的版本所取代。‌

低功耗蓝牙(BLE)

蓝牙 4.0‌ 引入了
低功耗蓝牙(BLE)
技术,其最大数据吞吐量仅为1Mbps,但相对经典蓝牙,BLE 拥有超低的运行功耗和待机功耗。

BLE 的低功耗是如何做到的呢?主要是缩减广播通道数量(由经典蓝牙的 16-32个,缩减为 3 个)、缩短广播射频开启时间(由经典蓝牙的 22.5ms,减少到 0.6-1.2ms)、深度睡眠模式及针对低功耗场景优化了协议栈等,此处不赘述。

当前最新版本

‌当前大版本是蓝牙 5.0,传输速度达到了 24Mbps,是 4.2 版本的两倍,有效工作距离可达 300 米,是 4.2 版本的四倍。低功耗模式下的传输速度上限为 2Mbps,适合于影音级应用,如高清晰度音频解码协议的应用。

蓝牙特征值

GATT(Generic Attribute Profile)协议
定义了蓝牙设备之间的通信方式,其中单个
服务(Service)
可以包含多个
特征值(Characteristic)
,每个服务和特征值都有特定的‌ UUID 来唯一标识。特征值是蓝牙设备中用于存储和传输数据的基本单元,每个特征值都有其特定的
属性和值

属性协议(ATT)
定义数据的检索,允许设备暴露数据给其他设备,这些数据被称为
属性(attribute)

通过属性可以设置特征值操作类型,如
读取、写入、通知
等,操作对象即为特征值的
值(value)
。一个特征值可以同时拥有多种操作类型。

为了实现数据的传输,服务需要暴露两个主要的特征值:
write
和‌
notify 或 indication
。write 特征值用于接收数据,而 notify 特征值用于发送数据。这些特征值类型为 bytes,并且一次传输的数据长度可以根据不同的特征值类型有所不同。

小程序接口封装

需要知道的是,虽然蓝牙是开放协议,但由于苹果 IOS 系统的封闭设计,目前苹果设备无法与 Android 及其它平台设备通过蓝牙相连。

本文描述皆基于 Android 平台。

关键接口

使用蓝牙传输数据都会涉及以下步骤及接口:

  1. 激活设备蓝牙(如在手机上点按蓝牙图标);
  2. wx.openBluetoothAdapter
    :初始化小程序蓝牙模块;
  3. 搜索外围设备
    1. wx.onBluetoothDeviceFound
      :监听搜索到新设备的事件;
    2. wx.startBluetoothDevicesDiscovery
      :开始搜索附近设备;
    3. wx.stopBluetoothDevicesDiscovery
      :找到待连的对手设备后停止搜索;
  4. wx.createBLEConnection
    :连接 BLE 设备;
  5. 接收数据
    1. wx.notifyBLECharacteristicValueChange
      :为下一步骤做铺垫(注意:必须对手设备的特征支持 notify 或者 indicate 才可以成功调用);
    2. wx.onBLECharacteristicValueChange
      :监听对手设备特征值变化事件,可以获得变化后的特征 value,如此数据就从对手设备传递过来了;
  6. wx.writeBLECharacteristicValue
    :向对手设备特征值中写入二进制数据(注意:必须对手设备的特征支持 write 才可以成功调用);
  7. wx.closeBLEConnection
    :断开连接;
  8. wx.closeBluetoothAdapter
    :关闭小程序蓝牙模块;
  9. 关闭设备蓝牙。

坑及注意点(仅限于笔者基于开发过程使用到的机型观察记录,未必有普遍性):

  • wx.onBluetoothDeviceFound 这个方法只能找到新的蓝牙设备,之前搜索过的在部分安卓机型上,不算做新的蓝牙设备,因此重新搜索不到。这种情况,要么重启小程序蓝牙模块或者重启小程序,或者使用
    wx.getBluetoothDevices
    获取在蓝牙模块生效期间所有搜索到的蓝牙设备。
  • 连接未必能一次成功,需要多连几次。
  • 每次连接最好能重启 BluetoothAdapter,否则在后续 wx.notifyBLECharacteristicValueChange 时容易报 10005-没有找到指定特征 错误。
  • 若小程序在之前已有搜索过某个蓝牙设备,并成功建立连接,可直接传入之前搜索获取的 deviceId 直接尝试连接该设备,无需再次进行搜索操作。
  • 系统与蓝牙设备会限制蓝牙 4.0 单次传输的数据大小,超过最大字节数后会发生写入错误,建议每次写入不超过 20 字节。
  • 一旦过程中出现任何异常,就必须断开连接重连,否则后续会一直报 notifyblecharacteristicValuechange:fail: no characteristic 错误

主要代码

注:本文代码块为笔者临时盲敲,仅作参考。

定义一个工具对象

const ble = {}

由于可能会遇到的各类问题,我们先全局定义运行时异常枚举和 throw/handle 方法,免得后面遇到异常处理各写各的。

const ble = {
  errors: {
    OPEN_ADAPTER: '开启蓝牙模块异常',
    CLOSE_ADAPTER: '关闭蓝牙模块异常',
    CONNECT: '蓝牙连接异常',
    NOTIFY_CHARACTERISTIC_VALUE_CHANGE: '注册特征值变化异常',
    WRITE: '发送数据异常',
    DISCONNECT: '断开蓝牙连接异常',
    //...
  },

  _throwError(title, err) {
    //... 可以考虑在这里调用 wx.closeBLEConnection

    if (err) {
      err.title = title
      throw err
    }
    throw new Error(title)
  },  

蓝牙连接。注意到这是个有限递归方法,且每次连接都先重启 BluetoothAdapter,原因请看上节。

/** 
   * @param {string} deviceId 设备号
   * @param {int} tryCount 已尝试次数
   */
  async connectBLE(deviceId, tryCount = 5) {
    await wx.closeBluetoothAdapter().catch(err => { ble._throwError(this.errors.CLOSE_ADAPTER, err) })
    await wx.openBluetoothAdapter().catch(err => { ble._throwError(this.errors.OPEN_ADAPTER, err) })
    await wx.createBLEConnection({
      deviceId: deviceId,
      timeout: 5000
    })
      .catch(async err => {
        if (err.errCode === -1) { //蓝牙已是连接状态
          // continue work
        } else {
          console.log(`第${6 - tryCount}次蓝牙连接出错`, err.errCode, err.errMsg)
          tryCount--
          if (tryCount === 0) {
            ble._throwError(this.errors.CONNECT, err)
          } else {
            await ble.connectBLE(deviceId, tryCount)
          }
        }
      })
      //蓝牙连接成功
  },

连接成功后,可能需要监听对手设备,用于接收其传过来的数据。

  async onDataReceive(deviceId, serviceId, characteristicId, callback) {
    await wx.notifyBLECharacteristicValueChange({
      deviceId: deviceId,
      serviceId: serviceId,
      characteristicId: characteristicId,
      state: true
    }).catch(err => { ble._throwError(this.errors.NOTIFY_CHARACTERISTIC_VALUE_CHANGE, err) })

    wx.onBLECharacteristicValueChange(res => {
      let data = new Uint8Array(res.value)
      callback(data)
    })
  },

发送数据,须切片,每次发送不多于 20字节。此处增加了在固定时长内的重试机制。

  /** 
   * @param {Uint8ClampedArray} data 待发送数据
   * @param {boolean} holdConnWhenDone 发送完毕后是否保持连接
   */
  async send(deviceId, serviceId, characteristicId, data, holdConnWhenDone = false) {
    let idx = 0 //已传输字节数
    let startTime = Date.now(),
      duration = 800 //发送失败重试持续时间  
    while (idx < data.byteLength) {
      await wx.writeBLECharacteristicValue({
        deviceId: deviceId,
        serviceId: serviceId,
        characteristicId: characteristicId,
        value: data.slice(idx, idx += 20).buffer
      })
        .then(_ => startTime = Date.now()) //成功则now重置
        .catch(err => {
          if (Date.now() - startTime >= duration) {
            ble._throwError(this.errors.WRITE, err)
          } else {
            //重试
            idx -= 20
          }
        })
    }
    if (!holdConnWhenDone)
      await wx.closeBLEConnection({ deviceId: deviceId }).catch(err => { ble._throwError(this.errors.DISCONNECT, err) })
  }

在实际项目中,可能需要在每次发送数据片之后得到对手设备响应后,根据响应决定重发(校验错误或响应超时等)、中止(设备繁忙)、还是接着发送下一个数据片。这种情况则需配合 onDataReceive 方法协同工作,向其传入合适的 callback 参数,此处不赘述。

技术背景

假设我们在一个局域网内有多台工作站(不是服务器),那么有没有一个简单的方案可以实现一个小集群,提交分布式的任务呢?
Ray
为我们提供了一个很好的解决方案,允许你通过conda和Python灵活的构建集群环境,并提交分布式的任务。其基本架构为:

那么本文简单的介绍一下Ray的安装与基本使用。

安装

由于是一个Python的框架,Ray可以直接使用pip进行安装和管理:

$ python3 -m pip install ray[default]

但是需要注意的是,在所有需要构建集群的设备上,需要统一Python和Ray的版本,因此建议先使用conda创建同样的虚拟环境之后,再安装统一版本的ray。否则在添加集群节点的时候就有可能出现如下问题:

RuntimeError: Version mismatch: The cluster was started with:
    Ray: 2.7.2
    Python: 3.7.13
This process on node 172.17.0.2 was started with:
    Ray: 2.7.2
    Python: 3.7.5

启动和连接服务

一般在配置集群的时候可以先配置下密钥登陆:

$ ssh-keygen -t rsa
$ ssh-copy-id user_name@ip_address

就这么两步,就可以配置远程服务器ssh免密登陆(配置的过程中有可能需要输入一次密码)。然后在主节点(配置一个master节点)启动ray服务:

$ ray start --head --dashboard-host='0.0.0.0' --dashboard-port=8265
Usage stats collection is enabled. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.

Local node IP: xxx.xxx.xxx.xxx

--------------------
Ray runtime started.
--------------------

Next steps
  To add another node to this Ray cluster, run
    ray start --address='xxx.xxx.xxx.xxx:6379'

  To connect to this Ray cluster:
    import ray
    ray.init()

  To submit a Ray job using the Ray Jobs CLI:
    RAY_ADDRESS='http://xxx.xxx.xxx.xxx:8265' ray job submit --working-dir . -- python my_script.py

  See https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html
  for more information on submitting Ray jobs to the Ray cluster.

  To terminate the Ray runtime, run
    ray stop

  To view the status of the cluster, use
    ray status

  To monitor and debug Ray, view the dashboard at
    xxx.xxx.xxx.xxx:8265

  If connection to the dashboard fails, check your firewall settings and network configuration.

这就启动完成了,并给你指示了下一步的操作,例如在另一个节点上配置添加到集群中,可以使用指令:

$ ray start --address='xxx.xxx.xxx.xxx:6379'

但是前面提到了,这里要求Python和Ray版本要一致,如果版本不一致就会出现这样的报错:

RuntimeError: Version mismatch: The cluster was started with:
    Ray: 2.7.2
    Python: 3.7.13
This process on node 172.17.0.2 was started with:
    Ray: 2.7.2
    Python: 3.7.5

到这里其实Ray集群就已经部署完成了,非常的简单方便。

基础使用

我们先用一个最简单的案例来测试一下:

# test_ray.py 
import os
import ray

ray.init()

print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))

这个Python脚本打印了远程节点的计算资源,那么我们可以用这样的方式去提交一个本地的job:

$ RAY_ADDRESS='http://xxx.xxx.xxx.xxx:8265' ray job submit --working-dir . -- python test_ray.py 
Job submission server address: http://xxx.xxx.xxx.xxx:8265
2024-08-27 07:35:10,751 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_4b79155b5de665ce.zip.
2024-08-27 07:35:10,751 INFO packaging.py:518 -- Creating a file package for local directory '.'.

-------------------------------------------------------
Job 'raysubmit_7Uqy8LjP4dxjZxGa' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_7Uqy8LjP4dxjZxGa
  Query the status of the job:
    ray job status raysubmit_7Uqy8LjP4dxjZxGa
  Request the job to be stopped:
    ray job stop raysubmit_7Uqy8LjP4dxjZxGa

Tailing logs until the job exits (disable with --no-wait):
2024-08-27 15:35:14,079 INFO worker.py:1330 -- Using address xxx.xxx.xxx.xxx:6379 set in the environment variable RAY_ADDRESS
2024-08-27 15:35:14,079 INFO worker.py:1458 -- Connecting to existing Ray cluster at address: xxx.xxx.xxx.xxx:6379...
2024-08-27 15:35:14,103 INFO worker.py:1639 -- Connected to Ray cluster. View the dashboard at http://xxx.xxx.xxx.xxx:8265 
This cluster consists of
    1 nodes in total
    48.0 CPU resources in total


------------------------------------------
Job 'raysubmit_7Uqy8LjP4dxjZxGa' succeeded
------------------------------------------

这里的信息说明,远程的集群只有一个节点,该节点上有48个可用的CPU核资源。这些输出信息不仅可以在终端窗口上看到,还可以从这里给出的dashboard链接里面看到更加详细的任务管理情况:

这里也顺便提交一个输出软件位置信息的指令,确认下任务是在远程执行而不是在本地执行:

import ray

ray.init()

import numpy as np
print (np.__file__)

返回的日志为:

$ RAY_ADDRESS='http://xxx.xxx.xxx.xxx:8265' ray job submit --working-dir . -- python test_ray.py 
Job submission server address: http://xxx.xxx.xxx.xxx:8265
2024-08-27 07:46:10,645 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_5bba1a7144beb522.zip.
2024-08-27 07:46:10,658 INFO packaging.py:518 -- Creating a file package for local directory '.'.

-------------------------------------------------------
Job 'raysubmit_kQ3XgE4Hxp3dkmuU' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_kQ3XgE4Hxp3dkmuU
  Query the status of the job:
    ray job status raysubmit_kQ3XgE4Hxp3dkmuU
  Request the job to be stopped:
    ray job stop raysubmit_kQ3XgE4Hxp3dkmuU

Tailing logs until the job exits (disable with --no-wait):
2024-08-27 15:46:12,456 INFO worker.py:1330 -- Using address xxx.xxx.xxx.xxx:6379 set in the environment variable RAY_ADDRESS
2024-08-27 15:46:12,457 INFO worker.py:1458 -- Connecting to existing Ray cluster at address: xxx.xxx.xxx.xxx:6379...
2024-08-27 15:46:12,470 INFO worker.py:1639 -- Connected to Ray cluster. View the dashboard at http://xxx.xxx.xxx.xxx:8265 
/home/dechin/anaconda3/envs/mindspore-latest/lib/python3.7/site-packages/numpy/__init__.py

------------------------------------------
Job 'raysubmit_kQ3XgE4Hxp3dkmuU' succeeded
------------------------------------------

$ python3 -m pip show numpy
Name: numpy
Version: 1.21.6
Summary: NumPy is the fundamental package for array computing with Python.
Home-page: https://www.numpy.org
Author: Travis E. Oliphant et al.
Author-email: 
License: BSD
Location: /usr/local/python-3.7.5/lib/python3.7/site-packages
Requires: 
Required-by: CyFES, h5py, hadder, matplotlib, mindinsight, mindspore, mindspore-serving, pandas, ray, scikit-learn, scipy

这里可以看到,提交的任务中numpy是保存在mindspore-latest虚拟环境中的,而本地的numpy不在虚拟环境中,说明任务确实是在远程执行的。类似的可以在dashboard上面看到提交日志:

接下来测试一下分布式框架ray的并发特性:

import ray

ray.init()

@ray.remote(num_returns=1)
def cpu_task():
    import time
    time.sleep(2)
    import numpy as np
    nums = 100000
    arr = np.random.random((2, nums))
    arr2 = arr[1]**2 + arr[0]**2
    pi = np.where(arr2<=1, 1, 0).sum() * 4 / nums
    return pi

num_conc = 10
res = ray.get([cpu_task.remote() for _ in range(num_conc)])
print (sum(res) / num_conc)

这个案例的内容是用蒙特卡洛算法计算圆周率的值,一次提交10个任务,每个任务中撒点100000个,并休眠2s。那么如果是顺序执行的话,理论上需要休眠20s。而这里提交任务之后,输出如下:

$ time RAY_ADDRESS='http://xxx.xxx.xxx.xxx:8265' ray job submit --working-dir . --entrypoint-num-cpus 10 -- python te
st_ray.py 
Job submission server address: http://xxx.xxx.xxx.xxx:8265
2024-08-27 08:30:13,315 INFO dashboard_sdk.py:385 -- Package gcs://_ray_pkg_d66b052eb6944465.zip already exists, skipping upload.

-------------------------------------------------------
Job 'raysubmit_Ur6MAvP7DYiCT6Uz' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_Ur6MAvP7DYiCT6Uz
  Query the status of the job:
    ray job status raysubmit_Ur6MAvP7DYiCT6Uz
  Request the job to be stopped:
    ray job stop raysubmit_Ur6MAvP7DYiCT6Uz

Tailing logs until the job exits (disable with --no-wait):
2024-08-27 16:30:15,032 INFO worker.py:1330 -- Using address xxx.xxx.xxx.xxx:6379 set in the environment variable RAY_ADDRESS
2024-08-27 16:30:15,033 INFO worker.py:1458 -- Connecting to existing Ray cluster at address: xxx.xxx.xxx.xxx:6379...
2024-08-27 16:30:15,058 INFO worker.py:1639 -- Connected to Ray cluster. View the dashboard at http://xxx.xxx.xxx.xxx:8265 
3.141656

------------------------------------------
Job 'raysubmit_Ur6MAvP7DYiCT6Uz' succeeded
------------------------------------------


real    0m7.656s
user    0m0.414s
sys     0m0.010s

总的运行时间在7.656秒,其中5s左右的时间是来自网络delay。所以实际上并发之后的总运行时间就在2s左右,跟单任务休眠的时间差不多。也就是说,远程提交的任务确实是并发执行的。最终返回的结果进行加和处理,得到的圆周率估计为:
3.141656
。而且除了普通的CPU任务之外,还可以上传GPU任务:

import ray

ray.init()

@ray.remote(num_returns=1, num_gpus=1)
def test_ms():
    import os
    os.environ['GLOG_v']='4'
    os.environ['CUDA_VISIBLE_DEVICE']='0'
    import mindspore as ms
    ms.set_context(device_target="GPU", device_id=0)
    a = ms.Tensor([1, 2, 3], ms.float32)
    return a.asnumpy().sum()

res = ray.get(test_ms.remote())
ray.shutdown()
print (res)

这个任务是用mindspore简单创建了一个Tensor,并计算了Tensor的总和返回给本地,输出内容为:

$ RAY_ADDRESS='http://xxx.xxx.xxx.xxx:8265' ray job submit --working-dir . --entrypoint-num-gpus 1 -- python test_ray.py 
Job submission server address: http://xxx.xxx.xxx.xxx:8265
2024-08-28 01:16:38,712 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_10019cd9fa9bdc38.zip.
2024-08-28 01:16:38,712 INFO packaging.py:518 -- Creating a file package for local directory '.'.

-------------------------------------------------------
Job 'raysubmit_RUvkEqnkjNitKmnJ' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_RUvkEqnkjNitKmnJ
  Query the status of the job:
    ray job status raysubmit_RUvkEqnkjNitKmnJ
  Request the job to be stopped:
    ray job stop raysubmit_RUvkEqnkjNitKmnJ

Tailing logs until the job exits (disable with --no-wait):
2024-08-28 09:16:41,960 INFO worker.py:1330 -- Using address xxx.xxx.xxx.xxx:6379 set in the environment variable RAY_ADDRESS
2024-08-28 09:16:41,960 INFO worker.py:1458 -- Connecting to existing Ray cluster at address: xxx.xxx.xxx.xxx:6379...
2024-08-28 09:16:41,974 INFO worker.py:1639 -- Connected to Ray cluster. View the dashboard at http://xxx.xxx.xxx.xxx:8265 
6.0

------------------------------------------
Job 'raysubmit_RUvkEqnkjNitKmnJ' succeeded
------------------------------------------

返回的计算结果是6.0,那么也是正确的。

查看和管理任务

前面的任务输出信息中,都有相应的job_id,我们可以根据这个job_id在主节点上面查看相关任务的执行情况:

$ ray job status raysubmit_RUvkEqnkjNitKmnJ

可以查看该任务的输出内容:

$ ray job logs raysubmit_RUvkEqnkjNitKmnJ

还可以终止该任务的运行:

$ ray job stop raysubmit_RUvkEqnkjNitKmnJ

总结概要

本文介绍了基于Python的分布式框架Ray的基本安装与使用。Ray框架下不仅可以通过conda和Python十分方便的构建一个集群,还可以自动的对分布式任务进行并发处理,且支持GPU分布式任务的提交,极大的简化了手动分布式开发的工作量。

版权声明

本文首发链接为:
https://www.cnblogs.com/dechinphy/p/ray.html

作者ID:DechinPhy

更多原著文章:
https://www.cnblogs.com/dechinphy/

请博主喝咖啡:
https://www.cnblogs.com/dechinphy/gallery/image/379634.html