2024年2月

使用Java的stream中的Collectors可以很方便地做容器间的转换,可以少写很多代码。但是其中有暗含的坑需要注意和避免,本文探讨Collectors.toMap(JDK8版本)。

Collectors.toMap可以将一个流转化成Map,常见于需要将List转换成Map以便于进一步操作的场景,比如在通过RPC接口获取一个返回结果、从DB中查询到匹配的多条数据后,对其按某个字段(经常是主键id)做分组。这里先定义一个简单的类:

public class User {
    private Long id;

    private String name;
}

后续所有代码的目的均为将这个User类组成的List转换为id作为key、name作为value的Map。

1. 不使用stream

不使用stream时,需要先new一个map,然后手动把list的每一项放入map

    public void test0() {
        User user1 = new User();
        user1.setId(1L);
        user1.setName("1");

        User user2 = new User();
        user2.setId(2L);
        user2.setName("2");

        List<User> list = Lists.newArrayList();
        list.add(user1);
        list.add(user2);
        Map<Long, String> map = new HashMap<>();
        for(User user : list) {
            map.put(user.getId(), user.getName());
        }
    }

2. 使用stream的Collectors.toMap

构造list的代码不变,转化map的代码可以简化为

Map<Long, String> map = list.stream().collect(Collectors.toMap(User::getId, User::getName));

相较第一种,简洁多了。
但是,如果User1和User2的id是一样的,会发生什么情况?将代码中
user2.setId(2L)
修改为
user2.setId(1L)
,再次执行,发现抛异常
java.lang.IllegalStateException: Duplicate key 1
,说明在merge时报错了:key不允许重复。

但是有些情况下key确实是可以重复的,比如我调用上游的数据,上游没做校验和控制;再或者这个字段本身不是惟一的,多个数据可能重复。那么如何改进呢?

3. Collectors.toMap指定merge函数

可以自定义一个merge函数来确定key重复时,如何取value。比如下面这种写法,是保留第一个value。你也可以保留第二个,或者是做一些更复杂的处理。

Map<Long, String> map = list.stream().collect(Collectors.toMap(User::getId, User::getName, (x1,x2)->x1));

4. value为null的场景

传入merge方法以后,看似万事大吉了,没想到还有坑。将user2.setName(null),会发现抛了NullPointerException异常,异常栈信息为:

可以看到对应的源码里,value是不允许为null的。

虽然说Map的value是支持null值的,但是map自己的merge方法天生不支持,此时仅靠自定义merge方法也已经无能为力了。如果仍然想使用Collectors.toMap,需要手动处理null的值,比如:

Map<Long, String> map = list.stream().collect(Collectors.toMap(User::getId, value -> Optional.ofNullable(value.getName()).orElse("")));

当然,这样处理后的map的value并不是实际的值,并不适用于所有场景。
这样看来,Collectors.toMap的局限性无法避免了,使用的时候要确认不会发生预期的问题。

当然你也可以做一些预处理,比如使用filter过滤掉value=null的数据,来规避这个问题。

5. key=null时会怎么样?

不会怎么样,一切正常。

public void test5() {
    User user1 = new User();
    user1.setId(null);
    user1.setName("1");

    User user2 = new User();
    user2.setId(2L);
    user2.setName("2");

    List<User> list = Lists.newArrayList();
    list.add(user1);
    list.add(user2);
    Map<Long, String> map = list.stream().collect(Collectors.toMap(User::getId, User::getName));
    System.out.print(map);
}

6. 还有什么要注意的?

上述的例子,均是建立在list不为空的前提下进行的。如果list本身为null,在调用stream()时自然也会抛NullPointerException;如果list是空的(内容为空但是容器本身初始化过,如list = new ArrayList<>()),则不会报错。

小结

  • 调用stream()前先确定容器本身是否为null
  • 如果不确定要通过Collectors.toMap转换为map的源容器的数据
    • 对应key是否会重复,可以在toMap()传入merge方法
    • 对应value是否为null,可以做过滤或指定默认值
  • 如果不想思考和求证,还是继续用for循环吧

一、引例

假设有这样一组数据,它们是腰围和体重一一对应的数据对。我们将根据表中的数据对去估计体重。

表1. 腰围体重表

如果现在给出一个新的腰围 62 ,那么体重的估计值是多少呢?

凭经验,我们认为腰围和体重是正相关的,所以我们会自然地『关注』和 62 差距更小的那些腰围,来去估计体重。也就是更加关注表格中腰围是 60 和 64 的『腰围-体重对』(waistline-weight pairs)。即,我们会估计此人的体重在 110 ~ 115 之间。这是一种定性的分析。

下面我们来算一下具体值。我们选取一种简单直观的方法来计算:
由于 62 距离 60 和 64 的距离是相等的,所以我们取 110 和 115 的平均值作为 62 腰围对应的体重。

\[\frac{110 + 115}{2}=112.5
\]

也可以这样认为,由于 62 距离 60 和 64 是最近的,所以我们更加『注意』它们,又由于 62 到它俩的距离相等,所以我们给这两对『腰围-体重对』各分配 0.5 的权重。

\[0.5\times 110+0.5\times 115=112.5
\]

但是,我们到现在还没有用到过 68 --> 126 这个『腰围-体重对』,我们应该再分一些权重给它,让我们的估计结果更准确。

我们上面的讨论可以总结为公式:$$体重估计值=权重1×体重1+权重2×体重2+权重3×体重3$$

这个权重应该如何计算呢?


二、注意力机制

我们把『腰围-体重对』改写成 Python 语法中(字典)的『键-值对』(key-value pairs),把给出的新腰围 62 叫请求(query),简称
\(q\)
.

现在我们给那些值起了新的名字,所以公式可以写为:$$f(q)=\alpha (q, k_1)\cdot v_1 + \alpha (q, k_2)\cdot v_2 + \alpha (q, k_3)\cdot v_3=\Sigma _{i=1}^{3}\alpha (q, k_i)\cdot v_i$$
这个公式描述了『
注意力机制
』。其中,
\(f(q)\)
表示注意力机制的输出。
\(\alpha (q, k_i)\)
表示『
注意力权重
』。它和
\(q\)

\(k_i\)
的相似度有关,相似度越高,注意力权重越高。
它是如何计算的呢?方法有很多,在本例中,我们使用
高斯核
计算:

\[GS(q, k_i)=e^{-\frac{1}{2}(q-k_i)^2}
\]

我们取
\((-\frac{1}{2}(q-k_i)^2)\)
部分进行下一步计算,并把它叫做『
注意力分数
』。显然,现在这个注意力分数是个绝对值很大的数,没法作为权重使用。所以下面我们要对其进行
归一化
,把注意力分数转换为 [0, 1] 间的
注意力权重
(用
\(\alpha (q, k_i)\)
表示)。本例选用
Softmax
进行归一化:

\[\alpha (q, k_i) = \text{Softmax}(-\frac{1}{2}(q-k_i)^2) = \frac{e^{-\frac{1}{2}(q-k_i)^2}}{\Sigma _{i=1}^{3}e^{-\frac{1}{2}(q-k_i)^2}}
\]

我们发现,好巧不巧地,
\(\alpha (q, k_i)\)
最终又变成高斯核的表达式。

本例中的高斯核计算的相似度为:$$GS(62, 68)= 1.52×10^{-8}$$ $$GS(62, 60)= 0.135$$ $$GS(62, 64)= 0.135$$
\(GS(q, k_1)\)
太小了,我们直接近似为 0 .
注意力权重计算结果为:$$\alpha (62, 68) = 0$$ $$\alpha (62, 60) = 0.5$$ $$\alpha (62, 64) = 0.5$$
体重估计值为:$$f(q) = \alpha (62, 68) \times 126 + \alpha (62, 60) \times 110 + \alpha (62, 64) \times 115 = 112.5$$


三、多维情况


\(q\)
,
\(k\)
,
\(v\)
为多维时


注意力分数
\(\alpha (q, k_i)\)
可以用以下方法计算:

模型 公式
加性模型 \(\alpha(q, k_i) = \text{softmax}(W_q q + W_k k_i + b)\)
点积模型 \(\alpha(q, k_i) = \frac{q \cdot k_i}{\sqrt{d}}\)
缩放点积模型 \(\alpha(q, k_i) = \frac{q \cdot k_i}{\sqrt{d_k}}\)

我们以『
点积模型
』为例

\[q_1=[64, 85]
\]

\[k_1^T= \begin{bmatrix} 68 \\ 91 \end{bmatrix}
\]

则有

\[\alpha(q_1, k_1) = \text{Softmax}(q_1 k_1^T) = 64 \times 68 + 85 \times 91 = 4352 + 7735 = 12087
\]

其他注意力分数同理。
那么现在,多维的
\(f(q)\)
公式可以表示为:

\[f(q)=\Sigma _{i=1}^{3}\alpha (q_i, k_i^T)\cdot v_i = \text{Softmax}(q_i k_i^T)\cdot v_i
\]

为了方便计算,我们写成矩阵形式。

\[Q = \begin{bmatrix}
64 & 85 \\
61 & 80 \\
\end{bmatrix}
\]

\[K^T = \begin{bmatrix}
68 & 60 & 64 \\
91 & 87 & 88 \\
\end{bmatrix}
\]

\[V = \begin{bmatrix}
126 & 180 \\
110 & 172 \\
115 & 170 \\
\end{bmatrix}
\]

\[f(Q)=\text{Softmax}(QK^T)V
\]

为了缓解梯度消失的问题,我们还会除以一个特征维度 $ \sqrt{d_k} $ ,即:

\[f(Q)=\text{Softmax}(QK^T/\sqrt{d_k})V
\]

这一系列操作,被称为『
缩放点积注意力模型
』(scaled dot-product attention)

如果
\(Q\)
,
\(K\)
,
\(V\)
是同一个矩阵,会发生什么?


四、自注意力机制

我们用
\(X\)
表示这三个相同的矩阵:

\[X=Q=K=V=\begin{bmatrix}
67 & 91 \\
60 & 87 \\
64 & 84 \\
\end{bmatrix}\]

则上述的注意力机制表达式可以写成:

\[f(X)=\text{Softmax}(XX^T/\sqrt{d_k})X
\]

这个公式描述了『
自注意力机制
』(Self-Attention Mechanism)。在实际应用中,可能会对
\(X\)
做不同的线性变换再输入,比如 Transformer 模型。这可能是因为
\(X\)
转换空间后,能更加专注注意力的学习。
三个可学习的权重矩阵
\(W_Q\)
,
\(W_K\)
,
\(W_V\)
可以将输入
\(X\)
投影到查询、键和值的空间。

