来源:晓飞的算法工程笔记 公众号,转载请注明出处

论文: Vision-Language Model Fine-Tuning via Simple Parameter-Efficient Modification

创新点


  • 提出了一种
    CLIPFit
    方法以高效地微调
    CLIP
    模型,从而揭示经典模型微调在视觉语言模型(
    VLMs
    )上的潜力。
  • 与现有的提示调整或适配器调整方法不同,
    CLIPFit
    不引入任何外部参数,而仅微调
    CLIP
    固有参数中的一个小特定子集。

内容概述


微调视觉语言模型(
VLMs
)方面的进展见证了提示调优和适配器调优的成功,而经典模型在固有参数上的微调似乎被忽视了。有人认为,使用少量样本微调
VLMs
的参数会破坏预训练知识,因为微调
CLIP
模型甚至会降低性能。论文重新审视了这一观点,并提出了一种新视角:微调特定的参数而不是全部参数将揭示经典模型微调在
VLMs
上的潜力。

通过细致研究,论文提出了
ClipFit
,可以在不引入额外参数开销的情况下微调
CLIP
。仅通过微调特定的偏置项和归一化层,
ClipFit
可以将零样本
CLIP
的平均调和均值准确率提升
7.27%

为了理解
CLIPFit
中的微调如何影响预训练模型,论文进行了广泛的实验分析以研究内部参数和表示的变化。在文本编码器中,当层数增加时,偏置的变化减少。在图像编码器中,
LayerNorm
也有同样的结论。进一步的实验表明,变化较大的层对知识适应更为重要。

CLIPFit


在不引入任何外部参数的情况下,
CLIPFit
仅对文本编码器中
FNN
的投影线性层的偏置项进行微调,并更新图像编码器中的
LayerNorm

文本编码器

对于文本编码器,
CLIPFit
并不是对所有偏置项进行微调,而仅对文本编码器中
FFNs
的投影线性层(即第二层)的偏置项进行微调。仅微调部分偏置项将减少训练参数的数量,相较于微调所有偏置项。此外,实验表明,微调部分偏置项可以实现比微调所有偏置项更好的性能。

图像编码器

BitFit
证明了在不引入任何新参数的情况下,仅微调预训练语言模型中的偏置项可以与完全微调的表现相媲美。然而,
BitFit
是为大型语言模型(
LLM
)微调设计的,直接将
BitFit
应用于视觉语言模型(
VLM
)微调可能会损害模型的泛化能力。

为此,
CLIPFit
并没有对图像编码器的偏置项进行微调,而是对
LayerNorm
进行微调。在
LayerNorm
中,两个可学习参数增益
\(\boldsymbol{g}\)
和偏置
\(\boldsymbol{b}\)
用于对标准化输入向量
\(\boldsymbol{x}\)
进行仿射变换,以进行重新中心化和重新缩放,这有助于通过重新塑形分布来增强表达能力。在训练过程中,不同的数据分布应该在
LayerNorm
中产生不同的增益和偏置,以实现分布的重新塑形。

如果在推理过程中应用偏移的增益和偏置,可能会导致次优解。因此,
CLIPFit
对图像编码器中的
LayerNorm
进行微调。

损失函数

在微调阶段,通用的预训练知识很容易被遗忘。因此,论文探索了两种不同的策略来减轻这种遗忘。

第一种策略是使用知识蒸馏损失来指导
CLIPFit
从原始的零样本
CLIP
中学习。设
\(\{\boldsymbol{w}_i^\mathrm{clip}\}_{i=1}^K\)
为原始
CLIP
的文本特征,
\(\{\boldsymbol{w}_{i}\}_{i=1}^K\)

CLIPFit
的文本特征。
CLIPFit
的训练损失和知识蒸馏损失定义为:

\[\begin{equation}
\mathcal{L}=\mathcal{L}_{\mathrm{ce}}+\beta \mathcal{L}_{\mathrm{k g}},
\end{equation}
\]

\[\begin{equation}
\mathcal{L}_\mathrm{k g} = \frac{1}{K}\sum_{i=1}^{K}\cos(\boldsymbol{w}_i^{\mathrm{clip}},\boldsymbol{w}_i),
\end{equation}
\]

第二种策略是使用均方误差(
MSE
)损失来惩罚文本编码器的变化。设
\(\{\boldsymbol{b}_i^\mathrm{clip}\}_{i=1}^L\)
为来自预训练
CLIP
的未固定文本偏置项,
\(\{\boldsymbol{b}_i\}_{i=1}^L\)
为来自
CLIPFit
的未固定文本偏置项,其中
\(L\)
是未固定偏置层的数量。均方误差损失定义为:

\[\begin{equation}
\mathcal{L}_\mathrm{m s e} = \frac{1}{L}\sum_{i=1}^{L}||\boldsymbol{b}_i^\mathrm{clip}-\boldsymbol{b}_i||^2.
\end{equation}
\]

这两种策略都能缓解遗忘问题,而知识蒸馏损失的效果更佳。因此,选择将知识蒸馏损失作为
CLIPFit
的最终解决方案。

主要实验




如果本文对你有帮助,麻烦点个赞或在看呗~
更多内容请关注 微信公众号【晓飞的算法工程笔记】

work-life balance.

