2024年1月

大家好,我是三友~~

在之前写的
《万字+20张图探秘Nacos注册中心核心实现原理》
这篇文章中我留了一个彩蛋

当文章点赞量突破28个,就单独写一篇关于Raft协议的文章

既然现在文章点赞量已经超过28个,那我就连夜爆肝,把这个坑给填上

由于Nacos使用的是实现了Raft协议的JRaft框架,所以本文主要是基于JRaft框架来讲解

本文我大致分为了下面三个方面的内容:

第一个方面就是讲解在Raft协议中,一个请求在整个集群中的处理过程

第二个方面就是讲解Leader选举相关的内容

第三个方面就是讲一讲JRaft框架做的一些优化

在这个过程中我也会穿插一些Nacos跟JRaft框架整合相关的内容

好了,话不多说,让我们直接进入主题,去会一会这个Raft协议


一个简单的介绍

Raft协议是用来保证服务中各节点数据强一致性的,也就是CAP定理中的CP

在Raft协议中,集群中的服务(也可以被称为节点,Peer)会有三种状态:

  • Leader:主节点,负责处理所有的请求,一个集群只有一个Leader,处于 唯我独尊 的地位
  • Follower:从节点,主要是负责复制Leader的数据,保证跟Leader的数据的一致性,Follower必须服从Leader
  • Candidate:候选节点,中间状态,最终会变成Leader或者Follower

当一个节点启动的时候,需要将自身节点信息注册到集群中Leader节点

比如Nacos启动的时候,就会通过JRaft提供的API
CliService#addPeer
将自己注册到Leader节点中

代码在Nacos的JRaftServer类


一个请求的一生

Raft协议中规定,对于一个请求,一定需要交由Leader节点来处理

服务在接收到用户请求的时候,必须得判断当前的节点是不是Leader节点

  • 如果是Leader节点,那么就可以处理请求
  • 如果不是Leader节点,必须将请求发送给Leader节点处理

如下是Nacos在处理请求时的代码,遵循这个规定

当Leader节点最终接收到请求时,这个请求才能开始被处理

这个请求的处理过程主要分别两个部分:

  • Raft协议对请求的处理
  • 应用服务对于请求的处理

Raft协议对请求的处理,是因为Raft协议是保证数据强一致性的,所以这个协议本身也需要去处理这个请求才能去保证数据强一致性

应用服务对于请求其实很好理解了,因为对于一个请求来说,肯定最终是要交给应用服务来做这个请求对应的操作

就拿Nacos注册永久服务实例举例,这个请求首先会经过Raft协议,最终有Nacos这个应用服务才会去处理这个注册服务的请求

总的来说,Raft协议其实就是本身对你应用服务的请求进行了一个前置的拦截操作,这个拦截恰恰就是用来保证数据强一致性的

最后这个请求在
能保证数据强一致性的前提下
交给应用服务,之后该怎么处理就怎么处理


1、Raft协议对请求的处理


1.1、存储请求日志

当Leader节点接收到请求,就会将请求交给Raft协议来处理

首先他会将每个请求的数据封装成一个一个LogEntry对象

同时也会给每个LogEntry设置一个index

这个index你可以
暂时
认为是LogEntry在整个集群中的唯一标识,从0开始,随着请求的顺序依次递增

之后Raft协议会将LogEntry给存到磁盘中

这样所有请求就会按照顺序一个一个存储到磁盘中了

说到这里,我不知道你有没有想到Redis八股文中常背的AOF日志

Redis的AOF日志也是按照顺序存储所有的写操作

当Redis重启时,可以通过回放AOF日志中的命令来恢复数据

这里的LogEntry其实跟Redis的AOF日志也有相同的作用

当Raft节点重启的时候,可以通过回放LogEntry中请求来恢复数据

相信你在背八股的时候你还背过这么一句话:Redis为了防止AOF文件过大,会重写AOF日志,减小AOF文件大小。那么问题来了,Raft协议会重写LogEntry日志文件来减小日志的大小么?这个问题我们后面来说


1.2、复制日志到所有Follower节点

前面说了,服务在启动时,需要向Leader注册节点信息

在JRaft中,当Leader节点接收到新的Follower节点信息的时候,会为创建一个Replicator,翻译过来就是复制器

这个Replicator会不停地向Follower节点发送请求,将LogEntry按照顺序批量复制给Follower节点

这个过程也被称为
AppendEntries

Follower节点接收到这些LogEntry之后,也会将LogEntry存到磁盘中

Follower与Follower之间复制LogEntry是互不影响的


1.3、提交超过半数复制成功LogEntry的index

Leader在每次批量复制成功之后都会对复制成功的LogEntry进行判断

去找到目前已经被超过半数节点都复制成功的最大LogEntry对应的index

什么意思呢,画张图你就明白了

如上图,整个集群有5个节点,一个Leader,四个Follower

按照图上复制进度我们可以看出,已经被超过半数节点成功复制的LogEntry就是index=4的那个LogEntry

因为Follower2和Follower4,再加上Leader节点,这三个节点都成功有了index=4的那个LogEntry,已经超过了集群节点数量的半数

所以超过半数节点都复制成功的最大LogEntry就是index=4的那个

此时也就表明这个index之前的LogEntry都可以交由应用服务所处理

因为超过半数都能成功复制这个LogEntry,那么表明这个集群基本上就是处于稳定状态

这个index也被称为
CommitIndex
,代表可以被应用服务所处理的最大的index

接下来Raft协议就会将这个index以及之前对应的LogEntry交给应用服务来处理

同时在下次进行复制日志和心跳(后面会说)的时候,会把这个index告诉Follower节点

这样Follower节点也可以处理这些LogEntry对应的请求了

总的来说,一个请求虽然能被Leader节点所接收,但是如果没有超过半数节点复制成功,应用服务就无法处理这个请求。

如果集群发生故障,导致超过半数节点无法复制成功,那么这个请求永远不会被应用服务处理,整个集群就处于不可用状态,很符合CP的概念。


2、应用服务对于请求的处理

上一节说到,当超过半数节点已经复制了LogEntry,此时Raft协议就会将这些LogEntry,也就是请求交给应用服务处理

对于应用服务来说,它必须得去实现一个Raft协议的
状态机
才能拿到这些请求数据

这个状态机在Raft协议中非常重要,
它是应用服务与Raft协议交互的重要桥梁

当Raft协议有什么状态变动的时候,都会调用状态机来通知我们的应用服务

所以状态机不仅仅可以拿到可以处理的请求数据,还可以拿到Raft集群变动的信息

在JRaft框架中,定义了一个
StateMachine
接口来代表状态机

Nacos在整合JRaft框架的时候,自然而然也会(间接)实现这个状态机

当应用系统拿到这个请求数据之后,就可以做对应的业务处理了。

对于Follower节点来说,最终也是调用状态机来处理请求的。

到这,一个请求就算真正完成了

按照传统,这里画一张图来总结整个请求的一生


3、快照(Snapshot)

前面在说LogEntry留了这么一个问题

Raft协议会重写LogEntry日志文件来减小日志的大小么?

答案其实是不会的。

因为很简单,Redis的业务数据的格式Redis本身是知道的,所以他可以根据最新的数据来重新生成AOF文件

但是Raft协议本身肯定不知道你应用系统的业务数据的格式,所以无法重写LogEntry日志文件减小日志的大小

但是无法重写就意味要忍受LogEntry日志越来越大,占有磁盘的空间变大么?

肯定是不会的,LogEntry日志越来越大不仅仅会占有磁盘空间变大,还会导致重启或者故障恢复越慢,因为重启需要一个一个重新处理LogEntry对应的请求

所以,为了解决上述问题,Raft就提供了优化的手段,也就是本节的标题:
快照(Snapshot)

所谓的快照,就是将某一时刻内存中的业务数据给存到磁盘中

这时,我相信你又又想到了Redis八股文中的RDB文件,RDB文件也是某一时刻内存中的数据,所以Raft快照的意思跟RDB文件是很相似的

由于只有应用服务知道内存中的业务数据,所以执行快照这个具体的操作是交给由应用服务来完成的

但是整个过程是由Raft协议去决定是否需要落磁盘

在JRaft框架中,所有的节点默认是每隔1小时执行一次快照任务

JRaft会调用状态机的
onSnapshotSave
方法,告诉应用服务去生成快照

当然Nacos也实现了这个方法

比如对于永久实例来说,Nacos最终会走到下面这个方法将实例数据存到磁盘

当执行完快照之后,此时应用到这个快照数据之前的LogEntry都可以被删除了

整个过程如下图所示

所以,对于Raft协议来说,他的真正数据是包括
快照 + LogEntry
两部分

重启恢复的时候,只需要读取快照数据,然后再处理生成快照之后所处理LogEntry,就可以快速恢复数据了

Redis在4.0版本之后支持混合RDB+AOF的日志来快速恢复数据,这其实就是跟Raft协议的快照+LogEntry恢复数据几乎是一样的原理,所以你看看,很多框架、中间件他们的一些原理都是相似的

快照除了能减少磁盘,快速恢复之外,还有一个很重要的功能,能够提高Follower复制LogEntry的速度,比如下图的这种情况

某一时刻,Leader生成快照之后,之后最小的日志索引就是index=3了,其余的日志都删了

Follower1由于复制的慢,才复制到了index=1的位置,接下来要复制index=2的位置

Follower2新加入的节点,没有数据,接下来要从index=0开始复制