\[f(X)=\text{Softmax}(XW_Q(XW_K)^T/\sqrt{d_k})XW_V
\]

该公式执行以下步骤:

  1. 使用权重矩阵
    \(W_Q\)

    \(W_K\)
    将输入序列
    \(X\)
    投影到查询空间和键空间,得到
    \(XW_Q\)

    \(XW_K\)
  2. 计算自注意力分数:
    \((XW_Q)(XW_K)^T\)
    ,并除以
    \(\sqrt{d_k}\)
    进行缩放。
  3. 对自注意力分数进行 Softmax 操作,得到注意力权重。
  4. 使用权重矩阵
    \(W_V\)
    将输入序列
    \(X\)
    投影到值空间,得到
    \(XW_V\)
  5. 将 Softmax 的结果乘以
    \(XW_V\)
    ,得到最终的输出。

这个带有权重矩阵的自注意力机制允许模型学习不同位置的查询、键和值的映射关系,从而更灵活地捕捉序列中的信息。在Transformer等模型中,这样的自注意力机制广泛用于提高序列建模的效果。


相关概念推荐阅读:
高斯核是什么?

Softmax 函数是什么?
推荐B站视频:注意力机制的本质(BV1dt4y1J7ov),65 注意力分数【动手学深度学习v2】(BV1Tb4y167rb)

webrtc终极版(二)搭建自己的iceserver服务,并用到RTCMultiConnection的demo中



前言

第一篇文章,写了如何再5分钟之内搭建一个webrtc服务,并运行起来,这当然不够,turn-server以及stun-server都掌握在别人手中,实际上,连socket服务,也在别人手中,正常情况下,RTCMultiConnection官方不会停止服务的,但是咱们也不敢保证不是,所以本篇文章,我们就用最详细的方式,来手把手教会大家在centos上搭建自己的stun-server以及turn-server。

看到这里,可能有人要懵了,怎么一会儿是iceserver,一会儿是stunserver,一会儿又是turnserver,下面我们先讲解下三者的定义以及作用,然后再直接进行实操,实际上,你可以这样理stunserver+turnserver构成iceserver。

整个webrtc系列,其实搭建iceserver是最为麻烦的,为什么这么说呢,其实如何搭建,网上一搜一大把,你根据文章去实操,也没有问题,但是实际应用起来,就是不能用,我就是这样的,找了很多问题都找不到,折腾了两天才算搞定,这篇文章除了提醒大家再搭建iceserver的最重要的注意事项外,还有个目的是为了防止我自己时间长了也别忘了,下面我们就开始步入正题


一、stunserver,turnserver,iceserver是什么?

ICE:
(Interactive Connectivity Establishment)是一种框架和协议,用于在网络中建立可靠的实时通信连接。ICE并不是单独的服务器,而是一种方法,它可以利用STUN(Session Traversal Utilities for NAT)和TURN(Traversal Using Relays around NAT)服务器来实现其目标。实际上ice是一个综合解决方案,它整合了STUN和TURN的功能。ICE通过在不同网络条件下选择最佳的连接路径,以确保最可靠和高效的通信。

STUN服务器:
用于解决NAT(网络地址转换)引起的连接问题。STUN协议允许客户端发现其在NAT后的公共地址和端口,从而直接通信成为可能。

TURN服务器:
用于在两个端点无法直接通信时提供中继服务。TURN服务器充当一个中继,将数据传输给两个端点,以解决NAT等问题。

其实很好理解,比方说我们两台电脑需要进行p2p通信,如果两台电脑再同一局域网,那直接就通信了,没有问题。
但是如果两台电脑再不同局域网,那么该怎么通信呢?我怎么能访问到另外一台电脑呢?它又不是服务器,我也不知道它的公网ip!
那么没有问题,STUN服务器来帮忙,他做的事情就是这个,它作用就是找到两台电脑【当然也可以是手机】的公网ip,然后两台电脑就可以通过公网ip地址,相互访问了。
问题又来了,它如果找不到公网ip怎么办? 这就是TURN服务器要干的事情了,没办法,那就只能让TURN服务器中转一下,注意,如果走到这一步,是要消耗服务器流量的。

二、具体搭建步骤

搭建stun-server和搭建turn-server,上来我们认为要搭建两个,实际上,可以用coturn解决问题, Coturn不仅仅是一个TURN服务器,还包括了STUN的功能,允许客户端发现其在NAT后的公共IP地址和端口,简而言之,只需要搭建一个就可以。

1.下载安装coturn

脚本如下:
①更新系统: yum update
②安装Coturn: yum install coturn
③启动Coturn服务: systemctl start coturn
④确保Coturn开机启动: sudo systemctl enable coturn
⑤如果有防火墙,确保打开相应的端口:
firewall-cmd --zone=public --add-port=3478/udp --permanent
firewall-cmd --reload
注意:如果用的是云服务器,还要到后台去开启tcp以及udp的3478端口,如阿里云的是在安全组那里设置【这个很重要,我就是因为没有放开udp的3478端口,导致白白耽误了很长时间】
以上所有脚本,如果有选择的,就输入y,然后enter,没有选择的,直接enter

2、处理证书问题

coturn安装成功后,打开配置文件,我的是默认安装到了/etc/coturn下,配置打开如下【
无用部分已排除
】:

#listening-device=eth0
#listening-port=3478
#tls-listening-port=5349
#alt-listening-port=0
#alt-tls-listening-port=0
#listening-ip=172.17.19.101
#listening-ip=10.207.21.238
#listening-ip=2607:f0d0:1002:51::4
#relay-device=eth1
#relay-ip=172.17.19.105
#relay-ip=2607:f0d0:1002:51::5
#external-ip=60.70.80.91
#external-ip=60.70.80.91/172.17.19.101
#external-ip=60.70.80.92/172.17.19.102
#relay-threads=0
# Lower and upper bounds of the UDP relay endpoints:
# (default values are 49152 and 65535)
#min-port=49152
#max-port=65535
#user=username1:password1
#cert=/etc/pki/coturn/public/turn_server_cert.pem
#pkey=/etc/pki/coturn/private/turn_server_pkey.pem
#realm=mycompany.org

安装coturn后,cert和pkey是注释状态,但是实际上,这个是不能注释的,否则运行时,会报错,报证书找不到。

①安装依赖: yum install -y make gcc gcc-c++ wget openssl-devel libevent libevent-devel openssl git
②生成证书:openssl req -x509 -newkey rsa:2048 -keyout /etc/turn_server_pkey.pem -out /etc/turn_server_cert.pem -days 99999 -nodes
以上脚本,都按enter进行下一步

上面两个脚本执行后,/etc文件夹下,会有两个文件,turn_server_cert.pem以及turn_server_pkey.pem,将cert以及pkey配置为上面两个文件的地址。
替换证书地址

3、处理各个ip以及端口的配置

listening-ip配置为0.0.0.0即可,这样会监听所有的ip请求
relay-ip配置为服务器的外网ip地址
external-ip 配置为服务器的外网ip地址
listening-port=3478 保持原有配置即可

4、配置用户名密码以及域标识

创建用户名密码以及域标识的格式如下:
turnadmin -a -u [用户名] -p [密码] -r [域标识]

我这里执行:
turnadmin -a -u wjc -p 123456 -r turn.zilv.cn
相当于创建了一个wjc的用户名,密码为:123456,域标识为turn.zilv.cn

以上创建好后,再次更新turnserver.conf文件如下:
user=wjc:123456
realm=turn.zilv.cn

三、测试

可以采用谷歌提供的在线测试工具进行:
https://webrtc.github.io/samples/src/content/peerconnection/trickle-ice/
iceserver测试页面

done表示成功

四、将配置放到第一篇文章的demo中

原来的配置都不用变,将自己的iceserver配置进去就可以了,如下图:

增加自己的服务器配置

五、总结

iceserver的确是不好配置,虽然网上的文档,包括chatgpt,都有相应的步骤,但是几乎没有完整的,我自己也走了很多弯路,至少花了两天时间,才彻底搞定,为了大家少走弯路,所以今天再好好总结下,相信大家按照我的步骤一定能够完成自己的iceserver搭建

最后还是提示下容易出问题的点,大家可以对号入座,作为重点排查对象:

1、ip设置的正确吗
2、用户名密码有没有用账号密码创建?我就是没有创建,直接想当然的的配置了,主要,要用脚本创建了用户名密码才可以
3、udp以及tcp端口放开了吗?我当时是udp再阿里云后台没有放开,所以一直不通,找了好久才发现这个隐蔽的问题
4、证书配置了吗?也是需要用脚本配置的哦

同时也欢迎您关注爱自律官方微信公众号,同时体验爱自律小程序的使用,让我们一起爱上自律,拥抱自由吧。

引言

经过三个月的开发,项目通过了所有测试并上线,然而,我们发现项目的首页几乎无法打开,后台一直发生超时错误,导致CPU过度负荷。在这次项目开发过程中,我制定了一份详细的技术优化方案。考虑到客户无法提供机器硬件配置,我们只能从软件方面寻找解决方案,以满足客户的预期。同时,我还准备了一个简单的项目复盘,如果你对此感兴趣,也可以一起查看。

image

初期优化

在进行第一次优化时,我们发现SQL的基本书写存在问题。通过使用pinpoint工具,我们成功抓取了所有的SQL语句。然后,我们请一位对业务非常熟悉的人对所有的SQL进行了审查,主要是优化SQL书写中的基本错误。由于开发人员的疏忽,导致了数据库的全表查询,但是由于测试数据库的数据量不足,测试环境并没有发现潜在的基础SQL问题。经过第一轮SQL优化,现在所有的SQL语句已经得到了正确的修正。

经过压力测试后,我们发现数据库的CPU已经超负荷运行。为了进一步优化性能,我们决定进行SQL的详细优化。在查询表关联的过程中,我们发现有很多字段实际上已经在业务表中冗余存在,因此无需再去关联另外一张表。通过减少表的关联操作,我们可以有效提高SQL的执行效率。

中期优化

由于测试数据量不足且业务关联性较差,我们需要申请将正式数据库中涉及的表数据迁移至测试数据库,以模拟正式环境进行压力测试。

经过初步讨论,我们认为业务表的数据量很大可能导致了SQL查询的缓慢和CPU性能的占用。为了优化这个问题,我们决定根据所有报表涉及的表及其字段,扫描业务增量数据并抽离出一个小表用于报表业务查询。

在进行数据迁移的过程中,我们要注意的是,除非数据量很小的表,不要直接将其抽离到小表中。相反,我们需要引入一个中间表来提前缓冲数据。如果直接去除中间表,可能会导致对业务表的锁定,从而降低业务操作的性能。

