2024年3月

一:背景

1. 讲故事

前几天有位朋友找到我,说他们的API服务程序跑着跑着CPU满了降不下去,让我帮忙看下怎么回事,现在貌似民间只有我一个人专注dump分析,还是申明一下我dump分析是免费的,如果想学习.NET高级调试的分析技术,可以来我的训练营看看,话不多说,dump分析走起!

二:WinDbg分析

1. CPU 真的爆高吗

昨天录了一个小视频,大意就是作为分析师,不要过分的相信客户说的话,他们往往会把你带偏,你要有自己的分析思路和前进方向,所以需要以数据说话,首先用
!tp
观察下线程池。


0:030> !tp
CPU utilization: 60%
Worker Thread: Total: 18 Running: 3 Idle: 15 MaxLimit: 32767 MinLimit: 4
Work Request in Queue: 0
--------------------------------------
Number of Timers: 3
--------------------------------------
Completion Port Thread:Total: 3 Free: 1 MaxFree: 8 CurrentLimit: 1 MaxLimit: 1000 MinLimit: 4

从卦中可以看到当前的
CPU=60%
,这个值说高也不高,说低也不低,接下来观察下这台机器的cpu核心数,可以用
!cpuid
观察。


0:030> !cpuid
CP  F/M/S  Manufacturer     MHz
 0  6,63,2  <unavailable>   2295
 1  6,63,2  <unavailable>   2295
 2  6,63,2  <unavailable>   2295
 3  6,63,2  <unavailable>   2295

真的是无语,做游戏的不都是有钱的主,难道都在降本增效吗?既然到了 60%,说明有两个线程估计脱轨了,接下来就需要观察下那2个托轨线程都在做什么?

2. 脱轨线程在干嘛

要想观察每个线程都在做什么,可以使用
~*e !clrstack
命令即可。


0:030> ~*e !clrstack
OS Thread Id: 0x3eec (30)
        Child SP               IP Call Site
0000001f8fbad610 00007ffd958535c3 System.Collections.Generic.HashSet`1[[System.__Canon, mscorlib]].Contains(System.__Canon)
0000001f8fbad680 00007ffd95372933 System.Web.HttpCookieCollection.EnsureKeyValidated(System.String, System.String)
0000001f8fbad6c0 00007ffd9483fa8d System.Web.HttpCookieCollection.Get(System.String)
0000001f8fbad700 00007ffd3d12b3da xxx.CookieHelper.Read(System.String)
...
OS Thread Id: 0x5cf0 (31)
        Child SP               IP Call Site
0000001f8d27d330 00007ffd958535b8 System.Collections.Generic.HashSet`1[[System.__Canon, mscorlib]].Contains(System.__Canon)
0000001f8d27d3a0 00007ffd95372933 System.Web.HttpCookieCollection.EnsureKeyValidated(System.String, System.String)
0000001f8d27d3e0 00007ffd9483fa8d System.Web.HttpCookieCollection.Get(System.String)
0000001f8d27d420 00007ffd3e2ab6da xxx.CookieHelper.Read(System.String)
...

仔细琢磨了一下卦象后,发现有两个线程都停在
HashSet
上,而且这个集合还是来自于底层的
System.Web
,对一般人来说这个是比较奇葩的现象,但对于我这种有300+分析经验的熟手来说,一眼就看出来了什么问题,对,就是多线程操控 HashSet 导致的死循环,接下来的问题是如何去验证呢?毕竟空口无凭。。。

3. 真的死循环了吗

要想了解有没有真的死循环,需要你对 HashSet 的底层有一个了解,比如说 HashSet 挂链的时候在内部是如何组织的,其实这个我在
.NET高级训练营
里面也做过讲解,也做过演示,接下来切到 31 号线程观察下它的 HashSet 结构。


0:031> !mdso
Thread 31:
Location          Object            Type
------------------------------------------------------------
RCX:              000000200054ffc0  System.Collections.Generic.HashSet`1+Slot[[System.String, mscorlib]][]
RSI:              000000200054ff58  System.Collections.Generic.HashSet`1[[System.String, mscorlib]]
...
0:031> !mdt 000000200054ff58
000000200054ff58 (System.Collections.Generic.HashSet`1[[System.String, mscorlib]])
    m_buckets:000000200054ff98 (System.Int32[], Elements: 3)
    m_slots:000000200054ffc0 (System.Collections.Generic.HashSet`1+Slot[[System.String, mscorlib]][], Elements: 3, ElementMT=00007ffd96666665230)
    m_count:0x0 (System.Int32)
    m_lastIndex:0x0 (System.Int32)
    m_freeList:0xffffffff (System.Int32)
    m_comparer:00000022003d3380 (System.OrdinalComparer)
    m_version:0x6 (System.Int32)
    m_siInfo:NULL (System.Runtime.Serialization.SerializationInfo)
...
0:031> !mdt -e:2 000000200054ffc0
000000200054ffc0 (System.Collections.Generic.HashSet`1+Slot[[System.String, mscorlib]][], Elements: 3, ElementMT=00007ffd96666665230)
[0] (System.Collections.Generic.HashSet`1+Slot[[System.String, mscorlib]]) VALTYPE (MT=00007ffd96666665230, ADDR=000000200054ffd0)
    hashCode:0xffffffff (System.Int32)
    next:0x0 (System.Int32)
    value:NULL (System.__Canon)