Navigation作为路由容器,其生命周期承载在NavDestination组件上,以组件事件的形式开放。其生命周期大致可分为三类,自定义组件生命周期、通用组件生命周期和自有生命周期。其中,aboutToAppear和aboutToDisappear是自定义组件的生命周期(NavDestination外层包含的自定义组件),OnAppear和OnDisappear是组件的通用生命周期。剩下的六个生命周期为NavDestination独有。生命周期时序如下图所示
img1

  • aboutToAppear:在创建自定义组件后,执行其build()函数之前执行(NavDestination创建之前),允许在该方法中改变状态变量,更改将在后续执行build()函数中生效。
  • onWillAppear:NavDestination创建后,挂载到组件树之前执行,在该方法中更改状态变量会在当前帧显示生效。
  • onAppear:通用生命周期事件,NavDestination组件挂载到组件树时执行。
  • onWillShow:NavDestination组件布局显示之前执行,此时页面不可见(应用切换到前台不会触发)。
  • onShown:NavDestination组件布局显示之后执行,此时页面已完成布局。
  • onWillHide:NavDestination组件触发隐藏之前执行(应用切换到后台不会触发)。
  • onHidden:NavDestination组件触发隐藏后执行(非栈顶页面push进栈,栈顶页面pop出栈或应用切换到后台)。
  • onWillDisappear:NavDestination组件即将销毁之前执行,如果有转场动画,会在动画前触发(栈顶页面pop出栈)。
  • onDisappear:通用生命周期事件,NavDestination组件从组件树上卸载销毁时执行。
  • aboutToDisappear:自定义组件析构销毁之前执行,不允许在该方法中改变状态变量。

一、概述

上篇文章介绍了木舟通过HTTP网络组件接入设备,那么此篇文章将介绍如何利用Tcp或者UDP网络组件接入设备.

木舟 (Kayak) 是什么?

木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。

那么下面就为大家介绍如何从创建组件、协议、设备网关,设备到设备网关接入,再到设备数据上报,把整个流程通过此篇文章进行阐述。

二、网络组件

1.编辑创建Tcp协议的网络组件,可以选择共享配置和独立配置(独立配置是集群模式). 下图是解析方式选择了自定义脚本进行解码操作。

还可以选择其它解析方式:如下图

2. 编辑创建UDP协议的网络组件,可以选择共享配置和独立配置(独立配置是集群模式). 可以选择单播或组播。

三、自定义协议

  • 如何创建自定义协议模块

如果是网络编程开发,必然会涉及到协议报文的编码解码处理,那么对于平台也是做到了灵活处理,首先是协议模块创建,通过以下代码看出协议模块可以添加协议说明md文档, 身份鉴权处理,消息编解码,元数据配置。下面一一介绍如何进行编写

 public classDemo3ProtocolSupportProvider : ProtocolSupportProvider
{
public override IObservable<ProtocolSupport>Create(ProtocolContext context)
{
var support = newComplexProtocolSupport();
support.Id
= "demo_3";
support.Name
= "演示协议3";
support.Description
= "演示协议3";
support.AddAuthenticator(MessageTransport.Tcp,
newDemo5Authenticator());
support.AddDocument(MessageTransport.Tcp,
"Document/document-tcp.md");
support.Script
= "\r\nvar decode=function(buffer)\r\n{\r\n parser.Fixed(5).Handler(\r\n function(buffer){ \r\n var bytes = BytesUtils.GetBytes(buffer,1,4);\r\n var len = BytesUtils.LeStrToInt(bytes,1,4);//2. 获取消息长度.\r\n var buf = BytesUtils.Slice(buffer,0,5); \r\n parser.Fixed(len).Result(buf); \r\n }).Handler(function(buffer){ parser.Result(buffer).Complete(); \r\n }\r\n )\r\n}\r\nvar encode=function(buffer)\r\n{\r\n}";
support.AddMessageCodecSupport(MessageTransport.Tcp, ()
=> Observable.Return(newScriptDeviceMessageCodec(support.Script)));
support.AddConfigMetadata(MessageTransport.Tcp, _tcpConfig);

support.AddAuthenticator(MessageTransport.Udp,
newDemo5Authenticator());
support.Script
= "\r\nvar decode=function(buffer)\r\n{\r\n parser.Fixed(5).Handler(\r\n function(buffer){ \r\n var bytes = BytesUtils.GetBytes(buffer,1,4);\r\n var len = BytesUtils.LeStrToInt(bytes,1,4);//2. 获取消息长度.\r\n var buf = BytesUtils.Slice(buffer,0,5); \r\n parser.Fixed(len).Result(buf); \r\n }).Handler(function(buffer){ parser.Result(buffer).Complete(); \r\n }\r\n )\r\n}\r\nvar encode=function(buffer)\r\n{\r\n}";
support.AddMessageCodecSupport(MessageTransport.Udp, ()
=> Observable.Return(newScriptDeviceMessageCodec(support.Script)));
support.AddConfigMetadata(MessageTransport.Udp, _udpConfig);
returnObservable.Return(support);
}
}

1. 添加协议说明文档如代码:
support.AddDocument(MessageTransport.Tcp,
"
Document/document-tcp.md
"
);,文档仅支持
markdown文件,如下所示

### 认证说明

CONNECT报文:
```text
clientId: 设备ID
password: md5(timestamp
+"|"+secureKey)
```