注意点
:在上线之前,务必确保准备好初始化表数据和相应的表索引。丢失一个都会导致生产数据库的CPU跑满。

image

后期优化

由于项目需要进行时间范围查询,当选择以月为单位时,首页在压测并发下未能达到预期要求。因此,我们进行了方案升级,引入了结果表的概念。具体而言,我们按照月份的最小统计范围提前生成结果统计表。也有人提出在此阶段涉及到的业务接口中加入一步更新结果表中的某一个指标字段的操作。然而,这样做会导致业务之间的耦合性非常严重,可能随时影响正常业务的操作。

在中期之前,我们已经想到了使用中间表->小表的方式来进行数据处理。因此,我们能够准确地了解当前数据的变化情况,并根据变动的字段值进行相应的操作,对相关的指标字段进行增加或减少的操作。

为了解决由于多个小表同时更新统计表字段而导致的行锁问题,我们采取了进一步的优化措施,将原统计表拆分为两张表,但仍保持主键一致。这样做的好处是减少了不同小表同时更新同一行数据的情况,但需要注意的是指标字段是有所不同的。

虽然此时也满足了压测要求,但是仍然需要考虑到此项目报表的风险会影响到业务数据库的正常操作,所以将其切换到另外一个小型数据库来专门提供其查询。就算出现问题也不会影响到业务端的正常操作。

注意点
:在进行当月上线之前,需要提前对统计表的各个指标进行初始化。这样可以确保数据的准确性和完整性。

上线

经过上一期的系统优化,我们成功满足了预期的压力测试结果,所有按钮和报表功能均可以正常点击和查看。值得一提的是,数据库的CPU性能一直保持在10%以下,显示了良好的稳定性。

image

说明
:有毛刺现象是因为这个小型数据库在进行同步大表数据导致的。其实,关于结果集这种方法,在前期已经有过提出,但是我当时选择了放弃,因为存在许多不确定性因素,可能导致指标值和实际值有一定的差异。然而,目前来看,至少可以满足压测和使用的要求。

项目复盘

以上只是我在技术方面的参与和优化过程的简要描述。接下来,我将作为一个非程序员身份来对整个项目进行一个简单复盘,从一开始的项目参与到需求统计指标的制定,我一直都在其中参与,并且对这个模块非常熟悉。当第三方提出无法实现或者实现上存在困难的问题时,我通常会举例说明,因为第三方并不会像程序员一样从技术角度考虑问题。在我看来,到目前为止,一切都没有问题,因为我们已经仔细审查了每个指标,确定了哪些问题需要第三方进一步研究。

下一步就有问题了,整个项目的需求我是最了解的,但是由于我并不参与项目的开发,导致项目经理对于某一个小点根本不了解。因此,其他人经常来找我咨询,并且直到项目接近尾声时,当意识到无法满足客户的要求,才急忙找我参与开发。这是其中一个存在的问题。

让我来详细说明一下我的问题。在整个开发过程中,我的问题也逐渐显露出来,那就是为什么只有我一个人了解这个问题,而其他人却一无所知呢?原因是因为我一直在负责这个模块的开发,但是我并没有将相关的细节文档化,导致所有的细节都只存在于我的脑海中。因此,其他人只能不断地向我请教,才有可能了解这些细节。然而,这种依赖性是需要避免的,尽管我自己也很不喜欢写文档。

在项目中期时,我听到最多的抱怨之一是关于指标统计的问题。有些指标统计了多一个数,而其他指标则统计了少一个数。尽管开发工作基本上已经完成,但在核对各个数字时却出现了问题。然而,直到后期才发现这些问题其实是由于SQL编写不当所导致的。在对数据库进行操作时,我们可能没有考虑到一些边界情况或者特殊情况,导致了数据统计的不准确性。另外,还有可能是测试数据库中的数据基本都是脏数据,也就是不符合我们预期的数据,这也对统计结果造成了一定的影响。

image

在项目的后期阶段,由于排期已经到达客户可接受的最低底线,大家都非常着急地制定优化方案。这时候我才被真正拉过来参与,整个过程耗时将近一个月,没有休息的时间。出方案的人也开始费尽心思地想办法,但前提是他们对业务一无所知。他们也没有花时间静下心来好好思考,只管出方案,而我们这边需要再评估一下。我们花了整整一天(24小时)的时间来进行评估,具体方案和技术优化的细节我就不详细介绍了,总之最终的方案就是我之前提到的各种技术优化。

其实,我在这个过程中发现了一个规律,前期一直在进行指标分析,而后期的重点则是确保系统能够满足压力测试的要求,即客户的预期。如果下次你有类似的需求,一定要确保在系统正常运行的基础上,再去优化指标的准确性,就像这次项目,当初上线时连页面都无法打开,数据库的CPU直接达到了极限,根本没有时间去核对报表指标了。简而言之,我们需要先满足最低要求,然后逐步进行优化。

LiteFlow.png

LiteFlow简介

LiteFlow是什么?

LiteFlow
是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑。通过支持热加载规则配置,开发者能够即时调整流程步骤,将复杂的业务如价格计算、下单流程等拆分为独立且可复用的组件,从而实现系统的高度灵活性与扩展性,避免了牵一发而动全身的问题。旨在优化开发流程,减少冗余工作,让团队能够更聚焦于核心业务逻辑,而将流程控制层面的重任托付给该框架进行自动化处理。

LiteFlow
整合了流程编排与规则引擎的核心特性,提供
XML

JSON

YAML
格式的灵活流程定义,以及本地文件系统、数据库、ZooKeeper、Nacos、Apollo、Redis等多种规则文件存储方案。其内建插件如liteflow-rule-nacos,以及开放的扩展机制,赋予开发人员自定义规则解析器的能力,满足多样化场景下的规则管理需求。

对于基于角色任务流转的场景,
LiteFlow
并非最佳选择,推荐使用
Flowable

Activiti
等专门的工作流引擎。

LiteFlow的架构

LiteFlow
是从获取上下文开始的,这个上下文通常包含了执行流程所需的数据和环境信息。通过解析这些上下文数据,
LiteFlow
能够理解并执行对应的规则文件,驱动业务流程的执行。在
LiteFlow
中,业务流程被组织成一系列的链路(或节点),每个链路代表一个业务步骤或决策点。这些链路上的节点,也就是业务组件,是独立的,可以支持多种脚本语言,如
Groovy

JavaScript

Python

Lua
等,以便根据具体业务需求进行定制。下图为
LiteFlow
的整体架构图。

LiteFlow执行流程.png

LiteFlow的作用

  • LiteFlow
    将瀑布式代码进行组件化、灵活的编排体系,组件可独立调整替换,规则引擎语法简单易学。
    利用
    LiteFlow
    可以把传统的瀑布式代码重构为以组件为中心的概念体系,从而获得灵活的编排能力。在这种结构里,各个组件彼此分离,允许轻松调整和替换。组件本身可通过脚本定制,而且组件间的过渡完全受规则引导。此外,
    LiteFlow
    具备简单易懂的
    DSL
    规则引擎语法,能快速入门掌握。
    image.png

  • LiteFlow
    强大的编排能力
    LiteFlow
    的编排语法强大到可以编排出任何你想要的逻辑流程。如下图复杂的语法,如果使用瀑布式的代码去写,那种开发以及维护难度可想而知,但是使用
    LiteFlow
    你可以轻松完成逻辑流程的编排,易于维护。
    image.png

  • LiteFlow
    支持组件热部署
    通过
    LiteFlow
    ,你可以实现组件的实时热替换,同时也能在已有的逻辑流程中随时插入新的组件,以此动态调整你的业务逻辑。
    image.png

LiteFlow的环境支持

  • JDK
    LiteFlow
    要求的最低的JDK版本为8,支持
    JDK8~JDK17
    所有的版本。当然如果使用
    JDK11
    以上,确保
    LiteFlow
    的版本为
    v2.10.6
    及其以上版本。

如果你使用JDK11及其以上的版本,请确保jvm参数加上以下参数:--add-opens java.base/sun.reflect.annotation=ALL-UNNAMED

  • Spring
    LiteFlow
    要求的Spring的最低版本为
    Spring 5.0
    。支持的范围是
    Spring 5.X ~ Spring 6.X

  • SpringBoot
    LiteFlow
    要求的Springboot的最低的版本是2.0。支持的范围是
    Springboot 2.X ~ Springboot 3.X

LiteFlow的性能

LiteFlow
框架在启动时完成大部分工作,包括解析规则、注册组件和组装元信息,执行链路时对系统资源消耗极低。在设计之初就注重性能表现,对核心代码进行了优化。

实际测试中,
LiteFlow
表现出色,50多个业务组件组成的链路在压测中单点达到1500 TPS,成功应对双11、明星顶流带货等大规模流量挑战。

尽管
LiteFlow
框架自身性能卓越,但实际执行效率取决于业务组件的性能。若组件包含大量循环数据库查询、不良
SQL
或大量
RPC
同步调用,整体
TPS
也会较低。但这归咎于业务组件的性能问题,而非
LiteFlow
框架本身的性能问题。整体系统吞吐量的高低不只依赖于某个框架,而是需要整体优化业务代码才能提升。

数据来源于LiteFlow官方文档说明。

LiteFlow使用

以下我们结合SpringBoot环境使用。

LiteFlow
在使用上可以按照引入依赖,
LiteFlow
相关配置,规则文件,定义组件,节点编排,执行流程进行。

引入依赖

<dependency>  
	<groupId>com.yomahub</groupId>  
	<artifactId>liteflow-spring-boot-starter</artifactId>  
	<version>2.11.1</version>  
</dependency>

目前liteflow的稳定版本已经更新到2.11.4.2。本文依托于2.11.1做讲解演示。好多新的功能均在2.9.0以后的版本中才有。

配置项

LiteFlow
有诸多配置项,大多数配置项有默认值,可以不必配置,同时官方也建议某个配置项不了解它有什么用时,就不要去随意的改它的值。