Leader节点在复制数据给Follower的时候,发现Follower接下来要复制的index都已经小于我本身最小的日志了,已经没有办法复制给Follower

此时他就会向Follower发送一个
安装快照
的请求

Follower接收到请求之后,会向Leader发送一个请求,拉取快照数据,将快照数据存到磁盘文件中

之后Follower会调用状态机的
onSnapshotLoad
方法,让应用服务加载这些快照数据到内存中

之后Follower节点只需要复制这个快照之后的LogEntry就可以了,如下图所示

总的来说,快照主要有两个用途,一是减少LogEntery文件大小,帮助重启快速恢复数据,二是可以加快Follower节点的复制进度


选举算法

好了,说完一个请求是怎么在Raft协议中处理之后,接下来讲一讲Raft协议是如何选举Leader节点的


1、选举周期与选举时机

在说选举之前,有两个东西得说一下

  • 选举周期(term)
  • 选举时机


1.1、选举周期

选举周期,也可以叫任期,这个东西非常重要,那么怎么理解呢

举个例子,比如说,在上学的时候,要选择课代表

总共有3个名候选人,要班里的同学投票,规定得票超过半数的人竞选成功

第一次投票,没人得到超过半数票

接下来进行第二次投票、第三次投票...

这里面每一轮投票在Raft协议中就代表一次选举周期,选举周期也会随着选举次数一直递增

Raft协议规定,
在同一个周期中

  • 每个节点只能投一次票
  • 最多只能有一个节点成为Leader节点,当没选出Leader节点时,此时就会进入下一个周期选举

周期会保存在每个节点的内存中,在一个已经选举稳定运行的集群中,每个节点的周期是一样的

前面在介绍LogEntry的时候,我说过暂时可以认为index全局唯一的

之所以这么说,主要是因为真正的可以判断一个唯一的LogEntry,需要通过周期(term)和index一起判断

当生成LogEntry,其实会将当前的集群的周期和index一起写入到LogEntry中

为什么要通过两个一起判断呢?举个例子,如下图所示

集群中有很多节点,我只画了两个

某一时刻出现了上图,在周期1时,Leader节点的index已经到了5,而Follower值复制到了index=3

假设此时Leader网络出现故障,于是集群开始重新选举

此时选举出图上的Follower作为新的Leader节点,选举周期增加到了2

此时新的Leader(原先Follower)会接收新的请求,写LogEntry,此时他会接着本身的最大的index=3继续按顺序写日志

所以从这可以看出,仅仅只根据同一个index并不能确定是同一个LogEntry

如果此时原Leader恢复了网络,就会成为Follower,接收新的Leader的LogEntry

由于Leader
唯我独尊
的地位,此时这个Follower需要丢弃之前周期1的index=4位置的LogEntry,复制Leader的日志

所以从这可以看出,比较两个日志谁的更新,需要先判断周期,周期越大越新,周期相同再判断index,index越大越新

记住这句话,选举的时候有用


1.2、选举时机

就是当Follower超过一定时间没有接收到Leader节点的心跳,这个Follower节点就会开启选举

在一个集群中,Leader会不停地向每个Follower节点发送心跳保持自己的领导地位

并且这个心跳机制的实现是通过复用
日志复制接口

Leader会构建一个LogEntry,这个LogEntry中并没有携带什么用户的请求数据,但是会携带
CommitIndex
信息,方便Follower及时将可以处理的LogEntry交由应用服务处理

当Follower发现LogEntry没有请求数据,就知道是心跳信息,就不会将LogEntry给落到磁盘了,仅仅只是更新Leader节点的心跳时间

其实,只要通过日志复制接口来的数据,都会更新心跳时间,不仅仅是心跳包,复制真正日志的时候也更新心跳时间

在JRaft框架中,
每个Follower
会开启一个定时任务,每隔1~2s去判断最后一次心跳时间,如果发现超过1s没接收到心跳信息就会开启选举

所以每个Follower都可以发起选举,只要超过一定时间没收到心跳信息


2、StepDown

在说选举之前,再讲一个很重要的
StepDown
操作,可以翻译为
下台
的意思

这个操作规定,集群中所有的节点,
一旦发现集群中有更大的周期

请求和响应数据都需要携带自身节点的周期,所以每个节点都可以知道集群中其它节点的周期

那么此时这个节点必须进行
StepDown
操作

这个
StepDown
操作包含下面三步:

  • 将自身的周期设置成其它节点这个更大的周期
  • 状态重新设置成Follower
  • 重新开启选举倒计时,检查心跳,一旦又超过1~2s没接收到新,就基于这个更大的周期再次递增,再次开始选举

所以这个
StepDown
操作说白了就是让周期更低的节点去等待其它周期更高的节点成为Leader节点(不一定会成)

这一节很重要,如果接下来选举如果有什么想不通的,多读这一节


3、预选举

当Follower可以发起选举的时候,并不会直接选举,而是先进行一波预选举

在原始的Raft协议中是没有预选举机制,预选举是JRaft框架做的优化

所谓预选举就是在正式选举之前去判断一下自己是不是有潜质,很大几率成为Leader

那么为什么JRaft框架要加入预选举机制?

主要是因为如果在选举过程中出现网络问题,很多节点同时发生选举,导致选举冲突,无法正确选出Leader节点,造成多次无效的选举尝试,进而影响集群的稳定性和可用性

所以为了保证稳定性,只有当当前节点在预选举中可以成为Leader节点,当前节点才会开启真正的选举

预选举的过程并不会改变当前节点的各种状态,仅仅只会向所有的其它节点发送预投票请求,携带下面这些数据

  • 当前周期+1,说明准备在下个周期开始选举了
  • 当前复制最后一条LogEntry对应的term和index,为了比较谁的日志更新

当其它节点接收到请求的时候,在投票的时候遵循下面几个原则:

  • 当前节点与Leader节点的心跳正常,说明Leader节点每什么问题,投否定票
  • 发起预投票的节点的周期比自己的小,说明其它节点可能都已经发生好几轮正式选举了,那么你就别凑选举的热闹了,直接投否定票
  • 发起预投票的节点复制的日志没有当前节点复制的日志新,直接投否定票

当前面都判断条件都过了之后,那么就会给这个节点投赞成票

当超过半数节点都投了赞成票,那么当前节点就可以正式发起选举了


4、正式选举

当正式选举开始的时候,当前节点会进行一些准备操作:

  • 选举周期加1,比如原来是2,现在就为3
  • 将当前状态从Follower转换成Candidate,表明这个节点成为候选者
  • 将当前这个周期的票投给自己,自己发起选举的,这个票优先投给自己

当这波操作完成之后,就开始构建请求,携带的信息跟预投票是一样的,包括:当前递增后周期、当前节点复制的最新的LogEntry对应的周期(term)和index,然后向所有的节点发送请求

当其它节点接收到请求之后,也会进行一系列判断

  • 发起预投票的节点的周期比自己的大,投赞成票,并且会StepDown操作
  • 发起预投票的节点的周期比自己的小,则直接投反对票
  • 周期相等,如果已经投票给自己或者别人,则投否定票;否则判断日志谁的更新,如果自己的日志更新则直接投反对票,否则就投赞成票

其实正式投票判断条件跟预投票的判断很像

当收到超过半数的节点的赞成票之后,当前节点就可以成为Leader节点了

之后这个节点就会接收请求,将LogEntry复制给其它从节点,同时维护与从节点之间的心跳


JRaft的一些优化

JRaft的优化很多,比如几乎全链路异步、批量并行复制LogEntry、预投票机制等等

这点我再说几点功能性的优化


1、线性一致读

前面说到请求的一生的时候我说过,所有请求都需要交给Leader节点来处理

但是你可以思考个问题

对于
读请求
来说,真的有必要都交给Leader节点来处理么?

其实是没有必要的

因为虽然Leader节点可以处理,但是Leader节点在处理这个读请求时也会走一遍Raft协议

将读请求的数据封装成LogEntry,然后复制到Follower,最后过半复制成功再处理这个读请求

由于读请求来说并不涉及到数据修改,不会造成数据不一致性

所以走上面这段逻辑并没有任何好处,反而对提高Leader节点处理请求的压力

所以,为了减轻Leader节点的压力,提出了一种叫线性一致读的方式来处理读请求,可以让从节点处理
读请求

线性一致读是Raft协议提出的优化,JRaft框架实现了这个逻辑

如上图,假设
t时刻
去读数据,此时Leader节点已经将index=3的LogEntry应用到状态机

由于Follower节点和Leader节点都是一样的,按照顺序处理LogEntry请求的数据,也就是线性处理

所以只要等到Follower节点也处理到index=3的LogEntry,那么此时Follower节点就和
t时刻
的Leader节点的数据是一样的了

之所以是
t时刻
Leader节点数据,是因为Leader节点可能会继续应用LogEntry,但是由于读发生在
t时刻
,所以只要Follower节点运用到的index是
t时刻
Leader节点的index就可以了

这样就可以处理从Follower节点处理这个读请求了,读到数据了

线性一致读底层实现也比较简单,就是每次读得时候都去从Leader节点去查找此时Leader节点运用到的index大小

然后当Follower节点应用到的index跟Leader节点是一样的,此时就可以从Follower节点读数据了

所以前面说的那句所有的请求都一定要交给Leader处理,其实可以改成所有的写请求一定要交给Leader处理,读请求可以通过线性一致读交给Follower节点处理

Naocs在读数据的时候就使用到了线性一致读

代码在Nacos的JRaftServer类


2、Learner

