2024年4月
C#开发的绑定类型默认应用例子 - 开源研究系列文章
这次在用C#编写一个看图软件小工具,然后其它的基本完成了,就是绑定看图软件到那些个图片扩展名的时候碰到了问题,就是如何将看图软件绑定图片文件的默认应用,以及解绑默认应用。这个涉及到注册表操作,但是找度娘和AI回答,都没得到良好的回复。于是就根据AI的提示,自己研究了下注册表中的操作,终于,在CSDN的一个帖子里找到了这个函数,于是就有了此博文。
此博文主要介绍绑定默认应用及解绑操作,提供了例子,请其他需要的读者自己复用此例子的代码。
1、
项目目录;
2、
源码介绍;
1) 判断当前应用是否为该扩展名默认应用;
2) 绑定当前应用为某扩展名默认应用;
3) 解除绑定当前应用;
3、
运行界面;
点击选中扩展名,然后点设置绑定;勾掉点设置解除绑定;
4、
使用介绍;
1) 将操作类库复制到其它项目中;
2) 参考此例子中的用法,将代码复用过去;
5、
源码下载;
提供源码下载:
https://download.csdn.net/download/lzhdim/89092832
6、
其它建议;
这个例子已经能够解决绝大部分的问题了,具体请读者自己进行扩展应用。
上面对C#的默认应用的操作记录了一个例子,虽然简单,但是过程还是具有一定的难度的,特别是当前Windows 11操作系统了,以前的那些个注册表操作不定能够使用,希望此博文能够帮助到那些需要的读者。
聊聊ChatGLM3多用户并发API调用的问题
背景
目前在公司内部4张A10的GPU服务器上部署了ChatGLM3开源模型;然后部署了官方默认的
web_demo
、
api_demo
两种模式;重新设计了前端,支持H5和安卓两个客户端调用。但却发现了不能并发访问的问题。
问题现象
在安卓与H5同时调用ChatGLM的API接口(流式接口)时,其中有一个客户端的返回是正常的,而另一个客户端返回却是乱码(解码后是空数据),同时模型报错。报错内容与问题请看
issue
。
官方回复如下:
后来我测试用多卡部署模型,比如3卡,此时可以支持3个以下的用户调用,但再多就不行了。
问题分析
由于非AI相关科班出身也不是专门做这个的,因此一下子还有点棘手;后来在智谱AI开放平台的
使用指南-速率限制指南
一文中,发现其支持并发调用,只是说有并发数限制。因此我分析来说,应该是放出来的模型与开放平台上的模型有一定的区别,而这个区别就在于模型的并发能力。毕竟外部API调用时,最终还是调用模型内部的流式接口/非流式接口。也就是说,这个模型内部的接口并不支持并行计算。
从模型的内部来说,其是transformer神经网络结构,但其并发能力却没有这么简单,毕竟模型涉及到的计算量是巨大的。归根来说,还是transformer的并行计算能力。
后来找到个遇到同样情况的博文,不过和我们的部署方式还是有区别的。
mosec部署chatglm2-6B
一文中分析了下其遇到的问题与解决方案,至此我大概也清楚了并发调用模型API时为什么会返回乱码(空数据)。
原因与解决策略
当并发调用时,其中模型已经处理完了一个request后,返回的tensor识别了eos_token,模型会认为已经处理完了所有的request,因此返回空数据。
那么从这里来说的话,我暂时想到的解决策略:模型内部按batch来处理request。
这个代码不好改,应该有开源的实现和解决策略。后来我又想到了LLaMA-Factory这个微调框架,他们也是有api_demo的,应该也会遇到这样的问题,因此提了个Issue,还好最终有另外的解,见
issue
。
LLaMA-Factory官方通过vllm实现了并发流式,暂时还没验证,简单看了下代码,理论上是冒得问题的:
首发于个人公众号
实战:如何优雅的从 Skywalking 切换到 OpenTelemetry
背景
最近公司将我们之前使用的链路工具切换为了
OpenTelemetry
.
我们的技术栈是:
OTLP
Client──────────►Collect────────►StartRocks
(Agent) ▲
│
│
Jaeger
其中客户端使用 OpenTelemetry 提供的 Java Agent 进行埋点收集数据,再由 Agent 通过 OTLP(OpenTelemetry Protocol) 协议将数据发往 Collector,在
Collector
中我们可以自行任意处理数据,并决定将这些数据如何存储(这点在以往的 SkyWalking 体系中是很难自定义的)
这里我们将数据写入 StartRocks 中,供之后的 UI 层进行查看。
OpenTelemetry
是可观测系统的新标准,基于它可以兼容以前使用的 Prometheus、 victoriametrics、skywalking 等系统,同时还可以灵活扩展,不用与任何但一生态或技术栈进行绑定。
更多关于 OTel 的内容会在今后介绍。
难点
其中有一个关键问题就是:如何在线上进行
无缝切换
。
虽然我们内部的发布系统已经支持重新发布后就会切换到新的链路,也可以让业务自行发布然后逐步的切换到新的系统,这样也是最保险的方式。
但这样会有几个问题:
- 当存在调用依赖的系统没有全部切换为新链路时,再查询的时候就会出现断层,整个链路无法全部串联起来。
- 业务团队没有足够的动力去推动发布,可能切换的周期较长。
所以最好的方式还是由我们在后台统一发布,对外没有任何感知就可以一键全部切换为 OpenTelemetry。
仔细一看貌似也没什么难的,无非就是模拟用户点击发布按钮而已。
但这事由我们自动来做就不一样了,用户点击发布的时候会选择他们认为可以发布的分支进行发布,我们不能自作主张的比如选择 main 分支,有可能只是合并了但还不具备发布条件。
所以保险的方式还是得用当前项目上一次发布时所使用的 git hash 值重新打包发布。
但这也有几个问题:
- 重复打包发布太慢了,线上几十上百个项目,每打包发布一次就得几分钟,虽然可以并发,但考虑到 kubernetes 的压力也不能调的太高。
- 保不准业务镜像中有单独加入一些环境变量,这样打包可能会漏。
切换方案
所以思来想去最保险的方法还是将业务镜像拉取下来,然后手动删除镜像中的 skywalking 包以及 JVM 参数,全部替换为 OpenTelemetry 的包和 JVM 参数。
整体的方案如下:
- 遍历 namespace 的
pod >0
的 deployment - 遍历 deployment 中的所有 container,获得业务镜像
- 跳过 istio 和日志采集 container,获取到业务容器
- 判断该容器是否需要替换,其实就是判断环境变量中是否有 skywalking ,如果有就需要替换。
- 获取业务容器的镜像
- 基于该 Image 重新构建一个 OpenTelemetry 的镜像
3.1 新的镜像包含新的启动脚本.
3.1.1 新的启动脚本中会删除原有的 skywalking agent
3.2 新镜像会包含 OpenTelemetry 的 jar 包以及我们自定义的 OTel 扩展包
3.3 替换启动命令为新的启动脚本 - 修改 deployment 中的 JVM 启动参数
- 修改 deployment 的镜像后滚动更新
- 开启一个 goroutine 定时检测更新之后是否启动成功
- 如果长时间 (比如五分钟) 都没有启动成功,则执行回滚流程
具体代码
因为需要涉及到操作 kubernetes,所以整体就使用 Golang 实现了。
遍历 deployment 得到需要替换的容器镜像
func ProcessDeployment(ctx context.Context, finish []string, deployment v1.Deployment, clientSet kubernetes.Interface) error {
deploymentName := deployment.Name
for _, s := range finish {
if s == deploymentName {
klog.Infof("Skip finish deployment:%s", deploymentName)
return nil
}
}
// Write finish deployment name to a file
defer writeDeploymentName2File(deploymentName, fmt.Sprintf("finish-%s.log", deployment.Namespace))
appName := deployment.GetObjectMeta().GetLabels()["appName"]
klog.Infof("Begin to process deployment:%s, appName:%s", deploymentName, appName)
upgrade, err := checkContainIstio(ctx, deployment, clientSet)
if err != nil {
return err
}
if upgrade == false {
klog.Infof("Don't have istio, No need to upgrade deployment:%s appName:%s", deploymentName, appName)
return nil
}
for i, container := range deployment.Spec.Template.Spec.Containers {
if strings.HasPrefix(deploymentName, container.Name) {
// Check if container has sw jvm
for _, envVar := range container.Env {
if envVar.Name == "CATALINA_OPTS" {
if !strings.Contains(envVar.Value, "skywalking") {
klog.Infof("Skip upgrade don't have sw jvm deployment:%s container:%s", deploymentName, container.Name)
return nil
}
}
}
upgrade(container)
// Check newDeployment status
go checkNewDeploymentStatus(ctx, clientSet, newDeployment)
// delete from image
deleteImage(container.Image)
}
}
return nil
}
这个函数需要传入一个 deployment ,同时还有一个已经完成了的列表进来。
已完成列表用于多次运行的时候可以快速跳过已经执行的 deployment。
checkContainIstio()
函数很简单,判断是否包含了 Istio 容器,如果没有包含说明不是后端应用(可能是前端、大数据之类的任务),就可以直接跳过了。
而判断是否需要替换的前提这事判断环境变量
CATALINA_OPTS
中是否包含了 skywalking 的内容,如果包含则说明需要进行替换。
Upgrade 核心函数
func upgrade(container Container){
klog.Infof("Begin to upgrade deployment:%s container:%s", deploymentName, container.Name)
newImageName := fmt.Sprintf("%s-otel-%s", container.Image, generateRandomString(4))
err := BuildNewOtelImage(container.Image, newImageName)
if err != nil {
return err
}
// Update deployment jvm ENV
for e, envVar := range container.Env {
if envVar.Name == "CATALINA_OPTS" {
otelJVM := replaceSWAgent2OTel(envVar.Value, appName)
deployment.Spec.Template.Spec.Containers[i].Env[e].Value = otelJVM
}
}
// Update deployment image
deployment.Spec.Template.Spec.Containers[i].Image = newImageName
newDeployment, err := clientSet.AppsV1().Deployments(deployment.Namespace).Update(ctx, &deployment, metav1.UpdateOptions{})
if err != nil {
return err
}
klog.Infof("Finish upgrade deployment:%s container:%s", deploymentName, container.Name)
}
这里一共分为以下几部:
- 基于老镜像构建新镜像
- 更新原有的
CATALINA_OPTS
环境变量,也就是替换 skywalking 的参数 - 更新 deployment 镜像,触发滚动更新
构建新镜像
dockerfile = fmt.Sprintf(`FROM %s
COPY %s /home/admin/%s
COPY otel.tar.gz /home/admin/otel.tar.gz
RUN tar -zxvf /home/admin/otel.tar.gz -C /home/admin
RUN rm -rf /home/admin/skywalking-agent
ENTRYPOINT ["/bin/sh", "/home/admin/start.sh"]
`, fromImage, script, script)
idx := strings.LastIndex(newImageName, "/") + 1
dockerFileName := newImageName[idx:]
create, err := os.Create(fmt.Sprintf("Dockerfile-%s", dockerFileName))
if err != nil {
return err
}
defer func() {
create.Close()
os.Remove(create.Name())
}()
_, err = create.WriteString(dockerfile)
if err != nil {
return err
}
cmd := exec.Command("docker", "build", ".", "-f", create.Name(), "-t", newImageName)
cmd.Stdin = strings.NewReader(dockerfile)
if err := cmd.Run(); err != nil {
return err
}
其实这里的重点就是构建这个新镜像,从这个 dockerfile 中也能看出具体的逻辑,也就是上文提到的删除原有的 skywalking 资源同时将新的 OpenTelemetry 资源打包进去。
最后再将这个镜像上传到私服。
其中的替换 JVM 参数也比较简单,直接删除 skywalking 的内容,然后再追加上 OpenTelemetry 需要的参数即可。
定时检测替换是否成功
func checkNewDeploymentStatus(ctx context.Context, clientSet kubernetes.Interface, newDeployment *v1.Deployment) error {
ready := true
tick := time.Tick(10 * time.Second)
for i := 0; i < 30; i++ {
<-tick
originPodList, err := clientSet.CoreV1().Pods(newDeployment.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchLabels: newDeployment.Spec.Selector.MatchLabels,
}),
})
if err != nil {
return err
}
// Check if there are any Pods
if len(originPodList.Items) == 0 {
klog.Infof("No Pod in deployment:%s, Skip", newDeployment.Name)
}
for _, item := range originPodList.Items {
// Check Pod running
for _, status := range item.Status.ContainerStatuses {
if status.RestartCount > 0 {
ready = false
break
}
}
}
klog.Infof("Check deployment:%s namespace:%s status:%t", newDeployment.Name, newDeployment.Namespace, ready)
if ready == false {
break
}
}
if ready == false {
// rollback
klog.Infof("=======Rollback deployment:%s namespace:%s", newDeployment.Name, newDeployment.Namespace)
writeDeploymentName2File(newDeployment.Name, fmt.Sprintf("rollback-%s.log", newDeployment.Namespace))
}
return nil
}
这里会启动一个 10s 执行一次的定时任务,每次都会检测是否有容器发生了重启(正常情况下是不会出现重启的)
如果检测了 30 次都没有重启的容器,那就说明本次替换成功了,不然就记录一个日志文件,然后人工处理。
这种通常是原有的镜像与 OpenTelemetry 不兼容,比如里面写死了一些 skywalking 的 API,导致启动失败。
所以替换任务跑完之后我还会检测这个
rollback-$namespace
的日志文件,人工处理这些失败的应用。
分批处理 deployment
最后讲讲如何单个调用刚才的
ProcessDeployment()
函数。
考虑到不能对 kubernetes 产生影响,所以我们需要限制并发处理 deployment 的数量(我这里的限制是 10 个)。
所以就得分批进行替换,每次替换 10 个,而且其中有一个执行失败就得暂停后续任务,由人工检测失败原因再决定是否继续处理。
毕竟处理的是线上应用,需要小心谨慎。
所以触发的代码如下:
func ProcessDeploymentList(ctx context.Context, data []v1.Deployment, clientSet kubernetes.Interface) error {
file, err := os.ReadFile(fmt.Sprintf("finish-%s.log", data[0].Namespace))
if err != nil {
return err
}
split := strings.Split(string(file), "\n")
batchSize := 10
start := 0
for start < len(data) {
end := start + batchSize
if end > len(data) {
end = len(data)
}
batch := data[start:end]
//等待goroutine结束
var wg sync.WaitGroup
klog.Infof("Start process batch size %d", len(batch))
errs := make(chan error, len(batch))
wg.Add(len(batch))
for _, item := range batch {
d := item
go func() {
defer wg.Done()
if err := ProcessDeployment(ctx, split, d, clientSet); err != nil {
klog.Errorf("!!!Process deployment name:%s error: %v", d.Name, err)
errs <- err
return
}
}()
}
go func() {
wg.Wait()
close(errs)
}()
//任何一个失败就返回
for err := range errs {
if err != nil {
return err
}
}
start = end
klog.Infof("Deal next batch")
}
return nil
}
使用
WaitGroup
来控制一组任务,使用一个 chan 来传递异常;这类分批处理的代码在一些批处理框架中还蛮常见的。
总结
最后只需要查询某个 namespace 下的所有 deployment 列表传入这个批处理函数即可。
不过整个过程中还是有几个点需要注意:
- 因为需要替换镜像的前提是要把现有的镜像拉取到本地,所以跑这个任务的客户端需要有充足的磁盘,同时和镜像服务器的网络条件较好。
- 不然执行的过程会比较慢,同时磁盘占用满了也会影响任务。
其实这个功能依然有提升空间,考虑到后续会升级 OpenTelemetry agent 的版本,甚至也需要增减一些 JVM 参数。
所以最后有一个统一的工具,可以直接升级 Agent,而不是每次我都需要修改这里的代码。
后来在网上看到了得物的相关分享,他们可以远程加载配置来解决这个问题。
这也是一种解决方案,直到我们看到了 OpenTelemetry 社区提供了
Operator
,其中也包含了注入 agent 的功能。
apiVersion: opentelemetry.io/v1alpha1
kind: Instrumentation
metadata:
name: my-instrumentation
spec:
exporter:
endpoint: http://otel-collector:4317
propagators:
- tracecontext
- baggage
- b3
sampler:
type: parentbased_traceidratio
argument: "0.25"
java:
image: private/autoinstrumentation-java:1.32.0-1
我们可以使用他提供的 CRD 来配置我们 agent,只要维护好自己的镜像就好了。
使用起来也很简单,只要安装好了 OpenTelemetry-operator ,然后再需要注入 Java Agent 的 Pod 中使用注解:
instrumentation.opentelemetry.io/inject-java: "true"
operator 就会自动从刚才我们配置的镜像中读取 agent,然后复制到我们的业务容器。
再配置上环境变量
$JAVA_TOOL_OPTIONS=/otel/javaagent.java
, 这是一个 Java 内置的环境变量,应用启动的时候会自动识别,这样就可以自动注入 agent 了。
envJavaToolsOptions = "JAVA_TOOL_OPTIONS"
// set env value
idx := getIndexOfEnv(container.Env, envJavaToolsOptions)
if idx == -1 {
container.Env = append(container.Env, corev1.EnvVar{
Name: envJavaToolsOptions,
Value: javaJVMArgument,
})} else {
container.Env[idx].Value = container.Env[idx].Value + javaJVMArgument
}
// copy javaagent.jar
pod.Spec.InitContainers = append(pod.Spec.InitContainers, corev1.Container{
Name: javaInitContainerName,
Image: javaSpec.Image,
Command: []string{"cp", "/javaagent.jar", javaInstrMountPath + "/javaagent.jar"},
Resources: javaSpec.Resources,
VolumeMounts: []corev1.VolumeMount{{
Name: javaVolumeName,
MountPath: javaInstrMountPath,
}},})
大致的运行原理是当有 Pod 的事件发生了变化(重启、重新部署等),operator 就会检测到变化,此时会判断是否开启了刚才的注解:
instrumentation.opentelemetry.io/inject-java: "true"
接着会写入环境变量
JAVA_TOOL_OPTIONS
,同时将 jar 包从 InitContainers 中复制到业务容器中。
这里使用到了 kubernetes 的初始化容器,该容器是用于做一些准备工作的,比如依赖安装、配置检测或者是等待其他一些组件启动成功后再启动业务容器。
目前这个 operator 还处于使用阶段,同时部分功能还不满足(比如支持自定义扩展),今后有时间也可以分析下它的运行原理。
参考链接:
面试官:Redis如何实现延迟任务?
延迟任务(Delayed Task)是指在未来的某个时间点,执行相应的任务。也就是说,延迟任务是一种计划任务,它被安排在特定的时间后执行,而不是立即执行。
延迟任务的常见使用场景有以下几个:
- 定时发送通知或消息
:
- 发送定时短信、邮件或应用内消息,如注册确认、订单状态更新、促销活动通知等。
- 定时推送新闻、天气预报、股票价格等实时信息。
- 异步处理和后台任务
:
- 将耗时的操作安排为延迟任务,避免阻塞主线程或用户界面,提高系统的响应性能。
- 执行批量数据处理,如日志分析、数据报表生成等。
- 缓存管理和过期处理
:
- 定时清理过期的缓存数据,释放存储空间。
- 更新缓存中的数据,保持数据的时效性和准确性。
- 计划任务和定时调度
:
- 在特定时间执行系统维护任务,如数据库备份、系统更新等。
- 定时启动或关闭服务,以节约资源或满足业务需求。
- 订单和支付处理
:
- 在用户下单后的一段时间内,如果用户未支付,则自动取消订单。
- 定时检查订单的支付状态,并更新相应的订单信息。
- 重试和失败恢复机制
:
- 当某个操作失败时,可以在延迟一段时间后自动重试,以提高成功率。
- 实现分布式锁的超时释放,避免死锁情况。
- 提醒和日程管理
:
- 设置日程提醒,如会议、生日、纪念日等。
- 定时提醒用户完成任务或进行某项活动。
- 定时数据采集和上报
:
- 定期从传感器、设备或外部系统中采集数据。
- 定时上报应用的使用情况、统计数据或用户行为分析。
Redis如何实现延迟任务?
Redis 本身并没有直接提供延迟任务的功能
,但可以通过一些策略和手段,
在 Redis 中手动实现延迟任务
。
使用 Redis 实现延迟任务的主要手段有以下几个:
- 使用过期键的事件通知执行延时任务
:开启过期键通知,当 Redis 中键值过期时触发时间,在事件中实现延迟代码,但因为 Redis 的 Key 过期时不会被及时删除,所以这个过期事件也不保证可以立即触发,所以此方式很少用来实现延迟任务(因为极其不稳定)。 - 使用 ZSet 执行延时任务
:在 ZSet 中插入延迟执行时间和任务 ID,如下命令所示:
ZADD delay_tasks <timestamp> <task_id>
然后,启动一个后台线程或者定时任务,定期通过 ZRANGEBYSCORE 命令从有序集合中获取已到达执行时间的任务,即分数小于或等于当前时间的任务,进行执行即可实现延时任务。
- 使用 Redisson 执行延迟任务
:在 Redisson 框架中,提供了一个 RDelayedQueue 用于实现延迟队列,使用简单方便,推荐使用。
具体实现如下。
1.过期键通知事件实现
Redis 提供了键空间通知功能,当某个键发生变化(过期)时,可以发送通知。你可以结合 EXPIRE 过期命令和键空间通知来实现延迟任务。
当为某个键设置过期时间时,一旦该键过期,Redis 会发送一个通知。你可以订阅这个通知,并在接收到通知时执行任务。但这种方法可能不够精确,且依赖于 Redis 的内部机制。
它的实现步骤是:
- 设置开启 Redis 过期键通知事件,可以通过执行“CONFIG SET notify-keyspace-events KEA”命令来动态开启键空间通知功能,而无需重启 Redis 服务器。
- 设置过期键,可以通过命令“SET mykey "myvalue" EX 3”设置某个 key 3 秒后过期(3s 后执行)。
- 编写一个监听程序来订阅 Redis 的键空间通知。这可以通过使用 Redis 的发布/订阅功能来实现,具体实现代码如下(以 Jedis 框架使用为例):
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisKeyspaceNotifier {
public static void main(String[] args) {
// 创建Jedis实例
Jedis jedis = new Jedis("localhost", 6379);
// 配置键空间通知(通常这一步在Redis配置文件中完成,但也可以在运行时配置)
jedis.configSet("notify-keyspace-events", "KEA");
// 订阅键空间通知
jedis.subscribe(new KeyspaceNotificationSubscriber(), "__keyevent@0__:expired");
}
static class KeyspaceNotificationSubscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received message from channel: " + channel + ", message: " + message);
// 在这里处理接收到的键空间通知
// 例如,如果message是一个需要处理的任务ID,你可以在这里触发相应的任务处理逻辑
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Subscribed to channel: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Unsubscribed from channel: " + channel);
}
}
}
但因为 Redis 的 Key 过期时不会被及时删除,Redis 采用的是惰性删除和定期删除,所以这个过期事件也不保证可以立即触发,所以此方式很少用来实现延迟任务(因为极其不稳定)。
2.使用ZSet实现延迟任务
可以将任务及其执行时间作为成员和分数存储在 ZSET 中,然后,使用一个后台任务(如定时任务或守护进程)定期检查 ZSET,查找分数(即执行时间)小于或等于当前时间的成员,并执行相应的任务。执行后,从 ZSET 中删除该成员,具体实现代码如下:
import redis.clients.jedis.Jedis;
import java.util.Set;
public class RedisDelayedTaskDemo {
private static final String ZSET_KEY = "delayed_tasks";
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
// 添加延迟任务
addDelayedTask(jedis, "task1", System.currentTimeMillis() / 1000 + 5); // 5秒后执行
addDelayedTask(jedis, "task2", System.currentTimeMillis() / 1000 + 10); // 10秒后执行
// 模拟定时任务检查器
new Thread(() -> {
while (true) {
try {
// 检查并执行到期的任务
checkAndExecuteTasks(jedis);
Thread.sleep(1000); // 每秒检查一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
private static void addDelayedTask(Jedis jedis, String task, long executeTime) {
jedis.zadd(ZSET_KEY, executeTime, task);
System.out.println("Added task: " + task + " with execution time: " + executeTime);
}
private static void checkAndExecuteTasks(Jedis jedis) {
long currentTime = System.currentTimeMillis() / 1000;
Set<String> tasks = jedis.zrangeByScore(ZSET_KEY, 0, currentTime);
for (String task : tasks) {
jedis.zrem(ZSET_KEY, task); // 从有序集合中移除任务
executeTask(task); // 执行任务
}
}
private static void executeTask(String task) {
System.out.println("Executing task: " + task);
// 在这里添加实际的任务执行逻辑
}
}
在这个示例中,我们首先使用 addDelayedTask 方法向 Redis 的有序集合中添加任务,并设置它们的执行时间。然后,我们启动一个线程来模拟定时任务检查器,它会每秒检查一次是否有任务到期,并执行到期的任务。
3.使用Redisson执行定时任务
在 Redisson 框架中,提供了一个 RDelayedQueue 用于实现延迟队列,使用简单方便,推荐使用,它的具体实现如下:
import org.redisson.Redisson;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
public class RDelayedQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 创建 Redisson 客户端
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// 获取延迟队列
RDelayedQueue<String> delayedQueue =
redisson.getDelayedQueue("delayedQueue");
// 添加延迟任务
delayedQueue.offer("task1", 5, TimeUnit.SECONDS);
// 监听并处理延迟任务
Thread listenerThread = new Thread(() -> {
while (true) {
try {
// 通过 take 方法等待并获取到期的任务
String task = delayedQueue.take();
System.out.println("Handle task: " + task);
} catch (InterruptedException e) {
break;
}
}
});
listenerThread.start();
}
}
在上述示例中,我们首先创建了一个 Redisson 客户端,通过配置文件指定了使用单节点 Redis 服务器。然后,我们获取一个延迟队列 RDelayedQueue,并添加一个延迟任务,延迟时间为 5 秒,接着,我们通过线程监听并处理延迟队列中的任务。
4.Redis实现延迟任务优缺点分析
优点:
- 轻量级与高性能
:Redis 是一个内存中的数据结构存储系统,因此读写速度非常快。将任务信息存储在 Redis 中可以迅速地进行存取操作。 - 简单易用
:Redis 的 API 简洁明了,易于集成到现有的应用系统中。
缺点:
- 精度有限
:Redis 的延迟任务依赖于系统的定时检查机制,而不是精确的定时器。这意味着任务的执行可能会有一定的延迟,特别是在系统负载较高或检查间隔较长的情况下。 - 功能有限
:与专业的任务调度系统相比,Redis 提供的延迟任务功能可能相对简单。对于复杂的任务调度需求,如任务依赖、任务优先级等,可能需要额外的逻辑来实现。 - 稳定性较差
:使用 Redis 实现延迟任务没有重试机制和 ACK 确认机制,所以稳定性比较差。 - 单点故障风险
:如果没有正确配置 Redis 集群或主从复制,那么单个 Redis 实例的故障可能导致整个延迟任务系统的瘫痪。
课后思考
Redisson 底层是如何实现延迟任务的?
本文已收录到我的面试小站
www.javacn.site
,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。