liteflow:
  #规则文件路径
  rule-source: config/flow.el.xml
  #-----------------以下非必须-----------------
  #liteflow是否开启,默认为true
  enable: true
  #liteflow的banner打印是否开启,默认为true
  print-banner: true
  #zkNode的节点,只有使用zk作为配置源的时候才起作用,默认为/lite-flow/flow
  zk-node: /lite-flow/flow
  #上下文的最大数量槽,默认值为1024
  slot-size: 1024
  #FlowExecutor的execute2Future的线程数,默认为64
  main-executor-works: 64
  #FlowExecutor的execute2Future的自定义线程池Builder,LiteFlow提供了默认的Builder
  main-executor-class: com.yomahub.liteflow.thread.LiteFlowDefaultMainExecutorBuilder
  #自定义请求ID的生成类,LiteFlow提供了默认的生成类
  request-id-generator-class: com.yomahub.liteflow.flow.id.DefaultRequestIdGenerator
  #并行节点的线程池Builder,LiteFlow提供了默认的Builder
  thread-executor-class: com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder
  #异步线程最长的等待时间(只用于when),默认值为15000
  when-max-wait-time: 15000
  #异步线程最长的等待时间(只用于when),默认值为MILLISECONDS,毫秒
  when-max-wait-time-unit: MILLISECONDS
  #when节点全局异步线程池最大线程数,默认为16
  when-max-workers: 16
  #并行循环子项线程池最大线程数,默认为16
  parallelLoop-max-workers: 16
  #并行循环子项线程池等待队列数,默认为512
  parallelLoop-queue-limit: 512
  #并行循环子项的线程池Builder,LiteFlow提供了默认的Builder
  parallelLoop-executor-class: com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder
  #when节点全局异步线程池等待队列数,默认为512
  when-queue-limit: 512
  #是否在启动的时候就解析规则,默认为true
  parse-on-start: true
  #全局重试次数,默认为0
  retry-count: 0
  #是否支持不同类型的加载方式混用,默认为false
  support-multiple-type: false
  #全局默认节点执行器
  node-executor-class: com.yomahub.liteflow.flow.executor.DefaultNodeExecutor
  #是否打印执行中过程中的日志,默认为true
  print-execution-log: true
  #是否开启本地文件监听,默认为false
  enable-monitor-file: false
  #是否开启快速解析模式,默认为false
  fast-load: false
  #简易监控配置选项
  monitor:
    #监控是否开启,默认不开启
    enable-log: false
    #监控队列存储大小,默认值为200
    queue-limit: 200
    #监控一开始延迟多少执行,默认值为300000毫秒,也就是5分钟
    delay: 300000
    #监控日志打印每过多少时间执行一次,默认值为300000毫秒,也就是5分钟
    period: 300000

只要使用规则,则必须配置
rule-source
配置,但是如果你是用代码动态构建规则,则
rule-source
自动失效。

规则文件

从上面
LiteFlow
的整体架构图中可以看出
LiteFlow
支持多种规则文件源配置:本地文件,数据库,zk,Nacos,Apollo,Etcd,Redis以及自定义配置源。本文将会以本地规则文件为例讲解,其余配置源将在后续文章中讲解实时修改流程中在进行分享,

LiteFlow
支持3种规则文件格式:
XML
,
JSON
,
YML
,3种文件的配置相差无几。
LiteFlow
的组成很轻量,主要由
Node
以及
Chain
元素构成。值得一提的是:如果在非Spring环境下,
Node
节点是必须的,配置配置,否则会导致报错找不到节点。当然在Spring环境下,我们可以不必配置
Node
节点,只需要将相应的节点注册到Spring上下文即可。

<?xml version="1.0" encoding="UTF-8"?>  
<flow>  
	<chain name="chain1">  
		THEN(a, b, c);  
	</chain>  
	  
	<chain name="scChain">  
		SWITCH(s1).to(s2, THEN(a,b).id("d"));  
	</chain>
</flow>	

组件

在介绍具体的组件之前,我们先来了解下
@LiteflowComponent
注解。

@Target({ ElementType.TYPE })  
@Retention(RetentionPolicy.RUNTIME)  
@Documented  
@Inherited  
@Component  
public @interface LiteflowComponent {  
	@AliasFor(annotation = Component.class, attribute = "value")  
	String value() default "";  
	@AliasFor(annotation = Component.class, attribute = "value")  
	String id() default ""; 
	/**
	* 可以给节点起别名
	**/
	String name() default "";  
  
}

@LiteflowComponent
继承自
@Component
注解,在Spring环境中,可以将组件注入到容器中。它的value或者id即对应规则文件中的node的id。例如上述规则文件中的a,b,c等。

普通组件:NodeComponent

普通组件节点需要继承
NodeComponent
,需要实现
process
方法。可用于
THEN

WHEN
编排中。

@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {  
		System.out.println("执行A规则");  
	}  
}

当然
NodeComponent
中还有一些其他方法可以重写,以达到自己的业务需求。例如:

  • isAccess():表示是否进入该节点,可以用于业务参数的预先判断。
  • isContinueOnError():表示出错是否继续往下执行下一个组件,默认为false
  • isEnd():是否结束整个流程(不往下继续执行)。
    如果返回true,则表示在这个组件执行完之后立马终止整个流程。此时由于是用户主动结束的流程,属于正常结束,所以流程结果中(
    LiteflowResponse
    )的isSuccess是true。
  • beforeProcess()和afterProcess():流程的前置和后置处理器,其中前置处理器,在
    isAccess
    之后执行。
  • onSuccess()和onError():流程的成功失败事件回调
  • rollback():流程失败后的回滚方法。

在任意组件节点的内部,还可以使用
this
关键字调用对应的方法:

  • 获取流程初始入参参数
    我们在组件节点内部可以通过
    this.getRequestData()
    去获取流程初始的入参。例如:
@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {  
	DataRequest dataRequest = this.getRequestData();  
		System.out.println("执行A规则");  
	}  
}
  • 获取上下文
    在组件节点里,随时可以通过方法
    this.getContextBean(clazz)
    获取当前你自己定义的上下文,从而可以获取到上下文的数据。例如:
@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {  
		ConditionContext context = this.getContextBean(ConditionContext.class);  
		System.out.println("执行A规则");  
	}  
}
  • setIsEnd
    是否立即结束整个流程 ,用法为
    this.setIsEnd(true)

    还有一些其他的方法,可以参考源码。
选择组件:NodeSwitchComponent

实际业务中,我们针对不同的业务类型,有不同的业务处理逻辑,例如上一篇文章中的订单类型一样,此时就需要节点动态的判断去执行哪些节点或者链路,所以就出现了选择组件。
选择组件需要实现
NodeSwitchComponent
,并且需要实现
processSwitch()
方法。用于
SWITCH
编排中。

processSwitch()
方法返回值是一个String,即下一步流程执行的节点ID或者链路tag。