2. 添加身份鉴权如代码:
support.AddAuthenticator(MessageTransport.Http, new Demo5Authenticator()) ,自定义身份鉴权
Demo5Authenticator 代码如下:

public classDemo5Authenticator : IAuthenticator
{
public IObservable<AuthenticationResult>Authenticate(IAuthenticationRequest request, IDeviceOperator deviceOperator)
{
var result = Observable.Return<AuthenticationResult>(default);if (request isDefaultAuthRequest)
{
var authRequest = request asDefaultAuthRequest;
deviceOperator.GetConfig(authRequest.GetTransport()
==MessageTransport.Http?"token": "key").Subscribe( config =>{var password = config.Convert<string>();if(authRequest.Password.Equals(password))
{
result
=result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
}
else{
result
= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
}
});
}
elseresult= Observable.Return<AuthenticationResult>(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "不支持请求参数类型"));returnresult;
}
public IObservable<AuthenticationResult>Authenticate(IAuthenticationRequest request, IDeviceRegistry registry)
{
var result = Observable.Return<AuthenticationResult>(default);var authRequest = request asDefaultAuthRequest;
registry
.GetDevice(authRequest.DeviceId)
.Subscribe(
async p =>{var config= await p.GetConfig(authRequest.GetTransport() == MessageTransport.Http ? "token" : "key");var password= config.Convert<string>();if(authRequest.Password.Equals(password))
{
result
=result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
}
else{
result
= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
}
});
returnresult;
}
}

3.添加消息编解码代码
support.AddMessageCodecSupport(MessageTransport.Tcp, () => Observable.Return(new ScriptDeviceMessageCodec(support.Script)));, 可以自定义编解码,
ScriptDeviceMessageCodec
代码如下:

usingDotNetty.Buffers;usingJint;usingJint.Parser;usingMicrosoft.CodeAnalysis.Scripting;usingMicrosoft.Extensions.Logging;usingRulesEngine.Models;usingSurging.Core.CPlatform.Codecs.Core;usingSurging.Core.CPlatform.Utilities;usingSurging.Core.DeviceGateway.Runtime.Device.Message;usingSurging.Core.DeviceGateway.Runtime.Device.Message.Event;usingSurging.Core.DeviceGateway.Runtime.Device.Message.Property;usingSurging.Core.DeviceGateway.Runtime.Device.MessageCodec;usingSurging.Core.DeviceGateway.Runtime.RuleParser.Implementation;usingSurging.Core.DeviceGateway.Utilities;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Reactive.Linq;usingSystem.Reactive.Subjects;usingSystem.Runtime;usingSystem.Text;usingSystem.Text.Json;usingSystem.Text.RegularExpressions;usingSystem.Threading.Tasks;namespaceSurging.Core.DeviceGateway.Runtime.Device.Implementation
{
public classScriptDeviceMessageCodec : DeviceMessageCodec
{
public string GlobalVariable { get; private set; }public string EncoderScript { get; private set; }public string DecoderScript { get; private set; }public IObservable<Task<RulePipePayloadParser>>_rulePipePayload;private readonly ILogger<ScriptDeviceMessageCodec>_logger;public ScriptDeviceMessageCodec(stringscript) {

_logger
= ServiceLocator.GetService<ILogger<ScriptDeviceMessageCodec>>();
RegexOptions options
= RegexOptions.Singleline |RegexOptions.IgnoreCase;string matchStr = Regex.Match(script, @"var\s*[\w$]*\s*\=.*function.*\(.*\)\s*\{[\s\S]*\}.*?v", options).Value;if (!string.IsNullOrEmpty(matchStr))
{
DecoderScript
= matchStr.TrimEnd('v');
DecoderScript
= Regex.Replace(DecoderScript, @"var\s*[\w$]*\s*\=[.\r|\n|\t|\s]*?(function)\s*\([\w$]*\s*\)\s*\{", "", RegexOptions.IgnoreCase);
DecoderScript
= DecoderScript.Slice(0, DecoderScript.LastIndexOf('}'));
EncoderScript
= script.Replace(DecoderScript, "");

}
var matchStr1 = Regex.Matches(script, @"(?<=var).*?(?==)|(?=;)|(?=v)", options).FirstOrDefault(p=>!string.IsNullOrEmpty(p.Value))?.Value;if (!string.IsNullOrEmpty(matchStr1))
{
GlobalVariable
= matchStr1.TrimEnd(';');
}
var ruleWorkflow = newRuleWorkflow(DecoderScript);
_rulePipePayload
=Observable.Return( GetParser( GetRuleEngine(ruleWorkflow), ruleWorkflow));
}
public override IObservable<IDeviceMessage>Decode(MessageDecodeContext context)
{
var result = Observable.Return<IDeviceMessage>(null);
_rulePipePayload.Subscribe(
async p =>{var parser = awaitp;
parser.Build(context.GetMessage().Payload);
parser.HandlePayload().Subscribe(
async p =>{try{var headerBuffer=parser.GetResult().FirstOrDefault();var buffer =parser.GetResult().LastOrDefault();var str =buffer.GetString(buffer.ReaderIndex, buffer.ReadableBytes, Encoding.UTF8);var session = awaitcontext.GetSession();if (session?.GetOperator() == null)
{
var onlineMessage = JsonSerializer.Deserialize<DeviceOnlineMessage>(str);
result
=result.Publish(onlineMessage);
}
else{var messageType = headerBuffer.GetString(0, 1, Encoding.UTF8);if (Enum.Parse<MessageType>(messageType.ToString()) ==MessageType.READ_PROPERTY)
{
var onlineMessage = JsonSerializer.Deserialize<ReadPropertyMessage>(str);
result
=result.Publish(onlineMessage);
}
else if (Enum.Parse<MessageType>(messageType.ToString()) ==MessageType.EVENT)
{
var onlineMessage = JsonSerializer.Deserialize<EventMessage>(str);
result
=result.Publish(onlineMessage);
}
}
}
catch(Exception e)
{

}
finally{
p.Release();
parser.Close();
}
});
});
returnresult;
}
public override IObservable<IEncodedMessage>Encode(MessageEncodeContext context)
{
context.Reply(((RespondDeviceMessage
<IDeviceMessageReply>)context.Message).NewReply().Success(true));return Observable.Empty<IEncodedMessage>();
}
privateRulesEngine.RulesEngine GetRuleEngine(RuleWorkflow ruleWorkflow)
{
var reSettingsWithCustomTypes = new ReSettings { CustomTypes = new Type[] { typeof(RulePipePayloadParser) } };var result = new RulesEngine.RulesEngine(new Workflow[] { ruleWorkflow.GetWorkflow() }, null, reSettingsWithCustomTypes);returnresult;
}
private async Task<RulePipePayloadParser>GetParser(RulesEngine.RulesEngine engine, RuleWorkflow ruleWorkflow)
{
var payloadParser = newRulePipePayloadParser();var ruleResult = await engine.ExecuteActionWorkflowAsync(ruleWorkflow.WorkflowName, ruleWorkflow.RuleName, new RuleParameter[] { new RuleParameter("parser", payloadParser) });if (ruleResult.Exception != null &&_logger.IsEnabled(LogLevel.Error))
_logger.LogError(ruleResult.Exception, ruleResult.Exception.Message);
returnpayloadParser;
}
}
}