在JRaft会有一个Learner的概念,翻译过来就是学习者的意思,也被称为
只读成员

所以从名字也可以看出,当一个节点是Learner节点,那么这个节点仅仅只从Leader节点复制数据,并不参与选举、投票和日志复制成功确认

如果当前节点想表明自己是一个Learner的话,在向Leader节点注册的时候,可以通过
CliService#addLearners
这个API就可以了

这个Learner节点的作用可以用来创建一个只读的服务,通过线性一致读实现类似读写分离效果


3、Multi-Raft-Group

Multi-Raft-Group也是JRaft中一个重要的特性

由于所有的写请求都需要交给Leader处理,一个Leader由于只在一台机器,所以就会导致Leader所在的机器压力很大

基于这个问题,就提出了Multi-Raft-Group

所谓的Multi-Raft-Group,你可以理解的是多租户隔离,通过一个分组(Group)进行隔离

每个组都是有自己的Leader,有自己的单独选举,完全相互不影响

就有点像上学时课代表的意思,每个课代表处理不同的学科的任务,互不影响

举个例子,假设现在有三台机器

我们可以根据系统功能或者数据进行分组,分为A、B、C三个组

此时A组的Leader可能是机器1,B组的Leader可能是机器2,C组的Leader可能是机器3

这样对于不同的组,对应的写请求就可以交给不同的机器处理,这样就解决了一个Leader只在一个机器导致压力大的问题

这就是Multi-Raft-Group

由于有分组,这就导致所有的请求都得携带group的信息,表明这个请求属于哪个组,所以前面提到的所有请求参数其实都得携带group名称

既然有这个分组的功能,Nacos也肯定使用了,对于不同的功能有不同的分组

Nacos目前有4个分组,名称如下图所示


总结

到这就讲完了Raft协议大致内容以及它的实现JRaft框架

这篇又又又是洋洋洒洒写了近万字,根本停不下来

由于Raft协议这种技术栈可能没那么受众

所以不知道有多少人能坚持看到这里

如果你坚持看到这里,觉得本篇对你有点帮助,欢迎点赞、在看、收藏、转发分享给其他需要的人

你的支持就是我更新的最大动力,感谢感谢!

好了,本文就讲到这里,让我们下期再见,拜拜。


往期热门文章推荐

如何去阅读源码,我总结了18条心法

如何写出漂亮代码,我总结了45个小技巧

三万字盘点Spring/Boot的那些常用扩展点

三万字盘点Spring 9大核心基础功能

两万字盘点那些被玩烂了的设计模式

万字+20张图探秘Nacos注册中心核心实现原理

万字+20张图剖析Spring启动时12个核心步骤

1.5万字+30张图盘点索引常见的11个知识点

扫码或者搜索关注公众号
三友的java日记
,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题。

前言

大家好,这里是白泽。
《Go语言的100个错误以及如何避免》
是最近朋友推荐我阅读的书籍,我初步浏览之后,大为惊喜。就像这书中第一章的标题说到的:“
Go: Simple to learn but hard to master
”,整本书通过分析100个错误使用 Go 语言的场景,带你深入理解 Go 语言。

我的愿景是以这套文章,在保持权威性的基础上,脱离对原文的依赖,对这100个场景进行篇幅合适的中文讲解。所涉内容较多,总计约 8w 字,这是该系列的第二篇文章,对应书中第11-20个错误场景。


我的博客一直没有一个好看的评论区,自己做又不会。。没错,我是个前端渣渣。调研了一下,一开始想套用一些网上的静态模板,但是改造成本还是挺大的,后来接触到了Waline,简单了解了以下,我就知道了,它就是我理想中的评论区功能实现,和我的博客匹配度MAX。

一、Waline简介

Waline官网:
https://waline.js.org/

Waline github地址:
https://github.com/walinejs/waline

Waline是一款评论区实现方案软件,它需要单独的服务端部署,适用于非常适用于静态博客,比如Hexo,对于大多数个人博客也一样适用。

它有以下特点:

  1. 评论支持markdown,同时支持多套表情、数据公式、html嵌入等
  2. 强大的安全性:匿名登录内容校验、防灌水、保护敏感数据等
  3. 登录支持:支持注册登录、github登录等多种登录方式,同时也支持匿名登录
  4. ......

确实,它的功能很强大,UI也很漂亮。

二、Waline安装

1、Wline软件架构

直接看Waline文档,很多人一开始会比较迷糊,LeanCloud是啥,为啥要注册?Vercel又是啥?啥啥啥,为啥步骤这么复杂。。。。其实非常简单,一张架构图表示下

Waline软件架构图

可以清楚的看到,Vercel只是服务端部署的一种方案,LeanCloud只是个数据存储服务,它们均有可替代方案,推荐使用
独立部署
替代Vercel,使用
Mysql
替代LeanCloud。

2、服务端安装

这里使用独立部署的方案,独立部署官方文档:
https://waline.js.org/guide/deploy/vps.html

Waline关于MySQL的说明:
https://waline.js.org/guide/database.html#mysql

首先,需要先建好MySQL数据库waline(注意使用utf8mb4编码),并执行初始化脚本,初始化脚本见github:
https://github.com/walinejs/waline/blob/main/assets/waline.sql

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
SET NAMES utf8mb4;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40666666 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;


# Dump of table wl_Comment
# ------------------------------------------------------------

CREATE TABLE `wl_Comment` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `comment` text,
  `insertedAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `ip` varchar(100) DEFAULT '',
  `link` varchar(255) DEFAULT NULL,
  `mail` varchar(255) DEFAULT NULL,
  `nick` varchar(255) DEFAULT NULL,
  `pid` int(11) DEFAULT NULL,
  `rid` int(11) DEFAULT NULL,
  `sticky` boolean DEFAULT NULL,
  `status` varchar(50) NOT NULL DEFAULT '',
  `like` int(11) DEFAULT NULL,
  `ua` text,
  `url` varchar(255) DEFAULT NULL,
  `createdAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `updatedAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;



# Dump of table wl_Counter
# ------------------------------------------------------------

CREATE TABLE `wl_Counter` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `time` int(11) DEFAULT NULL,
  `reaction0` int(11) DEFAULT NULL,
  `reaction1` int(11) DEFAULT NULL,
  `reaction2` int(11) DEFAULT NULL,
  `reaction3` int(11) DEFAULT NULL,
  `reaction4` int(11) DEFAULT NULL,
  `reaction5` int(11) DEFAULT NULL,
  `reaction6` int(11) DEFAULT NULL,
  `reaction7` int(11) DEFAULT NULL,
  `reaction8` int(11) DEFAULT NULL,
  `url` varchar(255) NOT NULL DEFAULT '',
  `createdAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `updatedAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;



# Dump of table wl_Users
# ------------------------------------------------------------

CREATE TABLE `wl_Users` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `display_name` varchar(255) NOT NULL DEFAULT '',
  `email` varchar(255) NOT NULL DEFAULT '',
  `password` varchar(255) NOT NULL DEFAULT '',
  `type` varchar(50) NOT NULL DEFAULT '',
  `label` varchar(255) DEFAULT NULL,
  `url` varchar(255) DEFAULT NULL,
  `avatar` varchar(255) DEFAULT NULL,
  `github` varchar(255) DEFAULT NULL,
  `twitter` varchar(255) DEFAULT NULL,
  `facebook` varchar(255) DEFAULT NULL,
  `google` varchar(255) DEFAULT NULL,
  `weibo` varchar(255) DEFAULT NULL,
  `qq` varchar(255) DEFAULT NULL,
  `2fa` varchar(32) DEFAULT NULL,
  `createdAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `updatedAt` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;




/*!40666666 SET SQL_NOTES=@OLD_SQL_NOTES */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;

为了方便部署,这里使用了docker-compose部署,docker-compose.yml文件如下

version: '3'

services:
  waline:
    container_name: waline
    image: lizheming/waline:latest
    restart: always
    ports:
      - 8360:8360
    volumes:
      - ${PWD}/data:/app/data
    environment:
      #数据库配置
      MYSQL_HOST: 'MySQL连接地址'
      MYSQL_PORT: '端口号'
      MYSQL_DB: '数据库'
      MYSQL_USER: 'root'
      MYSQL_PASSWORD: '密码'
      MYSQL_PREFIX: 'wl_'
      MYSQL_CHARSET: 'utf8mb4'
      MYSQL_SSL: 'false'
      TZ: Asia/Shanghai
      #邮箱配置,163为例
      SMTP_SERVICE: '163'
      AUTHOR_EMAIL: '邮箱地址'
      SMTP_HOST: 'smtp.163.com'
      SMTP_PORT: '465'
      SMTP_USER: '邮箱地址'
      SMTP_PASS: '第三方授权登陆token'
      SMTP_SECURE: 'true'
      #站点地址,方便发邮件时直接跳转
      SITE_URL: 'https://blog.kdyzm.cn'
      SITE_NAME: '一枝梅的博客'
      #这里这个似乎不生效,不知道什么原因,但是不影响使用
      SENDER_NAME: 'kdyzm'
      #安全域名
      SECURE_DOMAINS: 'blog.kdyzm.cn'
      #配置文章响应
      LEVELS: '0,10,20,50,100,200'

之后使用
docker-compose up
命令启动看看有没有报错,如果没有报错,使用
docker-compose up -d
命令后台启动即可。

服务端启动成功之后访问
http://127.0.0.1:8360/
,会看到默认的效果展示

image-20240131133344552

访问
http://localhost:8360/ui/register,进行注册,第一个注册的用户即为管理员