@LiteflowComponent("s)
public class SwitchComponent extends NodeSwitchComponent {  
  
	@Override  
	public String processSwitch() throws Exception {  
		System.out.println("执行switch规则");  
		return "a";  
	}  
}

规则文件中,配置的
SWITCH
编排信息为:

<chain name="scChain">  
	SWITCH(s).to(a, b, c);  
</chain>

此时s节点就会返回要执行的节点id为a,即要执行a流程。通常switch的节点的逻辑我们需要具体结合业务类型,例如订单类型枚举去使用。

除了可以返回id以外,我们还可以返回tag(标签)。例如我们在规则文件中这么写:
在规则表达式中我们可以这样使用:

<chain name="scChain">  
	SWITCH(s).to(a.tag("td"), b.tag("td"), c.tag("td));  
</chain>

然后在
SWITCH
中返回tag:

@LiteflowComponent("s)
public class SwitchComponent extends NodeSwitchComponent {  
  
	@Override  
	public String processSwitch() throws Exception {  
		System.out.println("执行switch规则");  
		return ":td"      // 进入 b 节点,含义:选择第一个标签为td的节点
		return "tag:td"   // 进入 b 节点,含义:选择第一个标签为td的节点
		return "a";       // 进入 b 节点,含义:选择targetId是b的节点
		return "b:";      // 进入 b 节点,含义:选择第一个targetId是b的节点
		return "b:td";    // 进入 b 节点,含义:选择targetId是b且标签是td的节点
		return ":";       // 进入 b 节点,含义:选择第一个节点
		return "d";       // 进入 d 节点,含义:选择targetId是d的节点
		return "d:";      // 进入 d 节点,含义:选择第一个targetId是d的节点
		return "d:td";    // 进入 d 节点,含义:选择targetId是d且标签是td的节点
		return "b:x";     // 报错,原因:没有targetId是b且标签是x的节点
		return "x";       // 报错,原因:没有targetId是x的节点
		return "::";      // 报错,原因:没有找到标签是":"的节点 
	}  
}

NodeSwitchComponent
继承至
NodeComponent
,其节点的内部可以覆盖的方法和this关键字
NodeComponent

条件组件:NodeForComponent

条件组件,也是IF组件,返回值是一个
boolean
。需要继承
NodeForComponent
,实现
processIf()
方法。可用于
IF...ELIF...ELSE
编排。例如:

<chain name = "ifChain">  
	IF(x, a, b);  
</chain>

该例中x就是一个条件组件,如果x返回true,则会执行a节点,否则执行b节点。

@LiteflowComponent("x")  
public class IfXComponent extends NodeIfComponent {  
  
	@Override  
	public boolean processIf() throws Exception {  
		System.out.println("执行X节点");  
		return false;  
	}  
}

NodeIfComponent
继承至
NodeComponent
,其节点内部可以覆盖的方法和this关键字
NodeComponent

次数循环组件:NodeForComponent

次数循环组件。返回的是一个int值的循环次数。继承
NodeForComponent
,实现
processFor()
方法, 主要用于
FOR...DO...
表达式。在紧接着DO编排中的节点中,可以通过
this.getLoopIndex()
获取下标信息,可以从对应数组或者集合中通过下表获取对应的元素信息。

<chain name = "forChain">  
	FOR(f).DO(a);  
</chain>
@LiteflowComponent("f")  
public class ForComponent extends NodeForComponent {  
  
	@Override  
	public int processFor() throws Exception {  
		DataContext dataContext = this.getContextBean(DataContext.class);  
		List<String> dataList = dataContext.getDataList();  
		return dataList.size();  
	}  
}

@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {  
		Integer loopIndex = this.getLoopIndex();  
		DataContext dataContext = this.getContextBean(DataContext.class);  
		List<String> dataList = dataContext.getDataList();  
		String str = dataList.get(loopIndex);  
		System.out.println("执行A规则:"+str);  
	}  
}

其中f组件相当于定义一个数组或者集合的元素个数,类似

for(int i=0;i<size;i++){ // size = f 
//逻辑处理  = a
}

NodeForComponent
继承至
NodeComponent
,其节点内部可以覆盖的方法和this关键字
NodeComponent

条件循环组件:NodeWhileComponent

条件循环组件,主要用于
WHILE...DO...
表达式。继承
NodeWhileComponent
,需要实现
processWhile()
方法。
processWhile()
方法返回一个boolean类型的值,即while循环跳出的条件,如果为false则循环结束,同次数循环,可以在DO编排中的节点中,可以通过
this.getLoopIndex()
获取下标信息,可以从对应数组或者集合中通过下表获取对应的元素信息。

<chain name = "whileChain">  
	WHILE(w).DO(a);  
</chain>
@LiteflowComponent("w")  
public class WhileComponent extends NodeWhileComponent {  
  
	@Override  
	public boolean processWhile() throws Exception {  
		DataContext dataContext = this.getContextBean(DataContext.class);  
		Integer count = Optional.ofNullable(dataContext.getCount()).orElse(0);  
		List<String> dataList = dataContext.getDataList();  
		return count < dataList.size();  
	}  
}

NodeWhileComponent
继承至
NodeComponent
,其节点内部可以覆盖的方法和this关键字
NodeComponent

迭代循环组件:NodeIteratorComponent

迭代循环组件,相当于Java语言的
Iterator
关键字,功能上相当于
for
循环,主要用于
ITERATOR...DO...
表达式。需要继承
NodeIteratorComponent
,实现
processIterator()
方法。在DO编排的节点中,可以通过
this.getCurrLoopObj()
获取集合中的信息。这个组件在使用liteflow的循环组件时用的比较多,就像日常开发代码,集合遍历大部分都会使用for循环(特殊情况必须使用下标除外)。

<chain name = "iteratorChain">  
	ITERATOR(iterator).DO(a);  
</chain>
@LiteflowComponent("iterator")  
public class MyIteratorComponent extends NodeIteratorComponent {  
  
	@Override  
	public Iterator<?> processIterator() throws Exception {  
		DataContext dataContext = this.getContextBean(DataContext.class); 
		return Optional.ofNullable(dataContext.getDataList())
			.orElse(Lists.newArrayList()).iterator();  
	}  
}

@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {  
		String str = this.getCurrLoopObj();  
		System.out.println("执行A规则:"+str); 
	}  
}

NodeIteratorComponent
继承至
NodeComponent
,循环组件节点的内部可以覆盖的方法和this关键字
NodeComponent

退出循环组件:NodeBreakComponent

退出循环组件,即
BREAK
组件。返回的是一个布尔值的循环退出标志。 需要继承
NodeBreakComponent
,实现
processBreak
方法。主要用于
FOR...DO...BREAK
,
WHILE...DO...BREAK
,
ITERATOR...DO...BREAK
表达式。即Java的
for

while
循环退出。

<chain name = "iteratorChain">  
	ITERATOR(iterator).DO(a).BREAK(break_flag);  
</chain>
@LiteflowComponent("break_flag")  
public class BreakComponent extends NodeBreakComponent {  
  
	@Override  
	public boolean processBreak() throws Exception {  
		String str = this.getCurrLoopObj();  
		return Objects.equals("c", str);  
	}  
}

同理
NodeBreakComponent
也是继承
NodeComponent
,其节点内部可以覆盖的方法和this关键字
NodeComponent

接下来我们聊一下组件的另外一种定义方式:声明式组件。我比较喜欢用。。。

声明式组件

在上述介绍组件时,都是通过定义一个类继承某一个组件,例如
NodeComponent
或者
NodeIteratorComponent
,这样的定义组件会有一些弊端,比如当你的业务庞大时类也会快速的膨胀增加,即使一个跳出循环或者循环组件都要单独去定义一个类(个人认为循环组件其实不会包含太多的复杂业务逻辑),再比如说Java中类是单继承,这样就会造成这个写组件类无法再去继承一些其他的超类供我们使用。基于此,
LiteFlow
推出依靠注解完成组件的声明,即使一个普通类中的方法不需要继承任何组件类,也可以声明为一个组件,一个类可以定义很多个组件。可以分别对类或者方法进行生命组件。目前声明式组件只能在springboot环境中使用。

类级别声明

类级别式声明主要用处就是通过注解形式让普通的java bean变成LiteFlow的组件。无需通过继承类或者实现接口的方式。但是类级别声明有一个缺点就是他和常规组件一样,需要一个类对应一个组件。使用
@LiteflowCmpDefine
注解,通过
NodeTypeEnum
指定当前类是什么类型的组件。
NodeTypeEnum
值如下:

public enum NodeTypeEnum {  
	COMMON("common", "普通", false, NodeComponent.class),  
	SWITCH("switch", "选择", false, NodeSwitchComponent.class),  
	IF("if", "条件", false, NodeIfComponent.class),  
	FOR("for", "循环次数", false, NodeForComponent.class),  
	WHILE("while", "循环条件", false, NodeWhileComponent.class),  
	BREAK("break", "循环跳出", false, NodeBreakComponent.class),  
	ITERATOR("iterator", "循环迭代", false, NodeIteratorComponent.class),  
	SCRIPT("script", "脚本", true, ScriptCommonComponent.class),  
	SWITCH_SCRIPT("switch_script", "选择脚本", true, ScriptSwitchComponent.class),  
	IF_SCRIPT("if_script", "条件脚本", true, ScriptIfComponent.class),  
	FOR_SCRIPT("for_script", "循环次数脚本", true, ScriptForComponent.class),  
	WHILE_SCRIPT("while_script", "循环条件脚本", true, ScriptWhileComponent.class),  
	BREAK_SCRIPT("break_script", "循环跳出脚本", true, ScriptBreakComponent.class);
}

组件类中的再通过
@LiteflowMethod
注解将方法映射为组件方法。通过
@LiteflowMethod

value
值指定方法类型
LiteFlowMethodEnum
,通过
nodeType
指定节点类型
NodeTypeEnum

LiteFlowMethodEnum
对应各组件中的抽象类方法(
isMainMethod=true
)(或者可覆盖的方法)。

public enum LiteFlowMethodEnum {  
  
	PROCESS("process", true),  
	PROCESS_SWITCH("processSwitch", true),  
	PROCESS_IF("processIf", true),  
	PROCESS_FOR("processFor", true),  
	PROCESS_WHILE("processWhile", true),  
	PROCESS_BREAK("processBreak", true),  
	PROCESS_ITERATOR("processIterator", true),  
	IS_ACCESS("isAccess", false),  
	IS_END("isEnd", false),  
	IS_CONTINUE_ON_ERROR("isContinueOnError", false),  
	GET_NODE_EXECUTOR_CLASS("getNodeExecutorClass", false),  
	ON_SUCCESS("onSuccess", false),  
	ON_ERROR("onError", false),  
	BEFORE_PROCESS("beforeProcess", false),  
	AFTER_PROCESS("afterProcess", false),  
	GET_DISPLAY_NAME("getDisplayName", false), 
	ROLLBACK("rollback", false)  
	;
	
	private String methodName;  
	private boolean isMainMethod;
}

对于方法的要求:
组件内的方法的参数必须传入
NodeComponent
类型的参数,而且必须是第一个参数。这个参数值就替代常规组件中的
this
,从这个参数中可以获取流程入参,上线文等信息。然后方法的返回值必须跟常规组件中的抽象方法的返回值保持一致,否则可能吹出现错误。对于方法名称并无限制。

  • 普通组件:
@Component("d")  
@LiteflowCmpDefine(value = NodeTypeEnum.COMMON)  
public class MyDefineCmp {  
  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeType = NodeTypeEnum.COMMON)  
	public void processA(NodeComponent nodeComponent){  
		System.out.println("processA");  
	}  
	  
	@LiteflowMethod(value = LiteFlowMethodEnum.BEFORE_PROCESS, nodeType = NodeTypeEnum.COMMON)  
	public void beforeA(NodeComponent nodeComponent){  
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
		System.out.println("beforeA");  
	}  
	  
	@LiteflowMethod(value = LiteFlowMethodEnum.AFTER_PROCESS, nodeType = NodeTypeEnum.COMMON)  
	public void afterA(NodeComponent nodeComponent){  
		System.out.println("afterA");  
	}  
}
  • 条件组件
    声明选择组件在类和方法上都需要加上
    NodeTypeEnum.IF
    参数。
@Component("define_if")  
@LiteflowCmpDefine(NodeTypeEnum.IF)  
public class MyDefineIfCpm {  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_IF, nodeType = NodeTypeEnum.IF)  
	public boolean processIf(NodeComponent nodeComponent){ 
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);   
		System.out.println("执行if");  
		return false;  
	}  
}
  • 选择组件
    声明选择组件在类和方法上都需要加上
    NodeTypeEnum.SWITCH
    参数。
@Component("define_w")  
@LiteflowCmpDefine(NodeTypeEnum.SWITCH)  
public class MyDefineSwitchCpm {  
  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_SWITCH, nodeType = NodeTypeEnum.SWITCH)  
	public String processSwitch1(NodeComponent nodeComponent){ 
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
		System.out.println("执行switch");  
		return "b";  
	}  
}
  • 次数循环组件
    声明选择组件在类和方法上都需要加上
    NodeTypeEnum.FOR
    参数。
@Component("define_for")  
@LiteflowCmpDefine(NodeTypeEnum.FOR)  
public class MyDefineForCmp {  
  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_FOR, nodeType = NodeTypeEnum.FOR)  
	public int processFor(NodeComponent nodeComponent){  
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
		System.out.println("执行for");  
		return 10;  
	}  
}
  • 条件循环组件
    声明选择组件在类和方法上都需要加上
    NodeTypeEnum.WHILE
    参数。
@Component("define_while")  
@LiteflowCmpDefine(NodeTypeEnum.WHILE)  
public class MyDefineWhileCmp {  
  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_WHILE, nodeType = NodeTypeEnum.WHILE)  
	public boolean processWhile(NodeComponent nodeComponent){  
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
		System.out.println("执行while");  
		return true;  
	}  
}
  • 迭代循环组件
    声明选择组件在类和方法上都需要加上
    NodeTypeEnum.ITERATOR
    参数。
@Component("define_iterator")  
@LiteflowCmpDefine(NodeTypeEnum.ITERATOR)  
public class MyDefineIteratorCpm {  
  
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_ITERATOR, nodeType = NodeTypeEnum.ITERATOR)  
public Iterator<String> processSwitch1(NodeComponent nodeComponent){  
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
		System.out.println("执行iterator");  
		return dataContext.getDataList().iterator();  
	}  
}
  • 退出循环组件
    声明选择组件在类和方法上都需要加上
    NodeTypeEnum.BREAK
    参数。
@Component("define_break")  
@LiteflowCmpDefine(NodeTypeEnum.BREAK)  
public class MyDefineWhileCmp {  
  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_BREAK, nodeType = NodeTypeEnum.BREAK)  
	public boolean processBreak(NodeComponent nodeComponent){  
		DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
		System.out.println("执行break");  
		return true;  
	}  
}
方法级别式声明