4.添加元数据配置代码
support.AddConfigMetadata(MessageTransport.Tcp, _tcpConfig)
;
_tcpConfig
代码如下:

        private readonly DefaultConfigMetadata _tcpConfig = newDefaultConfigMetadata("TCP认证配置","key为tcp认证密钥")
.Add(
"tcp_auth_key", "key", "TCP认证KEY", StringType.Instance);

_udpConfig代码如下:

     private readonly DefaultConfigMetadata _udpConfig = newDefaultConfigMetadata("udp认证配置","key为udp认证密钥")
.Add(
"udp_auth_key", "key", "TCP认证KEY", StringType.Instance);

  • 如何加载协议模块,协议模块包含了协议模块支持自定义脚本、添加引用、上传热部署加载。

自定义脚本,选择了自定义脚本解析,如果本地有设置消息编解码,会进行覆盖

引用加载模块

上传热部署协议模块

首先利用以下命令发布模块:

然后打包上传协议模块

四、设备网关

创建TCP设备网关

创建UDP设备网关

五、产品管理

以下是添加产品。

设备接入

六、设备管理

添加设备

Tcp认证配置

添加告警阈值

事件定义

七、测试

利用测试工具进行Tcp测试,以调用
tcp://127.0.0.1:993
为例,

测试设备上线

字符串: 293\0\0{"MessageType":2,"Headers":{"token":"123456"},"DeviceId":"scro-34","Timestamp":1726540220311}

说明:第一个字符表示类型,第二个表示消息内容长度

16进制:32393300007b224d65737361676554797065223a322c2248656164657273223a7b22746f6b656e223a22313233343536227d2c224465766963654964223a227363726f2d3334222c2254696d657374616d70223a313732363534303232303331317d

结果如下:

测试上报属性

字符串:195\0\0{"MessageType":1,"Properties":{"temp":"38.24"},"DeviceId":"scro-34","Timestamp":1726560007339}

16进制:31393500007b224d65737361676554797065223a312c2250726f70657274696573223a7b2274656d70223a2233382e3234227d2c224465766963654964223a227363726f2d3334222c2254696d657374616d70223a313732363536303030373333397d

结果如下:

测试事件

字符串:8307\0{"MessageType":8,"Data":{"deviceId":"scro-34","level":"alarm","alarmTime":"2024-11-07 19:47:00","from":"device","alarmType":"设备告警","coordinate":"33.345,566.33","createTime":"2024-11-07 19:47:00","desc":"温度超过阈值"},"DeviceId":"scro-34","EventId":"alarm","Timestamp":1726540220311}

16进制:38333037007b224d65737361676554797065223a382c2244617461223a7b226465766963654964223a227363726f2d3334222c226c6576656c223a22616c61726d222c22616c61726d54696d65223a22323032342d31312d30372031393a34373a3030222c2266726f6d223a22646576696365222c22616c61726d54797065223a22e8aebee5a487e5918ae8ada6222c22636f6f7264696e617465223a2233332e3334352c3536362e3333222c2263726561746554696d65223a22323032342d31312d30372031393a34373a3030222c2264657363223a22e6b8a9e5baa6e8b685e8bf87e99888e580bc227d2c224465766963654964223a227363726f2d3334222c224576656e744964223a22616c61726d222c2254696d657374616d70223a313732363534303232303331317d

结果如下:

可以在平台界面看到上报的数据