image-20240131133438447

进去之后可以修改自己的头像、管理评论等

三、前端使用

首先,定义一个div放在要显示评论区的位置:

 <div id="waline"></div>

接下来在下方引入样式和js

<script src="https://unpkg.com/@waline/client@v2/dist/waline.js"></script>
<link href='//unpkg.com/@waline/client@v2/dist/waline.css' rel='stylesheet' />

注意这里应当引入v2版本,而不是官方的v3版本(实测v3版本有bug,它不能正确适配移动端而且登录功能也有问题)看官方的包v3版本刚更新,似乎还不是很稳定。

然后初始化脚本

<script>
    const waline = Waline.init({
        el: '#waline',
        //后端服务器地址
        serverURL: 'http://localhost:8360',
        emoji: [
            'https://unpkg.com/@waline/emojis@1.2.0/alus',
            'https://unpkg.com/@waline/emojis@1.2.0/bilibili',
            ......各种表情包
        ],
        //禁止图片上传按钮
        imageUploader: false,
        //不显示版权标志
        copyright: false,
    });
</script>

1、禁用图片上传按钮

初始化的时候配置
imageUploader: false,
即可

2、不显示版权标志

初始化的时候配置
copyright: false,
即可

3、各种表情包配置

官方文档:
https://waline.js.org/guide/features/emoji.html

常用的表情包如上所示,官方文档里还有其他表情包,部分表情包显示效果可看我的博客留言板模块:
https://blog.kdyzm.cn/messageBoard

四、欢迎来到我的博客留言

留言板使用了Waline,查看Waline效果,请看:
https://blog.kdyzm.cn/messageBoard

image-20240131133950677

前言

现在是分布式微服务开发的时代,除了小工具和游戏之类刚需本地运行的程序已经很少见到纯单机应用。现在流行的Web应用由于物理隔离天然形成了分布式架构,核心业务由服务器运行,边缘业务由客户端运行。对于消费终端应用,为了应付庞大的流量,服务端本身也要进行再切分以满足多实例和不同业务独立运行的需要。

在单机应用中,架构设计的必要性则弱很多,精心设计架构的应用基本是为适应团队开发的需要。单机程序因为没有物理隔离很容易写成耦合的代码,给未来的发展埋下隐患。如果能利用Web应用的思路设计应用,可以轻松做到最基本的模块化,把界面和数据传输同核心业务逻辑分离。Web服务的分布式架构等设计也能用最简单的方式复用到单机程序。

ASP.NET Core为这个设想提供了原生支持。基本思路是利用
TestServer
承载服务,然后用
TestServer
提供的用内存流直接和服务通信的特殊
HttpClient
完成交互。这样就摆脱了网络和进程间通信的基本开销以最低的成本实现虚拟的C/S架构。

新书宣传

有关新书的更多介绍欢迎查看
《C#与.NET6 开发从入门到实践》上市,作者亲自来打广告了!
image

正文

TestServer
本是为ASP.NET Core集成测试而开发的特殊
IServer
实现,这个服务器并不使用任何网络资源,因此也无法从网络访问。访问
TestServer
的唯一途径是使用由
TestServer
的成员方法创建的特殊
HttpClient
,这个Client的底层不使用
SocketsHttpMessageHandler
而是使用专用Handler由内存流传输数据。

TestServer

Microsoft.AspNetCore.TestHost
包中定义,可以用于集成测试,但是官方建议使用
Microsoft.AspNetCore.Mvc.Testing
包来进行测试。这个包在基础包之上进行了一些封装,简化了单元测试类的定义,并为Client增加了自动重定向和Cookie处理以兼容带重定向和Cookie的测试。笔者之前也一直在研究如何用这个包实现目标,但是无奈这个包的一些强制规则不适用测试之外的情况。最终只能用基础包来开发。

为了实现集成测试包的额外Client功能,从源代码中复制这些类的代码来用。开源项目就是好啊!

特殊Client在本地使用时有非常大的优势,但是如果其中的某些情况需要和真实网络交互就做不到了。为此笔者开发了一个使用网络通信的HttpMessageHandler来处理这种情况。

RedirectHandler

/// <summary>
/// A <see cref="DelegatingHandler"/> that follows redirect responses.
/// </summary>
public class RedirectHandler : DelegatingHandler
{
    internal const int DefaultMaxRedirects = 7;

    /// <summary>
    /// Creates a new instance of <see cref="RedirectHandler"/>.
    /// </summary>
    public RedirectHandler()
        : this(maxRedirects: DefaultMaxRedirects)
    {
    }

    /// <summary>
    /// Creates a new instance of <see cref="RedirectHandler"/>.
    /// </summary>
    /// <param name="maxRedirects">The maximum number of redirect responses to follow. It must be
    /// equal or greater than 0.</param>
    public RedirectHandler(int maxRedirects)
    {
        ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxRedirects);

        MaxRedirects = maxRedirects;
    }

    /// <summary>
    /// Gets the maximum number of redirects this handler will follow.
    /// </summary>
    public int MaxRedirects { get; }

    /// <inheritdoc />
    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        var remainingRedirects = MaxRedirects;
        var redirectRequest = new HttpRequestMessage();
        var originalRequestContent = HasBody(request) ? await DuplicateRequestContentAsync(request) : null;
        CopyRequestHeaders(request.Headers, redirectRequest.Headers);
        var response = await base.SendAsync(request, cancellationToken);
        while (IsRedirect(response) && remainingRedirects > 0)
        {
            remainingRedirects--;
            UpdateRedirectRequest(response, redirectRequest, originalRequestContent);
            originalRequestContent = HasBody(redirectRequest) ? await DuplicateRequestContentAsync(redirectRequest) : null;
            response = await base.SendAsync(redirectRequest, cancellationToken);
        }

        return response;
    }

    protected internal static bool HasBody(HttpRequestMessage request) =>
        request.Method == HttpMethod.Post || request.Method == HttpMethod.Put;

    protected internal static async Task<HttpContent?> DuplicateRequestContentAsync(HttpRequestMessage request)
    {
        if (request.Content == null)
        {
            return null;
        }
        var originalRequestContent = request.Content;
        var (originalBody, copy) = await CopyBody(request);

        var contentCopy = new StreamContent(copy);
        request.Content = new StreamContent(originalBody);

        CopyContentHeaders(originalRequestContent, request.Content, contentCopy);

        return contentCopy;
    }

    protected internal static void CopyContentHeaders(
        HttpContent originalRequestContent,
        HttpContent newRequestContent,
        HttpContent contentCopy)
    {
        foreach (var header in originalRequestContent.Headers)
        {
            contentCopy.Headers.TryAddWithoutValidation(header.Key, header.Value);
            newRequestContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
        }
    }

    protected internal static void CopyRequestHeaders(
        HttpRequestHeaders originalRequestHeaders,
        HttpRequestHeaders redirectRequestHeaders)
    {
        foreach (var header in originalRequestHeaders)
        {
            // Avoid copying the Authorization header to match the behavior
            // in the HTTP client when processing redirects
            // https://github.com/dotnet/runtime/blob/69b5d67d9418d672609aa6e2c418a3d4ae00ad18/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs#L509-L517
            if (!header.Key.Equals(HeaderNames.Authorization, StringComparison.OrdinalIgnoreCase))
            {
                redirectRequestHeaders.TryAddWithoutValidation(header.Key, header.Value);
            }
        }
    }

    protected internal static async Task<(Stream originalBody, Stream copy)> CopyBody(HttpRequestMessage request)
    {
        var originalBody = await request.Content!.ReadAsStreamAsync();
        var bodyCopy = new MemoryStream();
        await originalBody.CopyToAsync(bodyCopy);
        bodyCopy.Seek(0, SeekOrigin.Begin);
        if (originalBody.CanSeek)
        {
            originalBody.Seek(0, SeekOrigin.Begin);
        }
        else
        {
            originalBody = new MemoryStream();
            await bodyCopy.CopyToAsync(originalBody);
            originalBody.Seek(0, SeekOrigin.Begin);
            bodyCopy.Seek(0, SeekOrigin.Begin);
        }

        return (originalBody, bodyCopy);
    }

    protected internal static void UpdateRedirectRequest(
        HttpResponseMessage response,
        HttpRequestMessage redirect,
        HttpContent? originalContent)
    {
        Debug.Assert(response.RequestMessage is not null);

        var location = response.Headers.Location;
        if (location != null)
        {
            if (!location.IsAbsoluteUri && response.RequestMessage.RequestUri is Uri requestUri)
            {
                location = new Uri(requestUri, location);
            }

            redirect.RequestUri = location;
        }

        if (!ShouldKeepVerb(response))
        {
            redirect.Method = HttpMethod.Get;
        }
        else
        {
            redirect.Method = response.RequestMessage.Method;
            redirect.Content = originalContent;
        }

        foreach (var property in response.RequestMessage.Options)
        {
            var key = new HttpRequestOptionsKey<object?>(property.Key);
            redirect.Options.Set(key, property.Value);
        }
    }

    protected internal static bool ShouldKeepVerb(HttpResponseMessage response) =>
        response.StatusCode == HttpStatusCode.RedirectKeepVerb ||
            response.StatusCode == HttpStatusCode.PermanentRedirect;

    protected internal static bool IsRedirect(HttpResponseMessage response) =>
        response.StatusCode == HttpStatusCode.MovedPermanently ||
            response.StatusCode == HttpStatusCode.Redirect ||
            response.StatusCode == HttpStatusCode.RedirectMethod ||
            response.StatusCode == HttpStatusCode.RedirectKeepVerb ||
            response.StatusCode == HttpStatusCode.PermanentRedirect;
}