因为类级别式声明还是会造成类定义过多的问题,
LiteFlow
又提供了方法级别式声明。方法级别式声明可以让在一个类中通过注解定义多个组件。在类上使用
@LiteflowComponent
进行声明这是一个组件类,然后在方法使用
@LiteflowMethod
声明方法是一个组件节点。如下:

@Slf4j  
@LiteflowComponent  
public class OrderHandlerCmp {  
  
  
	/**  
	* 普通组件 等价于 继承`NodeComponent` 实现process()方法  
	* @param nodeComponent  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.COMMON, value = LiteFlowMethodEnum.PROCESS, nodeId = "common", nodeName = "普通组件")  
	public void processCommon(NodeComponent nodeComponent){  
		// 业务逻辑  
	}  
	  
	/**  
	* IF组件 等价于 继承 `NodeIfComponent` 实现processIf()方法  
	* @param nodeComponent  
	* @return  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.IF, value = LiteFlowMethodEnum.PROCESS_IF, nodeId = "if", nodeName = "IF组件")  
	public boolean processIf(NodeComponent nodeComponent){  
		// 业务逻辑  
		return false;  
	}  
	  
	/**  
	* SWITCH组件 等价于 继承 `NodeSwitchComponent` 实现processSwitch()方法  
	* @param nodeComponent  
	* @return  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.SWITCH, value = LiteFlowMethodEnum.PROCESS_SWITCH, nodeId = "switch", nodeName = "SWITCH组件")  
	public String processSwitch(NodeComponent nodeComponent){  
		// 业务逻辑  
		return "nodeId";  
	}  
	  
	/**  
	* 次数循环组件 等价于 继承 `NodeForComponent` 实现processFor()方法  
	* @param nodeComponent  
	* @return  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.FOR, value = LiteFlowMethodEnum.PROCESS_FOR, nodeId = "for", nodeName = "FOR组件")  
	public int processFor(NodeComponent nodeComponent){  
	// 业务逻辑  
	return 10;  
	}  
	  
	/**  
	* 条件循环组件 等价于 继承 `NodeWhileComponent` 实现processWhile()方法  
	* @param nodeComponent  
	* @return  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.WHILE, value = LiteFlowMethodEnum.PROCESS_WHILE, nodeId = "while", nodeName = "WHILE组件")  
	public boolean processWhile(NodeComponent nodeComponent){  
		// 业务逻辑  
		return false;  
	}  
	  
	/**  
	* 迭代循环组件 等价于 继承 `NodeIteratorComponent` processIterator()方法  
	* @param nodeComponent  
	* @return  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.ITERATOR, value = LiteFlowMethodEnum.PROCESS_ITERATOR, nodeId = "iterator", nodeName = "ITERATOR组件")  
	public Iterator<Object> processIterator(NodeComponent nodeComponent){  
	// 业务逻辑  
	List<Object> list = Lists.newArrayList();  
	return list.iterator();  
	}  
	  
	/**  
	* 跳出循环组件 等价于 继承 `NodeBreakComponent` processBreak()方法  
	* @param nodeComponent  
	* @return  
	*/  
	@LiteflowMethod(nodeType = NodeTypeEnum.BREAK, value = LiteFlowMethodEnum.PROCESS_BREAK, nodeId = "break", nodeName = "BREAK组件")  
	public boolean processBreak(NodeComponent nodeComponent){  
		// 业务逻辑  
		return false;  
	}  
}

对于方法级别声明特性来说,
@LiteflowMethod
注解上的
nodeId
一定要写。
nodeName
的属性。方便对声明式的组件进行命名。定义方法时,返回值要和常规组件里的对应方法返回值一致。例如普通组件的
process
方法是不返回的,比如IF组件的
processIf
方法是返回布尔值的。如果写错误,会造成一些异常。。。。

我个人是比较喜欢用这种方式进行组件的定义,可以按照业务逻辑将代码拆分成一个个的模块,在各自的模块中进行业务逻辑的实现,也会非常清晰。

EL规则

LiteFlow
2.8.x以后版本提供了一款强大的规则表达式。一切复杂的流程在
LiteFlow
表达式的加持下,都变得非常简便明了。配合一些流程图简直是通熟易懂。让整个业务流程在处理上看起来并没有那么黑盒。它可以设置各种编排规则,包括:

  • 串行编排
    串行编排,即组件要顺序执行,使用
    THEN
    关键字,
    THEN
    必须大写
<chain name="thenchain">  
	THEN(a, b, c);  
	THEN(a, THEN(b, c));  
</chain>
  • 并行编排
    并行编排即并行执行若干个个组件,使用用
    WHEN
    关键字,
    WHEN
    必须大写。
<chain name="whenchain">  
	WTHEN(a, b, c);  
</chain>

当然,
WHEN

THEN
还可以结合使用:

<chain name="testChain">  
	THEN(a, WHEN(b, c, d), e);  
</chain>

当a节点执行完成之后,并行执行b,c,d节点,完成之后在执行e节点。
image.png

image.png

我们在看到并行执行的时候,就会联想到多线程处理,那么
LiteFlow
是怎么创建多线程的呢?答案是
LiteFlow
内部默认维护了一个
when
线程池,这个线程池是供给所有
WHEN
流程使用的。当然你可以在
LiteFlow
执行器执行之前给你的流程通过
LiteflowConfig
传入一些线程池参数或者实现
ExecutorBuilder
接口,自定义线程池。比如:

public class LiteFlowThreadPool implements ExecutorBuilder {  
  
	@Override  
	public ExecutorService buildExecutor() {  
		ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("mythread-pool-%s").build();  
		return new ThreadPoolExecutor(  
					// 核心线程数,即2个常开窗口  
					2,  
					// 最大的线程数,银行所有的窗口  
					5,  
					// 空闲时间  
					5,  
					TimeUnit.SECONDS,  
					// 工作队列  
					new LinkedBlockingQueue<>(5),  
					// 线程工厂  
					threadFactory,  
					// 拒绝策略  
					new ThreadPoolExecutor.AbortPolicy()  
		);  
	  
	}  
}

然后我们在
LiteflowConfig
设置并行线程执行器class路径
threadExecutorClass
:

LiteflowConfig liteflowConfig = new LiteflowConfig();  
liteflowConfig.setThreadExecutorClass("LiteFlowThreadPool的类路径");  
flowExecutor.setLiteflowConfig(liteflowConfig);  
LiteflowResponse response = flowExecutor.execute2Resp("testChain", null);

执行结果,可以看见线程池使用的是自定义的:
image.png

LiteFlow从2.11.1开始,提供一个
liteflow.when-thread-pool-isolate
参数,默认为
false
,如果设为
true
,则会开启WHEN的线程池隔离机制,这意味着每一个when都会有单独的线程池。

在多线程执行下,我们还有一个疑问,如果其中某个或者某几个并行分支发生异常执行失败那么后面的节点会不会收到影响?假如我们把C节点抛出一个异常,发现流程直接就结束了,并没有执行最后的E节点:
image.png
对于这种情况,
LiteFlow

WHEN
关键字提供了
ignoreError
(默认为false)来提供忽略错误的特性。我们修改流程如下:

<chain name="testChain">  
	THEN(test_a, WHEN(test_b, test_c, test_d).ignoreError(true), test_e);  
</chain>

再次执行发现流程执行到了E节点:
image.png

LiteFlow
还提供了对
WHEN
并行流程中,使用子关键字
any
(默认为false)可以设置任一条分支先执行完即忽略其他分支,继续执行的特性。

<chain name="testChain">  
	THEN(test_a, WHEN(test_b, test_c, test_d).ignoreError(true).any(true), test_e);  
</chain>

我们将C节点Sleep 10秒,可以发现C节点并没有执行,就执行到了E节点:
image.png

除此之外,
LiteFlow
还支持了并行编排中指定节点的执行则忽略其他,
WHEN
关键字子关键字
must
(不可为空),可用于指定需等待执行的任意节点,可以为 1 个或者多个,若指定的所有节点率先完成,则继续往下执行,忽略同级别的其他任务。我们将流程调节如下:

<chain name="testChain">  
	THEN(test_a, WHEN(test_b, test_c, test_d).ignoreError(true).must(test_c), test_e);  
</chain>

我们还是将C节点Sleep 10秒,发现流程一直等到C节点执行结束才会执行后面的节点:
image.png

must子关键字在LiteFlow从v2.11.1版本之后才有。

  • 选择编排
    在实现业务逻辑过程中,我们常见的就是根据某种标识去进行不同的业务流程,通常我们也可以使用策略模式进行实现。在
    LiteFlow
    中可以通过
    SWITCH..TO()
    选择编排,即
    SWITCH
    中的流程返回后面TO中那个节点就会执行那个节点,我们只需要处理好
    SWITCH
    中条件于TO中分支的关系即可。增加一个
    Switch
    组件:
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_SWITCH, nodeType = NodeTypeEnum.SWITCH, nodeId = "test_w", nodeName = "测试组件W")  
public String processSwitch(NodeComponent nodeComponent){  
	log.info("执行W节点");  
	return "test_a";  
}

然后我们规则编排:

<chain name="testSwitchChain">  
	SWITCH(test_w).TO(test_a, test_b, test_c, test_d, test_e);  
</chain>

执行流程:
image.png

SWITCH
还提供了子关键字
DEFAULT
,如果
SWITCH
返回的节点不是TO中的节点,则就走DEFAULT中指定的节点。

<chain name="testSwitchChain">  
	SWITCH(test_w).TO(test_a, test_b, test_c, test_d, test_e).DEFAULT(test_y);  
</chain>

image.png

由选择组件章节中我们知道,
SWITCH
可以返回ID或者链路Tag,上述例子中返回的
test_a
就是一个节点ID(对应
@LiteflowMethod
中指定的nodeId中的值)。当让在规则中我们也可以给表达式设置一个id。
LiteFlow
中规定,每个表达式都可以有一个id值,你可以设置id值来设置一个表达式的id值。然后在选择组件里返回这个id即可。

<chain name="testSwitchChain">  
	SWITCH(test_w).TO(test_a, THEN(test_b, test_c, test_d).id("test_bcd"), test_e).DEFAULT(test_y);  
</chain>

假如此时
test_w
表达式返回的是
test_bcd
,则流程就会执行
test_b
,
test_c
,
test_d
节点:
image.png

除了给表达式赋值
id
属性之外,还可以给表达式赋值
tag
属性。在
SWITCH
中返回
tag

<chain name="testSwitchChain">  
	SWITCH(test_w).TO(test_a, THEN(test_b, test_c, test_d).tag("test_tag"), test_e).DEFAULT(test_y);  