[1] (System.Collections.Generic.HashSet`1+Slot[[System.String, mscorlib]]) VALTYPE (MT=00007ffd96666665230, ADDR=000000200054ffe0)
    hashCode:0x3eb5808c (System.Int32)
    next:0xffffffff (System.Int32)
    value:000000200054f8f0 (System.String) Length=6, String="xxx"
[2] (System.Collections.Generic.HashSet`1+Slot[[System.String, mscorlib]]) VALTYPE (MT=00007ffd96666665230, ADDR=000000200054fff0)
    hashCode:0x7e225883 (System.Int32)
    next:0x1 (System.Int32)
    value:000000200054fba0 (System.String) Length=12, String="xxx"

熟悉 HashSet底层的朋友,从上面的卦信息一眼就能看出问题,对,就是这个
next:0x0
,在hashset的挂链中,最后一个节点永远是 -1,如果是 0 的话就相当于指向数组的首元素,最后就是无情死循环了,知道了前因后果之后,接下来就要寻找下到底是什么圣神代码。

4. 到底是什么奇葩代码

这个比较简单,观察下线程栈的托管层代码,然后看源码即可,为了保护客户隐私,我就多注释一点,输出如下:


0:031> !clrstack
OS Thread Id: 0x5cf0 (31)
        Child SP               IP Call Site