这是从原项目复制后修改的重定向处理器,主要是把部分方法的访问级别稍微放宽。从代码可以看出这个处理器使用内存流复制来实现消息体复制和重定向,如果请求包含大文件上传可能出现复制操作把文件内容缓冲到内存导致内存溢出。不过这种情况应该非常少见,这里不考虑处理这种情况。

RemoteLocalAutoSwitchWithRedirectHandler

public class RemoteLocalAutoSwitchWithRedirectHandler : DelegatingHandler
{
    private readonly Uri _localAddress;
    private readonly RedirectHandler? _localRedirectHandler;
    private readonly string _nameOfNamedClient;
    private readonly IServiceScope _scope;
    private volatile bool _disposed;

    private HttpClient _remoteHttpClient;
    private HttpClient? _localHttpClient;

    public RemoteLocalAutoSwitchWithRedirectHandler(
        Uri localAddress,
        RedirectHandler? localRedirectHandler,
        IServiceScope scope,
        string nameOfNamedClient)
    {
        ArgumentNullException.ThrowIfNull(localAddress);
        ArgumentNullException.ThrowIfNull(scope);

        _localAddress = localAddress;
        _localRedirectHandler = localRedirectHandler;
        _scope = scope;
        _nameOfNamedClient = nameOfNamedClient;

        _remoteHttpClient = _scope.ServiceProvider
            .GetRequiredService<IHttpClientFactory>()
            .CreateClient(_nameOfNamedClient);
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        ObjectDisposedException.ThrowIf(_disposed, this);

        if (IsLocalAddress(request.RequestUri, _localAddress))
        {
            return await base.SendAsync(request, cancellationToken);
        }
        else
        {
            var response = await _remoteHttpClient.SendAsync(request, cancellationToken);

            if (_localRedirectHandler is null) return response;

            var remainingRedirects = _localRedirectHandler.MaxRedirects;
            var redirectRequest = new HttpRequestMessage();
            var originalRequestContent = RedirectHandler.HasBody(request) ? await RedirectHandler.DuplicateRequestContentAsync(request) : null;
            RedirectHandler.CopyRequestHeaders(request.Headers, redirectRequest.Headers);
            while (RedirectHandler.IsRedirect(response) && remainingRedirects > 0)
            {
                remainingRedirects--;
                RedirectHandler.UpdateRedirectRequest(response, redirectRequest, originalRequestContent);
                originalRequestContent = RedirectHandler.HasBody(request) ? await RedirectHandler.DuplicateRequestContentAsync(request) : null;
                RedirectHandler.CopyRequestHeaders(request.Headers, redirectRequest.Headers);

                if (IsLocalAddress(response.Headers.Location, _localAddress))
                {
                    _localHttpClient ??= new HttpClient(_localRedirectHandler);
                    response = await _localHttpClient.SendAsync(redirectRequest, cancellationToken);
                }
                else
                {
                    response = await _remoteHttpClient.SendAsync(redirectRequest, cancellationToken);
                }
            }

            return response;
        }
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing && !_disposed)
        {
            _disposed = true;

            _scope.Dispose();
        }

        base.Dispose(disposing);
    }

    private static bool IsLocalAddress(Uri? uri, Uri? localAddress) =>
        uri is not null && localAddress is not null
            && uri.Scheme == localAddress.Scheme
            && uri.Host == localAddress.Host
            && uri.Port == localAddress.Port;
}

这是笔者为处理网络请求编写的处理器,并且这个处理器自带重定向功能,逻辑基本是抄的官方代码。然后做了一些本地请求和外部网络请求的区分处理。

网络请求处理器从主机的依赖注入服务获取客户端,因此要提前在主机服务中注册客户端,并且要关闭网络客户端自带的重定向。

TestServerClientHandlerOptions

/// <summary>
/// The default options to use to when creating
/// <see cref="HttpMessageHandler"/> instances by calling
/// <see cref="TestServerExtensions.CreateHandlers(TestServer, TestServerClientHandlerOptions)"/>.
/// </summary>
public class TestServerClientHandlerOptions
{
    public const string DefaultTestServerRemoteRequestClientName = "DefaultTestServerRemoteRequestClient";

    /// <summary>
    /// Initializes a new instance of <see cref="TestServerClientHandlerOptions"/>.
    /// </summary>
    public TestServerClientHandlerOptions()
    {
    }

    // Copy constructor
    internal TestServerClientHandlerOptions(TestServerClientHandlerOptions clientOptions)
    {
        AllowAutoRedirect = clientOptions.AllowAutoRedirect;
        MaxAutomaticRedirections = clientOptions.MaxAutomaticRedirections;
        HandleCookies = clientOptions.HandleCookies;
        ProcessRemoteRequest = clientOptions.ProcessRemoteRequest;
        RemoteRequestClientName = clientOptions.RemoteRequestClientName;
    }

    /// <summary>
    /// Gets or sets whether or not <see cref="HttpMessageHandler"/> instances created by calling
    /// <see cref="TestServerExtensions.CreateHandlers(TestServer, TestServerClientHandlerOptions)"/>
    /// should automatically follow redirect responses.
    /// The default is <c>true</c>.
    /// </summary>
    public bool AllowAutoRedirect { get; set; } = true;

    /// <summary>
    /// Gets or sets the maximum number of redirect responses that <see cref="HttpMessageHandler"/> instances
    /// created by calling <see cref="TestServerExtensions.CreateHandlers(TestServer, TestServerClientHandlerOptions)"/>
    /// should follow.
    /// The default is <c>7</c>.
    /// </summary>
    public int MaxAutomaticRedirections { get; set; } = RedirectHandler.DefaultMaxRedirects;

    /// <summary>
    /// Gets or sets whether <see cref="HttpMessageHandler"/> instances created by calling
    /// <see cref="TestServerExtensions.CreateHandlers(TestServer, TestServerClientHandlerOptions)"/>
    /// should handle cookies.
    /// The default is <c>true</c>.
    /// </summary>
    public bool HandleCookies { get; set; } = true;

    public bool ProcessRemoteRequest { get; set; } = false;

    public string? RemoteRequestClientName { get; set; } = DefaultTestServerRemoteRequestClientName;
}

这是从集成测试包中复制后改造的处理器选项类,用于控制客户端实例化时要启用的功能。
ProcessRemoteRequest
控制是否启用网络请求处理。
RemoteRequestClientName
用于指定在主机中注册的命名客户端的名字。

TestServerClientOptions

/// <summary>
/// The default options to use to when creating
/// <see cref="HttpClient"/> instances by calling
/// <see cref="TestServerExtensions.GetTestClient(IHost, TestServerClientOptions)"/>.
/// </summary>
public class TestServerClientOptions : TestServerClientHandlerOptions
{
    /// <summary>
    /// Initializes a new instance of <see cref="TestServerClientOptions"/>.
    /// </summary>
    public TestServerClientOptions() { }

    // Copy constructor
    internal TestServerClientOptions(TestServerClientOptions clientOptions)
        : base(clientOptions)
    {
        BaseAddress = clientOptions.BaseAddress;
        DefaultRequestVersion = clientOptions.DefaultRequestVersion;
    }

    /// <summary>
    /// Gets or sets the base address of <see cref="HttpClient"/> instances created by calling
    /// <see cref="TestServerExtensions.GetTestClient(IHost, TestServerClientOptions)"/>.
    /// The default is <c>http://localhost</c>.
    /// </summary>
    public Uri BaseAddress { get; set; } = new Uri("http://localhost");

    public Version DefaultRequestVersion { get; set; } = new Version(2, 0);
}

这是对应的客户端选项类,继承处理器选项并增加
HttpClient
相关的内容。

TestServerExtensions

public static class TestServerExtensions
{
    public static Action<IWebHostBuilder> ConfigureTestServer(
        Action<IWebHostBuilder>? configureTestWebBuilder = null,
        RemoteRequestClientOptions? options = null
    ) =>
        webBuilder =>
        {
            configureTestWebBuilder?.Invoke(webBuilder);

            webBuilder.ConfigureAppConfiguration(configurationBuilder =>
            {
                List<KeyValuePair<string, string?>> memoryAppConfiguration = [new("HostInTestServer", "true")];
                configurationBuilder.AddInMemoryCollection(memoryAppConfiguration);
            });

            webBuilder.UseTestServer();
            webBuilder.ConfigureServices(services =>
            {
                var testServerRemoteRequestClientBuilder = services.AddHttpClient(options?.RemoteRequestClientName ?? TestServerClientHandlerOptions.DefaultTestServerRemoteRequestClientName)
                    .SetHandlerLifetime(TimeSpan.FromMinutes(5))
                    .ConfigurePrimaryHttpMessageHandler(provider =>
                    {
                        return new SocketsHttpHandler()
                        {
                            // 禁用内置的自动重定向,由 RemoteLocalAutoSwitchWithRedirectHandler 处理重定向实现本地请求和远程请求之间的相互重定向
                            AllowAutoRedirect = false,
                            PooledConnectionLifetime = TimeSpan.FromMinutes(2),
                        };
                    });

                foreach (var func in options?.AppendHttpMessageHandlers ?? Enumerable.Empty<Func<IServiceProvider, DelegatingHandler>>())
                {
                    testServerRemoteRequestClientBuilder.AddHttpMessageHandler(func);
                }

                if(options?.ConfigureAdditionalHttpMessageHandlers is not null)
                    testServerRemoteRequestClientBuilder.ConfigureAdditionalHttpMessageHandlers(options.ConfigureAdditionalHttpMessageHandlers);
            });
        };