利用测试工具进行Udp测试,以调用udp
://127.0.0.1:267为例,

测试设备上线

字符串:295\0\0{"MessageType":2,"Headers":{"token":"123456"},"DeviceId":"srco-2555","Timestamp":1726540220311}

说明:第一个字符表示类型,第二个表示消息内容长度

16进制:32393500007b224d65737361676554797065223a322c2248656164657273223a7b22746f6b656e223a22313233343536227d2c224465766963654964223a227372636f2d32353535222c2254696d657374616d70223a313732363534303232303331317d

结果如下:

测试上报属性

字符串:197\0\0{"MessageType":1,"Properties":{"temp":"38.24"},"DeviceId":"srco-2555","Timestamp":1726560007339}

说明:第一个字符表示类型,第二个表示消息内容长度

16进制:31393700007b224d65737361676554797065223a312c2250726f70657274696573223a7b2274656d70223a2233382e3234227d2c224465766963654964223a227372636f2d32353535222c2254696d657374616d70223a313732363536303030373333397d

结果如下:

测试事件

字符串:8301\0{"MessageType":8,"Data":{"deviceId":"srco-2555","level":"alarm","alarmTime":"2024-11-07 19:47:00","from":"device","alarmType":"设备告警","coordinate":"33.345,566.33","createTime":"2024-11-07 19:47:00","desc":"温度超过阈值"},"DeviceId":"srco-2555","EventId":"alarm","Timestamp":1726540220311}

说明:第一个字符表示类型,第二个表示消息内容长度

16进制:38333031007b224d65737361676554797065223a382c2244617461223a7b226465766963654964223a227372636f2d32353535222c226c6576656c223a22616c61726d222c22616c61726d54696d65223a22323032342d31312d30372031393a34373a3030222c2266726f6d223a22646576696365222c22616c61726d54797065223a22e8aebee5a487e5918ae8ada6222c22636f6f7264696e617465223a2233332e3334352c3536362e3333222c2263726561746554696d65223a22323032342d31312d30372031393a34373a3030222c2264657363223a22e6b8a9e5baa6e8b685e8bf87e99888e580bc227d2c224465766963654964223a227372636f2d32353535222c224576656e744964223a22616c61726d222c2254696d657374616d70223a313732363534303232303331317d

结果如下:

可以在平台界面看到上报的数据

七、总结

以上是基于Tcp和UDP网络组件设备接入,现有平台网络组件可以支持TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,tcpclient, 而设备接入支持TCP,UDP,HTTP网络组件,年前尽量完成国标28181和MQTT,也会开始着手开始开发规则引擎,  然后定于11月20日发布1.0测试版平台。也请大家到时候关注捧场。

将Apache Samza作业迁移到Apache Flink作业是一个复杂的任务,因为这两个流处理框架有不同的API和架构。然而,我们可以将Samza作业的核心逻辑迁移到Flink,并尽量保持功能一致。

假设我们有一个简单的Samza作业,它从Kafka读取数据,进行一些处理,然后将结果写回到Kafka。我们将这个逻辑迁移到Flink。

1. Samza 作业示例

首先,让我们假设有一个简单的Samza作业:

// SamzaConfig.java
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.serializers.JsonSerdeFactory;
import org.apache.samza.system.kafka.KafkaSystemFactory;
 
import java.util.HashMap;
import java.util.Map;
 
public class SamzaConfig {
    public static Config getConfig() {
        Map<String, String> configMap = new HashMap<>();
        configMap.put("job.name", "samza-flink-migration-example");
        configMap.put("job.factory.class", "org.apache.samza.job.yarn.YarnJobFactory");
        configMap.put("yarn.package.path", "/path/to/samza-job.tar.gz");
        configMap.put("task.inputs", "kafka.my-input-topic");
        configMap.put("task.output", "kafka.my-output-topic");
        configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
        configMap.put("serializers.registry.json.class", JsonSerdeFactory.class.getName());
        configMap.put("systems.kafka.samza.factory", KafkaSystemFactory.class.getName());
        configMap.put("systems.kafka.broker.list", "localhost:9092");
 
        return new MapConfig(configMap);
    }
}
 
// MySamzaTask.java
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskInit;
import org.apache.samza.task.TaskRun;
import org.apache.samza.serializers.JsonSerde;
 
import java.util.HashMap;
import java.util.Map;
 
public class MySamzaTask implements StreamApplication, TaskInit, TaskRun {
    private JsonSerde<String> jsonSerde = new JsonSerde<>();
 
    @Override
    public void init(Config config, TaskContext context, TaskCoordinator coordinator) throws Exception {
        // Initialization logic if needed
    }
 
    @Override
    public void run() throws Exception {
        MessageCollector collector = getContext().getMessageCollector();
        SystemStream inputStream = getContext().getJobContext().getInputSystemStream("kafka", "my-input-topic");
 
        for (IncomingMessageEnvelope envelope : getContext().getPoll(inputStream, "MySamzaTask")) {
            String input = new String(envelope.getMessage());
            String output = processMessage(input);
            collector.send(new OutgoingMessageEnvelope(getContext().getOutputSystem("kafka"), "my-output-topic", jsonSerde.toBytes(output)));
        }
    }
 
    private String processMessage(String message) {
        // Simple processing logic: convert to uppercase
        return message.toUpperCase();
    }
 