0000001f8d27d330 00007ffd958535b8 System.Collections.Generic.HashSet`1[[System.__Canon, mscorlib]].Contains(System.__Canon)
0000001f8d27d3a0 00007ffd95372933 System.Web.HttpCookieCollection.EnsureKeyValidated(System.String, System.String)
0000001f8d27d3e0 00007ffd9483fa8d System.Web.HttpCookieCollection.Get(System.String)
0000001f8d27d420 00007ffd3e2ab6da xxx.CookieHelper.Read(System.String)
...

虽然信息比较少,但卦还是很明朗的,客户写了一个 CookieHelper 封装了 Request.Cookies 操作,那到底怎么封装的呢?仔细阅读代码之后终于发现了,截图如下:

我去,这代码还是挺奇葩的,居然将 Cookies 给了静态变量,静态变量可是一个进程小缓存呢,水落石出之后改发也比较简单,把 static 去掉即可。

三:总结

这种将
Request.Cookies
赋给静态变量的奇葩操作其实蕴含着巨大的安全隐患,会导致多个用户之间串cookie,但以服务器自爆的方式来避免客户端串cookie,真的是不幸中的万幸,哈哈,同时用bug去抑制另一个bug的神操作也真的是大自然的鬼斧神工!
图片名称

补充

之前的文章简单介绍了工作流和Elsa工作流库,这里再补充说明两点

  1. 工作流的使用场景非常广泛,几乎涵盖了所有需要进行业务流程自动化管理的领域。

  2. 学习一个开源库,最简单的方法就是看源码,Elsa的工作流引擎源码非常简单易懂,并且提供了非常丰富的示例代码,举一个例子:审批工作流示例
    .\src\samples\aspnet\Elsa.Samples.AspNet.DocumentApproval

在这里插入图片描述
这个审批流是这样的:
作者发来一个文章,有两个审批人需要全部审批通过,文章才算通过,否则退回。

我们尝试阅读工作流源代码
DocumentApprovalWorkflow.cs
,并运行此项目,用postman发送请求

第一步:

假设这名叫Amanda的作者要发布文章,请求发送后,作者浏览器显示发送成功稍安勿躁之类的提示

同时后台打印作者信息和4个链接,分别是Jack和Lucy两位审批人“通过”和“退回”的url链接

Activities =
{
    new HttpEndpoint
    {
        Path = new("/documents"),
        SupportedMethods = new(new[] { HttpMethods.Post }),
        ParsedContent = new(documentVariable),
        CanStartWorkflow = true
    },
    new WriteLine(context => $"Document received from {documentVariable.Get<dynamic>(context)!.Author.Name}."),
    new WriteHttpResponse
    {
        Content = new("<h1>Request for Approval Sent</h1><p>Your document has been received and will be reviewed shortly.</p>"),
        ContentType = new(MediaTypeNames.Text.Html),
        StatusCode = new(HttpStatusCode.OK),
        ResponseHeaders = new(new HttpHeaders { ["X-Powered-By"] = new[] { "Elsa 3.0" } })
    },

第二步:

Jack觉得文章不错,通过浏览器请求了“通过”链接,而Lucy觉得文章还不够好,需改进,她在浏览器中请求了“退回”链接。

两位审批人的审批结果存储于
approvedVariable
变量中

同时他们的浏览器返回的响应内容:Thanks for the approval 或 Sorry to hear that

    new Fork
    {
        JoinMode = ForkJoinMode.WaitAll,
        Branches =
        {
            // Jack
            new Sequence
            {
                Activities =
                {
                    new WriteLine(context => $"Jack approve url: \n {GenerateSignalUrl(context, "Approve:Jack")}"),
                    new WriteLine(context => $"Jack reject url: \n {GenerateSignalUrl(context, "Reject:Jack")}"),
                    new Fork
                    {
                        JoinMode = ForkJoinMode.WaitAny,
                        Branches =
                        {
                            // Approve
                            new Sequence
                            {
                                Activities =
                                {
                                    new Event("Approve:Jack"),
                                    new SetVariable
                                    {
                                        Variable = approvedVariable,
                                        Value = new(true)
                                    },
                                    new WriteHttpResponse
                                    {
                                        Content = new("Thanks for the approval, Jack!"),
                                    }
                                }
                            },

                            // Reject
                            new Sequence
                            {
                                Activities =
                                {
                                    new Event("Reject:Jack"),
                                    new SetVariable
                                    {
                                        Variable = approvedVariable,
                                        Value = new(false)
                                    },
                                    new WriteHttpResponse
                                    {
                                        Content = new("Sorry to hear that, Jack!"),
                                    }
                                }
                            }
                        }
                    }
                }
            },

            // Lucy
            new Sequence
            {
                Activities =
                {
                    new WriteLine(context => $"Lucy approve url: \n {GenerateSignalUrl(context, "Approve:Lucy")}"),
                    new WriteLine(context => $"Lucy reject url: \n {GenerateSignalUrl(context, "Reject:Lucy")}"),
                    new Fork
                    {
                        JoinMode = ForkJoinMode.WaitAny,
                        Branches =
                        {
                            // Approve
                            new Sequence
                            {
                                Activities =
                                {
                                    new Event("Approve:Lucy"),
                                    new SetVariable
                                    {
                                        Variable = approvedVariable,
                                        Value = new(true)
                                    },
                                    new WriteHttpResponse
                                    {
                                        Content = new("Thanks for the approval, Lucy!"),
                                    }
                                }
                            },

                            // Reject
                            new Sequence
                            {
                                Activities =
                                {
                                    new Event("Reject:Lucy"),
                                    new SetVariable
                                    {
                                        Variable = approvedVariable,
                                        Value = new(false)
                                    },
                                    new WriteHttpResponse
                                    {
                                        Content = new("Sorry to hear that, Lucy!"),
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    },

第三步:

根据
approvedVariable
变量判定文章是否被审核通过。

如果通过则在控制台打印Document document-1 approved!, 否则打印Document document-1 rejected!

    new WriteLine(context => $"Approved: {approvedVariable.Get<bool>(context)}"),
    new If(context => approvedVariable.Get<bool>(context))
    {
        Then = new WriteLine(context => $"Document ${documentVariable.Get<dynamic>(context)!.Id} approved!"),
        Else = new WriteLine(context => $"Document ${documentVariable.Get<dynamic>(context)!.Id} rejected!")
    }
}

Elsa工作流源码还提供了大量的Sample,这里就不一一列举了,

需求描述

根据不同的时间规则,发送下发问卷给客户填写。

下发问卷给用户填写,且填写有超时时间,期间要提醒用户答题,

如果问卷未在规定的时间内作答则,则作废,并提醒用户。

需求分析

我们将需求尽可能分解成为单一职责的功能单元,并定义这些功能单元的输入输出。

下发问卷任务 PublishQuestionnaireActivity

下发问卷是将问卷(Questionnaire)实例化成问卷实例(Survey),问卷实例绑定用户Id,用户在问卷实例上作答。明确输入和输出:

  • 输入:问卷ID
  • 输出:问卷实例对象SurveyDto

通知任务 NotificationActivity

通知在这个需求中需要发送问卷状态,时间等内容给对应的用户,同通至少包含标题和内容。

  • 输入:标题和内容
  • 输出:无

问卷状态跟踪任务 WaitFillInSurveyActivity

这个任务要追踪问卷实例的状态,当问卷实例状态为已完成时,可以继续执行后续任务。

  • 输入:问卷实例ID
  • 输出:无

定时和延时任务

用于延时执行每个下发问卷的时间,等待问卷超时,以及延时发送通知等。

  • 输入:开始日期,延时日期,间隔时间或cron表达式
  • 输出:无

根任务

根任务包含所有的子任务,完成这个任务后,整个流程结束。在这个需求中根任务只需要知道将什么问卷,发送给哪位用户,以及在何时发送这三个问题。

  • 输入:问卷ID,用户ID,发送时间
  • 输出:无

各子任务参数对于他们的根任务是透明的(Invisible),根任务只需要关心是否完成,而不需要知道任务参数。

代码实现

下发问卷活动 PublishQuestionnaireActivity

下发问卷任务可以抽象成为下发问卷活动 PublishQuestionnaireActivity
创建PublishQuestionnaireActivity类并设置输入QuestionnaireId,输出SurveyDto

public class PublishQuestionnaireActivity : Activity<SurveyDto>
{
    public PublishQuestionnaireActivity()
    {

    }
    public PublishQuestionnaireActivity(long questionnaireId)
    {
        QuestionnaireId = new Input<long>(questionnaireId);
    }


    
    public Input<long> QuestionnaireId { get; set; } = default!;
}


重写ExecuteAsync方法,完成问卷下发逻辑

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
    var _surveyAppService = context.GetRequiredService<ISurveyAppService>();
    if (_surveyAppService != null)
    {
        var currentUserId = await context.GetInputValueAsync<Guid>("UserId");
        var survey = await _surveyAppService.PublishAsync(new PublishInput()
        {
            QuestionnaireId = this.QuestionnaireId.Get<long>(context),
            UserId = currentUserId

        }) ?? throw new Exception("创建问卷失败");
        context.SetResult(survey);
    }


    await context.CompleteActivityAsync();

}

如此,其他的任务分别抽象成为相应的活动,这里展示完整代码

通知活动:NotificationActivity

public class NotificationActivity : Activity
{


    public NotificationActivity()
    {

    }
    public NotificationActivity(string title, string content)
    {
        Content = new Input<string>(content);
        Title = new Input<string>(title);
    }

    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
    {
        var notificationManager = context.GetRequiredService<NotificationManager>();

        if (notificationManager != null)
        {
            var title = this.Title.Get(context);
            var content = this.Content.Get(context);
            var currentUserId = await context.GetInputValueAsync<Guid>("UserId");
            var data = new CreatePrivateMessageNotificationEto(currentUserId, title, content);
            await notificationManager.Send(data);
        }


    await context.CompleteActivityAsync();

}

    public Input<string> Title { get; set; } = default!;
    public Input<string> Content { get; set; } = default!;
}

等待问卷完成活动:WaitFillInSurveyActivity

public class WaitFillInSurveyActivity : Activity
{
    public WaitFillInSurveyActivity()
    {

    }
    public WaitFillInSurveyActivity(Func<ExpressionExecutionContext, long?> surveyId)
: this(Expression.DelegateExpression(surveyId))
    {
    }

    public WaitFillInSurveyActivity(long surveyId) => SurveyId = new Input<long>(surveyId);

    public WaitFillInSurveyActivity(Expression expression) => SurveyId = new Input<long>(expression, new MemoryBlockReference());


    /// <inheritdoc />
    protected override ValueTask ExecuteAsync(ActivityExecutionContext context)
    {
        var surveyId = SurveyId.Get(context);
        if (surveyId == default)
        {
            var survey = context.ExpressionExecutionContext.GetLastResult<SurveyDto>();
            surveyId = survey.Id;
        }
        var payload = new WaitFillInSurveyBookmarkPayload(surveyId);
        context.CreateBookmark(new CreateBookmarkArgs
        {
            Payload = payload,
            Callback = Resume,
            BookmarkName = Type,
            IncludeActivityInstanceId = false
        });
        return ValueTask.CompletedTask;
    }

    private async ValueTask Resume(ActivityExecutionContext context)
    {
        await context.CompleteActivityAsync();
    }

    public Input<long> SurveyId { get; set; } = default!;
}

此任务需要等待,我们创建一个Bookmark,注意创建Bookmark时,我们根据问卷实例SurveyId判断是否完成问卷的回答,因此指定
IncludeActivityInstanceId

false
,创建携带SurveyId的Payload类型:

public record WaitFillInSurveyBookmarkPayload(long SurveyId);

在回调
OnResumeAsync
中,我们使用
context.CompleteActivityAsync
来完成任务。

定时和延时活动:

Elsa.Scheduling库提供了用于定时和延时任务的触发器(触发器属于工作流的一种)

在这里插入图片描述


[.NET项目实战] Elsa开源工作流组件应用(二):内核解读
一文 "构建 - 构建活动 "章节 列出了Elsa所有内建的活动。

这里使用Elsa内建的三个触发器:

StartAt 在未来特定的时间戳触发工作流触发器
Delay 延迟执行工作流触发器。
Timer 定期触发工作流触发器。

问卷活动:QuestionnaireActivity

问卷活动是下发问卷,通知,等待填写问卷等活动的父级。

Elsa定义了容器类型的活动Container类型,其中的Activities可以包含其他活动。

在这里插入图片描述

Sequence和Parallel都是容器类型,是Activity的子类,它们分别表示并行和顺序执行。

除此之外我们还需要两个内建活动:

Fork:分支,用于分支并行执行,与Parallel类似,但比它多了一个等待完成功能。

通过ForkJoinMode属性,可以指定分支任务的执行方式,
ForkJoinMode.WaitAny
:等待任意一个任务完成,
ForkJoinMode.WaitAll
:等待所有任务完成。

Fault:故障,用于在工作流执行过程中,遇到异常时,触发故障。并结束工作流。

创建问卷活动类型QuestionnaireActivity,继承自Sequence类型,并设置一些属性,如问卷Id,问卷填写超时时间等。

[可选]Elsa在注册工作流时,Activity对象是会被序列化并存储到WorflowDefinition表中的, 因此这些属性可以被持久化到数据库中。

public class QuestionnaireActivity : Sequence
{
    //可选,用于持久化一些属性
    public TimeSpan Delay { get; set; }
    public DateTime StartAt { get; set; }
    public TimeSpan Interval { get; set; }
    public string Cron { get; set; }
    public TimeSpan Duration { get; set; }
    public long QuestionnaireId { get; set; }
    public TimeSpan FillInTimeout { get; set; } = TimeSpan.FromHours(2);

    public QuestionnaireActivity()
    {

    }   
}

重写构造函数,并设置Activities属性

public QuestionnaireActivity(long questionnaireId, TimeSpan fillInTimeout)
{
    this.QuestionnaireId = questionnaireId;
    this.FillInTimeout = fillInTimeout;
    var currentSurvey = new Variable<SurveyDto>();
    Variables.Add(currentSurvey);
    Activities = new List<IActivity>()
    {
        //流程开始打印
        new WriteLine("问卷流程开始"),

        //下发问卷任务
        new PublishQuestionnaireActivity(QuestionnaireId)
        {
            Name="PublishQuestionnaire",
            Result=new Output<Questionnaire.Survey.Dto.SurveyDto> (currentSurvey)
        },
        //问卷到达提醒             
        new NotificationActivity("新问卷提醒", "您有新的问卷,请查收"),

        //问卷处理分支 
        new Fork
        {
            JoinMode = ForkJoinMode.WaitAny,
            Branches =
            {
                //问卷即将过期提醒 
                new Sequence
                {
                    Activities =
                    {
                        //等待
                        new Delay
                        {
                            Name = "RemindDelay",
                            TimeSpan = new(RemindDelay)
                        },
                        //通知
                        new NotificationActivity("问卷即将超时", "问卷即将超时,请尽快回答")
                    }
                },
                //问卷过期处理以及提醒 
                new Sequence
                {
                    Activities =
                    {
                        //等待
                        new Delay
                        {
                            Name = "TimeoutDelay",
                            TimeSpan = new(FillInTimeout)
                        },
                        //通知
                        new NotificationActivity("问卷已过期", "问卷已过期,请等待工作人员处理"),

                        //处理
                        new Fault()
                        {
                            Message=new ("问卷回答超时")
                        }
                    }
                },
                //问卷状态跟踪 
                new Sequence
                {
                    Activities =
                    {
                        new WriteLine("开始等待问卷提交信号"),
                        new WaitFillInSurveyActivity(context => currentSurvey.Get<SurveyDto>(context)?.Id)

                    }
                }
            }
        },
        //流程结束打印 
        new WriteLine("完成流程结束"),
        new Finish(),
    };
}

创建工作流

现在我们来创建测试工作流,

  1. 添加一个工作流参数UserId,用于各活动中对用户的查询依赖。
  2. 分别实现4个并行任务:延时发送问卷,定时发送问卷,定期间隔发送问卷,根据Cron表达式执行。和一个串行任务
public class Test1Workflow : WorkflowBase
{
    public Guid UserId { get; set; }
    protected override void Build(IWorkflowBuilder workflow)
    {
        var startTime = new Variable<DateTimeOffset>();
        workflow.Inputs.Add(
                    new InputDefinition() { Name = "UserId", Type = typeof(Guid), StorageDriverType = typeof(WorkflowStorageDriver) }
            );
        workflow.WithVariable(startTime);

        workflow.Root = new Sequence
        {
            Activities =
                {
                    new WriteLine("Start"),
                    new SetVariable<DateTimeOffset>
                    {
                        Variable = startTime,
                        Value = new (DateTime.Now )
                    },
                    
                    new Parallel()
                    {
                        Activities =
                        {
                            //并行任务1:延时发送问卷
                            new Sequence()
                            {
                                Activities =
                                {
                                    //问卷1 将在工作流启动后1小时执行

                                    new Delay(TimeSpan.FromHours(1)),
                                    new QuestionnaireActivity(1),
                                }
                            },
                            //并行任务2:定时发送问卷
                            new Sequence()
                            {
                                Activities =
                                {
                                    //问卷2 将在 2024-4-1 08:30:00 执行
                                    new StartAt(new DateTime(2024,4,1,8,30,0)),
                                    new Delay(TimeSpan.FromHours(2)),
                                    new QuestionnaireActivity(2),
                                }
                            },
                            //并行任务3:定期间隔发送问卷
                            new Sequence()
                            {
                                Activities =
                                {
                                    //问卷3 每隔两个小时执行
                                    new Timer(new TimeSpan(2,0,0)),

                                    new Delay(TimeSpan.FromHours(2)),
                                    new QuestionnaireActivity(3),
                                }
                            },
                            //并行任务4:根据Cron表达式执行
                            new Sequence()
                            {
                                Activities =
                                {
                                    //问卷4 每个月的最后一天上午10点执行任务
                                    new Cron(cronExpression:"0 0 10 L * ?"),

                                    new Delay(TimeSpan.FromHours(2)),
                                    new QuestionnaireActivity(4),
                                }
                            },
                            //并行任务5:根据某时间发送问卷
                            new Sequence()
                            {
                                Activities =
                                {
                                    new StartAt(context=> startTime.Get(context).AddMinutes(90)),

                                    new Delay(TimeSpan.FromHours(2)),
                                    new QuestionnaireActivity(5),
                                }
                            },



                            //串行任务
                            new Sequence()
                            {
                                Activities =
                                {
                                    //问卷3 将在工作流启动后2小时执行

                                    new Delay(TimeSpan.FromHours(2)),
                                    new QuestionnaireActivity(3),

                                        //问卷4 将在问卷3完成1天后执行

                                    new Delay(TimeSpan.FromDays(1)),
                                    new QuestionnaireActivity(4),

                                    //问卷5 将在问卷4完成3天后执行

                                    new Delay(TimeSpan.FromDays(3)),
                                    new QuestionnaireActivity(5),
                                }
                            }
                        }
                    },
                    new Finish(),
            },
        };
    }
}

开始工作流

工作流启动参数需设置Input对象


var input = new Dictionary<string, object>
{

    {"UserId", "D1522DBC-5BFC-6173-EB60-3A114454350C"},

};

var startWorkflowOptions = new StartWorkflowRuntimeOptions
{
    Input = input,
    VersionOptions = versionOptions,
    InstanceId = instanceId,
};


// Start the workflow.
var result = await _workflowRuntime.StartWorkflowAsync(workflowDefinition.DefinitionId, startWorkflowOptions);

下面进入喜闻乐见的踩坑填坑环节

TroubleShooting

  1. 在活动中执行异步操作时,会导致报错:

    如下面的代码,执行Excute方法中的 context.CompleteActivityAsync()方法,时报错

在这里插入图片描述

在这里插入图片描述

原因分析:scope资源被提前释放

代码先执行到了112行,scope释放

在这里插入图片描述

解决:带有异步的操作一定要使用ExecuteAsync方法

在这里插入图片描述

  1. delay之后,Workflow的Input无法访问

原因分析:

Delay或其他Schedule类型的Activity,通过创建Bookmark挂起任务,当任务被唤醒时,input被workflowState.Output替换掉,和原先的input不一样了。

在这里插入图片描述

解决:

虽然input被替换了,但数据库的input还在,可以通过workflowInstanceId先取回workflowInstance对象,再通过
instance.WorkflowState.Input.TryGetValue
方法获取原始input值。

可以创建一个一个扩展方法GetInputValueAsync,Delay之后的活动中调用即可。

public static async Task<TValue> GetInputValueAsync<TValue>(this ActivityExecutionContext context, string name)
{
    TValue value;
    if (!context.TryGetWorkflowInput(name, out value))
    {
        var workflowInstanceStore = context.GetRequiredService<IWorkflowInstanceStore>();

        var instance = await workflowInstanceStore.FindAsync(new WorkflowInstanceFilter()
        {
            Id = context.WorkflowExecutionContext.Id

        });
        if (instance != null)
        {
            instance.WorkflowState.Input.TryGetValue(name, out value);
        }
    }

    return value;


}

在Activity中调用:

await context.GetInputValueAsync<Guid>("UserId");

持续更新中...

--完结--

简单使用某个组件很容易,但是一旦要搬到生产上就要考虑各种各样的异常,保证你方案的可靠性,可恢复性就是我们需要思考的问题。今天来聊聊我们部门在 MYSQL 同步到ES的方案设计。

在面对复杂条件查询时,MYSQL往往显得力不从心,一般公司的做法会通过将mysql中的数据同步到ES,之后的查询就通过ES进行查询,ES在面对多条件复杂查询时,能较快的查询出结果集。

在MYSQL数据 到ES中的数据同步 方案设计上,就有多种选择,

1,
最简单的便是直接在业务代码中对数据库进行修改
,插入,删除时,同步修改ES中的数据。 但
这种方案也是最不可靠的一种设计
。在写入MYSQL后,业务服务宕机了,ES数据就会丢失。如果写入ES失败,重试逻辑将会嵌套在业务代码中,业务代码复杂性增加了,并且如果一直失败,要一直重试吗?

所以,对于这种方案,直接pass掉了。

2,
第二种同步方案则是业界用的比较多的同步方案,通过binlog进行同步
,目前业界已经有比较成熟的模拟mysql从库,拉取binlog的组件,例如阿里开源的
canal
。整个同步架构如下所示,canal组件充当mysql从库的角色,将mysql的binlog拉取下来,由客户端从canal拉取消息进行消费,再由客户端主动插入或者更新ES中的数据。


Java 实现压缩图片,视频,音频案例

在 Java 中,要实现视频压缩通常需要使用外部的库或工具,因为 Java 标准库本身并不提供直接的视频处理功能。以下是一些常用的方法和工具来压缩视频:

FFmpeg:

FFmpeg 是一个开源跨平台的多媒体处理工具,可以用来对音频、视频等多媒体数据进行编解码、转换和流处理。你可以通过 Java 调用 FFmpeg 的命令行来实现视频压缩。

Xuggler:

Xuggler 是一个 Java 语言的多媒体处理库,基于 FFmpeg 和 X264 构建,提供了 Java API 来进行视频和音频的处理。你可以使用 Xuggler 来实现视频的压缩和转换。

JCodec:

JCodec 是一个专门用于视频编解码的 Java 库,支持常见的视频编解码格式,包括 H.264、MPEG-4 等。你可以使用 JCodec 来实现视频的压缩和编解码操作。


接下来我们使用FFmpeg实现音频及视频的压缩

导入Maven依赖

<dependency>
    <groupId>com.vaadin.external.google</groupId>
    <artifactId>android-json</artifactId>
    <version>0.0.20131108.vaadin1</version>
    <scope>compile</scope>
</dependency>

参数说明

1. ffmpegPath:FFmpeg可执行文件的路径。
2. "-i", inputFile.getAbsolutePath():指定输入文件的路径。
3. "-c:v", "libx264":指定视频编解码器为libx264。
4. "-crf", "28":设置视频的质量,数值越小,视频质量越高,推荐范围是18-28。
5. "-preset", "fast":设置编码速度和质量的平衡,"fast"为快速编码。
6. "-c:a", "aac":指定音频编解码器为AAC。
7. "-b:a", "64k":设置音频比特率为64kbps。
8. "-movflags", "+faststart":对生成的MP4文件进行优化,使其能够逐步播放。
9. outputFile.getAbsolutePath() + ".mp4":指定输出文件的路径和文件名,同时指定了输出文件的格式为MP4。

压缩视频

//压缩视频
public static void compressVideo(File inputFile, File outputFile) {
	Long startTime = System.currentTimeMillis();
	try {
		String ffmpegPath = "D:\\develop\\ffmpeg-master-latest-win64-gpl\\bin\\ffmpeg.exe"; 
		// FFmpeg可执行文件路径
		// 构建FFmpeg命令
		String[] command = {
			ffmpegPath,
			"-i", inputFile.getAbsolutePath(),
			"-c:v", "libx264",
			"-crf", "28",
			"-preset", "fast",
			"-c:a", "aac",
			"-b:a", "64k",
			"-movflags", "+faststart",
			outputFile.getAbsolutePath() + ".mp4"
		};
		// 创建进程生成器
		ProcessBuilder processBuilder = new ProcessBuilder(command);
		// 重定向进程的输入、输出和错误流
		processBuilder.inheritIO();
		// 启动进程
		Process process = processBuilder.start();

		// 等待进程完成
		process.waitFor();
		Long endTime = System.currentTimeMillis();
		System.out.println("视频压缩完成!用时: " + (endTime - startTime));
	} catch (Exception e) {
		e.printStackTrace();
	}
}

压缩音频

//压缩音频
public static byte[] compressAudio(InputStream inputStream) {
	Long startTime = System.currentTimeMillis();
	try {
		// FFmpeg可执行文件路径
		String[] command = {
				"ffmpeg",
				"-i", "pipe:0",
				"-b:a", "64k",
				"-f", "mp3",
				"pipe:1"
		};
		ProcessBuilder processBuilder = new ProcessBuilder(command);
		// 重定向进程的输入、输出和错误流
		processBuilder.redirectInput(ProcessBuilder.Redirect.PIPE);
		processBuilder.redirectOutput(ProcessBuilder.Redirect.PIPE);
		processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
		// 启动进程
		Process process = processBuilder.start();

		// 将输入流拷贝到进程的输入流中
		Thread copyThread = new Thread(() -> {
			try {
				byte[] buffer = new byte[1024];
				int len;
				while ((len = inputStream.read(buffer)) > 0) {
					process.getOutputStream().write(buffer, 0, len);
				}
				process.getOutputStream().close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
		copyThread.start();

		// 将进程的输出流缓存到字节数组输出流中
		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
		byte[] buffer = new byte[1024];
		int len;
		while ((len = process.getInputStream().read(buffer)) > 0) {
			byteArrayOutputStream.write(buffer, 0, len);
		}

		// 等待输入流拷贝线程完成
		copyThread.join();
		// 等待进程完成
		process.waitFor();
		Long endTime = System.currentTimeMillis();
		System.out.println("音频压缩完成!用时: " + (endTime - startTime));

		// 返回压缩后的音频文件数据
		return byteArrayOutputStream.toByteArray();
	} catch (Exception e) {
		e.printStackTrace();
		return null;
	}
}
实现压缩图片
//压缩图片
public static InputStream compressImage(InputStream inputStream, String outputFormat) {
    BufferedImage image = null;
    try {
        image = ImageIO.read(inputStream);
    } catch (IOException e) {
        e.printStackTrace();
    }

    int newWidth = (int) (image.getWidth() * compressionRatio);
    int newHeight = (int) (image.getHeight() * compressionRatio);

    BufferedImage compressedImage = new BufferedImage(newWidth, newHeight, BufferedImage.TYPE_INT_RGB);

    compressedImage.createGraphics().drawImage(image.getScaledInstance(newWidth, newHeight, Image.SCALE_SMOOTH), 0, 0, null);

    ByteArrayOutputStream out = new ByteArrayOutputStream();

    try {
        ImageIO.write(compressedImage, outputFormat, out);
        out.flush();
    } catch (Exception e) {
        e.printStackTrace();
    }

    ByteArrayInputStream compressedInputStream = new ByteArrayInputStream(out.toByteArray());

    return compressedInputStream;
}


去听了 hzxu 老师的 DRL 课,感觉终于听懂了,记录一下…

相关链接:


0 我们想做什么

我们想最大化的东西:
\(J(\theta) = \mathbb E_\tau[R(\tau)]\)
,其中 R 是轨迹的 reward 求和(或 discount 求和)。

我们希望,期望下的轨迹的 reward 求和(reward discounted 求和)最大。

1 三个数学 trick

①:
\(\nabla_\theta\log z = \frac1z\nabla_\theta z\)

②:
\(\mathbb E_{x\sim p(x)}[f(x)] = \int p(x)f(x)dx\)

③:
\(a/b = [a\cdot p(x)] / [b\cdot p(x)]\)

2 对单个 transition 的 policy gradient

\[\begin{aligned}
\nabla_\theta\mathbb{E}_{a\sim p(a|s;\theta)}[r(a)]& =\nabla_\theta\sum_ap(a\mid s;\theta)r(a) \\
&=\sum_ar(a)\nabla_\theta p(a\mid s;\theta) \\
&=\sum_ar(a)p(a\mid s;\theta)\frac{\nabla_\theta p(a\mid s;\theta)}{p(a\mid s;\theta)} \\
&=\sum_a^ar(a)p(a\mid s;\theta)\nabla_\theta\log p(a\mid s;\theta) \\
&=\mathbb{E}_{a\sim p(a|s;\theta)}[r(a)\nabla_\theta\log p(a\mid s;\theta)]
\end{aligned}
\]

其中,
第一行 把单个 (s,a) 的 reward 期望写为 Σπ(a|s)r(s,a) 的形式;
第二行 认为 r(a) 是不可微分的,去微分 π(a|s);
第三行 在分数线上下 同时塞了一个 π(a|s) (即 p(a|s;θ) );
第四行 因为 d log z = dz/z,原式变成 p(a|s)
\(\nabla\)
p(a|s) 了;
第五行 把 p(a|s) 塞回去,变成了 期望下的 r(s,a)
\(\nabla\)
log π(a|s)。

结论:如果想最大化期望下的 r(s,a),可以把 r(s,a) 放
\(\nabla\)
外面,去对 log π(a|s) 求梯度。

3 对整个 trajectory 的 policy gradient

先计算 trajectory 的概率:

\[p(\tau\mid\theta)=\underbrace{\mu(s_0)}_{\text{initial state distribution}} \cdot \prod_{t=0}^{T-1}[\underbrace{\pi(a_t\mid s_t,\theta)}_{\text{policy}}\cdot\underbrace{p(s_{t+1},r_t\mid s_t,a_t)}_{\text{transition fn.}}]
\\

\]

然后,对单个 transition,我们有

\[\nabla_\theta\mathbb{E}_{x\sim p(x|s;\theta)}[r(x)]=\mathbb{E}_{x\sim p(x|s;\theta)}[r(x)\nabla_\theta\log p(x\mid s;\theta)]
\]

对于整个 trajectory 的 total reward 的梯度,应用跟 2 相同的方法(分数线上下同乘 p(τ|theta) ),可以得到

\[\nabla_\theta\mathbb{E}_\tau[R(\tau)]=\mathbb{E}_\tau[\underbrace{\nabla_\theta\log p(\tau\mid\theta)}_{\text{What is this?}}\underbrace{R(\tau)}_{\text{Reward of a trajectory}}]
\]

现在,让我们来看
\(\nabla_\theta\log p(\tau\mid\theta)\)

\[\begin{aligned}
\log p(\tau\mid\theta)& =\log\mu(s_0)+\log\prod_{t=0}^{T-1}[\pi(a_t\mid s_t,\theta)\cdot p(s_{t+1},r_t\mid s_t,a_t)] \\
&=\log\mu(s_0)+\sum_{t=0}^{T-1}\log[\pi(a_t\mid s_t,\theta)\cdot p(s_{t+1},r_t\mid s_t,a_t)] \\
&=\log\mu(s_0)+\sum_{t=0}^{T-1}[\log\pi(a_t\mid s_t,\theta)+\log p(s_{t+1},r_t\mid s_t,a_t)] \\
\end{aligned}
\]

其中,
第一行 是把 trajectory 的概率展开;
第二行 第三行 都是把 log(A×B) 变成 logA + logB;
然后发现,只有中间这一项
\(\sum_{t=0}^{T-1}\log\pi(a_t\mid s_t,\theta)\)
带 θ,因此,前后两项都不用跟 θ 求梯度了。

由此,我们得到:

\[\nabla_\theta\mathbb{E}_\tau[R(\tau)]=\mathbb{E}_\tau\left[R(\tau)\nabla_\theta\sum_{t=0}^{T-1}\log\pi(a_t\mid s_t,\theta)\right]
\]

结论:如果想最大化期望下的 R(τ),可以把 R(τ) 放
\(\nabla\)
外面,去求 Σ
\(\nabla\)
log π(a|s) ,即 log [action 概率] 的梯度。

4 REINFORCE 算法

  • 使用策略 π(a|s;θ),生成一个 trajectory:
    \((s_0, a_0, r_1, ..., s_{T-1}, a_{T-1}, r_T)\)
  • 对每个时间步 t,计算回报:
    \(R_t = \sum_{k=t+1}^{T} γ^{k-t-1} r_k\)
  • 更新策略参数:
    \(θ = θ + α γ^t R_t ∇_θ log π(a_t|s_t;θ)\)

(算法是 GPT 生成的,看起来好像没问题)