    public static HttpClient CreateTestClient(this TestServer server, TestServerClientOptions options)
    {
        HttpClient client;
        var handlers = server.CreateHandlers(options);
        if (handlers == null || handlers.Length == 0)
        {
            client = server.CreateClient();
        }
        else
        {
            for (var i = handlers.Length - 1; i > 0; i--)
            {
                handlers[i - 1].InnerHandler = handlers[i];
            }

            var testServerHandler = server.CreateHandler(options);

            client = new HttpClient(testServerHandler)
            {
                BaseAddress = options.BaseAddress,
                DefaultRequestVersion = options.DefaultRequestVersion
            };
        }

        return client;
    }

    public static HttpClient GetTestClient(this IHost host, TestServerClientOptions options)
    {
        return host.GetTestServer().CreateTestClient(options);
    }

    public static HttpMessageHandler CreateHandler(
        this TestServer server,
        TestServerClientHandlerOptions options,
        Action<HttpContext>? additionalContextConfiguration = null)
    {
        HttpMessageHandler handler;
        var handlers = server.CreateHandlers(options);
        if (handlers == null || handlers.Length == 0)
        {
            handler = additionalContextConfiguration is null
                ? server.CreateHandler()
                : server.CreateHandler(additionalContextConfiguration);
        }
        else
        {
            for (var i = handlers.Length - 1; i > 0; i--)
            {
                handlers[i - 1].InnerHandler = handlers[i];
            }

            var testServerHandler = additionalContextConfiguration is null
                ? server.CreateHandler()
                : server.CreateHandler(additionalContextConfiguration);

            handlers[^1].InnerHandler = testServerHandler;
            handler = handlers[0];
        }

        return handler;
    }

    internal static DelegatingHandler[] CreateHandlers(this TestServer server,TestServerClientHandlerOptions options)
    {
        return CreateHandlersCore(server, options).ToArray();

        static IEnumerable<DelegatingHandler> CreateHandlersCore(TestServer server, TestServerClientHandlerOptions options)
        {
            RedirectHandler? redirectHandler = null;
            if (options.AllowAutoRedirect)
            {
                redirectHandler = new RedirectHandler(options.MaxAutomaticRedirections);
                yield return redirectHandler;
            }

            if (options.ProcessRemoteRequest)
            {
                if (string.IsNullOrEmpty(options.RemoteRequestClientName))
                    throw new ArgumentException($"{nameof(options.RemoteRequestClientName)} must have content when {nameof(options.ProcessRemoteRequest)} is true.", nameof(options));

                yield return new RemoteLocalAutoSwitchWithRedirectHandler(
                    server.BaseAddress,
                    redirectHandler,
                    server.Services.CreateScope(),
                    options.RemoteRequestClientName);
            }

            if (options.HandleCookies)
            {
                yield return new CookieContainerHandler();
            }
        }
    }
}

public class RemoteRequestClientOptions
{
    public string? RemoteRequestClientName { get; set; }
    public IEnumerable<Func<IServiceProvider, DelegatingHandler>>? AppendHttpMessageHandlers { get; set; }
    public Action<IList<DelegatingHandler>, IServiceProvider>? ConfigureAdditionalHttpMessageHandlers { get; set; }
}

这是用于配置TestServer主机的扩展。其中定义的几个委托用于追加自定义配置提高灵活性。

MyHub

public class MyHub : Hub
{
    public async Task SendMessage(string user, string message)
    {
        await Clients.All.SendAsync("ReceiveMessage", user, message);
    }

    public async Task SendBinary(string user, byte[] bytes)
    {
        await Clients.All.SendAsync("ReceiveBinary", user, bytes);
    }
}

为了测试单机模式下是否能使用SignalR功能,写了一个简单的集线器。

Startup(节选)

// 服务注册部分
services.AddSignalR(options => options.StatefulReconnectBufferSize = 100_000);

// 管道配置部分
var hostInTestServer = configuration.GetValue("HostInTestServer", false);
if (!hostInTestServer)
{
    app.UseHsts();
    app.UseHttpsRedirection();
}

// 端点配置部分
endpoints.MapHub<MyHub>("MyHub", options =>
{
    options.AllowStatefulReconnects = true;
});

var redirectToHome = static (HttpContext context) => Task.FromResult(Results.Redirect("/"));
endpoints.Map("/re", redirectToHome);

var redirectToBaidu = static (HttpContext context) => Task.FromResult(Results.Redirect("https://www.baidu.com/"));
endpoints.Map("/reBaidu", redirectToBaidu);

var redirectToOutRe = static (HttpContext context) => Task.FromResult(Results.Redirect("https://localhost:7215/inRe", preserveMethod: true));
endpoints.Map("/outRe", redirectToOutRe);

var redirectToInRe = static (HttpContext context, TestParam? param) => Task.FromResult(Results.Redirect($"http://localhost/{param?.Path?.TrimStart('/')}", preserveMethod: true));
endpoints.Map("/inRe", redirectToInRe);

Startup
只是在RazorPages模版的基础上追加了以上内容,为了方便使用没有使用新模版的写法。新模版完全是对老模版的包装,还导致了少量功能无法使用,笔者这边的用法刚好是新模版不好用的情况。

为了避免不必要的HTTPS重定向,在单机模式下不注册跳转中间件和严格传输模式中间件。

Program

public class Program
{
    public static async Task Main(string[] args)
    {
        using var kestrelServerHost = CreateHostBuilder(args).Build();
        await kestrelServerHost.StartAsync();

        using var testServerHost = CreateHostBuilder(args, ConfigureTestServer()).Build();
        await testServerHost.StartAsync();

        var testServer = testServerHost.GetTestServer();
        var testServerClient = testServerHost.GetTestClient(new()
        {
            ProcessRemoteRequest = true,
            DefaultRequestVersion = new(3, 0)
        });

        var multiRedirectResponse = await testServerClient.PostAsJsonAsync("/outRe", new TestParam { Path = "/reBaidu" });
        var multiRedirectContent = await multiRedirectResponse.Content.ReadAsStringAsync();
        Console.WriteLine(multiRedirectContent);

        var connection = new HubConnectionBuilder()
            .WithUrl(
                new Uri(testServer.BaseAddress, "/MyHub"),
                HttpTransportType.WebSockets,
                options =>
                {
                    options.HttpMessageHandlerFactory = handler =>
                    {
                        var newHandler = testServer.CreateHandler(options: new());
                        return newHandler;
                    };
                    options.WebSocketFactory = (context, cancellationToken) =>
                    {
                        var webSocketClient = testServer.CreateWebSocketClient();
                        var webSocket = webSocketClient.ConnectAsync(context.Uri, cancellationToken);
                        return new(webSocket);
                    };
                }
            )
            .WithStatefulReconnect()
            .WithAutomaticReconnect()
            .Build();

        connection.On<string, string>("ReceiveMessage", (user, message) =>
        {
            var newMessage = $"{user}: {message}";
            Console.WriteLine(newMessage);
        });

        var times = 0;
        connection.On<string, byte[]>("ReceiveBinary", (user, bytes) =>
        {
            Interlocked.Increment(ref times);
            var newMessage = $"{user}: No.{times,10}: {bytes.Length} bytes";
            Console.WriteLine(newMessage);
        });

        await connection.StartAsync();
        await connection.InvokeAsync("SendMessage", "ConsoleClient", "ConsoleClientMessage");

        Console.WriteLine("内存压力测试开始");
        Stopwatch sw = Stopwatch.StartNew();
        var tenMinutes = TimeSpan.FromMinutes(10);
        while (sw.Elapsed < tenMinutes)
        {
            await connection.InvokeAsync("SendBinary", "ConsoleClient", new byte[1024 * 10]);
            await Task.Delay(10);
        }
        Console.WriteLine("内存压力测试结束");

        Console.Write("按任意键继续...");
        Console.ReadKey();

        await connection.StopAsync();
        await testServerHost.StopAsync();
        await kestrelServerHost.StopAsync();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) => CreateHostBuilder(args, null);

    public static IHostBuilder CreateHostBuilder(string[] args, Action<IWebHostBuilder>? configureWebBuilder) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
            {
                webBuilder
                    .UseStartup<Startup>();
                configureWebBuilder?.Invoke(webBuilder);
            });
}

public class TestParam
{
    public string? Path { get; set; }
}

这里使用Post一个Json到
/outRe
的请求测试连续相互跳转。其中的Json用于测试是否能正常处理多次请求流的数据发送。outRe会返回一个到网络主机的地址的重定向,网络主机又会返回到单机主机的
/inRe
地址的重定向,这里会读取Json的内容决定最后一次跳转的地址,两个跳转地址分别用来测试本地跳转和网络跳转。

然后连接SignalR测试是否能连接成功以及内存泄漏测试,其中内存泄漏测试用VS的诊断面板来看比较方便。

效果测试

全部准备完成后就可以测试效果了。经过实测,本地SignalR客户端在连接单机WebSocket时无法处理HTTPS跳转,TestServer创建的WebSocketClient没有配置途径,内置Handler没有处理重定向请求。每秒100次每次10K的二进制数据传输的10分钟测试也没有出现内存泄漏,内存会在一定增长后保持稳定。根据SignalR的测试结果和官网文档,gRPC理论上应该也能完整支持。最后是刻意构造的带数据Post的多次本地、网络交叉重定向测试,结果验证成功。