    @Override
    public StreamApplicationDescriptor getDescriptor() {
        return new StreamApplicationDescriptor("MySamzaTask")
                .withConfig(SamzaConfig.getConfig())
                .withTaskClass(this.getClass());
    }
}

现在,让我们将这个Samza作业迁移到Flink:

// FlinkConfig.java
import org.apache.flink.configuration.Configuration;
 
public class FlinkConfig {
    public static Configuration getConfig() {
        Configuration config = new Configuration();
        config.setString("execution.target", "streaming");
        config.setString("jobmanager.rpc.address", "localhost");
        config.setInteger("taskmanager.numberOfTaskSlots", 1);
        config.setString("pipeline.execution.mode", "STREAMING");
        return config;
    }
}
 
// MyFlinkJob.java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 
import java.util.Properties;
 
public class MyFlinkJob {
    public static void main(String[] args) throws Exception {
        // Set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Configure Kafka consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-consumer-group");
 
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-input-topic", new SimpleStringSchema(), properties);
 
        // Add source
        DataStream<String> stream = env.addSource(consumer);
 
        // Process the stream
        DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        });
 
        // Configure Kafka producer
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("my-output-topic", new SimpleStringSchema(), properties);
 
        // Add sink
        processedStream.addSink(producer);
 
        // Execute the Flink job
        env.execute("Flink Migration Example");
    }
}

3. 运行Flink作业

(1)
设置Flink环境
:确保你已经安装了Apache Flink,并且Kafka集群正在运行。

(2)编译和运行:

  • 使用Maven或Gradle编译Java代码。
  • 提交Flink作业到Flink集群或本地运行。
# 编译(假设使用Maven)
mvn clean package
 
# 提交到Flink集群(假设Flink在本地运行)
./bin/flink run -c com.example.MyFlinkJob target/your-jar-file.jar

4. 注意事项

  • 依赖管理
    :确保在
    pom.xml

    build.gradle
    中添加了Flink和Kafka的依赖。
  • 序列化
    :Flink使用
    SimpleStringSchema
    进行简单的字符串序列化,如果需要更复杂的序列化,可以使用自定义的序列化器。
  • 错误处理
    :Samza和Flink在错误处理方面有所不同,确保在Flink中适当地处理可能的异常。
  • 性能调优
    :根据实际需求对Flink作业进行性能调优,包括并行度、状态后端等配置。

这个示例展示了如何将一个简单的Samza作业迁移到Flink。

【1】引言(完整代码在最后面)

在鸿蒙NEXT系统中,开发一个有趣且实用的转盘应用不仅可以提升用户体验,还能展示鸿蒙系统的强大功能。本文将详细介绍如何使用鸿蒙NEXT系统开发一个转盘应用,涵盖从组件定义到用户交互的完整过程。

【2】环境准备

电脑系统:windows 10

开发工具:DevEco Studio NEXT Beta1 Build Version: 5.0.3.806

工程版本:API 12

真机:mate60 pro

语言:ArkTS、ArkUI

【3】难点分析

1. 扇形路径的计算

难点:创建扇形的路径需要精确计算起始点、结束点和弧线参数。尤其是涉及到三角函数的使用,初学者可能会对如何将角度转换为坐标感到困惑。

解决方案:可以通过绘制简单的示意图来帮助理解扇形的构造,并在代码中添加详细注释,解释每一步的计算过程。

2. 动态角度计算

难点:在转盘旋转时,需要根据单元格的比例动态计算每个单元格的角度和旋转角度。这涉及到累加和比例计算,可能会导致逻辑错误。

解决方案:使用数组的 reduce 方法来计算总比例,并在计算每个单元格的角度时,确保逻辑清晰。可以通过单元测试来验证每个单元格的角度是否正确。

3. 动画效果的实现

难点:实现转盘的旋转动画需要对动画的持续时间、曲线和结束后的状态进行管理。初学者可能会对如何控制动画的流畅性和效果感到困惑。

解决方案:可以参考鸿蒙NEXT的动画文档,了解不同的动画效果和参数设置。通过逐步调试,观察动画效果并进行调整。

4. 用户交互的处理

难点:处理用户点击事件,尤其是在动画进行时,如何禁用按钮以防止重复点击,可能会导致状态管理的复杂性。

解决方案:在按钮的点击事件中,使用状态变量(如 isAnimating)来控制按钮的可用性,并在动画结束后恢复按钮的状态。

5. 组件的状态管理

难点:在多个组件之间传递状态(如当前选中的单元格、转盘的角度等)可能会导致状态管理混乱。

解决方案:使用状态管理工具(如 @State 和 @Trace)来确保状态的统一管理,并在需要的地方进行状态更新,保持组件之间的解耦。

【完整代码】

import { CounterComponent, CounterType } from '@kit.ArkUI'; // 导入计数器组件和计数器类型

// 定义扇形组件
@Component
struct Sector {
  @Prop radius: number; // 扇形的半径
  @Prop angle: number; // 扇形的角度
  @Prop color: string; // 扇形的颜色