</chain>

我们
SWITCH
组件中返回tag标签:

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_SWITCH, nodeType = NodeTypeEnum.SWITCH, nodeId = "test_w", nodeName = "测试组件W")  
public String processSwitch(NodeComponent nodeComponent){  
	log.info("执行W节点");  
	return "tag:test_tag";  
}

image.png

  • 条件编排
    条件编排类似Java中的if...else,它有
    IF

    IF...ELIF

    ELSE
    几种写法。其中IF以及ELIF中的表达式对应IF组件中返回的
    boolean
    结果。对与IF有二元表达式:
<chain name="testIfChain1">  
	IF(test_f, test_a);  
</chain>

IF
后面还可以跟
ELSE
。类似Java中的
else



<chain name="testIfChain">  
	IF(test_f, test_a).ELSE(test_b);  
</chain>

IF还支持三元表达式,上面的二元表达式等价于如下三元表达式写法:

<chain name="testIfChain">  
	IF(test_f, test_a, test_b);  
</chain>

上面两种表达式都可以解读为:如果
test_f
中返回
true
则执行
test_a
节点,否则执行
test_b
节点。

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_IF, nodeType = NodeTypeEnum.IF, nodeId = "test_f", nodeName = "测试组件F")  
public boolean processF(NodeComponent nodeComponent){  
	log.info("执行F节点");  
	return true;  
}

image.png

我们再看一下
ELIF
的写法,
ELIF
类似Java中的
else if
的写法,它的后面也可以跟
ELSE

<chain name="testIfChain">   
	IF(test_f, test_a).ELIF(test_x, test_b);  
</chain>

我们在订一个
test_x
的IF组件:

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_IF, nodeType = NodeTypeEnum.IF, nodeId = "test_f", nodeName = "测试组件F")  
public boolean processF(NodeComponent nodeComponent){  
	log.info("执行F节点");  
	return false;  
}  
  
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_IF, nodeType = NodeTypeEnum.IF, nodeId = "test_x", nodeName = "测试组件X")  
public boolean processX(NodeComponent nodeComponent){  
	log.info("执行X节点");  
	return true;  
}

image.png

test_f
节点返回false,所以不会执行
test_a
,继续执行
test_x
节点,返回true,则会执行
test_b
节点。
当然
ELIF
后面也可以使用
ELSE

<chain name="testIfChain">   
	IF(test_f, test_a).ELIF(test_x, test_b).ELSE(test_c);
</chain>

此时如果
test_f
以及
test_x
都返回false,就会走
test_c

image.png

值得注意的是,当我们使用
IF
的二元表达式时才会去跟上
ELIF
以及
ELSE
。如果是三元表达式我们不可以使用
ELIF
以及
ELSE
,会报错。