测试本地、网络相互跳转是打开一个监听本地端口的普通主机来提供从网络跳转回本地的服务。而这个普通主机只是个没有调用过TestServer配置的原始版本。从这里也可以看出单机主机和网络主机的切换非常方便。

image
image

结语

使用这个方法可以在单机程序中虚构出一个C/S架构,利用特制的
HttpClient
强制隔离业务逻辑和界面数据。这样还能获得一个免费的好处,如果将来要把程序做成真的网络应用,几乎可以0成本完成迁移改造。同样的,熟悉网络程序的开发者也可以在最大程度上利用已有经验开发单机应用。

又是很久没有写文章了,一直没有找到什么好选题,难得找到一个,经过将近1周的研究开发终于搞定了。

代码包:
InProcessAspNetCoreApp.rar

代码包调整了直接运行exe的一些设置,主要和HTTPS有关,制作证书还是比较麻烦的,所以直接关闭了HTTPS。当然方法很简单粗暴,理论上应该通过主机设置来调整,演示就用偷懒方法处理了。

QQ群

读者交流QQ群:540719365
image

欢迎读者和广大朋友一起交流,如发现本书错误也欢迎通过博客园、QQ群等方式告知笔者。

本文地址:
https://www.cnblogs.com/coredx/p/17998563.html

代码

先贴代码:
核心就是:Spring给我们提供的一个类
AbstractRoutingDataSource
,然后我们再写一个切面来切换数据源,肯定要有一个地方存储key还要保证上下文都可用,所以我们使用 ThreadLocal 来存储数据源的key

pom.xml


   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-aop</artifactId>
   </dependency>

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   
   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>druid-spring-boot-starter</artifactId>
       <version>1.2.6</version>
   </dependency>

   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
   </dependency>

注解:

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DS {
    String value() default DataSourceCons.DS1;
}

常量类:

public interface DataSourceCons {
    String DS1 ="ds1";
    
    String DS2 ="ds2";
}

上下文存储对象:

@Slf4j
public class DynamicDataSourceContextHolder {
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();

    public static void setContextHolder(String dsName) {
        log.info("切换到 ===> {} 数据源", dsName);
        CONTEXT_HOLDER.set(dsName);
    }

    public static String get() {
        return CONTEXT_HOLDER.get();
    }

    public static void clear() {
        CONTEXT_HOLDER.remove();
    }
}

切面:
后续这个Order注解有大用

@Aspect
@Component
// @Order(-1)
public class DynamicDataSourceAspect {

    @Pointcut("@annotation(com.lizhi.dds.anno.DS)")
    public void point1() {

    }

    @Around("point1()")
    public Object switchDS(ProceedingJoinPoint pjp) throws Throwable {
        try {
            DS ds = getDataSource(pjp);

            if (ds != null){
                DynamicDataSourceContextHolder.setContextHolder(ds.value());
            }

            return pjp.proceed();
        } finally {
            DynamicDataSourceContextHolder.clear();
        }
    }

    private DS getDataSource(ProceedingJoinPoint pjp) {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        DS methodAnno = signature.getMethod().getAnnotation(DS.class);

        if (methodAnno != null) {
            return methodAnno;
        } else {
            return pjp.getTarget().getClass().getAnnotation(DS.class);
        }
    }
}

存储数据源的类:我们这里就写一个map来存储数据源,大家也可以加其它的属性

@Data
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.druid")
public class DruidProperties {
    private Map<String, Map<String, String>> ds;
}

核心类:我们写一个类来继承这个核心类来自定义我们自己的东西

@Component
public class DynamicDataSource extends AbstractRoutingDataSource {

    @Autowired
    private DruidProperties druidProperties;

    /**
     * 初始化数据源
     *
     * @throws Exception
     */
    @PostConstruct
    public void init() throws Exception {
        Map<String, Map<String, String>> ds = druidProperties.getDs();
        Map<Object, Object> dataSources = new HashMap<>();

        for (Map.Entry<String, Map<String, String>> entry : ds.entrySet()) {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(entry.getValue());
            dataSources.put(entry.getKey(), dataSource);
        }

        setTargetDataSources(dataSources);
        setDefaultTargetDataSource(dataSources.get(DataSourceCons.DS1));
        afterPropertiesSet();
    }

	
    /**
     * 拿数据源的时候会调用此方法来找key
     *
     * @return
     */
    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceContextHolder.get();
    }
}

直接来看一下这个类的源码:就知道动态数据源是怎么来的了

public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {

	// 我们所有的数据源都会存储在这里面
	@Nullable
	private Map<Object, Object> targetDataSources;

	// 默认用哪个数据源
	@Nullable
	private Object defaultTargetDataSource;

	private boolean lenientFallback = true;

	private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();

	// 解析过后的所有数据源
	@Nullable
	private Map<Object, DataSource> resolvedDataSources;

	// 解析过后的默认数据源
	@Nullable
	private DataSource resolvedDefaultDataSource;


	/**
	 * Specify the map of target DataSources, with the lookup key as key.
	 * The mapped value can either be a corresponding {@link javax.sql.DataSource}
	 * instance or a data source name String (to be resolved via a
	 * {@link #setDataSourceLookup DataSourceLookup}).
	 * <p>The key can be of arbitrary type; this class implements the
	 * generic lookup process only. The concrete key representation will
	 * be handled by {@link #resolveSpecifiedLookupKey(Object)} and
	 * {@link #determineCurrentLookupKey()}.
	 */
	public void setTargetDataSources(Map<Object, Object> targetDataSources) {
		this.targetDataSources = targetDataSources;
	}

	/**
	 * Specify the default target DataSource, if any.
	 * <p>The mapped value can either be a corresponding {@link javax.sql.DataSource}
	 * instance or a data source name String (to be resolved via a
	 * {@link #setDataSourceLookup DataSourceLookup}).
	 * <p>This DataSource will be used as target if none of the keyed
	 * {@link #setTargetDataSources targetDataSources} match the
	 * {@link #determineCurrentLookupKey()} current lookup key.
	 */
	public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
		this.defaultTargetDataSource = defaultTargetDataSource;
	}


	@Override
	public void afterPropertiesSet() {
		if (this.targetDataSources == null) {
			throw new IllegalArgumentException("Property 'targetDataSources' is required");
		}
		this.resolvedDataSources = CollectionUtils.newHashMap(this.targetDataSources.size());
		this.targetDataSources.forEach((key, value) -> {
			Object lookupKey = resolveSpecifiedLookupKey(key);
			DataSource dataSource = resolveSpecifiedDataSource(value);
			this.resolvedDataSources.put(lookupKey, dataSource);
		});
		if (this.defaultTargetDataSource != null) {
			this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
		}
	}

	/**
	 * Resolve the given lookup key object, as specified in the
	 * {@link #setTargetDataSources targetDataSources} map, into
	 * the actual lookup key to be used for matching with the
	 * {@link #determineCurrentLookupKey() current lookup key}.
	 * <p>The default implementation simply returns the given key as-is.
	 * @param lookupKey the lookup key object as specified by the user
	 * @return the lookup key as needed for matching
	 */
	protected Object resolveSpecifiedLookupKey(Object lookupKey) {
		return lookupKey;
	}

	/**
	 * Resolve the specified data source object into a DataSource instance.
	 * <p>The default implementation handles DataSource instances and data source
	 * names (to be resolved via a {@link #setDataSourceLookup DataSourceLookup}).
	 * @param dataSource the data source value object as specified in the
	 * {@link #setTargetDataSources targetDataSources} map
	 * @return the resolved DataSource (never {@code null})
	 * @throws IllegalArgumentException in case of an unsupported value type
	 */
	protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
		if (dataSource instanceof DataSource) {
			return (DataSource) dataSource;
		}
		else if (dataSource instanceof String) {
			return this.dataSourceLookup.getDataSource((String) dataSource);
		}
		else {
			throw new IllegalArgumentException(
					"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
		}
	}

	/**
	 * Return the resolved default target DataSource, if any.
	 * @return the default DataSource, or {@code null} if none or not resolved yet
	 * @since 5.2.9
	 * @see #setDefaultTargetDataSource
	 */
	@Nullable
	public DataSource getResolvedDefaultDataSource() {
		return this.resolvedDefaultDataSource;
	}

	/**
	 * 决定要使用哪个数据源的方法(核心)
	 */
	protected DataSource determineTargetDataSource() {
		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
		// 先拿到数据源的key
		Object lookupKey = determineCurrentLookupKey();
		// 从我们初始化好的map里根据key拿到一个数据源
		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
		// 如果没拿到就用默认的数据源
		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
			dataSource = this.resolvedDefaultDataSource;
		}
		if (dataSource == null) {
			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
		}
		return dataSource;
	}

	/**
	 * 找一下数据源的key(我们重写了此方法,所以拿到的就是我们上下文里存储的key)
	 */
	@Nullable
	protected abstract Object determineCurrentLookupKey();

}

application.yml:

server:
  port: 9999

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      ds:
        ds1:
          url: jdbc:mysql://localhost:3306/ry-vue?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
          username: root
          password: root
          driverClassName: com.mysql.cj.jdbc.Driver
        ds2:
          url: jdbc:mysql://localhost:3306/atguigudb?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
          username: root
          password: root
          driverClassName: com.mysql.cj.jdbc.Driver

在org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource#determineTargetDataSource 方法上打一个断点,然后发一个请求过去就能看到了,肯定一看就懂。