  // 创建扇形路径的函数
  createSectorPath(radius: number, angle: number): string {
    const centerX = radius / 2; // 计算扇形中心的X坐标
    const centerY = radius / 2; // 计算扇形中心的Y坐标
    const startX = centerX; // 扇形起始点的X坐标
    const startY = centerY - radius; // 扇形起始点的Y坐标
    const halfAngle = angle / 4; // 计算半个角度

    // 计算扇形结束点1的坐标
    const endX1 = centerX + radius * Math.cos((halfAngle * Math.PI) / 180);
    const endY1 = centerY - radius * Math.sin((halfAngle * Math.PI) / 180);

    // 计算扇形结束点2的坐标
    const endX2 = centerX + radius * Math.cos((-halfAngle * Math.PI) / 180);
    const endY2 = centerY - radius * Math.sin((-halfAngle * Math.PI) / 180);

    // 判断是否为大弧
    const largeArcFlag = angle / 2 > 180 ? 1 : 0;
    const sweepFlag = 1; // 设置弧线方向为顺时针

    // 生成SVG路径命令
    const pathCommands =
      `M${startX} ${startY} A${radius} ${radius} 0 ${largeArcFlag} ${sweepFlag} ${endX1} ${endY1} L${centerX} ${centerY} L${endX2} ${endY2} A${radius} ${radius} 0 ${largeArcFlag} ${1 -
        sweepFlag} ${startX} ${startY} Z`;
    return pathCommands; // 返回路径命令
  }

  // 构建扇形组件
  build() {
    Stack() {
      // 创建第一个扇形路径
      Path()
        .width(`${this.radius}px`) // 设置宽度为半径
        .height(`${this.radius}px`) // 设置高度为半径
        .commands(this.createSectorPath(this.radius, this.angle)) // 设置路径命令
        .fillOpacity(1) // 设置填充透明度
        .fill(this.color) // 设置填充颜色
        .strokeWidth(0) // 设置边框宽度为0
        .rotate({ angle: this.angle / 4 - 90 }); // 旋转扇形

      // 创建第二个扇形路径
      Path()
        .width(`${this.radius}px`) // 设置宽度为半径
        .height(`${this.radius}px`) // 设置高度为半径
        .commands(this.createSectorPath(this.radius, this.angle)) // 设置路径命令
        .fillOpacity(1) // 设置填充透明度
        .fill(this.color) // 设置填充颜色
        .strokeWidth(0) // 设置边框宽度为0
        .rotate({ angle: 180 - (this.angle / 4 - 90) }); // 旋转扇形
    }
  }
}

// 定义单元格类
@ObservedV2
class Cell {
  @Trace angle: number = 0; // 扇形的角度
  @Trace title: string; // 当前格子的标题
  @Trace color: string; // 背景颜色
  @Trace rotate: number = 0; // 在转盘要旋转的角度
  angleStart: number = 0; // 轮盘所在区间的起始
  angleEnd: number = 0; // 轮盘所在区间的结束
  proportion: number = 0; // 所占比例

  // 构造函数
  constructor(proportion: number, title: string, color: string) {
    this.proportion = proportion; // 设置比例
    this.title = title; // 设置标题
    this.color = color; // 设置颜色
  }
}

// 定义转盘组件
@Entry
@Component
struct Wheel {
  @State cells: Cell[] = []; // 存储单元格的数组
  @State wheelWidth: number = 600; // 转盘的宽度
  @State currentAngle: number = 0; // 当前转盘的角度
  @State selectedName: string = ""; // 选中的名称
  isAnimating: boolean = false; // 动画状态
  colorIndex: number = 0; // 颜色索引
  colorPalette: string[] = [ // 颜色调色板
    "#26c2ff",
    "#978efe",
    "#c389fe",
    "#ff85bd",
    "#ff7051",
    "#fea800",
    "#ffcf18",
    "#a9c92a"
  ];

  // 组件即将出现时调用
  aboutToAppear(): void {
    // 初始化单元格
    this.cells.push(new Cell(1, "跑步", this.colorPalette[this.colorIndex++ % this.colorPalette.length]));
    this.cells.push(new Cell(2, "跳绳", this.colorPalette[this.colorIndex++ % this.colorPalette.length]));
    this.cells.push(new Cell(1, "唱歌", this.colorPalette[this.colorIndex++ % this.colorPalette.length]));
    this.cells.push(new Cell(4, "跳舞", this.colorPalette[this.colorIndex++ % this.colorPalette.length]));

    this.calculateAngles(); // 计算角度
  }

  // 计算每个单元格的角度
  private calculateAngles() {
    // 根据比例计算总比例
    const totalProportion = this.cells.reduce((sum, cell) => sum + cell.proportion, 0);
    this.cells.forEach(cell => {
      cell.angle = (cell.proportion * 360) / totalProportion; // 计算每个单元格的角度
    });

    let cumulativeAngle = 0; // 累计角度
    this.cells.forEach(cell => {
      cell.angleStart = cumulativeAngle; // 设置起始角度
      cumulativeAngle += cell.angle; // 更新累计角度
      cell.angleEnd = cumulativeAngle; // 设置结束角度
      cell.rotate = cumulativeAngle - (cell.angle / 2); // 计算旋转角度
    });
  }