在2.10.0以前可以使用,但是在
IF
中的最后一个表达式会被
ELIF
或者`ELSE中的表达式
覆盖掉。本人只调研到了2.10.0,在这个版本中还是会报错。

  • 循环编排

循环编排类似Java中的循环,分为次数循环(
FOR...DO()
),条件循环(
WHILE...DO()
)以及迭代循环(
ITERATOR...DO()
),同时还涉及跳出循环编排(
BREAK
)。
我们定义一个固定次数的FOR循环:

<chain name="testForChain">  
	FOR(3).DO(THEN(test_a, test_b));  
</chain>

此时会将
test_a

test_b
循环执行3次。
image.png

当然实际开发中,我们需要搭配
FOR
循环组件使用,即在
FOR
循环中返回需要执行的次数:

<chain name="testForChain">  
	FOR(test_w).DO(THEN(test_a, test_b));  
</chain>

假如
test_w
组件中返回次数是3,则执行效果如上固定次数。

接下来我们看一下
WHILE
条件循环,
WHILE
的表达式需要结合
WHILE
组件使用,返回一个
boolean
类型的值,去控制循环的流程,如果为true则继续循环,否则结束循环。

<chain name="testWhileChain">  
	WHILE(test_h).DO(THEN(test_a, test_b));  
</chain>

接下来我们继续看一下迭代循环
ITERATOR
,类似于Java中的for循环。这里我们要配合
ITERATOR
组件使用,返回一个集合的迭代器。

我们定义一个迭代循环编排:

<chain name = "testIteratorChain">  
	ITERATOR(test_i).DO(test_p);  
</chain>

然后我们在定义一个迭代组件以及一个普通组件用于打印集合中的元素:

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_ITERATOR, nodeType = NodeTypeEnum.ITERATOR, nodeId = "test_i", nodeName = "测试组件I")  
public Iterator<String> processI(NodeComponent nodeComponent){  
	List<String> list = Lists.newArrayList("A", "B", "C","D");  
	return list.iterator();  
}  
  
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeType = NodeTypeEnum.COMMON, nodeId = "test_p", nodeName = "测试打印组件")  
public void printData(NodeComponent nodeComponent){  
	String str = nodeComponent.getCurrLoopObj();  
	System.out.println(str);  
}

image.png

有循环编排,就相应的要有跳出循环编排,我们可以使用
BREAK
编排,配合
BREAK
组件使用。

<chain name="testForChain">  
	FOR(test_j).DO(THEN(test_a, test_b)).BREAK(test_break);  
</chain>  
  
<chain name="testWhileChain">  
	WHILE(test_h).DO(THEN(test_a, test_b)).BREAK(test_break);  
</chain>  
  
<chain name = "testIteratorChain">  
	ITERATOR(test_i).DO(test_p).BREAK(test_break);  
</chain>

我们以迭代循环跳出为例:

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_BREAK, nodeType = NodeTypeEnum.BREAK, nodeId = "test_break", nodeName = "测Break组件")  
public boolean processBreak(NodeComponent nodeComponent){  
	String str = nodeComponent.getCurrLoopObj();  
	return Objects.equals("C", str);  
}

image.png

即执行到C元素时返回true,跳出循环。

LiteFlow从v2.9.0开始,提供了循环编排表达式组合。

  • 异常捕获
    EL表达式中还增加了异常捕获表达式,用于捕获节点中的异常。类似Java中的
    try...catch
    。用法为
    CATCH
    ...
    DO
    (
    DO
    关键字非必须),如果在
    CATCH
    的表达式中捕获到了异常,可以在
    DO
    表达式中的节点进行处理,可以使用
    nodeComponent.getSlot().getException()
    获取异常信息。但是有一点我们需要注意,假如我们使用了
    CATCH
    ,如果其中的节点中发生了异常,那么我们在流程执行的结果中也会看到流程执行成功的标识(
    isSuccess
    =true),可以理解,发生的异常被你捕获处理了。
<chain name="testCatchChain">  
	CATCH(  
	THEN(test_a, test_b)  
	).DO(test_catch);  
</chain>

我们在
test_b
节点中手动抛出一个
RuntimeException
,在
test_catch
中使用
nodeComponent.getSlot().getException()
打印捕获到的异常,同时我们在流程执行结果中打印
isSuccess
看流程是否执行成功:

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeType = NodeTypeEnum.COMMON, nodeId = "test_catch", nodeName = "测试Catch组件")  
public void processCatch(NodeComponent nodeComponent){  
	log.error("执行Catch节点,捕获到了异常\n", nodeComponent.getSlot().getException());  
}

@Test  
public void testCatch(){  
	LiteflowResponse response = flowExecutor.execute2Resp("testCatchChain", null);  
	System.out.println(response.isSuccess() ? "执行成功" : "执行失败");  
	System.out.println("结果中的异常信息:" + response.getCause());  
}

image.png
image.png

可以看到
test_catch
打印了异常信息,同时我们可以看到流程执行结果中返回执行成功,没有异常信息。
image.png

同时
CATCH
配合迭代循环还可以达到Java
forEach
循环的
continue
的效果。

<chain name = "testIteratorCatchChain">  
	ITERATOR(test_i).DO(CATCH(THEN(test_pa, test_pb, test_pc)));  
</chain>

我们在
test_pb
在打印C时抛出异常

@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeType = NodeTypeEnum.COMMON, nodeId = "test_pb", nodeName = "测试打印组件B")  
public void printPB(NodeComponent nodeComponent){  
	String str = nodeComponent.getCurrLoopObj();  
	System.out.println("B组件打印:"+ str);  
	if (Objects.equals("B", str)){  
		throw new RuntimeException("B组件发生异常了。。。。");  
	}  
}

image.png
image.png

此时没有执行C组件打印B,直接跳过了
test_pc
节点。

LiteFlow从2.10.0开始提供CATCH表达式

  • 与或非表达式
    与或非表达式即
    AND

    OR

    NOT
    表达式。可以用于返回
    boolean
    值的组件的编排。可以将若干个这种组件编排在一起返回应该boolean值进行后续流程的判断。
<chain name = "testAndOrNotChain">  
	IF(AND(test_f, test_x), test_a, test_c);  
</chain>

此时只有
test_f

test_x
节点都返回true,就会走
test_a
,否则走
test_c

<chain name = "testAndOrNotChain">  
	IF(OR(test_f, test_x), test_a, test_c);  
</chain>

如果是OR,
test_f

test_x
节点都返回false,就会走
test_c
,否则走
test_c

<chain name = "testAndOrNotChain">  
	IF(NOT(test_f), test_a, test_c);  
</chain>

NOT
即非的意思,如果
test_f
返回true,则就会走
test_c
节点,否则走
test_a
节点。

AND

OR

NOT
三种表达式可以相互组合使用。但是只能用于返回
boolean
值的组件上。

  • 子流程
    在日常处理复杂业务时,流程编排的规则会嵌套很多层,可以想象一下那样的流程读起来也比较头疼,而且事实上我们在开发中是需要将复杂的流程业务去拆分成一个个独立的子流程去实现。
    image.png

如上图它的规则如下:

<chain name = "order_handle">  
THEN(  
	SWITCH(order_x).TO(  
	THEN(  
		order_a,  
		order_c,  
		IF(  
			order_k,  
				THEN(  
					order_d,  
					order_f  
				),  
			order_e)  
		).id("to_c"),  
	THEN(  
		order_b,  
		order_c,  
			IF(  
				order_k,  
				THEN(  
					order_d,  
					order_f  
			),  
		order_e)  
	).id("to_b")  
	)  
);  
</chain>

这样写其实也可以,但是读起来理解起来不号。这时我们就可以子流程进行改造。我们按照
to_c
,
to_b
流程进行拆分。

<chain name="order_handle">
	// 主流程  
	THEN(SWITCH(order_x).TO(THEN(to_c).id("to_c"), THEN(to_b).id("to_b")), order_h);  
</chain>

<chain name="order_if">  
	IF(order_k, THEN(order_d, order_f), order_e);  
</chain>  
  
<chain name = "to_c">  
	THEN(order_a, order_c, order_if);  
</chain>  
  
<chain name = "to_b">  
	THEN(order_b, order_c, order_if);  
</chain>

这样流程上就清晰了很多。

  • 子变量
    在复杂流程的编排上,我们不仅可以使用子流程,还可以使用子变量的方式。我们可以直接在流程中定义变量。如上述例子使用子变量可以改造为:
<chain name="order_handle">  
	// 定义一个if节点处理df/e  
	order_if = IF(order_k, THEN(order_d, order_f), order_e);  
	  
	// 定义to_c的订单流程 用id标识流程为to_c  
	to_c = THEN(order_a, order_c, order_if).id("to_c");  
	  
	// 定义to_b的订单流程 用id标识为流程to_b  
	to_b = THEN(order_b, order_c, order_if).id("to_b");  
	  
	// 主流程  
	THEN(SWITCH(order_x).TO(to_c, to_b), order_h);  
</chain>

这样也可以清晰。

  • 其他
    我们在上述一些示例中,每个语句后都加了分号:
    ;
    ,关于规则中的分号,我们链路中只有一条规则的时候(没有自变量)可以不加也可以运行,但是如果存在自变量,一定要在自变量中加上分号,否则汇报错。同时官方也建议不管是否存在子变量,都要加上分号。

另外,EL中我们使用
//
定义注释。

执行器

在上述的一些示例中,我们使用了
flowExecutor
去执行规则。
FlowExecutor
就是流程的执行器,是一个流程执行的触发点。在
Spring
或者
SprigBoot
环境下我们可以直接注入
FlowExecutor
进行使用。
FlowExecutor
中提供同步以及异步两种类型的方法,同步方法直接返回
LiteflowResponse
,而异步返回的是
Future<LiteflowResponse>
。同步方法如下:

//参数为流程ID,无初始流程入参,上下文类型为默认的DefaultContext
public LiteflowResponse execute2Resp(String chainId)
//第一个参数为流程ID,第二个参数为流程入参。上下文类型为默认的DefaultContext
public LiteflowResponse execute2Resp(String chainId, Object param);
//第一个参数为流程ID,第二个参数为流程入参,后面可以传入多个上下文class
public LiteflowResponse execute2Resp(String chainId, Object param, Class<?>... contextBeanClazzArray)
//第一个参数为流程ID,第二个参数为流程入参,后面可以传入多个上下文的Bean
public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray)
// 第一个参数为流程ID,第二个参数为流程入参,第三个参数是用户的RequestId,后面可以传入多个上下文的Bean
public LiteflowResponse execute2RespWithRid(String chainId, Object param, String requestId, Class<?>... contextBeanClazzArray)

这里我们一定要使用自定义上下文传入,不要使用默认上下文。

而异步方法跟同步方法是一样的,只是他是无阻塞。

public Future<LiteflowResponse> execute2Future(String chainId, Object param, Object... contextBeanArray)

同时,执行器可以针对异步执行提供了可配置的线程池参数,

## FlowExecutor的execute2Future的线程数
liteflow.main-executor-works=64

还可以使用自定义线程池,如果使用自定义线程池必须实现
ExecutorBuilder
接口,实现
ExecutorService buildExecutor()
接口。

public class LiteFlowThreadPool implements ExecutorBuilder {  
  
@Override  
public ExecutorService buildExecutor() {  
	ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("mythread-pool-%s").build();  
	return new ThreadPoolExecutor(  
			// 核心线程数,即2个常开窗口  
			2,  
			// 最大的线程数,银行所有的窗口  
			5,  
			// 空闲时间  
			5,  
			TimeUnit.SECONDS,  
			// 工作队列  
			new LinkedBlockingQueue<>(5),  
			// 线程工厂  
			threadFactory,  
			// 拒绝策略  
			new ThreadPoolExecutor.AbortPolicy()  
	);  
	  
	}  
}

关于Java线程池的配置详解,请参考这篇文章:
重温Java基础(二)之Java线程池最全详解

我们可以通过
LiteFlow
的配置信息去设置:

## FlowExecutor的execute2Future的自定义线程池的路径
liteflow.main-executor-class= com.springboot.litefolw.config.LiteFlowThreadPool

通过上述配置文件配置的信息,对全局的
FlowExecutor
都会生效,假如我们相对某一个执行器定义线程池内容,可以使用
LiteFlowConfig
类去的定义(通过配置文件中配置信息也会进入到这个类里)。

LiteflowConfig liteflowConfig = flowExecutor.getLiteflowConfig();  
// FlowExecutor的execute2Future的自定义线程池的路径
liteflowConfig.setMainExecutorClass("com.springboot.litefolw.config.LiteFlowThreadPool");  
// FlowExecutor的execute2Future的自定义线程池的路径
liteflowConfig.setMainExecutorWorks(64);

这里不建议new一个LiteflowConfig去设置配置信息,这样可能会导致配置文件中的一些默认配置信息丢失。

在一个流程执行时,我们需要传入一些参数例如订单号,账户信息等,这些信息会做初始参数传入到流程中。在执行器中我们可以使用上述
FlowExecutor
的方法中的第二个参数(Object param)传入流程入参参数。流程入参可以是任何对象,实际开发中,我们会将自己封装初始化好的Bean传入,然后可以在流程中使用
this.getRequestData()
或者
nodeCompoent.getRequestData()

DataRequest dataRequest = DataRequest.builder().iteratorRequestList(Lists.newArrayList()).build();
LiteflowResponse response = 
// 流程传入参数
flowExecutor.execute2Resp("testIteratorCatchChain", dataRequest);

在流程中获取入参参数:

@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {  
		DataRequest dataRequest = this.getRequestData();  
	}  
}


@Slf4j  
@LiteflowComponent  
public class TestComponent {  
  
	@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeType = NodeTypeEnum.COMMON, nodeId = "test_a", nodeName = "测试组件A")  
	public void processA(NodeComponent nodeComponent){  
		log.info("执行A节点");  
		DataRequest dataRequest = nodeComponent.getRequestData();  
		}
	}

理论上来说,流程入参可以是任何对象,但是我们不应该把数据上下文的实例当做参数传入。流程参数跟数据上下文是两个实例对象,流程入参只能通过
this.getRequestData()
去拿。

最后我们来说一下流程执行的结果
LiteflowResponse
。异步执行的流程可以通过
future.get()
获取。我们简单介绍一下其中常用的一些方法:

public class LiteflowResponse {
	// 判断流程是否执行成功
	public boolean isSuccess();
	
	// 如果流程执行不成功,可以获取流程的异常信息,这个跟isSuccess()使用,很有用
	public Exception getCause();

	// 获取流程的执行步骤
	public Map<String, List<CmpStep>> getExecuteSteps();

	// 获取流程的执行的队列信息
	public Queue<CmpStep> getRollbackStepQueue();

	// 获取流程的执行步骤的字符串信息。这个值在流程执行结束后,liteflow日志也会自动打印
	public String getExecuteStepStr();	

	// 获取数据上下文信息
	public <T> T getContextBean(Class<T> contextBeanClazz);
}

数据上下文

数据上下文对与整个
LiteFlow
来说是非常重要的,从LiteFlow的简介中我们知道LiteFlow的主要功能是业务解耦,那么解耦中很重要的一步就是数据解耦。要做编排,就要消除各个组件中的差异性,组件不接收业务参数,也不会返回业务数据,每个组件只需要从数据上下文中获取自己关心的数据,不用关心此数据是由谁提供的,同样的,每个组件也只需要把自己执行所产生的结果数据放到数据上下文中,也不用关心此数据到底是提供给谁用的。这样就在一定程度上做到了数据解耦。数据上下文进入流程中后,整个链路中的任一节点都可以取到。不同的流程,数据上下文实例是完全隔离开的。

LiteFlow虽然也提供了默认的数据上下文
DefaultContext
,但是实际开发中不建议使用。我们要传入自己自定义的数据上下文对象,同流程入参,我们可以使用任意的Bean作为数据上下文传入到流程中。我们可以定义好若干个数据上下文对象的class传入到流程中,LiteFlow会在调用时进行初始化,给这个上下文分配唯一的实例。

// 传入一个
LiteflowResponse response = flowExecutor.execute2Resp("chain1", new DataRequest(), DataContext.class);

// 传入多个
LiteflowResponse response = flowExecutor.execute2Resp("chain1", new DataRequest(), DataContext.class, OrderContext.class);

我们还可以将已经初始化好的Bean作为数据上下文传入到流程当中:

DataContext dataContext = new DataContext(); 
// 传入一个
LiteflowResponse response = flowExecutor.execute2Resp("chain1", new DataRequest(), dataContext);

// 传入多个
OrderContext orderContext = new OrderContext();  
LiteflowResponse response = flowExecutor.execute2Resp("chain1", new DataRequest(), dataContext, orderContext);

但是有一点要非常注意:
框架并不支持上下文bean和class混传,你要么都传bean,要么都传class。

然后我们就可以在链路的任意节点中通过以下方式获取数据上下文:

@Slf4j  
@LiteflowComponent  
public class TestComponent {  
  
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeType = NodeTypeEnum.COMMON, nodeId = "test_a", nodeName = "测试组件A")  
public void processA(NodeComponent nodeComponent){  
	log.info("执行A节点");  
	DataContext dataContext = nodeComponent.getContextBean(DataContext.class);  
	OrderContext orderContext = nodeComponent.getContextBean(OrderContext.class);  
	}
}


@LiteflowComponent("a")  
public class AComponent extends NodeComponent {  
  
	@Override  
	public void process() throws Exception {   
		DataContext dataContext = this.getContextBean(DataContext.class);  
		OrderContext orderContext = this.getContextBean(OrderContext.class);  
	}  
}

到此,SpringBoot环境下LiteFlow的一些基本概念就介绍完了,大家可以按照这些概念实现一个demo去体验LiteFlow那解耦,以及流程编排那种特爽的柑橘。当然大家也可以参考这篇文章去实现demo:
SpringBoot+LiteFlow优雅解耦复杂核心业务

后面我们在介绍LiteFlow的一些高级特性,例如:组件降级,组件继承,组建回滚,热刷新,以及使用代码构建规则,使用脚本构建组件,还有使用Nacos,Redis作为规则文件源等。

本文已收录于我的个人博客:
码农Academy的博客,专注分享Java技术干货,包括Java基础、Spring Boot、Spring Cloud、Mysql、Redis、Elasticsearch、中间件、架构设计、面试题、程序员攻略等