然后我们来说遇到的问题。

发现问题

问题1:两个库用同一个事务的问题

当我们使用如下代码时就会出现事务问题:

代码:

    @Transactional
    @Override
    public List<SysUser> listAll() {
        System.out.println("employeeService.listAll() = " + employeeService.listAll());
        return sysUserMapper.selectList(null);
    }

    @DS(DataSourceCons.DS2)
    @Override
    public List<Employees> listAll() {
        return employeeMapper.selectList(null);
    }

问题截图:

我们发现的确出现了问题,发现第二张表的数据源使用的还是上一个的数据源,那我们就会想,是不是数据源切换没成功啊?别想了,我们看一下控制台就可以发现的确有打印(
蓝色框里
),说明走切面了。
那是哪的问题呢。

我们又会想到两个库使用同一个事务肯定会有问题。那我们就直接用Spring的事务的
传播特性
来解决不就好了,于是我们就直接上手改传播特性为
REQUIRES_NEW
(默认的
传播特性

REQUIRED
也就是使用同一个事务,那我们就直接在第二个方法上加上事务注解并设置
传播特性

REQUIRES_NEW
即可,就是创建一个新事务,使用不同的事务)。我们改完之后发现还是没什么卵用。那我们就会疑惑了那是哪的问题呢?

解决方案:事务传播特性设置为
REQUIRES_NEW

问题2:优先级问题

这个问题的答案我就直接说了:
事务的原理是代理,我们切换数据源的切面的原理也是代理,它俩的执行的前后顺序是有问题的。我们需要把切换数据源的切面让它在事务前执行即可。

解决方案:
也就是需要加个Order注解来提升优先级。

源码分析

第一次Debug

先在第二个事务的方法设置传播特性为
REQUIRES_NEW
,然后来分析。最后会分析
REQUIRED
为啥不行

我们直接在事务的方法上打一个断点:

那我们就会想了,都不知道从哪看,那该怎么办,没关系:
不知道从哪看很简单,我们直接看调用栈来找。

我们通过调用栈可以发现:

  • 红框里是我们自己的方法,所以不用管
  • 紫框里是代理,所以也不用管
  • 蓝框里我们一看有
    Transaction
    关键字,那不就是事务相关的吗,好家伙,这不就找到了

那我们直接就在上面打个断点org.springframework.transaction.interceptor.TransactionInterceptor#
invoke
发现它调用了 org.springframework.transaction.interceptor.TransactionAspectSupport#
invokeWithinTransaction
方法

	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
		// 如果transaction属性为null,则该方法为非事务性方法。
		TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// 如果需要的话就创建一个事务(**核心**)
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// 执行目标方法
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
				// 出现异常就处理异常
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}
			
			// 提交事务
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}
	}

前面的都不需要看,直接看 createTransactionIfNecessary() 方法:TransactionAspectSupport类

	protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
				// 获取一个事务(核心)
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

来到真正的核心
getTransaction
() 方法:AbstractPlatformTransactionManager类

	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// 事务的定义信息
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
		// 获取一个事务
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

		// 已经有事务的处理
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// 没有事务的处理
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			try {
				// 开启一个事务
				return startTransaction(def, transaction, debugEnabled, suspendedResources);
			}
		}
	}

三个方法:doGetTransaction() 方法、handleExistingTransaction() 方法、startTransaction() 方法。我们一个一个来看。

第一个方法:

org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction

	protected Object doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}

这是第一次这三个的值。

核心呢其实就是从缓存里获取了ConnectionHolder对象,连接缓存

来看一下这个方法:

org.springframework.transaction.support.TransactionSynchronizationManager#getResource

	private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");

	public static Object getResource(Object key) {
		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
		return doGetResource(actualKey);
	}

	/**
	 * Actually check the value of the resource that is bound for the given key.
	 */
	@Nullable
	private static Object doGetResource(Object actualKey) {
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}

我们不是继承了AbstractRoutingDataSource这个类吗,这个key呢就是我们的这个类。

第一次肯定是拿不到连接缓存的,所以一路返回。又到了 getTransaction() 这个大方法

第一次肯定也是不存在事务的,所以也不会走我们的
第二个核心方法
handleExistingTransaction()

所以就来到了我们的
第三个核心方法

org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction

	private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
		// 开始
		doBegin(transaction, definition);
		prepareSynchronization(status, definition);
		return status;
	}

直接看 doBegin() 方法
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			// 如果事务没有连接缓存,那就给它一个
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				// 根据当前数据源来获取一个连接 这个也是导致我们的问题所在处一
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
				// 给当前事务设置一个连接缓存
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			// 设置一堆事务的特性:隔离级别啊、自动提交啊什么的
			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);
			txObject.setReadOnly(definition.isReadOnly());

			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
			// so we don't want to do it unnecessarily (for example if we've explicitly
			// configured the connection pool to set it already).
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				con.setAutoCommit(false);
			}

			prepareTransactionalConnection(con, definition);
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			if (txObject.isNewConnectionHolder()) {
				// 给当前线程绑定一下连接缓存,后续再来方便获取
				// 也就是往刚才的那个map里放一下
				// key还是我们的那个数据源的类,value就是连接缓存对象
				// 这个也是导致我们的问题所在处二
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}
	}

总结:
就是根据我们的数据源来获取一个连接,并缓存起来,以供后续使用。并会设置一些事务的相关信息。

我们当前获取到的连接:

绑定的缓存:key还是我们的那个类,value就是我们根据当前数据源获取到的连接,当前数据源是第一个数据源。我们
接下来看一下第二个数据源的连接是什么。

我们来看一下 obtainDataSource().getConnection() 这个方法是怎么做的:AbstractRoutingDataSource类

	@Override
	public Connection getConnection() throws SQLException {
		return determineTargetDataSource().getConnection();
	}
	
	protected DataSource determineTargetDataSource() {
		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
		Object lookupKey = determineCurrentLookupKey();
		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
			dataSource = this.resolvedDefaultDataSource;
		}
		if (dataSource == null) {
			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
		}
		return dataSource;
	}
	
	// 我们写的实现类
    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceContextHolder.get();
    }

是不是很熟悉?这不就是Spring给我们提供的来做动态数据源那个类的源码吗!前面已经分析过了

由于我们上下文存储的key是null,所以就给了一个默认数据源。所以获取到的是第一个数据源的连接。我们看一下第二个方法是否还是这个数据源,让我们进入第二次 Debug吧。

第二次Debug

前面都是一样的套路,但是走到这里就会发生变化了。还记得我们的
三个核心方法
吗?第一次 Debug的时候并没有走我们的
第二个核心方法
,这一次就要走了

进来的条件:判断当前事务对象是否有连接缓存并且当前连接缓存的事务是活跃的

org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction:

	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}

		// 我们设置的传播特性会走这里
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			// 清除一下上一次事务的信息
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				// 开启一个新的事务(核心)
				return startTransaction(definition, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				return startTransaction(definition, transaction, debugEnabled, null);
			}
		}

		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		// 默认的传播特性走这里
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}

总结:
就是根据不同的传播特性来做不同的事情。

先来看一下一个重要的事情,我们发现进来这个方法之后 Transaction 对象里还是有之前的连接缓存的,这是万万不行的,所以需要清掉。

当我们经过这个方法 suspend() 后,我们神奇的发现里面的信息没有了:

直接来看一下这个方法:

	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					// 清除连接信息
					suspendedResources = doSuspend(transaction);
				}
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

它里面又调用了这个方法来做:

org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend

	@Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		// 设置为null
		txObject.setConnectionHolder(null);
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

然后我们直接来看我们的传播特性做的事情。其实还是之前 startTransaction() 做的事情。就是开
启一个新的事务来做。我们可以看一下这一次的连接是谁的

我们发现这次的连接还是第一个数据源的连接,这就出大问题了。我们想要的是第二个数据源的连接。这是怎么回事呢?不要急,我们直接来看。

这是咋回事啊,我靠,切面没成功吗,key还是null,所以获取到的数据源就是默认的数据源了。来看一下控制台是否打印切换数据源的信息:确实没有信息

当我们全部放行之后,再次查看控制台就会发现,我们的数据源切换成功了!

what?这是什么情况。这个时候我们就要运用大脑里的知识来想一想了,想到原理。Spring事务是aop,我们的这个切换数据源的也是aop,那会不会是切面之间的执行顺序还有先后啊。于是我们直接实践,直接在切换数据源的切面上加上了 Order 注解,来提高优先级。加了之后,我们神奇的发现成功了!Amazing!

解决

优先级:

再次一路 Debug 到doBegin() 方法来验证猜想:

打开控制台查看,也会发现有信息输出:

直接成功!

同一个事务问题:

我们把第二个方法的事务的传播特性还设置回原来的
REQUIRED
或者不加事务注解。

还是来到第二个核心方法:handleExistingTransaction()

还记得我之前在此处代码上加的注释吗?如果是默认的传播特性会走这里来处理

	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {
                 ......
		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		// 处理 PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED 的传播特性
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}

prepareTransactionStatus():

	protected final DefaultTransactionStatus prepareTransactionStatus(
			TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
			boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
		prepareSynchronization(status, definition);
		return status;
	}

就是把当前的事务状态给返回了,所以后续拿到的连接还是上一个的

总结

  1. 切面的优先级问题(Order注解)

  2. 事务的传播特性(Propagation类)

文章到这里就结束了,应该还是有点长的。希望有点帮助哈。