  // 构建转盘组件
  build() {
    Column() {
      Row() {
        Text('转盘').fontSize(20).fontColor("#0b0e15"); // 显示转盘标题
      }.width('100%').height(44).justifyContent(FlexAlign.Center); // 设置行的宽度和高度

      // 显示当前状态
      Text(this.isAnimating ? '旋转中' : `${this.selectedName}`).fontSize(20).fontColor("#0b0e15").height(40);

      Stack() {
        Stack() {
          // 遍历每个单元格并绘制扇形
          ForEach(this.cells, (cell: Cell) => {
            Stack() {
              Sector({ radius: lpx2px(this.wheelWidth) / 2, angle: cell.angle, color: cell.color }); // 创建扇形
              Text(cell.title).fontColor(Color.White).margin({ bottom: `${this.wheelWidth / 1.4}lpx` }); // 显示单元格标题
            }.width('100%').height('100%').rotate({ angle: cell.rotate }); // 设置宽度和高度,并旋转
          });
        }
        .borderRadius('50%') // 设置圆角
        .backgroundColor(Color.Gray) // 设置背景颜色
        .width(`${this.wheelWidth}lpx`) // 设置转盘宽度
        .height(`${this.wheelWidth}lpx`) // 设置转盘高度
        .rotate({ angle: this.currentAngle }); // 旋转转盘

        // 创建指针
        Polygon({ width: 20, height: 10 })
          .points([[0, 0], [10, -20], [20, 0]]) // 设置指针的点
          .fill("#d72b0b") // 设置指针颜色
          .height(20) // 设置指针高度
          .margin({ bottom: '140lpx' }); // 设置指针底部边距

        // 创建开始按钮
        Button('开始')
          .fontColor("#c53a2c") // 设置按钮字体颜色
          .borderWidth(10) // 设置按钮边框宽度
          .borderColor("#dd2218") // 设置按钮边框颜色
          .backgroundColor("#fde427") // 设置按钮背景颜色
          .width('200lpx') // 设置按钮宽度
          .height('200lpx') // 设置按钮高度
          .borderRadius('50%') // 设置按钮为圆形
          .clickEffect({ level: ClickEffectLevel.LIGHT }) // 设置点击效果
          .onClick(() => { // 点击按钮时的回调函数
            if (this.isAnimating) { // 如果正在动画中,返回
              return;
            }
            this.selectedName = ""; // 清空选中的名称
            this.isAnimating = true; // 设置动画状态为正在动画
            animateTo({ // 开始动画
              duration: 5000, // 动画持续时间为5000毫秒
              curve: Curve.EaseInOut, // 动画曲线为缓入缓出
              onFinish: () => { // 动画完成后的回调
                this.currentAngle %= 360; // 保持当前角度在0到360之间
                for (const cell of this.cells) { // 遍历每个单元格
                  // 检查当前角度是否在单元格的角度范围内
                  if (360 - this.currentAngle >= cell.angleStart && 360 - this.currentAngle <= cell.angleEnd) {
                    this.selectedName = cell.title; // 设置选中的名称为当前单元格的标题
                    break; // 找到后退出循环
                  }
                }
                this.isAnimating = false; // 设置动画状态为未动画
              },
            }, () => { // 动画进行中的回调
              this.currentAngle += (360 * 5 + Math.floor(Math.random() * 360)); // 更新当前角度,增加随机旋转
            });
          });
      }

      // 创建滚动区域
      Scroll() {
        Column() {
          // 遍历每个单元格,创建输入框和计数器
          ForEach(this.cells, (item: Cell, index: number) => {
            Row() {
              // 创建文本输入框,显示单元格标题
              TextInput({ text: item.title })
                .layoutWeight(1) // 设置输入框占据剩余空间
                .onChange((value) => { // 输入框内容变化时的回调
                  item.title = value; // 更新单元格标题
                });
              // 创建计数器组件
              CounterComponent({
                options: {
                  type: CounterType.COMPACT, // 设置计数器类型为紧凑型
                  numberOptions: {
                    label: `当前占比`, // 设置计数器标签
                    value: item.proportion, // 设置计数器初始值
                    min: 1, // 设置最小值
                    max: 100, // 设置最大值
                    step: 1, // 设置步长
                    onChange: (value: number) => { // 计数器值变化时的回调
                      item.proportion = value; // 更新单元格的比例
                      this.calculateAngles(); // 重新计算角度
                    }
                  }
                }
              });
              // 创建删除按钮
              Button('删除').onClick(() => {
                this.cells.splice(index, 1); // 从单元格数组中删除当前单元格
                this.calculateAngles(); // 重新计算角度
              });
            }.width('100%').justifyContent(FlexAlign.SpaceBetween) // 设置行的宽度和内容对齐方式
            .padding({ left: 40, right: 40 }); // 设置左右内边距
          });
        }.layoutWeight(1); // 设置滚动区域占据剩余空间
      }.layoutWeight(1) // 设置滚动区域占据剩余空间
      .margin({ top: 20, bottom: 20 }) // 设置上下外边距
      .align(Alignment.Top); // 设置对齐方式为顶部对齐

      // 创建添加新内容按钮
      Button('添加新内容').onClick(() => {
        // 向单元格数组中添加新单元格
        this.cells.push(new Cell(1, "新内容", this.colorPalette[this.colorIndex++ % this.colorPalette.length]));
        this.calculateAngles(); // 重新计算角度
      }).margin({ top: 20, bottom: 20 }); // 设置按钮的上下外边距
    }
    .height('100%') // 设置组件高度为100%
    .width('100%') // 设置组件宽度为100%
    .backgroundColor("#f5f8ff"); // 设置组件背景颜色
  }
}