2024年3月

0. 问题

新版本上线之后,发现内存猛涨,入站流量猛增,不清楚具体原因,部分接口提示 OOM 异常,随后 Pod 直接崩溃无限重启。

1. 准备

Pod 已经接入了 NewRelic 和 Graylog,但是仍然没有办法找到真正的罪魁祸手,此时只能进入 Pod 容器当中抓取内存 Dump 信息。我们容器的基础镜像是基于 Apline-3.18 的,进入容器之后执行了以下命令开始安装相应的工具。

# 我们的镜像是基于 runtime 的,因此需要手动安装一下 SDK,以便后续操作。
# 这里还安装了 bash,后续会使用 bash 进行交互操作,自带的 sh 不好用。
apk add dotnet6-sdk bash
# 安装 Dump 工具
dotnet tool install --global dotnet-dump

因为容器的
ENTRYPOINT
就是直接运行的 dotNET 程序,一般来说其 PID 都是 1,如果你不清楚具体的进程 ID,可以执行

尝试运行
dotnet-dump collect -p 1
收集 Dump 信息,但是得到了以下错误:

/build# dotnet-dump collect -p 1

Writing full to /build/core_20240307_090401
Write dump failed - HRESULT: 0x00000000.

搜索一番之后,得知这是 Pod 没有足够的权限去执行 Dump 操作,因此修改了 Rollouts(或者 Deplotment) 的 YAML 定义,添加对应的
securityContext
应用即可,随后便能够正确地获取 Dump 文件。

securityContext:
  capabilities:
    add:
    - SYS_PTRACE
    - SYS_ADMIN
  seccompProfile:
    type: RuntimeDefault

再次执行
dotnet-dump collect -p 1
获取到了对应的 Dump 文件,将文件拷贝到挂载的 NFS 卷当中,随即下载到本地以便进行调试排查问题。

2. 调查

得到 Dump 文件之后,我们可以使用多种工具来分析 Dump 文件,这里我使用的是
dotnet-dump
命令。因为我是 macOS 的机器,使用
dotnet-dump
我可以直接开始进行分析,你也可以使用 Visual Studio 、dotnetMemory、WinDBG 来打开 Dump 的文件,具体看你的喜好了。

使用
dotnet dump analyze <dump file path>
进入交互式页面:

Loading core dump: D:\dotNET_Dumps\\core_20240307_142201 ...
Ready to process analysis commands. Type 'help' to list available commands or 'help [command]' to get detailed help on a command.
Type 'quit' or 'exit' to exit the session.

首先我们可以看一下目前 GC 堆的信息:

> eeheap -gc

========================================
Number of GC Heaps: 3
----------------------------------------
Heap 0 (00007faa2a73b6b0)
generation 0 starts at 7fa2495932e8
generation 1 starts at 7fa2458279f0
generation 2 starts at 7fa232703000
ephemeral segment allocation context: none
Small object heap
         segment            begin        allocated        committed allocated size         committed size
    7fa232702000     7fa232703000     7fa249be4020     7fa252174000 0x174e1020 (390991904) 0x1fa72000 (531046400)
Large object heap starts at 7fa3b2703000
         segment            begin        allocated        committed allocated size         committed size
    7fa3b2702000     7fa3b2703000     7fa3e3dfc348     7fa3e3dfd000 0x316f9348 (829395784) 0x316fb000 (829403136)
Pinned object heap starts at 7fa6b2703000
         segment            begin        allocated        committed allocated size         committed size
    7fa6b2702000     7fa6b2703000     7fa6b27d4bb8     7fa6b27d5000 0xd1bb8 (859064)       0xd3000 (864256)
------------------------------
Heap 1 (00007faa2a68b6e0)
generation 0 starts at 7fa2c75ae080
generation 1 starts at 7fa2c40eec00
generation 2 starts at 7fa2b2703000
ephemeral segment allocation context: none
Small object heap
         segment            begin        allocated        committed allocated size         committed size
    7fa2b2702000     7fa2b2703000     7fa2c9b1ebb0     7fa2d00b8000 0x1741bbb0 (390183856) 0x1d9b6000 (496721920)
Large object heap starts at 7fa4b2703000
         segment            begin        allocated        committed allocated size         committed size
    7fa4b2702000     7fa4b2703000     7fa4e3f804f0     7fa4e3f81000 0x3187d4f0 (830985456) 0x3187f000 (830992384)
Pinned object heap starts at 7fa7b2703000
         segment            begin        allocated        committed allocated size         committed size
    7fa7b2702000     7fa7b2703000     7fa7b2703018     7fa7b2704000 0x18 (24)              0x2000 (8192)
------------------------------
Heap 2 (00007faa2a5db720)
generation 0 starts at 7fa3466d0298
generation 1 starts at 7fa343173ee0
generation 2 starts at 7fa332703000
ephemeral segment allocation context: none
Small object heap
         segment            begin        allocated        committed allocated size         committed size
    7fa332702000     7fa332703000     7fa348631878     7fa34f736000 0x15f2e878 (368240760) 0x1d034000 (486752256)
Large object heap starts at 7fa5b2703000
         segment            begin        allocated        committed allocated size         committed size
    7fa5b2702000     7fa5b2703000     7fa5e519c3b0     7fa5e519d000 0x32a993b0 (849974192) 0x32a9b000 (849981440)
Pinned object heap starts at 7fa8b2703000
         segment            begin        allocated        committed allocated size         committed size
    7fa8b2702000     7fa8b2703000     7fa8b270c0f0     7fa8b2714000 0x90f0 (37104)         0x12000 (73728)
------------------------------
GC Allocated Heap Size:    Size: 0xda315cf0 (3660668144) bytes.
GC Committed Heap Size:    Size: 0xeff58000 (4025843712) bytes.

可以看到有 3 个 GC 堆,并且大部分内存占用都在 LOH 上,我们使用
dumpheap -stat -min 85000
搜索一下大小大于 85000 字节的对象有多少?

> dumpheap -stat -min 85000
Statistics:
          MT Count     TotalSize Class Name
7fa9b9be29c0     1        85,112 Serilog.Events.LogEventPropertyValue[]
7fa9ba87d710     1       117,464 Microsoft.AspNetCore.Routing.Matching.DfaState[]
7fa9b327b110     2       261,648 System.Object[]
7fa9b3348080     2       849,380 System.Int32[]
7fa9bb1e29f8     5     1,441,912 ***.Core.***.*************[]
7fa9b334d2e0     6     1,939,370 System.String
7fa9bb3589a0     1     2,097,176 ***.Core.***.***.***[]
7fa9b5200528     9     2,228,440 ***.Core.***.***[]
7fa9b5206200    20     3,670,496 ***.Core.***.***[]
7fa9bb3625e8     1     4,506,048 System.Collections.Generic.Dictionary<System.String, ***.***.***.***.***>+Entry[]
7fa9b338edd0    20     9,716,748 System.Char[]
7faa2cb14350    76    13,295,160 Free
7fa9b3d60c98 1,100 2,464,160,840 System.Byte[]
Total 1,244 objects, 2,504,369,794 bytes

可以看到这里面有 1100 个对象的大小都超过了 85000 字节,总共加起来快 2.3GB 了,所以问题出在这里。随后使用
dumpheap -type System.Byte[]
查看这些具体的对象列表,以便得到具体对象的地址:

    7fa5d5175480     7fa9b3d60c98     18,749,311
    7fa5d6356c20     7fa9b3d60c98      6,734,857
    7fa5d69c3050     7fa9b3d60c98        878,704
    7fa5d6a998e0     7fa9b3d60c98        174,565
    7fa5d6ad21c0     7fa9b3d60c98     18,749,311
    7fa5d7cb3960     7fa9b3d60c98      6,734,857
    7fa5d831fd90     7fa9b3d60c98     10,670,254
    7fa5d8d4ce60     7fa9b3d60c98     10,670,254
    7fa5d9779f30     7fa9b3d60c98     18,749,311
    7fa5da95b6d0     7fa9b3d60c98     18,749,311
    7fa5dbb3ce70     7fa9b3d60c98      1,931,776
    7fa5dbd6b8e0     7fa9b3d60c98      6,842,488
    7fa5dc3f2178     7fa9b3d60c98      7,773,830
    7fa5dcb5c020     7fa9b3d60c98      7,773,830
    7fa5dd2c5ec8     7fa9b3d60c98      7,773,830
    7fa5dda2fd70     7fa9b3d60c98     12,585,235
    7fa5de6306a8     7fa9b3d60c98      1,889,260
    7fa5de7fdab8     7fa9b3d60c98      1,172,106
    7fa5de91bd68     7fa9b3d60c98        134,508
    7fa5de94dff8     7fa9b3d60c98      8,857,584
    7fa5df1c0808     7fa9b3d60c98      6,842,488
    7fa5df8470a0     7fa9b3d60c98      6,842,488
    7fa5dfecd938     7fa9b3d60c98      6,842,488
    7fa5e05541d0     7fa9b3d60c98      8,857,584
    7fa5e0dc69e0     7fa9b3d60c98      7,773,449
    7fa5e1530710     7fa9b3d60c98      7,773,449
    7fa5e1c9a440     7fa9b3d60c98        980,321
    7fa5e1d899c8     7fa9b3d60c98      1,052,316
    7fa5e1e8a888     7fa9b3d60c98      1,052,316
    7fa5e1f8b748     7fa9b3d60c98      7,373,509
    7fa5e2693a30     7fa9b3d60c98      7,373,509
    7fa5e2d9bd18     7fa9b3d60c98      2,660,027
    7fa5e30253f8     7fa9b3d60c98      2,660,027
    7fa5e32aead8     7fa9b3d60c98      2,783,326
    7fa5e3556358     7fa9b3d60c98      2,783,326
    7fa5e37fdbd8     7fa9b3d60c98      2,783,326
    7fa5e3aa5458     7fa9b3d60c98      6,840,270
    7fa5e412b448     7fa9b3d60c98     17,239,905
    7fa6b27706c8     7fa9b3d60c98          4,120
    7fa6b27716e0     7fa9b3d60c98          4,120
    7fa6b27726f8     7fa9b3d60c98          4,120
    7fa6b2773710     7fa9b3d60c98          4,120
    7fa6b2774728     7fa9b3d60c98          4,120
    7fa6b2775740     7fa9b3d60c98          4,120
    7fa6b2776758     7fa9b3d60c98          4,120
    7fa6b2777770     7fa9b3d60c98          4,120
    7fa6b2778788     7fa9b3d60c98          4,120
    7fa6b27797a0     7fa9b3d60c98          4,120
    7fa6b277a7b8     7fa9b3d60c98          4,120
    7fa6b277b7d0     7fa9b3d60c98          4,120
    7fa6b277c7e8     7fa9b3d60c98          4,120
    7fa6b277d800     7fa9b3d60c98          4,120
    7fa6b277e818     7fa9b3d60c98          4,120
    7fa6b277f830     7fa9b3d60c98          4,120
    7fa6b2790860     7fa9b3d60c98          4,120
    7fa6b2791878     7fa9b3d60c98          4,120
    7fa6b2792890     7fa9b3d60c98          4,120
    7fa6b27938a8     7fa9b3d60c98          4,120
    7fa6b27948c0     7fa9b3d60c98          4,120
    7fa6b27958d8     7fa9b3d60c98          4,120
    7fa6b27968f0     7fa9b3d60c98          4,120
    7fa6b2797908     7fa9b3d60c98          4,120
    7fa6b2798920     7fa9b3d60c98          4,120
    7fa6b2799938     7fa9b3d60c98          4,120
    7fa6b279a950     7fa9b3d60c98          4,120
    7fa6b279b968     7fa9b3d60c98          4,120
    7fa6b279c980     7fa9b3d60c98          4,120
    7fa6b279d998     7fa9b3d60c98          4,120
    7fa6b279e9b0     7fa9b3d60c98          4,120
    7fa6b279f9c8     7fa9b3d60c98          4,120
    7fa6b27a09e0     7fa9b3d60c98          4,120
    7fa6b27a19f8     7fa9b3d60c98          4,120
    7fa6b27a2a10     7fa9b3d60c98          4,120
    7fa6b27a3a28     7fa9b3d60c98          4,120
    7fa6b27a4a40     7fa9b3d60c98          4,120
    7fa6b27a5a58     7fa9b3d60c98          4,120
    7fa6b27a6a70     7fa9b3d60c98          4,120
    7fa6b27a7a88     7fa9b3d60c98          4,120
    7fa6b27a8aa0     7fa9b3d60c98          4,120
    7fa6b27a9ab8     7fa9b3d60c98          4,120
    7fa6b27aaad0     7fa9b3d60c98          4,120
    7fa6b27abae8     7fa9b3d60c98          4,120
    7fa6b27acb00     7fa9b3d60c98          4,120
    7fa6b27adb18     7fa9b3d60c98          4,120
    7fa6b27aeb30     7fa9b3d60c98          4,120
    7fa6b27afb48     7fa9b3d60c98          4,120
    7fa6b27b0b60     7fa9b3d60c98          4,120
    7fa6b27b1b78     7fa9b3d60c98          4,120
    7fa6b27b2b90     7fa9b3d60c98          4,120
    7fa6b27b3ba8     7fa9b3d60c98          4,120
    7fa8b2703018     7fa9b3d60c98          4,120
    7fa8b2704030     7fa9b3d60c98          4,120
    7fa8b2705048     7fa9b3d60c98          4,120
    7fa8b2706060     7fa9b3d60c98          4,120
    7fa8b2707078     7fa9b3d60c98          4,120
    7fa8b2708090     7fa9b3d60c98          4,120
    7fa8b27090a8     7fa9b3d60c98          4,120
    7fa8b270a0c0     7fa9b3d60c98          4,120
    7fa8b270b0d8     7fa9b3d60c98          4,120

Statistics:
          MT  Count     TotalSize Class Name
7fa9b4c13148      1            24 AutoMapper.Configuration.MemberConfigurationExpression<***.Core.***.***, ***.***.***.***.***, System.Byte[]>+<>c__DisplayClass21_0
7fa9b4c13070      1            32 System.Linq.Expressions.Expression1<System.Func<***.***.***.***, System.Byte[]>>
7fa9bb162720      1            32 System.Collections.Generic.List<System.Byte[]>
7fa9b4c12a38      1            48 AutoMapper.Configuration.MemberConfigurationExpression<***.***.***.***, ***.***.***.***.***, System.Byte[]>
7fa9baa24648      1            64 System.Func<Microsoft.Win32.SafeHandles.SafeX509Handle, System.Byte[]>
7fa9b4199a80      1            64 System.Action<AutoMapper.IMemberConfigurationExpression<***.***.***.***, ***.***.***.***.***, System.Byte[]>>
7fa9ba918cf8      1            64 System.Func<System.String, System.Threading.CancellationToken, System.Byte[]>
7fa9bb7434d0      1            64 System.Buffers.SpanAction<System.Char, System.ValueTuple<System.Byte[], System.Int32, System.Int32>>
7fa9ba21c2f8      1            64 System.Func<System.Byte[], System.Security.Cryptography.HashAlgorithm>
7fa9ba3fb308      1            64 System.Action<System.IntPtr, System.Byte[], System.Int32, System.Net.Sockets.SocketError>
7fa9b98b3e90     67         4,288 System.Action<System.Int32, System.Byte[], System.Int32, System.Net.Sockets.SocketFlags, System.Net.Sockets.SocketError>
7fa9b6ad65e8     55        50,376 System.Byte[][]
7fa9b3d60c98 68,716 3,046,115,321 System.Byte[]
Total 68,848 objects, 3,046,170,505 byte

到这一步,你就需要多看看这些对象是什么东西,你可以使用
dumpobj [address]
查看某个对象的具体信息,这里我优先考虑那些比较大的对象,这里我选择了

7fa5d6ad21c0
看看里面有什么东西。

> dumpobj 7fa5d6ad21c0
Name:        System.Byte[]
MethodTable: 00007fa9b3d60c98
EEClass:     00007fa9b3d60c28
Tracked Type: false
Size:        18749311(0x11e177f) bytes
Array:       Rank 1, Number of elements 18749287, Type Byte
Content:     ... ftypisom....isomiso2avc1mp41....free...}mdat..........E....H..,. .#..x264 - core 155 r2917 0a84d98 - H.264/MPEG-4 AVC codec
Fields:
None

从内容可以看到,这似乎是一个视频文件被加载到了一个字节数组,那么我们需要看看谁在持有它,这里我们可以使用
gcroot [address]
来查看它的引用情况。

> gcroot 7fa5d6ad21c0
Caching GC roots, this may take a while.
Subsequent runs of this command will be faster.

Thread da:
    7fa21caae740 7fa9ba5849b9 Serilog.Capturing.PropertyValueConverter.TryConvertEnumerable(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
        r12:
          -> 7fa236bd11a8     System.Collections.Generic.List<***.***.***.CompanyManager>
          -> 7fa5b420b040     ***.***.***.CompanyManager[]
          -> 7fa236bd1c90     ***.***.***.CompanyManager
          -> 7fa33f3b91e0     ***.***.***.CompanyTypeManager
          -> 7fa33fa34ee8     System.Collections.Generic.List<***.***.***.ManualManager>
          -> 7fa33fa375f0     ***.***.***.ManualManager[]
          -> 7fa33fa37058     ***.***.***.ManualManager
          -> 7fa5d6ad21c0     System.Byte[]

... Other Information

    7fa21caaf680 7fa9b93b3c34 Serilog.Core.Logger.ForContext(System.String, System.Object, Boolean)
        r15:
          -> 7fa2b6278930     ***.***.***.Services.BaseResponse
          -> 7fa2b627dc18     ***.***.***.ServiceLogItemManager
          -> 7fa2b627d998     ***.***.***.ServiceLogManager
          -> 7fa236bcac90     ***.***.***.UserManager
          -> 7fa236bccbb0     ***.***.***.UserGroupManager
          -> 7fa236bcd558     ***.***.***.CompanyManager
          -> 7fa236bcdfb8     ***.***.***.CompanyTypeManager
          -> 7fa236bd11a8     System.Collections.Generic.List<***.***.***.CompanyManager>
          -> 7fa5b420b040     ***.***.***.CompanyManager[]
          -> 7fa236bd1c90     ***.***.***.CompanyManager
          -> 7fa33f3b91e0     ***.***.***.CompanyTypeManager
          -> 7fa33fa34ee8     System.Collections.Generic.List<***.***.***.ManualManager>
          -> 7fa33fa375f0     ***.***.***.ManualManager[]
          -> 7fa33fa37058     ***.***.***.ManualManager
          -> 7fa5d6ad21c0     System.Byte[]

    7fa21caaf710 7fa9bb62a3a2 ***.***.***.***.SendNewOrderIntegrationBeforeCommit(***.***.***.***, System.Collections.Generic.List`1<***.***.***.***Manager>)
        rbp-d0: 00007fa21caaf850
          -> 7fa2b6278930     ***.***.***.***.BaseResponse
          -> 7fa2b627dc18     ***.***.***.ServiceLogItemManager
          -> 7fa2b627d998     ***.***.***.ServiceLogManager
          -> 7fa236bcac90     ***.***.***.UserManager
          -> 7fa236bccbb0     ***.***.***.UserGroupManager
          -> 7fa236bcd558     ***.***.***.CompanyManager
          -> 7fa236bcdfb8     ***.***.***.CompanyTypeManager
          -> 7fa236bd11a8     System.Collections.Generic.List<***.***.***.CompanyManager>
          -> 7fa5b420b040     ***.***.***.CompanyManager[]
          -> 7fa236bd1c90     ***.***.***.CompanyManager
          -> 7fa33f3b91e0     ***.***.***.CompanyTypeManager
          -> 7fa33fa34ee8     System.Collections.Generic.List<***.***.***.ManualManager>
          -> 7fa33fa375f0     ***.***.***.ManualManager[]
          -> 7fa33fa37058     ***.***.***.ManualManager
          -> 7fa5d6ad21c0     System.Byte[]

Found 32 unique roots.

看起来这里的
Byte[]
最后是被某个
BaseResponse
所持有的,并且被
7fa21caaf680 7fa9b93b3c34 Serilog.Core.Logger.ForContext(System.String, System.Object, Boolean)
调用,其实到这一步我也猜到大概是什么原因了,如果想要获取更加详细的信息,可以切换到对应线程执行
clrstack
查看调用堆栈。

这里 Thread Number 是
da
,使用
threads
指令查看所有线程:

> threads
*0 0x0001 (1)
 1 0x0007 (7)
 2 0x0008 (8)
 3 0x0009 (9)
 4 0x000A (10)
 5 0x000B (11)
 6 0x000C (12)
 7 0x000D (13)
 8 0x000E (14)
 9 0x000F (15)
 10 0x0010 (16)
 11 0x0011 (17)
 12 0x0013 (19)
 13 0x0017 (23)
 14 0x0018 (24)
 15 0x0019 (25)
 16 0x001B (27)
 17 0x001D (29)
 18 0x001E (30)
 19 0x001F (31)
 20 0x0020 (32)
 21 0x0024 (36)
 22 0x0025 (37)
 23 0x0026 (38)
 24 0x002A (42)
 25 0x004A (74)
 26 0x00BB (187)
 27 0x00BC (188)
 28 0x00BD (189)
 29 0x00CE (206)
 30 0x00D1 (209)
 31 0x00D2 (210)
 32 0x00D4 (212)
 33 0x00D5 (213)
 34 0x00D6 (214)
 35 0x00DA (218)
 36 0x00DB (219)

发现
0x00DA
对应的是 35 号线程,使用命令
setthread 35
切换到对应线程,执行
clrstack
指令查看调用堆栈。

> setthread 35
> clrstack
OS Thread Id: 0xda (35)
        Child SP               IP Call Site
00007FA21CAACE00 00007faa2ce99f63 [InlinedCallFrame: 00007fa21caace00] Interop+Sys.ReceiveMessage(System.Runtime.InteropServices.SafeHandle, MessageHeader*, System.Net.Sockets.SocketFlags, Int64*)
00007FA21CAACE00 00007fa9ba5ddc38 [InlinedCallFrame: 00007fa21caace00] Interop+Sys.ReceiveMessage(System.Runtime.InteropServices.SafeHandle, MessageHeader*, System.Net.Sockets.SocketFlags, Int64*)
00007FA21CAACDF0 00007FA9BA5DDC38 ILStubClass.IL_STUB_PInvoke(System.Runtime.InteropServices.SafeHandle, MessageHeader*, System.Net.Sockets.SocketFlags, Int64*)
00007FA21CAACE90 00007FA9BA5DDAE9 System.Net.Sockets.SocketPal.SysReceive(System.Net.Sockets.SafeSocketHandle, System.Net.Sockets.SocketFlags, System.Span`1<Byte>, Byte[], Int32 ByRef, System.Net.Sockets.SocketFlags ByRef, Error ByRef) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @ 158]
00007FA21CAACF30 00007FA9BA5DD671 System.Net.Sockets.SocketPal.TryCompleteReceiveFrom(System.Net.Sockets.SafeSocketHandle, System.Span`1<Byte>, System.Collections.Generic.IList`1<System.ArraySegment`1<Byte>>, System.Net.Sockets.SocketFlags, Byte[], Int32 ByRef, Int32 ByRef, System.Net.Sockets.SocketFlags ByRef, System.Net.Sockets.SocketError ByRef) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @ 834]
00007FA21CAACFA0 00007FA9BAD28C59 System.Net.Sockets.SocketAsyncContext.ReceiveFrom(System.Memory`1<Byte>, System.Net.Sockets.SocketFlags ByRef, Byte[], Int32 ByRef, Int32, Int32 ByRef) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @ 1540]
00007FA21CAAD030 00007FA9BAD28ABB System.Net.Sockets.SocketPal.Receive(System.Net.Sockets.SafeSocketHandle, Byte[], Int32, Int32, System.Net.Sockets.SocketFlags, Int32 ByRef)
00007FA21CAAD0B0 00007FA9BAD28857 System.Net.Sockets.Socket.Receive(Byte[], Int32, Int32, System.Net.Sockets.SocketFlags, System.Net.Sockets.SocketError ByRef)
00007FA21CAAD130 00007FA9BAD28549 System.Net.Sockets.NetworkStream.Read(Byte[], Int32, Int32) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/NetworkStream.cs @ 231]
00007FA21CAAD180 00007FA9BAD2DD75 System.IO.Stream.Read(System.Span`1<Byte>) [/_/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs @ 667]
00007FA21CAAD1D0 00007FA9BA5DC52B System.Net.Sockets.NetworkStream.Read(System.Span`1<Byte>) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/NetworkStream.cs @ 246]
00007FA21CAAD220 00007FA9BAD2DB78 Microsoft.Data.SqlClient.SNI.SslOverTdsStream.Read(System.Span`1<Byte>) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SslOverTdsStream.NetCoreApp.cs @ 32]
00007FA21CAAD290 00007FA9BAD39691 System.Net.Security.SslStream+<EnsureFullTlsFrameAsync>d__186`1[[System.Net.Security.SyncReadWriteAdapter, System.Net.Security]].MoveNext()
00007FA21CAAD310 00007FA9BAD39477 System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[System.Net.Security.SslStream+<EnsureFullTlsFrameAsync>d__186`1[[System.Net.Security.SyncReadWriteAdapter, System.Net.Security]], System.Net.Security]](<EnsureFullTlsFrameAsync>d__186`1<System.Net.Security.SyncReadWriteAdapter> ByRef) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncMethodBuilderCore.cs @ 38]
00007FA21CAAD350 00007FA9BAD393D7 System.Net.Security.SslStream.EnsureFullTlsFrameAsync[[System.Net.Security.SyncReadWriteAdapter, System.Net.Security]](System.Net.Security.SyncReadWriteAdapter)
00007FA21CAAD3C0 00007FA9BAD38A39 System.Net.Security.SslStream+<ReadAsyncInternal>d__188`1[[System.Net.Security.SyncReadWriteAdapter, System.Net.Security]].MoveNext() [/_/src/libraries/System.Net.Security/src/System/Net/Security/SslStream.Implementation.cs @ 963]
00007FA21CAAD520 00007FA9BAD385D7 System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[System.Net.Security.SslStream+<ReadAsyncInternal>d__188`1[[System.Net.Security.SyncReadWriteAdapter, System.Net.Security]], System.Net.Security]](<ReadAsyncInternal>d__188`1<System.Net.Security.SyncReadWriteAdapter> ByRef) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncMethodBuilderCore.cs @ 38]
00007FA21CAAD560 00007FA9BAD3847B System.Net.Security.SslStream.Read(Byte[], Int32, Int32) [/_/src/libraries/System.Net.Security/src/System/Net/Security/SslStream.cs @ 767]
00007FA21CAAD620 00007FA9BAD27C43 Microsoft.Data.SqlClient.SNI.SNITCPHandle.Receive(Microsoft.Data.SqlClient.SNI.SNIPacket ByRef, Int32) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs @ 799]
00007FA21CAAD780 00007FA9BAD279FA Microsoft.Data.SqlClient.SNI.TdsParserStateObjectManaged.ReadSyncOverAsync(Int32, UInt32 ByRef) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObjectManaged.cs @ 219]
00007FA21CAAD7D0 00007FA9BAD276EF Microsoft.Data.SqlClient.TdsParserStateObject.ReadSniSyncOverAsync() [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @ 1223]
00007FA21CAAD820 00007FA9BAD2760C Microsoft.Data.SqlClient.TdsParserStateObject.TryReadNetworkPacket() [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @ 1181]
00007FA21CAAD840 00007FA9BAD37E54 Microsoft.Data.SqlClient.TdsParserStateObject.TryPrepareBuffer() [/_/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @ 901]
00007FA21CAAD860 00007FA9BAD3C748 Microsoft.Data.SqlClient.TdsParserStateObject.TryReadUInt32(UInt32 ByRef) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @ 632]
00007FA21CAAD890 00007FA9BAD7C73C Microsoft.Data.SqlClient.TdsParserStateObject.TryReadPlpLength(Boolean, UInt64 ByRef) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @ 908]
00007FA21CAAD8D0 00007FA9BAD7CA11 Microsoft.Data.SqlClient.TdsParserStateObject.TryReadPlpBytes(Byte[] ByRef, Int32, Int32, Int32 ByRef) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @ 1054]
00007FA21CAAD930 00007FA9BAD6D9A8 Microsoft.Data.SqlClient.TdsParser.TryReadSqlValue(Microsoft.Data.SqlClient.SqlBuffer, Microsoft.Data.SqlClient.SqlMetaDataPriv, Int32, Microsoft.Data.SqlClient.TdsParserStateObject, Microsoft.Data.SqlClient.SqlCommandColumnEncryptionSetting, System.String, Microsoft.Data.SqlClient.SqlCommand) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs @ 6054]
00007FA21CAAD9B0 00007FA9BAD6CF76 Microsoft.Data.SqlClient.SqlDataReader.TryReadColumnInternal(Int32, Boolean, Boolean) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlDataReader.cs @ 3915]
00007FA21CAADA40 00007FA9BAD6C92A Microsoft.Data.SqlClient.SqlDataReader.GetValueInternal(Int32) [/_/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlDataReader.cs @ 2719]
00007FA21CAADA60 00007FA9BAD6C864 Microsoft.Data.SqlClient.SqlDataReader.GetValue(Int32)
00007FA21CAADAA0 00007FA9BAD6C26A ***.DataAccess.NeoBaseDAL`1[[System.__Canon, System.Private.CoreLib]].LoadObject(System.Data.IDataReader)
00007FA21CAADBE0 00007FA9BAD7E4C9 ***.DataAccess.NeoBaseDAL`1[[System.__Canon, System.Private.CoreLib]].LoadObjects(Microsoft.Data.SqlClient.SqlDataReader)
00007FA21CAADC20 00007FA9BAD7D6BB ***.DataAccess.NeoBaseDAL`1[[System.__Canon, System.Private.CoreLib]].GetData(System.__Canon, ***.Domain.FiltroPaginador)
00007FA21CAADCC0 00007FA9BAD7D12E ***.Domain.NeoBaseManager`1[[System.__Canon, System.Private.CoreLib]].GetData(System.__Canon)
00007FA21CAADD10 00007FA9BB66EAFD ***.***.***.***.get_ListManual()
00007FA21CAADFC0 00007faa2c663c13 [DebuggerU2MCatchHandlerFrame: 00007fa21caadfc0]
00007FA21CAAE0C8 00007faa2c663c13 [HelperMethodFrame_PROTECTOBJ: 00007fa21caae0c8] System.RuntimeMethodHandle.InvokeMethod(System.Object, System.Span`1<System.Object> ByRef, System.Signature, Boolean, Boolean)
00007FA21CAAE240 00007FA9B49E9B3C System.Reflection.RuntimeMethodInfo.Invoke(System.Object, System.Reflection.BindingFlags, System.Reflection.Binder, System.Object[], System.Globalization.CultureInfo) [/_/src/coreclr/System.Private.CoreLib/src/System/Reflection/RuntimeMethodInfo.cs @ 435]
00007FA21CAAE2C0 00007FA9BB644BB8 Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAE340 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAE3A0 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAE410 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAE460 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAE4D0 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAE550 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAE5B0 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAE620 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAE670 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAE6E0 00007FA9BB66EA50 Serilog.Capturing.PropertyValueConverter+DepthLimiter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring)
00007FA21CAAE740 00007FA9BA5849B9 Serilog.Capturing.PropertyValueConverter.TryConvertEnumerable(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAE7A0 00007FA9B93B403D Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAE810 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAE890 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAE8F0 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAE960 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAE9B0 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAEA20 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAEAA0 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAEB00 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAEB70 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAEBC0 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAEC30 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAECB0 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAED10 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAED80 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAEDD0 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAEE40 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAEEC0 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAEF20 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAEF90 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAEFE0 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAF050 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAF0D0 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAF130 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAF1A0 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAF1F0 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAF260 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAF2E0 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAF340 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAF3B0 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAF400 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAF470 00007FA9BB644CCD Serilog.Capturing.PropertyValueConverter+<GetProperties>d__22.MoveNext()
00007FA21CAAF4F0 00007FA9B49A7C29 System.Collections.Generic.LargeArrayBuilder`1[[System.__Canon, System.Private.CoreLib]].AddRange(System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/LargeArrayBuilder.SpeedOpt.cs @ 116]
00007FA21CAAF550 00007FA9B40C0EA5 System.Collections.Generic.EnumerableHelpers.ToArray[[System.__Canon, System.Private.CoreLib]](System.Collections.Generic.IEnumerable`1<System.__Canon>) [/_/src/libraries/Common/src/System/Collections/Generic/EnumerableHelpers.Linq.cs @ 85]
00007FA21CAAF5C0 00007FA9BA584DD7 Serilog.Capturing.PropertyValueConverter.TryConvertCompilerGeneratedType(System.Object, System.Type, Serilog.Parsing.Destructuring, Serilog.Events.LogEventPropertyValue ByRef)
00007FA21CAAF610 00007FA9B93B4090 Serilog.Capturing.PropertyValueConverter.CreatePropertyValue(System.Object, Serilog.Parsing.Destructuring, Int32)
00007FA21CAAF680 00007FA9B93B3C34 Serilog.Core.Logger.ForContext(System.String, System.Object, Boolean)
00007FA21CAAF710 00007FA9BB62A3A2 ***.***.***.***.SendNewOrderIntegrationBeforeCommit(***.***.***.***, System.Collections.Generic.List`1<***.***.***.***>)
00007FA21CAAF930 00007FA9BB60EDAD ***.***.***.***.GenerateOrder(***.***.***.***, Boolean)
00007FA21CAAFD90 00007FA9BB60910F ***.***.***.***.GenerateOrderManuallyWithSelectedItems(***.***.***.***, ***.***.***.***)
00007FA21CAAFE20 00007FA9BB608F44 ***.***.***.***.GenerateOrder(***.***.***.***, Int64)
00007FA21CAAFE60 00007FA9BB605B29 ***.***.***.Controllers.OrderController.GenerateOrder(***.***.***.***.*********)
... 省略
00007FA21CAB11F0 00007FA9BA585082 NewRelic.Providers.Wrapper.AspNetCore.WrapPipelineMiddleware.Invoke(Microsoft.AspNetCore.Http.HttpContext)
00007FA21CAB1240 00007FA9BA56EDB1 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol+<ProcessRequests>d__226`1[[System.__Canon, System.Private.CoreLib]].MoveNext()
00007FA21CAB13E0 00007FA9B70753AD System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread, System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 268]
00007FA21CAB1420 00007FA9BADE97EA System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol+<ProcessRequests>d__226`1[[System.__Canon, System.Private.CoreLib]], Microsoft.AspNetCore.Server.Kestrel.Core]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 332]
00007FA21CAB1470 00007FA9B6FD2073 System.Threading.ThreadPoolWorkQueue.Dispatch() [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @ 724]
00007FA21CAB14E0 00007FA9B6FD0230 System.Threading.PortableThreadPool+WorkerThread.WorkerThreadStart() [/_/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @ 107]
00007FA21CAB16F0 00007faa2c663c13 [DebuggerU2MCatchHandlerFrame: 00007fa21cab16f0]

找到对应方法
SendNewOrderIntegrationBeforeCommit
里面调用
Serilog.Core.Logger.ForContext
的地方,发现了以下代码:

result objBaseResponse = ***.***(***, ***.IdOrder);
Log..ForContext("{@Result}", objBaseResponse, true)
    .Information("SendNewOrderIntegration end");

3. 结论

调查 objBaseResponse 得知,里面手搓了类似于导航属性的玩意儿,而且还有循环依赖,这样一旦走到这个打日志的地方,就会遍历对象的属性,与此同时还会触发从数据库获取数据的操作,也就是会导致入站流量飙升。

Create a logger that enriches log events with the specified property.
Params:
propertyName – The name of the property. Must be non-empty.
value – The property value.
destructureObjects – If true, the value will be serialized as a structured object if possible; if false, the object will be recorded as a scalar or simple array.

本来不指定
true
值可能都不会有问题,一旦指定了
true
值,序列化对象的时候就会完全遍历里面的子对象。

4. 资料

FastGPT 是一个基于 LLM 大语言模型的知识库问答系统,提供开箱即用的数据处理、模型调用等能力,它背后依赖OneApi开源项目来访问各种大语言模型提供的能力。各大语言模型提供的访问接口规范不尽相同,为此OneApi项目提供了统一的API接口去对接各种大语言模型。FastGPT的部署架构如图所示:

本文章将介绍如何部署OneApi和FastGPT,以及两种在线大语言模型(
AzureOpenAI

讯飞星火3.5
)的配置方法。

我将在Windows系统的WSL子系统上进行部署,WSL子系统安装的是Ubuntu22 Linux系统,WSL的部署方式完全适用于真实的Linux系统。

一、部署OneApi

OneApi项目开源地址:
https://github.com/songquanpeng/one-api

1.在/opt目录下创建oneapi目录

cd opt
mkdir oneapi
cd oneapi

2.编辑docker-compose.yml文件

在/opt/oneapi目录下创建docker-compose.yml文件,将下面的内容复制进去并保存

version: '3.8'

services:
  oneapi:
    container_name: oneapi
    image: justsong/one-api:latest
    restart: unless-stopped
    ports:
      - 3001:3000
    networks:
      - llm_net
    volumes:
      - ./data:/data
    environment:
      - TZ=Asia/Shanghai

networks:
  llm_net:
    name: llm_net
    external: true  

3.创建llm_net docker网络

docker network create llm_net

4.运行oneapi

docker compose up -d

5.配置AzureOpenAI渠道

登录http://localhost:3001,用户名:root,密码:123456。

如果你没有申请AzureOpenAI,可以直接查看讯飞星火的配置方式。其实申请AzureOpenAI并不难,网上教程很多,只要真实填写相关信息,一般24小时内就可以通过。

接下来我们添加AzureOpenAI渠道,按照图中的方式填写就好了。这里有一个需要注意的地方就是名称那一项填的是Azure上面的部署名称,而这个部署名称必须要和模型名称一致(很奇怪的做法,GitHub上已经有人提了issue,正在解决)

6.测试

渠道添加成功后,可以在渠道列表页面点击“测试”按钮,如果没有问题,会返回测试成功。

7.创建令牌

令牌的名称随便填,由于是测试,可以把额度设置为无限额度。

提交后,可以在令牌列表页面复制刚刚创建的令牌,这个令牌将在部署FastGPT时用到。

二、部署fastgpt

1.在/opt目录下创建fastgpt目录

cd opt
mkdir fastgpt
cd fastgpt

2.编辑docker-compose.yml文件

请先阅读FastGPT官方部署文档:
https://doc.fastgpt.in/docs/development/docker/

下载docker-compose.yml文件和config.json文件

curl -O https://raw.githubusercontent.com/labring/FastGPT/main/files/deploy/fastgpt/docker-compose.yml
curl -O https://raw.githubusercontent.com/labring/FastGPT/main/projects/app/data/config.json

编辑docker-compose.yml文件,主要是更改了容器网络,数据库用户名密码之类的

version: '3.8'
services:
  pg:
    image: ankane/pgvector:v0.5.0 # git
    # image: registry.cn-hangzhou.aliyuncs.com/fastgpt/pgvector:v0.5.0 # 阿里云
    container_name: pg
    restart: always
    ports: 
      - 5432:5432
    networks:
      - llm_net
    environment:
      - POSTGRES_USER=fastgpt
      - POSTGRES_PASSWORD=123456
      - POSTGRES_DB=fastgpt
    volumes:
      - ./pg/data:/var/lib/postgresql/data

  mongo:
    image: mongo:5.0.18
    # image: registry.cn-hangzhou.aliyuncs.com/fastgpt/mongo:5.0.18 # 阿里云
    container_name: mongo
    ports:
      - 27017:27017
    networks:
      - llm_net
    command: mongod --keyFile /data/mongodb.key --replSet rs0
    environment:
      - MONGO_INITDB_ROOT_USERNAME=fastgpt
      - MONGO_INITDB_ROOT_PASSWORD=123456
    volumes:
      - ./mongo/data:/data/db
      - ./mongodb.key:/data/mongodb.key

  fastgpt:
    container_name: fastgpt
    image: ghcr.io/labring/fastgpt:latest # git
    # image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt:latest # 阿里云
    ports:
      - 3002:3000
    networks:
      - llm_net
    depends_on:
      - mongo
      - pg
    restart: always
    environment:
      - DEFAULT_ROOT_PSW=123456
      - OPENAI_BASE_URL=http://192.168.2.117:3001/v1
      - CHAT_API_KEY=sk-XXXXX # 在OneApi中创建的令牌
      - DB_MAX_LINK=5 # database max link
      - TOKEN_KEY=any
      - ROOT_KEY=root_key
      - FILE_TOKEN_KEY=filetoken
      # mongo 配置,不需要改. 用户名myname,密码mypassword。
      - MONGODB_URI=mongodb://fastgpt:123456@mongo:27017/fastgpt?authSource=admin
      # pg配置. 不需要改
      - PG_URL=postgresql://fastgpt:123456@pg:5432/fastgpt
    volumes:
      - ./config.json:/app/data/config.json

networks:
  llm_net:
    name: llm_net
    external: true  

这里要注意的 OPENAI_BASE_URL=http://192.168.2.117:3001/v1,我本来想设置成http://oneapi:3000/v1,因为fastgpt与oneapi在同一个docker网络,但fastgpt访问不了这个地址,可能是哪里没有设置对,只好先用本机ip来访问。

3.编辑config.json文件

改动如下,name改成在oneapi配置中的一样

4.运行fastgpt

docker compose up -d

运行成功之后,不要忘了对MongoDb进行配置,直接按官网的步骤进行操作:

# 查看 mongo 容器是否正常运行
docker ps
# 进入容器
docker exec -it mongo bash
# 连接数据库
mongo -u myname -p mypassword --authenticationDatabase admin
# 初始化副本集。如果需要外网访问,mongo:27017 可以改成 ip:27017。但是需要同时修改 FastGPT 连接的参数(MONGODB_URI=mongodb://myname:mypassword@mongo:27017/fastgpt?authSource=admin => MONGODB_URI=mongodb://myname:mypassword@ip:27017/fastgpt?authSource=admin)
rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "mongo:27017" }
  ]
})
# 检查状态。如果提示 rs0 状态,则代表运行成功
rs.status()

5.测试

官网说OPENAI_BASE_URL地址后面要加v1,当我加了的时候,测试结果如下,报404:

于是去oneapi容器查看日志,可以看到fastgpt请求已经转到了oneapi,oneapi又去请求AzureOpenAI, AzureOpenAI返回404。于是去Azure上测试部署后的聊天功能,按F12查看网络请求,发现路由里面没有v1

于是更改docker-compose.yml文件,把OPENAI_BASE_URL值中的v1去掉了,重新执行docker-compose up -d ,重启之后继续测试,这次的报错就不一样了,如下图所示:

查看oneapi日志,请求结果是200,但没有响应内容,找了很久的原因,无法得知是OneApi还是Azure OpenAI的问题,于是转而去测试讯飞星火大模型。

三、配置讯飞星火认知大模型

1.创建讯飞模型应用

先去官方领取讯飞星火认知大模型的个人免费试用套餐(我选的是V3.5版本):
https://xinghuo.xfyun.cn/sparkapi?scr=price

然后去到讯飞开放平台去创建基于v3.5版本的应用,得到APPID、APISecret、APIKey三个值(在OneApi中需要用到)

2.在oneapi页面添加星火模型渠道

3.编辑fastgpt的config.json文件

增加星火模型的配置

4.重启fastgpt容器

注意:如果你之前的操作把OPENAI_BASE_URL的v1去掉了,请把它补上,然后执行命令:docker-compose up -d

5.测试

AI模型选择上面配置的spark3.5,测试成功

前言

上一篇文章说到我还开发了一个独立的自动测试工具,可以根据 OpenAPI 的文档来测试,并且在测试完成后输出测试报告,报告内容包括每个接口是否测试通过和响应时间等。

这个工具我使用了 go 语言开发,主要是考虑到了 go 语言可以傻瓜式的实现交叉编译,生成的可执行文件直接上传到服务器就可以执行,非常方便。



PS: go 语言写起来是真的折磨!感觉语法有很多别扭的地方,不过 build 的时候实在太爽了,根本无法拒绝

作者:vivo IT 平台团队 - Xiong Huanxin

Sharding-JDBC是在JDBC层提供服务的数据库中间件,在分库分表场景具有广泛应用。本文对Sharding-JDBC的解析、路由、改写、执行、归并五大核心引擎进行了源码解析,并结合业务实践经验,总结了使用Sharding-JDBC的一些痛点问题并分享了对应的定制开发与改造方案。

本文源码基于Sharding-JDBC 4.1.1版本。

一、业务背景

随着业务并发请求和数据规模的不断扩大,单节点库表压力往往会成为系统的性能瓶颈。公司IT内部
营销库存、交易订单、财经台账、考勤记录
等多领域的业务场景的日增数据量巨大,存在着
数据库节点压力过大、连接过多、查询速度变慢
等情况,根据数据来源、时间、工号等信息来将
没有联系的数据
尽量均分到不同的库表中,从而在不影响业务需求的前提下,减轻数据库节点压力,提升查询效率和系统稳定性

图片

二、技术选型

我们对比了几款比较常见的支持分库分表和读写分离的中间件。

图片

Sharding-JDBC作为轻量化的增强版的JDBC框架,相较其他中间件性能更好,接入难度更低,其数据分片、读写分离功能也覆盖了我们的业务诉求,因此我们在业务中广泛使用了Sharding-JDBC。但在使用Sharding-JDBC的过程中,我们也发现了诸多问题,为了业务更便捷的使用Sharding-JDBC,我们对源码做了针对性的定制开发和组件封装来满足业务需求。

图片

三、源码解析

3.1 引言

Sharding-JDBC作为基于JDBC的数据库中间件,实现了JDBC的标准api,Sharding-JDBC与原生JDBC的执行对比流程如下图所示:

图片

相关执行流程的代码样例如下:

  • JDBC执行样例

//获取数据库连接
try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {
    String sql = "SELECT * FROM  t_user WHERE name = ?";
    //预编译SQL
    try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
        //参数设置与执行
        preparedStatement.setString(1, "vivo");
        preparedStatement.execute(sql);
        //获取结果集
        try (ResultSet resultSet = preparedStatement.getResultSet()) {
            while (resultSet.next()) {
                //处理结果
            }
        }
    }
}
  • Sharding-JDBC 源码

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute
    public boolean execute() throws SQLException {
        try {

从对比的执行流程图可见:

  • 【JDBC】:执行的主要流程是通过Datasource获取Connection,再注入SQL语句生成PreparedStatement对象,PreparedStatement设置占位符参数执行后得到结果集ResultSet。

  • 【Sharding-JDBC】:主要流程基本一致,但Sharding基于PreparedStatement进行了实现与扩展,具体实现类
    ShardingPreparedStatement中会抽象出解析、路由、重写、归并等引擎,从而实现分库分表、读写分离
    等能力,每个引擎的作用说明如下表所示:

图片

//*相关引擎的源码解析在下文会作更深入的阐述

3.2 解析引擎

3.2.1 引擎解析

解析引擎是Sharding-JDBC进行分库分表逻辑的基础,其作用是将SQL拆解为不可再分的原子符号(称为token),再根据数据库类型将这些token分类成关键字、表达式、操作符、字面量等不同类型,进而生成抽象语法树,而语法树是后续进行路由、改写操作的前提(这也正是语法树的存在使得Sharding-JDBC存在各式各样的语法限制的原因之一)。

图片

▲图片来源:
ShardingSphere 官方文档

4.x的版本采用ANTLR(ANother Tool for Language Recognition)作为解析引擎,在ShardingSphere-sql-parser-dialect模块中定义了适用于不同数据库语法的解析规则(.g4文件),idea中也可以下载ANTLR v4的插件,输入SQL查看解析后的语法树结果。

图片

解析方法的入口在DataNodeRouter的createRouteContext方法中,解析引擎根据数据库类型和SQL创建SQLParserExecutor执行得到解析树,再通过ParseTreeVisitor()的visit方法,对解析树进行处理得到SQLStatement。ANTLR支持listener和visitor两种模式的接口,visitor方式可以更灵活的控制解析树的遍历过程,更适用于SQL解析的场景。

  • 解析引擎核心代码

org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext#96
    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
        //解析引擎解析SQL
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
        try {
            SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);
            return new RouteContext(sqlStatementContext, parameters, new RouteResult());
            // TODO should pass parameters for master-slave
        } catch (final IndexOutOfBoundsException ex) {
            return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
        }
    }
 
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0#72
    private SQLStatement parse0(final String sql, final boolean useCache) {
        //缓存
        if (useCache) {
            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
            if (cachedSQLStatement.isPresent()) {
                return cachedSQLStatement.get();
            }
        }
        //根据数据库类型和sql生成解析树
        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
        //ParseTreeVisitor的visit方法对解析树进行处理得到SQLStatement
      SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
        if (useCache) {
            cache.put(sql, result);
        }
        return result;
    }

SQLStatement实际上是一个接口,其实现对应着不同的SQL类型,如SelectStatement 类中就包括查询的字段、表名、where条件、分组、排序、分页、lock等变量,可以看到这里并没有对having这种字段做定义,相当于Sharding-JDBC无法识别到SQL中的having,这使得Sharding-JDBC对having语法有一定的限制。

  • SelectStatement

public final class SelectStatement extends DMLStatement {
    // 字段
    private ProjectionsSegment projections;
    // 表
    private final Collection<TableReferenceSegment> tableReferences = new LinkedList<>();
    // where
    private WhereSegment where;
    // groupBy
    private GroupBySegment groupBy;
    // orderBy
    private OrderBySegment orderBy;
    // limit
    private LimitSegment limit;
    // 父statement
    private SelectStatement parentStatement;
    // lock
    private LockSegment lock;
}

SQLStatement还会被进一步转换成SQLStatementContext,如SelectStatement 会被转换成SelectStatementContext ,其结构与SelectStatement 类似不再多说,值得注意的是虽然这里定义了containsSubquery来判断是否包含子查询,但4.1.1源码永远是返回的false,与having类似,这意味着Sharding-JDBC不会对子查询语句做特殊处理。

  • SelectStatementContext

public final class SelectStatementContext extends CommonSQLStatementContext<SelectStatement> implements TableAvailable, WhereAvailable {
     
    private final TablesContext tablesContext;
     
    private final ProjectionsContext projectionsContext;
     
    private final GroupByContext groupByContext;
     
    private final OrderByContext orderByContext;
     
    private final PaginationContext paginationContext;
     
    private final boolean containsSubquery;
}
 
    private boolean containsSubquery() {
        // FIXME process subquery
//        Collection<SubqueryPredicateSegment> subqueryPredicateSegments = getSqlStatement().findSQLSegments(SubqueryPredicateSegment.class);
//        for (SubqueryPredicateSegment each : subqueryPredicateSegments) {
//            if (!each.getAndPredicates().isEmpty()) {
//                return true;
//            }
//        }
        return false;
    }

3.2.2 引擎总结

解析引擎是进行路由改写的前提基础,其作用就是将SQL按照定义的语法规则拆分成原子符号(token),生成语法树,根据不同的SQL类型生成对应的SQLStatement,SQLStatement由各自的Segment组成,所有的Segment都包含startIndex和endIndex来定位token在SQL中所属的位置,但解析语法难以涵盖所有的SQL场景,使得部分SQL无法按照预期的结果路由执行。

3.3 路由引擎

3.3.1 引擎解析

路由引擎是Sharding-JDBC的核心步骤,作用是根据定义的分库分表规则将解析引擎生成的SQL上下文生成对应的路由结果,RouteResult 包括DataNode和RouteUnit,DataNode是实际的数据源节点,包括数据源名称和实际的物理表名,RouteUnit则记录了逻辑表/库与物理表/库的映射关系,后面的改写引擎也是根据这个映射关系来决定如何替换SQL中的逻辑表(
实际上RouteResult 就是维护了一条SQL需要往哪些库哪些表执行的关系
)。

  • RouteResult

public final class RouteResult {
     
    private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();
     
    private final Collection<RouteUnit> routeUnits = new LinkedHashSet<>();
}
 
public final class DataNode {
     
    private static final String DELIMITER = ".";
     
    private final String dataSourceName;
     
    private final String tableName;
}
 
public final class RouteUnit {
     
    private final RouteMapper dataSourceMapper;
     
    private final Collection<RouteMapper> tableMappers;
}
 
public final class RouteMapper {
     
    private final String logicName;
     
    private final String actualName;
}

其中,路由有分为
分片路由

主从路由
,两者可以单独使用,也可以组合使用。

  • 分片路由

ShardingRouteDecorator的decorate方法是路由引擎的核心逻辑,经过SQL校验->生成分片条件->合并分片值后得到路由结果。

  • 分片路由decorate方法

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate#57
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
        List<Object> parameters = routeContext.getParameters();
        //SQL校验  校验INSERT INTO .... ON DUPLICATE KEY UPDATE 和UPDATE语句中是否存在分片键
      ShardingStatementValidatorFactory.newInstance(
                sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
        //生成分片条件
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
        //合并分片值
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
            checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
            mergeShardingConditions(shardingConditions);
        }
        ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
        //得到路由结果
        RouteResult routeResult = shardingRouteEngine.route(shardingRule);
        if (needMergeShardingValues) {
            Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
        }
        return new RouteContext(sqlStatementContext, parameters, routeResult);
    }

ShardingStatementValidator有ShardingInsertStatementValidator和ShardingUpdateStatementValidator两种实现,INSERT INTO .... ON DUPLICATE KEY UPDATE和UPDATE语法都会涉及到字段值的更新,Sharding-JDBC是不允许更新分片值的,毕竟修改分片值还需要将数据迁移至新分片值对应的库表中,才能保证数据分片规则一致。两者的校验细节也有所不同:

  • INSERT INTO .... ON DUPLICATE KEY UPDATE仅仅是对UPDATE字段的校验, ON DUPLICATE KEY UPDATE中包含分片键就会报错;

  • 而UPDATE语句则会额外校验WHERE条件中分片键的原始值和SET的值是否一样,不一样则会抛出异常。

图片

ShardingCondition中只有一个变量routeValues,RouteValue是一个接口,有ListRouteValue和RangeRouteValue两种实现,前者记录了分片键的in或=条件的分片值,后者则记录了范围查询的分片值,两者被封装为ShardingValue对象后,将会透传至分片算法中计算得到分片结果集。

  • ShardingCondition

public final class ShardingConditions {
     
    private final List<ShardingCondition> conditions;
}
 
public class ShardingCondition {
     
    private final List<RouteValue> routeValues = new LinkedList<>();
}
 
 
public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {
     
    private final String columnName;
     
    private final String tableName;
    //in或=条件对应的值
    private final Collection<T> values;
     
    @Override
    public String toString() {
        return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")");
    }
}
 
public final class RangeRouteValue<T extends Comparable<?>> implements RouteValue {
     
    private final String columnName;
     
    private final String tableName;
    //between and 大于小于等范围值的上下限
    private final Range<T> valueRange;
}

生成分片条件后还会合并分片条件,但是前文提过在SelectStatementContext中的containsSubquery永远是false,所以这段逻辑永远返回false,即不会合并分片条件。

  • 判断是否需要合并分片条件

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#isNeedMergeShardingValues#87
private boolean isNeedMergeShardingValues(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
        return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsSubquery()
                && !shardingRule.getShardingLogicTableNames(sqlStatementContext.getTablesContext().getTableNames()).isEmpty();
    }

然后就是通过分片路由引擎调用分片算法计算路由结果了,ShardingRouteEngine实现较多,介绍起来篇幅较多,这里就不展开说明了,可以
参考官方文档来了解路由引擎的选择规则

图片

▲图片来源:
ShardingSphere 官方文档

Sharding-JDBC定义了多种分片策略和算法接口,主要的分配策略与算法说明如下表所示:

图片

补充两个细节:

(1)当ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING配置设置true时,InlineShardingStrategy支持范围查询,但是并不是根据分片值计算范围,而是
直接全路由至配置的数据节点,会存在性能隐患。

  • InlineShardingStrategy.doSharding

org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy#doSharding
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
        RouteValue shardingValue = shardingValues.iterator().next();
        //ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING设置为true,直接返回availableTargetNames,而不是根据RangeRouteValue计算
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING) && shardingValue instanceof RangeRouteValue) {
            return availableTargetNames;
        }
        Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support this type sharding:" + shardingValue.toString());
        Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue);
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        for (String each : shardingResult) {
            if (availableTargetNames.contains(each)) {
                result.add(each);
            }
        }
        return result;
    }

(2)4.1.1的
官方文档虽然说Hint可以跳过解析和改写
,但在我们上面解析引擎的源码解析中,我们并没有看到有对Hint策略的额外跳过。事实上,即使使用了Hint分片SQL也同样需要解析重写,也同样受Sharding-JDBC的语法限制,这在
官方的issue
中也曾经被提及。

图片

▲图片来源:
ShardingSphere 官方文档

  • 主从路由

主从路由的核心逻辑就是通过MasterSlaveDataSourceRouter的route方法进行判定SQL走主库还是从库。主从情况下,配置的数据源实际是一组主从,而不是单个的实例,所以需要通过masterSlaveRule获取到具体的主库或者从库名字。

  • 主从路由decorate

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate    
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
        //为空证明没有经过分片路由
        if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
            //根据SQL判断选择走主库还是从库
            String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
            RouteResult routeResult = new RouteResult();
           //根据具体的主库/从库名创建路由单元
            routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));
            return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);
        }
        Collection<RouteUnit> toBeRemoved = new LinkedList<>();
        Collection<RouteUnit> toBeAdded = new LinkedList<>();
        //不为空证明已经被分片路由处理了
        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
            if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
                //先标记移除 因为这里是一组主从的名字而不是实际的库
                toBeRemoved.add(each);
                //根据SQL判断选择走主库还是从库
                String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
                //根据具体的主库/从库名创建路由单元
                toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
            }
        }
        routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);
        routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);
        return routeContext;
    }

MasterSlaveDataSourceRouter中isMasterRoute方法会判断SQL是否需要走主库,当出现以下情况时走主库:

  • select语句包含锁,如for update语句

  • 不是select语句

  • MasterVisitedManager.isMasterVisited()设置为true

  • HintManager.isMasterRouteOnly()设置为true

不走主库则通过负载算法选择从库,Sharding-JDBC提供了
轮询和随机
两种算法。

  • MasterSlaveDataSourceRouter

public final class MasterSlaveDataSourceRouter {
     
    private final MasterSlaveRule masterSlaveRule;
     
    /**
     * Route.
     *
     * @param sqlStatement SQL statement
     * @return data source name
     */
    public String route(final SQLStatement sqlStatement) {
        if (isMasterRoute(sqlStatement)) {
            MasterVisitedManager.setMasterVisited();
            return masterSlaveRule.getMasterDataSourceName();
        }
        return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));
    }
     
    private boolean isMasterRoute(final SQLStatement sqlStatement) {
        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
    }
     
    private boolean containsLockSegment(final SQLStatement sqlStatement) {
        return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();
    }
}

是否走主库的信息存在MasterVisitedManager中,MasterVisitedManager是通过ThreadLocal实现的,但这种实现会有一个问题,
当我们使用事务先查询再更新/插入时,第一条查询SQL并不会走主库,而是走从库
,如果业务需要事务的第一条查询也走主库,事务查询前需要手动调用一次MasterVisitedManager.setMasterVisited()。

  • MasterVisitedManager

public final class MasterVisitedManager {
     
    private static final ThreadLocal<Boolean> MASTER_VISITED = ThreadLocal.withInitial(() -> false);
     
    /**
     * Judge master data source visited in current thread.
     *
     * @return master data source visited or not in current thread
     */
    public static boolean isMasterVisited() {
        return MASTER_VISITED.get();
    }
     
    /**
     * Set master data source visited in current thread.
     */
    public static void setMasterVisited() {
        MASTER_VISITED.set(true);
    }
     
    /**
     * Clear master data source visited.
     */
    public static void clear() {
        MASTER_VISITED.remove();
    }
}

3.3.2 引擎总结

路由引擎的作用是将SQL根据参数通过实现的策略算法计算出实际该在哪些库的哪些表执行,也就是路由结果。路由引擎有两种实现,分别是分片路由和主从路由,两者都提供了标准化的策略接口来让业务实现自己的路由策略,分片路由需要注意自身SQL场景和策略算法相匹配,主从路由中同一线程且同一数据库连接内,有写入操作后,之后的读操作会从主库读取,写入操作前的读操作不会走主库。

3.4 改写引擎

3.4.1 引擎解析

经过解析路由后虽然确定了执行的实际库表,但SQL中表名依旧是逻辑表,不能执行,改写引擎可以将逻辑表替换为物理表。同时,路由至多库表的SQL也需要拆分为多条SQL执行。

改写的入口仍旧在BasePrepareEngine中,创建重写上下文createSQLRewriteContext,再根据上下文进行改写rewrite,最终返回执行单元ExecutionUnit。

  • 改写逻辑入口

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
    private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
        //注册重写装饰器
        registerRewriteDecorator();
        //创建 SQLRewriteContext
        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
        //重写
        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
    }

执行单元包含了数据源名称,改写后的SQL,以及对应的参数,SQL一样的两个SQLUnit会被视为相等。

  • ExecutionUnit

@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根据sql判断是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
 
    private String sql;
 
    private final List<Object> parameters;
 
}

createSQLRewriteContext完成了两件事,一个是对SQL参数进行了重写,一个是生成了SQLToken。

  • createSQLRewriteContext

org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
    public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
        SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
        //sql参数重写
        decorate(decorators, result, routeContext);
        //生成SQLToken
        result.generateSQLTokens();
        return result;
    }
 
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate
    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
        for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
            if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
                //参数重写
                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
            }
        }
        //sqlTokenGenerators
        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
    }
 
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens
    public void generateSQLTokens() {
        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
    }

ParameterRewriter中与分片相关的实现有两种。

图片

//*详细的例子可以参考 官方文档中分页修正和补列部分

SQLToken记录了SQL中每个token(解析引擎中提过的不可再分的原子符号)的起始位置,从而方便改写引擎知道哪些位置需要改写。

  • SQLToken

@RequiredArgsConstructor
@Getter
public abstract class SQLToken implements Comparable<SQLToken> {
     
    private final int startIndex;
     
    @Override
    public final int compareTo(final SQLToken sqlToken) {
        return startIndex - sqlToken.getStartIndex();
    }
}

创建完SQLRewriteContext后就对整条SQL进行重写和组装参数,可以看出每个RouteUnit都会重写SQL并获取自己对应的参数。

  • SQLRouteRewriteEngine.rewrite

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#rewrite
    public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {
        Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);
        for (RouteUnit each : routeResult.getRouteUnits()) {
            //重写SQL+组装参数
            result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));
        }
        return result;
    }

toSQL核心就是根据SQLToken将SQL拆分改写再拼装,比如select * from t_order where created_by = '123' 就会被拆分为select * from | t_order | where created_by = '123'三部分进行改写拼装。

  • toSQL

org.apache.shardingsphere.underlying.rewrite.sql.impl.AbstractSQLBuilder#toSQL
    public final String toSQL() {
        if (context.getSqlTokens().isEmpty()) {
            return context.getSql();
        }
        Collections.sort(context.getSqlTokens());
        StringBuilder result = new StringBuilder();
        //截取第一个SQLToken之前的内容  select * from
        result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));
        for (SQLToken each : context.getSqlTokens()) {
            //重写拼接每个SQLToken对应的内容  t_order ->t_order_0
            result.append(getSQLTokenText(each));
            //拼接SQLToken中间不变的内容 where created_by = '123'
            result.append(getConjunctionText(each));
        }
        return result.toString();
    }

ParameterBuilder有StandardParameterBuilder和GroupedParameterBuilder两个实现。

  • StandardParameterBuilder:
    适用于非insert语句,getParameters无需分组处理直接返回即可

  • GroupedParameterBuilder:
    适用于insert语句,需要根据路由情况对参数进行分组。

原因和样例可以参考
官方文档批量拆分部分

  • getParameters

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#getParameters
    private List<Object> getParameters(final ParameterBuilder parameterBuilder, final RouteResult routeResult, final RouteUnit routeUnit) {
        if (parameterBuilder instanceof StandardParameterBuilder || routeResult.getOriginalDataNodes().isEmpty() || parameterBuilder.getParameters().isEmpty()) {
            //非插入语句直接返回
            return parameterBuilder.getParameters();
        }
        List<Object> result = new LinkedList<>();
        int count = 0;
        for (Collection<DataNode> each : routeResult.getOriginalDataNodes()) {
            if (isInSameDataNode(each, routeUnit)) {
                //插入语句参数分组构造
                result.addAll(((GroupedParameterBuilder) parameterBuilder).getParameters(count));
            }
            count++;
        }
        return result;
    }

3.4.2 引擎总结

改写引擎的作用是将逻辑SQL转换为实际可执行的SQL,这其中既有逻辑表名的替换,也有多路由的SQL拆分,还有为了后续归并操作而进行的分页、分组、排序等改写,select语句不会对参数进行重组,而insert语句为了避免插入多余数据,会通过路由单元对参数进行重组。

3.5 执行引擎

3.5.1 引擎解析

改写完成后的SQL就可以执行了,执行引擎需要平衡好资源和效率,如果为每条真实SQL都创建一个数据库连接显然会造成资源的滥用,但如果单线程串行也必然会影响执行效率。

执行引擎会先将执行单元中需要执行的SQLUnit根据数据源分组,同一个数据源下的SQLUnit会放入一个list,然后会根据maxConnectionsSizePerQuery对同一个数据源的SQLUnit继续分组,创建连接并绑定SQLUnit 。

  • 执行组创建

org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups
    private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
            final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        //根据数据源将SQLUnit分组 key=dataSourceName
        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);
        Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //创建sql执行组
        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
        }
        return result;
    }
 
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups
    private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
                                                                       final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //每个连接需要执行的最大sql数量
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        //分组,每组对应一条数据库连接
        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        //选择连接模式 连接限制/内存限制
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        //创建连接
        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
        int count = 0;
        for (List<SQLUnit> each : sqlUnitPartitions) {
            //绑定连接和SQLUnit 创建StatementExecuteUnit
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
        }
        return result;
    }

SQLUnit分组和连接模式选择没有任何关系,
连接模式的选择只取决于maxConnectionsSizePerQuery和SQLUnit数量的大小关系
,maxConnectionsSizePerQuery代表了一个数据源一次查询允许的最大连接数。

  • 当maxConnectionsSizePerQuery<sqlunit数量时,意味着无法做到每个sqlunit独享一个连接,需要直接查询出结果集至内存中;< li="">

  • 当maxConnectionsSizePerQuery>=SQLUnit数量时,意味着可以支持每个SQLUnit独享一个连接,可以通过ResultSet游标下移的方式查询结果集。

不过maxConnectionsSizePerQuery默认值为1,所以当一条SQL需要路由至多张表时(即有多个SQLUnit)会采用连接限制,当路由至单表时是内存限制模式。

图片

为了避免产生数据库连接死锁问题,
在内存限制模式时,Sharding-JDBC通过锁住数据源对象一次性创建出本条SQL需要的所有数据库连接
。连接限制模式下,各连接一次性查出各自的结果,不会出现多连接相互等待的情况,因此不会发生死锁,而内存限制模式通过游标读取结果集,需要多条连接去查询不同的表做合并,如果不一次性拿到所有需要的连接,则可能存在连接相互等待的情况造成死锁。可以参照
官方文档中执行引擎相关例子

  • 不同连接模式创建连接

private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
    if (1 == connectionSize) {
        Connection connection = createConnection(dataSourceName, dataSource);
        replayMethodsInvocation(connection);
        return Collections.singletonList(connection);
    }
    if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
        return createConnections(dataSourceName, dataSource, connectionSize);
    }
    //内存限制模式加锁 一次性获取所有的连接
    synchronized (dataSource) {
        return createConnections(dataSourceName, dataSource, connectionSize);
    }
}

此外,结果集的内存合并和流式合并只在调用JDBC的executeQuery的情况下生效,
如果使用execute方式进行查询,都是统一使用流式方式的查询。

  • 查询结果归并对比

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#executeQuery#101 
  org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult
    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        getResultSets().add(resultSet);
        //executeQuery 中根据连接模式选择流式/内存
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
    }
 
//execute 单独调用getResultSet中只会使用流式合并
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet#158
  org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getQueryResults 
    private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
        List<QueryResult> result = new ArrayList<>(resultSets.size());
        for (ResultSet each : resultSets) {
            if (null != each) {
                result.add(new StreamQueryResult(each));
            }
        }
        return result;
    }

多条连接的执行方式分为串行和并行,在本地事务和XA事务中是串行的方式,其余情况是并行,具体的执行逻辑这里就不再展开了。

  • isHoldTransaction

public boolean isHoldTransaction() {
        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
    }

3.5.2 引擎总结

执行引擎通过maxConnectionsSizePerQuery和同数据源的SQLUnit的数量大小确定连接模式,maxConnectionsSizePerQuery=SQLUnit数量使用内存限制模式,当使用内存限制模式时会通过对数据源对象加锁来保证一次性获取本条SQL需要的连接而避免死锁。在使用executeQuery查询时,处理结果集时会根据连接模式选择流式或者内存合并,但使用execute方法查询,处理结果集只会使用流式合并。

3.6 归并引擎

3.6.1 引擎解析

查询出的结果集需要经过归并引擎归并后才是最终的结果,归并的核心入口在MergeEntry的process方法中,优先处理分片场景的合并,再进行脱敏,只有读写分离的情况下则直接返回TransparentMergedResult,TransparentMergedResult实际上没做合并的额外处理,其内部实现都是完全调用queryResult的实现。

图片

  • 归并逻辑入口
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#mergeQuery#190
 org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge#61
    org.apache.shardingsphere.underlying.merge.MergeEntry#process
    public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        //分片合并
        Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
        //脱敏处理
        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
        //只有读写分离的情况下,orElseGet会不存在,TransparentMergedResult
        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
    }
  • TransparentMergedResult

@RequiredArgsConstructor
public final class TransparentMergedResult implements MergedResult {
     
    private final QueryResult queryResult;
     
    @Override
    public boolean next() throws SQLException {
        return queryResult.next();
    }
     
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        return queryResult.getValue(columnIndex, type);
    }
     
    @Override
    public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {
        return queryResult.getCalendarValue(columnIndex, type, calendar);
    }
     
    @Override
    public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
        return queryResult.getInputStream(columnIndex, type);
    }
     
    @Override
    public boolean wasNull() throws SQLException {
        return queryResult.wasNull();
    }
}

我们只看分片相关的操作,ResultMergerEngine只有一个实现类ShardingResultMergerEngine,所以只有存在分片情况的时候,上文的第一个merge才会有结果。根据SQL类型的不同选择ResultMerger实现,查询类的合并是最常用也是最复杂的合并。

  • MergeEntry.merge

org.apache.shardingsphere.underlying.merge.MergeEntry#merge
    private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultMergerEngine) {
                //选择不同类型的 resultMerger
                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                //归并
                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
            }
        }
        return Optional.empty();
    }
 
org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine#newInstance
    public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
        if (sqlStatementContext instanceof SelectStatementContext) {
            return new ShardingDQLResultMerger(databaseType);
        }
        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
            return new ShardingDALResultMerger(shardingRule);
        }
        return new TransparentResultMerger();
    }

ShardingDQLResultMerger的merge方法就是根据SQL解析结果中包含的token选择合适的归并方式(分组聚合、排序、遍历),归并后的mergedResult统一经过decorate方法进行判断是否需要分页归并,整体处理流程图可以概括如下。

  • 归并方式选择

org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#merge
    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        if (1 == queryResults.size()) {
            return new IteratorStreamMergedResult(queryResults);
        }
        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
        selectStatementContext.setIndexes(columnLabelIndexMap);
        //分组聚合,排序,遍历
        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        //分页归并
        return decorate(queryResults, selectStatementContext, mergedResult);
    }
 
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#build
    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        if (isNeedProcessGroupBy(selectStatementContext)) {
            //分组聚合归并
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessDistinctRow(selectStatementContext)) {
            setGroupByForDistinctRow(selectStatementContext);
            //分组聚合归并
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessOrderBy(selectStatementContext)) {
            //排序归并
            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
        }
        //遍历归并
        return new IteratorStreamMergedResult(queryResults);
    }
 
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#decorate
    private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {
        PaginationContext paginationContext = selectStatementContext.getPaginationContext();
        if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
            return mergedResult;
        }
        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
        //根据数据库类型分页归并
        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
            return new LimitDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("Oracle".equals(trunkDatabaseName)) {
            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("SQLServer".equals(trunkDatabaseName)) {
            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        return mergedResult;
    }

每种归并方式的作用在
官方文档有比较详细的案例
,这里就不再重复介绍了。

3.6.2 引擎总结

归并引擎是Sharding-JDBC执行SQL的最后一步,其作用是将多个数节点的结果集组合为一个正确的结果集返回,查询类的归并有
分组归并、聚合归并、排序归并、遍历归并、分页归并
五种,这五种归并方式并不是互斥的,而是相互组合的。

四、定制开发

在使用Sharding-JDBC过程中,我们发现了一些问题可以改进,比如存量系统数据量到达一定规模而需要分库分表引入Sharding-JDBC时,就会
存在两大问题

一个是存量数据的迁移
,这个问题我们可以通过分片算法兼容,前文已经提过分片键的值是不允许更改的,而且SQL如果不包含分片键,如果这个分片键对应的值是递增的(如id,时间等),我们可以设置一个阈值,在分片算法的doSharding中判断分片值与阈值的大小决定将数据路由至旧表或新表,避免数据迁移的麻烦。如果是根据用户id取模分表,而新增的数据无法只通过用户id判断,这时可以考虑采用复合分片算法,将用户id与订单id或者时间等递增的字段同时设置为分片键,根据订单id或时间判断是否是新数据,再根据用户id取模得到路由结果即可。

另一个是
Sharding-JDBC
语法限制会使得存量SQL面对巨大的改造压力,而实际上业务更关心的是需要分片的表,非分片的表不应该发生改动和影响。实际上,非分片表理论上无需通过解析、路由、重写、合并,为此我们在源码层面对这段逻辑进行了优化,支持跳过部分解析,完全跳过分片路由、重写和合并,尽可能减少Sharding-JDBC对非分片表的语法限制,来减少业务系统的改造压力与风险。

图片

4.1 跳过Sharding语法限制

Sharding-JDBC执行解析路由重写的逻辑都是在BasePrepareEngine中,最终构造ExecutionContext交由执行引擎执行,ExecutionContext中包含sqlStatementContext和executionUnits,非分片表不涉及路由改写,所以其ExecutionUnit我们非常容易手动构造,而查看SQLStatementContext的使用情况,我们发现SQLStatementContext只会影响结果集的合并而不会影响实际的执行,而不分片表也无需进行结果集的合并,整体实现思路如图。

图片

  • ExecutionContext相关对象

public class ExecutionContext {
 
    private final SQLStatementContext sqlStatementContext;
 
    private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();
}
 
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
public final class SQLUnit {
 
    private String sql;
 
    private final List<Object> parameters;
 
}

(1)校验SQL中是否包含分片表:
我们是通过正则将SQL中的各个单词分隔成Set,然后再遍历BaseRule判断是否存在分片表。大家可能会奇怪明明解析引擎可以帮我们解析出SQL中的表名,为什么还要自己来解析。因为我们测试的过程中发现,
存量业务上的SQL很多在解析阶段就会报错,只能提前判断
,当然这种判断方式并不严谨,比如 SELECT order_id FROM t_order_record WHERE order_id=1 AND remarks=' t_order xxx';,配置的分片表t_order时就会存在误判,但这种场景在我们的业务中没有,所以暂时并没有处理。由于这个信息需要在多个对象方法中使用,为了避免修改大量的对象变量和方法入参,而又能方便的透传这个信息,判断的结果我们选择放在ThreadLocal里。

  • RuleContextManager

public final class RuleContextManager {
 
    private static final ThreadLocal<RuleContextManager> SKIP_CONTEXT_HOLDER = ThreadLocal.withInitial(RuleContextManager::new);
 
    /**
     * 是否跳过sharding
     */
    private boolean skipSharding;
 
    /**
     * 是否路由至主库
     */
    private boolean masterRoute;
 
    public static boolean isSkipSharding() {
        return SKIP_CONTEXT_HOLDER.get().skipSharding;
    }
 
    public static void setSkipSharding(boolean skipSharding) {
        SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding;
    }
 
    public static boolean isMasterRoute() {
 
        return SKIP_CONTEXT_HOLDER.get().masterRoute;
    }
 
    public static void setMasterRoute(boolean masterRoute) {
        SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute;
    }
 
    public static void clear(){
        SKIP_CONTEXT_HOLDER.remove();
    }
 
}
  • 判断SQL是否包含分片表

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext
// 判断是否可以跳过sharding,构造RuleContextManager的值
private void buildSkipContext(final String sql){
    Set<String> sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[\\s]")));
        if (CollectionUtils.isNotEmpty(rules)) {
            for (BaseRule baseRule : rules) {
                //定制方法,ShardingRule实现,判断sqlTokenSet是否包含逻辑表即可
                if(baseRule.hasContainShardingTable(sqlTokenSet)){
                    RuleContextManager.setSkipSharding(false);
                    break;
                }else {
                    RuleContextManager.setSkipSharding(true);
                }
            }
        }
}
 
org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTable
public Boolean hasContainShardingTable(Set<String> sqlTokenSet) {
      //logicTableNameList通过遍历TableRule可以得到
       for (String logicTable : logicTableNameList) {
            if (sqlTokenSet.contains(logicTable)) {
                return true;
            }
        }
        return false;
    }

(2)跳过解析路由:
通过RuleContextManager中的skipSharding判断是否需要跳过Sharding解析路由,但为了兼容读写分离的场景,我们还需要知道这条SQL应该走主库还是从库,走主库的场景在后面强制路由主库部分有说明,SQL走主库实际上只有两种情况,一种是非SELECT语句,另一种就是SELECT语句带锁,如SELECT...FOR UPDATE,因此整体实现的步骤如下:

  • 如果标记了跳过Sharding且不为select语句,直接返回SkipShardingStatement,单独构造一个SkipShardingStatement的目的是为了能利用解析引擎中的缓存,缓存中不能放入null值。

  • 如果是select语句需要继续解析,判断是否有锁后直接返回,避免后续解析造成语法不兼容,这里也曾尝试用反射获取lockClause来判断是否包含锁,但最终没有成功。

  • ShardingRouteDecorator根据RuleContextManager.isSkipSharding判断是否跳过路由。

  • 跳过解析路由

public class SkipShardingStatement implements SQLStatement{
    @Override
    public int getParameterCount() {
        return 0;
    }
}
 
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
    private SQLStatement parse0(final String sql, final boolean useCache) {
        if (useCache) {
            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
            if (cachedSQLStatement.isPresent()) {
                return cachedSQLStatement.get();
            }
        }
        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
        /**
         * 跳过sharding 需要判断是否需要路由至主库 如果不是select语句直接跳过
         * 是select语句则需要通过继续解析判断是否有锁
         */
        SQLStatement result ;
        if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){
            RuleContextManager.setMasterRoute(true);
            result = new SkipShardingStatement();
        }else {
            result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
        }
        if (useCache) {
            cache.put(sql, result);
        }
        return result;
    }
 
org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause
    public ASTNode visitSelectClause(final SelectClauseContext ctx) {
        SelectStatement result = new SelectStatement();
        // 跳过sharding 只需要判断是否有锁来决定是否路由至主库即可
        if(RuleContextManager.isSkipSharding()){
            if (null != ctx.lockClause()) {
                result.setLock((LockSegment) visit(ctx.lockClause()));
                RuleContextManager.setMasterRoute(true);
            }
            return result;
        }
        //...后续解析
    }
 
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext
    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
        //如果需要跳过sharding 不进行后续的解析直接返回
        if (RuleContextManager.isSkipSharding()) {
            return new RouteContext(sqlStatement, parameters, new RouteResult());
        }
        //...解析
    }
 
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        // 跳过sharding路由
        if(RuleContextManager.isSkipSharding()){
            return routeContext;
        }
        //...路由
    }

(3)手动构造ExecutionUnit:
ExecutionUnit中我们需要确定的内容就是datasourceName,这里我们认为跳过Sharding的SQL最终执行的库一定只有一个。如果只是跳过Sharding的情况,直接从元数据中获取数据源名称即可,如果存在读写分离的情况,主从路由的结果也一定是唯一的。创建完ExecutionUnit直接放入ExecutionContext返回即可,从而跳过后续的改写逻辑。

  • 手动构造ExecutionUnit

public ExecutionContext prepare(final String sql, final List<Object> parameters) {
    List<Object> clonedParameters = cloneParameters(parameters);
    // 判断是否可以跳过sharding,构造RuleContextManager的值
    buildSkipContext(sql);  
    RouteContext routeContext = executeRoute(sql, clonedParameters);
    ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
    // 跳过sharding的sql最后的路由结果一定只有一个库
    if(RuleContextManager.isSkipSharding()){
        log.debug("可以跳过sharding的场景 {}", sql);
        if(!Objects.isNull(routeContext.getRouteResult())){
            Collection<String> allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();
            int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();
            /*
             * 1. 没有读写分离的情况下  跳过sharding路由会导致routeUnitsSize为0 此时需要判断数据源数量是否为1
             * 2. 读写分离情况下 只会路由至具体的主库或从库 routeUnitsSize数量应该为1
             */
            if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){
                throw new ShardingSphereException("可以跳过sharding,但是路由结果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());
            }
            Collection<String> actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();
            // 手动创建执行单元
            String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();
            ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));
            result.getExecutionUnits().add(executionUnit);
            //标记该结果需要跳过
            result.setSkipShardingScenarioFlag(true);
        }
    }else {
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
    }
    if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
        SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
    }
    return result;
}

(4)跳过合并:
跳过查询结果的合并和影响行数计算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳过

  • 跳过合并

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            List<QueryResult> queryResults = preparedStatementExecutor.executeQuery();
            List<ResultSet> resultSets = preparedStatementExecutor.getResultSets();
        // 定制开发,不分片跳过合并
            if(executionContext.isSkipShardingScenarioFlag()){
                return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
            }
            MergedResult mergedResult = mergeQuery(queryResults);
            result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
    public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        List<ResultSet> resultSets = getResultSets();
        // 定制开发,不分片跳过合并
        if(executionContext.isSkipShardingScenarioFlag()){
            return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
        }
 
        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        }
        return currentResultSet;
    }
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate
    public boolean isAccumulate() {
        //定制开发,不分片跳过计算
        if(executionContext.isSkipShardingScenarioFlag()){
            return false;
        }
        return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
    }

(5)清空RuleContextManager:
查看一下Sharding-JDBC其他ThreadLocal的清空位置,对应的清空RuleContextManager就好。

  • 清空ThreadLocal

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#close
public final void close() throws SQLException {
        closed = true;
        MasterVisitedManager.clear();
        TransactionTypeHolder.clear();
        RuleContextManager.clear();
        int connectionSize = cachedConnections.size();
        try {
            forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());
        } finally {
            cachedConnections.clear();
            rootInvokeHook.finish(connectionSize);
        }
    }

举个例子,比如Sharding-JDBC本身是不支持INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ?    这种语法的,会报空指针异常。

图片

经过我们上述改造验证后,非分片表是可以跳过语法限制执行如下的SQL的。

图片

通过该功能的实现,业务可以更关注与分片表的SQL改造,而无需担心引入Sharding-JDBC造成所有SQL的验证改造,大幅减少改造成本和风险。

4.2 强制路由主库

Sharding-JDBC可以通过配置主从库数据源方便的实现读写分离的功能,但使用读写分离就必须面对主从延迟和从库失联的痛点,针对这一问题,我们实现了强制路由主库的动态配置,当主从延迟过大或从库失联时,通过修改配置来实现SQL语句强制走主库的不停机路由切换。

后面会说明了配置的动态生效的实现方式,这里只说明强制路由主库的实现,我们直接使用前文的RuleContextManager即可,在主从路由引擎里判断下是否开启了强制主库路由。

  • MasterSlaveRouteDecorator.decorate改造

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
        /**
         * 如果配置了强制主库 MasterVisitedManager设置为true
         * 后续isMasterRoute中会保证路由至主库
         */
        if(properties.<Boolean>getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){
            MasterVisitedManager.setMasterVisited();
        }
        //...路由逻辑
        return routeContext;
    }

为了兼容之前跳过Sharding的功能,我们需要同步修改下isMasterRoute方法,如果是跳过了Sharding路由需要通过RuleContextManager来判断是否走主库。

  • isMasterRoute改造

org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute
    private boolean isMasterRoute(final SQLStatement sqlStatement) {
        if(sqlStatement instanceof SkipShardingStatement){
            // 优先以MasterVisitedManager中的值为准
            return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();
        }
        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
    }

当然,更理想的状况是通过监控主从同步延迟和数据库拨测,当超过阈值时或从库失联时直接自动修改配置中心的库,实现自动切换主库,减少业务故障时间和运维压力。

4.3 配置动态生效

Sharding-JDBC中的ConfigurationPropertyKey中提供了许多配置属性,而Sharding-JDBCB并没有为这些配置提供在线修改的方法,而在实际的应用场景中,像SQL_SHOW这样控制SQL打印的开关配置,我们更希望能够在线修改配置值来控制SQL日志的打印,而不是修改完配置再重启服务。

以SQL打印为例,BasePrepareEngine中存在ConfigurationProperties对象,通过调用getValue方法来获取SQL_SHOW的值。

  • SQL 打印

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
    /**
     * Prepare to execute.
     *
     * @param sql SQL
     * @param parameters SQL parameters
     * @return execution context
     */
    public ExecutionContext prepare(final String sql, final List<Object> parameters) {
        List<Object> clonedParameters = cloneParameters(parameters);
        RouteContext routeContext = executeRoute(sql, clonedParameters);
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
        //sql打印
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
    }

ConfigurationProperties继承了抽象类TypedProperties,其getValue方法就是根据key获取对应的配置值,因此我们直接在TypedProperties中实现刷新缓存中的配置值的方法。

  • TypedProperties刷新配置

public abstract class TypedProperties<E extends Enum & TypedPropertyKey> {
     
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
     
    @Getter
    private final Properties props;
     
    private final Map<E, TypedPropertyValue> cache;
     
    public TypedProperties(final Class<E> keyClass, final Properties props) {
        this.props = props;
        cache = preload(keyClass);
    }
     
    private Map<E, TypedPropertyValue> preload(final Class<E> keyClass) {
        E[] enumConstants = keyClass.getEnumConstants();
        Map<E, TypedPropertyValue> result = new HashMap<>(enumConstants.length, 1);
        Collection<String> errorMessages = new LinkedList<>();
        for (E each : enumConstants) {
            TypedPropertyValue value = null;
            try {
                value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());
            } catch (final TypedPropertyValueException ex) {
                errorMessages.add(ex.getMessage());
            }
            result.put(each, value);
        }
        if (!errorMessages.isEmpty()) {
            throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));
        }
        return result;
    }
     
    /**
     * Get property value.
     *
     * @param key property key
     * @param <T> class type of return value
     * @return property value
     */
    @SuppressWarnings("unchecked")
    public <T> T getValue(final E key) {
        return (T) cache.get(key).getValue();
    }
 
    /**
     * vivo定制改造方法 refresh property value.
     * @param key property key
     * @param value property value
     * @return 更新配置是否成功
     */
    public boolean refreshValue(String key, String value){
        //获取配置类支持的配置项
        E[] enumConstants = targetKeyClass.getEnumConstants();
        for (E each : enumConstants) {
            //遍历新的值
            if(each.getKey().equals(key)){
                try {
                    //空白value认为无效,取默认值
                    if(!StringUtils.isBlank(value)){
                        value = each.getDefaultValue();
                    }
                    //构造新属性
                    TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);
                    //替换缓存
                    cache.put(each, typedPropertyValue);
                    //原始属性也替换下,有可能会通过RuntimeContext直接获取Properties
                    props.put(key,value);
                    return true;
                } catch (final TypedPropertyValueException ex) {
                    log.error("refreshValue error. key={} , value={}", key, value, ex);
                }
            }
        }
        return false;
    }
}

实现了刷新方法后,我们还需要将该方法一步步暴露至一个外部可以调用的类中,以便在服务监听配置的方法中,能够调用这个刷新方法。ConfigurationProperties直接在BasePrepareEngine的构造函数中传入,我们通过构造函数逐步反推最外层的这一对象调用来源,最终可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以获取到这个配置,而这个就是Sharding-JDBC实现的JDBC中Datasource接口的抽象类,我们直接在这个类中调用刚刚实现的refreshValue方法,剩下的就是监听配置,通过自己实现的AbstractDataSourceAdapter来调用这个方法就好了。

图片

通过这一功能,我们可以方便的控制一些开关属性的在线修改,如SQL打印、强制路由主库等,业务无需重启服务即可做到配置的动态生效。

4.4 批量update语法支持

业务中存在使用foreach标签来批量update的语句,这种SQL在Sharding-JDBC中无法被正确路由,只会路由第一组参数,后面的无法被路由改写,原因是解析引擎无法将语句拆分解析。

  • 批量update样例

<update id="batchUpdate">
        <foreach collection="orderList" item="item">
               update t_order set
               status = 1,
               updated_by = #{item.updatedBy}
               WHERE created_by = #{item.createdBy};
        </foreach>
    </update>

图片

图片

我们通过将批量update按照;拆分为多个语句,然后分别路由,最后手动汇总路有结果生成执行单元。

为了能正确重写SQL,批量update拆分后的语句需要完全一样,这样就不能使用动态拼接set条件,而是使用ifnull语法或者字段值不发生变化时也将原来的值放入set中,只不过set前后的值保持一致,整体思路与实现如下。

图片

  • prepareBatch实现

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch
   private ExecutionContext prepareBatch(List<String> splitSqlList, final List<Object> allParameters) {
       //SQL去重
       List<String> sqlList = splitSqlList.stream().distinct().collect(Collectors.toList());
       if (sqlList.size() > 1) {
           throw new ShardingSphereException("不支持多条SQL,请检查SQL," + sqlList.toString());
       }
       //以第一条SQL为标准
       String sql = sqlList.get(0);
       //所有的执行单元
       Collection<ExecutionUnit> globalExecutionUnitList = new ArrayList<>();
       //初始化最后的执行结果
       ExecutionContext executionContextResult = null;
       //根据所有参数数量和SQL语句数量 计算每组参数的数量
       int eachSqlParameterCount = allParameters.size() / splitSqlList.size();
       //平均分配每条SQL的参数
       List<List<Object>> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount);
       for (List<Object> eachSqlParameterList : eachSqlParameterListList) {
           //每条SQL参数不同 需要根据参数路由不同的结果  实际的SqlStatementContext 是一致的
           RouteContext routeContext = executeRoute(sql, eachSqlParameterList);
           //由于SQL一样  实际的SqlStatementContext 是一致的 只需初始化一次
           if (executionContextResult == null) {
               executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext());
           }
           globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext));
       }
       //排序打印日志
       executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList()));
       if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
           SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE),
                   executionContextResult.getSqlStatementContext(), (Collection<ExecutionUnit>) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST));
       }
       return executionContextResult;
   }

这里我们在ExecutionContext单独构造了一个了ExtendMap来存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判断ExecutionUnit中的SqlUnit只会根据SQL去重,批量update的SQL是一致的,但parameters不同,为了不影响原有的逻辑,单独使用了另外的变量来存放。

  • ExecutionContext改造

@RequiredArgsConstructor
@Getter
public class ExecutionContext {
 
    private final SQLStatementContext sqlStatementContext;
 
    private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();
 
    /**
     * 自定义扩展变量
     */
    private final Map<ExtendEnum,Object> extendMap = new HashMap<>();
 
    /**
     * 定制扩展,是否可以跳过分片逻辑
     */
    @Setter
    private boolean skipShardingScenarioFlag = false;
}
 
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根据SQL判断是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
 
    private String sql;
 
    private final List<Object> parameters;
 
}

我们还需要改造下执行方法,在初始化执行器的时候,判断下ExtendMap中存在我们自定义的EXECUTION_UNIT_LIST是否存在,存在则使用生成InputGroup,同一个数据源下的ExecutionUnit会被放入同一个InputGroup中。

  • InputGroup改造

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init
    public void init(final ExecutionContext executionContext) throws SQLException {
        setSqlStatementContext(executionContext.getSqlStatementContext());
        //兼容批量update 分库分表后同一张表的情况 判断是否存在EXECUTION_UNIT_LIST 存在则使用未去重的List进行后续的操作
        if (MapUtils.isNotEmpty(executionContext.getExtendMap())){
            Collection<ExecutionUnit> executionUnitCollection = (Collection<ExecutionUnit>) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);
            if(CollectionUtils.isNotEmpty(executionUnitCollection)){
                getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));
            }
        }else {
            getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
        }
        cacheStatements();
    }

改造完成后,批量update中的每条SQL都可以被正确路由执行。

图片

4.5 ShardingCondition去重

当where语句包括多个or条件时,而or条件不包含分片键时,会造成createShardingConditions方法生成重复的分片条件,导致重复调用doSharding方法。

如SELECT * FROM t_order  WHERE created_by = ? and (   (status = ?) or  (status = ?) or  (status = ?) )这种SQL,存在三个or条件,分片键是created_by ,实际产生的shardingCondition会是三个一样的值,并会调用三次doSharding的方法。虽然实际执行还是只有一次(批量update那里说明过执行单元会去重),但为了减少方法的重复调用,我们还是对这里做了一次去重。

图片

图片

去重的方法也比较简单粗暴,我们对ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最终结果前加一次去重,从而避免生成重复的shardingCondition造成doSharding方法的重复调用。

  • createShardingConditions去重

org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions
    private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {
        Collection<ShardingCondition> result = new LinkedList<>();
        for (AndPredicate each : andPredicates) {
            Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);
            if (routeValueMap.isEmpty()) {
                return Collections.emptyList();
            }
            result.add(createShardingCondition(routeValueMap));
        }
        //去重
        Collection<ShardingCondition> distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));
        return distinctResult;
    }

4.6  全路由校验

分片表的SQL中如果没有携带分片键(或者带上了分片键结果没有被正确解析)将会导致全路由,产生性能问题,而这种SQL并不会报错,这就导致在实际的业务改造中,开发和测试很难保证百分百改造彻底。为此,我们在源码层面对这种情况做了额外的校验,当产生全路由,也就是ShardingConditions为空时,主动抛出异常,从而方便开发和测试能够快速发现全路由SQL。

实现方式也比较简单,校验下ShardingConditions是否为空即可,只不过需要额外兼容下Hint策略ShardingConditions始终为空的特殊情况。

  • 全路由校验

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        //省略...
        //获取 ShardingConditions
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
        boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);
        //判断是否允许全路由
        if (!properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {
            //如果不是Hint算法
            if(!isHintAlgorithm(sqlStatementContext, shardingRule)){
                /** 如果是DML语句  则可能有两种情况 这两种情况是根据getShardingConditions方法的内部逻辑而来的
                 *  一种是非插入语句  shardingConditions.getConditions()为空即可
                 *  一种是插入语句 插入语句shardingConditions.getConditions()不会为空  但是ShardingCondition的routeValues是空的
                 */
                if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
                    if(shardingConditions.getConditions().isEmpty()) {
                        throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");
                    }else {
                        if (sqlStatementContext instanceof InsertStatementContext) {
                            List<ShardingCondition> routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());
                            if(CollectionUtils.isEmpty(routeValuesNotEmpty)){
                                throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");
                            }
                        }
                    }
                }
            }
        }
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
        //省略...
        return new RouteContext(sqlStatementContext, parameters, routeResult);
    }
 
private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
        // 场景a 全局默认策略是否使用强制路由策略
        if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy
                || shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){
            return true;
        }
        for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
            Optional<TableRule> tableRule = shardingRule.findTableRule(each);
            //场景b 指定表是否使用强制路由策略
            if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy
                    || shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {
                return true;
            }
        }
        return false;
    }

当然这块功能也可以在完善些,比如对分片路由结果中的数据源数量进行校验,从而避免跨库操作,我们这边没有实现也就不再赘述了。

4.7 组件封装

业务接入Sharding-JDBC的步骤是一样的,都需要通过Java创建数据源和配置对象或者使用SpringBoot进行配置,存在一定的熟悉成本和重复开发的问题,为此我们也对定制开发版本的Sharding-JDBC封装了一个公共组件,从而简化业务配置,减少重复开发,提升业务的开发效率,具体功能可见下。这块没有涉及源码的改造,只是在定制版本上包装的一个公共组件。

  • 提供了默认的数据源与连接池配置

  • 简化分库分表配置,业务配置逻辑表名和后缀,组件拼装行表达式和actual-data-nodes

  • 封装常用的分片算法(时间、业务字段值等),

  • 统一的配置监听与动态修改(SQL打印、强制主从切换等)

开源Sharding-JDBC配置

//数据源名称
spring.shardingsphere.datasource.names=ds0,ds1
//ds0配置
spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=
//ds1配置
spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=
//分表规则
spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expression=t_order$->{order_id % 2}
spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expression=t_order_item$->{order_id % 2}
//默认分库规则
spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=ds$->{user_id % 2}

组件简化配置

//数据源名称
vivo.it.sharding.datasource.names = ds0,ds1
//ds0配置
vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds0.username = root
vivo.it.sharding.datasource.ds0.password =
//ds1配置
vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds1.username = root
vivo.it.sharding.datasource.ds1.password =
//分表规则
vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]
//默认分库规则
vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}

五、使用建议

结合官方文档和业务实践经验,我们也梳理了部分使用Sharding-JDBC的建议供大家参考,实际具体如何优化SQL写法(比如子查询、分页、分组排序等)还需要结合业务的实际场景来进行测试和调优。

(1)强制等级

  • 建议①:涉及分片表的SQL必须携带分片键

  • 原因:无分片键会导致全路由,存在严重的性能隐患

  • 建议②:禁止一条SQL中的分片值路由至不同的库

  • 原因:跨库操作存在严重的性能隐患,事务操作会升级为分布式事务,增加业务复杂度

  • 建议③:禁止对分片键使用运算表达式或函数操作

  • 原因:无法提前计算表达式和函数获取分片值,导致全路由

  • 说明:
    详见官方文档

图片

图片

  • 建议⑤:包含CASE WHEN、HAVING、UNION (ALL)语法的分片SQL,不支持路由至多数据节点
  • 说明:
    详见官方文档

(2)建议等级

  • ① 建议使用分布式id来保证分片表主键的全局唯一性

  • 原因:方便判断数据的唯一性和后续的迁移扩容

  • 说明:
    详见文章《vivo 自研鲁班分布式 ID 服务实践》

  • ② 建议跨多表的分组SQL的分组字段与排序字段保证一致

  • 原因:分组和排序字段不一致只能通过内存合并,大数据量时存在性能隐患

  • 说明:
    详见官方文档

  • ③ 建议通过全局递增的分布式id来优化分页查询

  • 原因:Sharding-JDBC的分页优化侧重于结果集的流式合并来避免内存爆涨,但深度分页自身的性能问题并不能解决

  • 说明:
    详见官方文档

六、总结

本文结合个人理解梳理了各个引擎的源码入口和关键逻辑,读者可以结合本文和官方文档更好的定位理解Sharding-JDBC的源码实现。
定制开发的目的是为了降低业务接入成本
,尽可能减少业务存量SQL的改造,部分改造思想其实与官方社区也存在差异,比如跳过语法解析,官方社区致力于通过优化解析引擎来适配各种语法,而不是跳过解析阶段,可参考官方
issue
。源码分析和定制改造只涉及了Sharding-JDBC的数据分片和读写分离功能,定制开发的功能也在生产环境经过了考验,如有不足和优化建议,也欢迎大家批评指正。

代理模式

介绍

代理模式作为设计模式的一种,在各种框架体系中均有应用,代理代理,顾名思义,代替某个对象处理事情.代理模式:为对象提供一个替身,以控制对这个对象的访问,从而通过代理对象访问目标对象,好处是在目标对象基础上增加额外的功能操作,扩展目标对象的功能
Java体系中代理模式有三种形式

  • 静态代理
  • 动态代理
    • JDK 动态代理
    • CGLIB 动态代理

静态代理

1.定义一个接口
2.定义一个实现类,实现该接口
3.定义一个代理类,
同样实现该接口
4.代理类持有实现类的引用
5.通过代理类屏蔽对目标对象的访问,并且可以在目标方法执行前后做一些自己想做的事情

//1.定义一个接口
public interface SmsService {

    String send(String message);

}
//2.定义一个实现类,实现该接口
public class SmsServiceImpl implements SmsService {
  @Override
  public String send(String message) {
    System.out.println("message = " + message);
    return message;
  }
}
//3.定义一个代理类,同样实现该接口
public class SmsProxy implements SmsService {

  //4.代理类持有实现类的引用
  private final SmsService smsService;

  //多态,使得代理类可以代理所有实现了SmsService接口的实现类
  public SmsProxy(SmsService smsService) {
    this.smsService = smsService;
  }

  //5.通过代理类屏蔽对目标对象的访问,并且可以在目标方法执行前后做一些自己想做的事情
  @Override
  public String send(String message) {
    //调用方法前可以添加自己的操作
    System.out.println("before method send()");
    smsService.send(message);
    System.out.println("after method send()");
    return message;
  }
}

为什么代理类一定要实现接口?

事实上,代理类不实现接口,上述代码也能正常运行,但是这样就无法保证类型一致性和多态性,违背了面向接口编程的原则,同时也失去了代理的意义

  • 类型一致性:如果
    SmsProxy
    不实现
    SmsService
    接口,那么在需要
    SmsService
    的地方就不能直接使用
    SmsProxy
    ,这会破坏程序的设计与结构
  • 多态性丧失面向接口编程的一个重要优势是多态性,即同一类型的对象可以有不同的具体行为。如果不实现
    SmsService
    接口,就无法利用Java的多态特性,客户端也无法以统一的方式处理
    SmsService
    及其代理对象
  • 代理意义:代理模式的核心思想是为其他对象提供一种代理以控制对这个对象的访问。为了做到这一点,代理对象必须能够“冒充”被代理的对象,即它们对外提供的接口应该相同,这样才能在客户端无感知的情况下完成代理功能

JDK 动态代理

JDK动态代理是Java SE自带的一种代理实现方式,主要通过
java.lang.reflect.Proxy
类以及
java.lang.reflect.InvocationHandler
接口来实现代理功能。
它能够在运行时动态地生成一个实现了指定接口的新类,这个新类就是代理类,其内部含有指向实际目标对象的引用,并在调用接口方法时执行特定的处理逻辑

1.定义一个接口
2.创建一个实现类实现该接口
3.创建一个实现类实现
InvocationHandler
接口,这个类将在代理对象的方法被调用时处理具体的逻辑
4.利用
Proxy.newInstance()
方法生成代理对象,并将自定义的
InvocationHandler
与实际对象关联起来


//1.定义一个接口
public interface SmsService {

  String send(String message);

}

//2.创建一个实现类实现该接口
public class SmsServiceImpl implements SmsService {
  @Override
  public String send(String message) {
    //具体的业务逻辑  
    System.out.println("message = " + message);
    return message;
  }
}

//3.创建一个实现类实现`InvocationHandler`接口,这个类将在代理对象的方法被调用时处理具体的逻辑
public class MyInvocationHandler implements InvocationHandler {

  /**
   * 持有代理类中的真实对象
   */
  private final Object target;

  public MyInvocationHandler(Object target) {
    this.target = target;
  }

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    //调用方法之前,可以添加自己的操作
    System.out.println("before method: " + method.getName());
    Object result = method.invoke(target, args);
    //调用方法之后 添加自己的操作
    System.out.println("after method: " + method.getName());
    return result;
  }
}

//4.利用`Proxy.newInstance()`方法生成代理对象,并将自定义的`InvocationHandler`与实际对象关联起来
public class JdkProxyFactory {

  public static Object getProxy(Object target) {
    return Proxy
            .newProxyInstance(
                    target.getClass().getClassLoader(),//目标类的类加载器
                    target.getClass().getInterfaces(),//代理需要实现的接口,可指定多个
                    new MyInvocationHandler(target));//代理对象自定义InvocationHandler
  }

}

@Test
public void test() {
  SmsService real = new SmsServiceImpl();
  SmsService proxy = (SmsService) JdkProxyFactory.getProxy(real);
  //当我们通过代理对象调用接口方法时,实际会触发自定义InvocationHandler中的invoke方法,从而实现在调用前后加入额外的逻辑处理,比如日志记录、事务管理、权限验证等功能
  proxy.send("hello");
}

InvocationHandler
中的
invoke
方法为什么会被自动执行?

接上述代码,使用如下代码生成代理类字节码

public static void main(String[] args) {
      /**
       * 这个用法是用来指示Java动态代理机制在运行时生成并保存代理类的.class文件到磁盘上。
       * 当Java使用java.lang.reflect.Proxy类生成动态代理时,默认是不会把生成的代理类持久化到文件系统的。
       * 设置这个系统属性可以让JDK的内部类sun.misc.ProxyGenerator在生成代理类字节码时将其写入到特定目录下
       */
      System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles","true");
      SmsService smsService = (SmsService) JdkProxyFactory.getProxy(new SmsServiceImpl());
      smsService.send("java");
}

public final class $Proxy0 extends Proxy implements SmsService {
  private static Method m1;
  private static Method m2;
  private static Method m3;
  private static Method m0;

  public $Proxy0(InvocationHandler var1) throws  {
    super(var1);
  }

  public final boolean equals(Object var1) throws  {
    try {
      return (Boolean)super.h.invoke(this, m1, new Object[]{var1});
    } catch (RuntimeException | Error var3) {
      throw var3;
    } catch (Throwable var4) {
      throw new UndeclaredThrowableException(var4);
    }
  }

  public final String toString() throws  {
    try {
      return (String)super.h.invoke(this, m2, (Object[])null);
    } catch (RuntimeException | Error var2) {
      throw var2;
    } catch (Throwable var3) {
      throw new UndeclaredThrowableException(var3);
    }
  }

  public final String send(String var1) throws  {
    try {
      return (String)super.h.invoke(this, m3, new Object[]{var1});
    } catch (RuntimeException | Error var3) {
      throw var3;
    } catch (Throwable var4) {
      throw new UndeclaredThrowableException(var4);
    }
  }

  public final int hashCode() throws  {
    try {
      return (Integer)super.h.invoke(this, m0, (Object[])null);
    } catch (RuntimeException | Error var2) {
      throw var2;
    } catch (Throwable var3) {
      throw new UndeclaredThrowableException(var3);
    }
  }

  static {
    try {
      m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
      m2 = Class.forName("java.lang.Object").getMethod("toString");
      m3 = Class.forName("com.cmy.interview.day5.theInterface.SmsService").getMethod("send", Class.forName("java.lang.String"));
      m0 = Class.forName("java.lang.Object").getMethod("hashCode");
    } catch (NoSuchMethodException var2) {
      throw new NoSuchMethodError(var2.getMessage());
    } catch (ClassNotFoundException var3) {
      throw new NoClassDefFoundError(var3.getMessage());
    }
  }
}

上面的代码是代理类的字节码

调用代理类的
send()
本质上都是在调用
super.h.invoke

这里的
h
是自定义的
InvocationHandler
实现类

同时传入了代理类本体(this),通过反射获取到的方法m3(
send()
),以及参数(
new Object[]{var1}
)

可以看到除了重写了本例中
SmsService

send
方法,Object的基础方法
hashCode()
,
equals()
,
toString()
都被覆写以及全部重定向给
InvocationHandler

当我们通过代理对象调用接口方法时,实际会触发自定义
InvocationHandler.invoke()
方法,从而实现在调用前后加入额外的逻辑处理,比如日志记录、事务管理、权限验证等功能

CGLIB 动态代理

CGLIB是一个功能强大,高性能的代码生成包,它可以在
运行期扩展Java类与实现Java接口
,解决了JDK动态代理的一个致命问题:
只能代理实现了接口的类

通常可以使用 Java 的动态代理创建代理,但当要代理的类没有实现接口或者为了更好的性能,CGLIB 是一个好的选择

CGLIB通过
字节码技术
为一个类创建子类,并在子类中方法调用前后加入拦截逻辑,从而实现动态代理。这种方式也被称为继承式代理或者子类代理


net.sf.cglib.proxy.Enhance
和接口
net.sf.cglib.proxy.MethodInterceptor
是使用核心

1.引入依赖
2.定义被代理类
3.创建拦截器并实现
MethodInterceptor
接口,重写
intercept()
方法
4.创建并使用代理对象

<!--1.引入依赖-->
<dependency>
  <groupId>cglib</groupId>
  <artifactId>cglib</artifactId>
  <!-- 注意版本号可能会更新 -->
  <version>3.3.0</version>
</dependency>

//2.定义被代理类
public class AliSmsService{
    public void send(){
        //业务逻辑
    }
}
//3.创建拦截器并实现`MethodInterceptor`接口,重写`intercept()`方法
public class MyMethodInterceptor implements MethodInterceptor {
  /**
   *
   * @param o 被代理的类(需要增强的对象)
   * @param method 被拦截的方法(需要增强的方法)
   * @param objects 方法入参
   * @param methodProxy 用于调用原始方法
   * @return
   * @throws Throwable
   */
  @Override
  public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
    //调用方法之前,添加自己的操作
    System.out.println("before method " + method.getName());
    Object object = methodProxy.invokeSuper(o, objects);
    //调用方法之后,添加自己的操作
    System.out.println("after method " + method.getName());
    return object;
  }
}
//4.创建并使用代理对象
public class CglibProxyFactory {

  public static Object getProxy(Class<?> clazz){
    //创建动态代理类创建的入口
    Enhancer enhancer = new Enhancer();
    //设置类加载器
    enhancer.setClassLoader(clazz.getClassLoader());
    //设置父类为被代理类
    enhancer.setSuperclass(clazz);
    //设置回调方法(拦截器)
    enhancer.setCallback(new MyMethodInterceptor());
    //创建代理类(被代理类的子类)
    return enhancer.create();
  }
  
}

@Test
public void testCglibProxy(){
  AliSmsService aliSmsService = (AliSmsService) CglibProxyFactory.getProxy(AliSmsService.class);
  alismsService.send("java");
}

CGLIB 动态代理原理

接上述代码,通过以下代码生成代理子类的字节码

public static void main(String[] args) {
    System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY, "F:\\cglib");
    AliSmsService aliSmsService = (AliSmsService) CglibProxyFactory.getProxy(AliSmsService.class);
    aliSmsService.send("java");
}

代理子类的字节码

public class AliSmsService$$EnhancerByCGLIB$$9d97d15c extends AliSmsService implements Factory {
    private boolean CGLIB$BOUND;
    public static Object CGLIB$FACTORY_DATA;
    private static final ThreadLocal CGLIB$THREAD_CALLBACKS;
    private static final Callback[] CGLIB$STATIC_CALLBACKS;
    private MethodInterceptor CGLIB$CALLBACK_0;
    private static Object CGLIB$CALLBACK_FILTER;
    private static final Method CGLIB$send$0$Method;
    private static final MethodProxy CGLIB$send$0$Proxy;
    private static final Object[] CGLIB$emptyArgs;
    private static final Method CGLIB$equals$1$Method;
    private static final MethodProxy CGLIB$equals$1$Proxy;
    private static final Method CGLIB$toString$2$Method;
    private static final MethodProxy CGLIB$toString$2$Proxy;
    private static final Method CGLIB$hashCode$3$Method;
    private static final MethodProxy CGLIB$hashCode$3$Proxy;
    private static final Method CGLIB$clone$4$Method;
    private static final MethodProxy CGLIB$clone$4$Proxy;

    static void CGLIB$STATICHOOK1() {
        CGLIB$THREAD_CALLBACKS = new ThreadLocal();
        CGLIB$emptyArgs = new Object[0];
        Class var0 = Class.forName("com.cmy.interview.day5.impl.AliSmsService$$EnhancerByCGLIB$$9d97d15c");
        Class var1;
        Method[] var10000 = ReflectUtils.findMethods(new String[]{"equals", "(Ljava/lang/Object;)Z", "toString", "()Ljava/lang/String;", "hashCode", "()I", "clone", "()Ljava/lang/Object;"}, (var1 = Class.forName("java.lang.Object")).getDeclaredMethods());
        CGLIB$equals$1$Method = var10000[0];
        CGLIB$equals$1$Proxy = MethodProxy.create(var1, var0, "(Ljava/lang/Object;)Z", "equals", "CGLIB$equals$1");
        CGLIB$toString$2$Method = var10000[1];
        CGLIB$toString$2$Proxy = MethodProxy.create(var1, var0, "()Ljava/lang/String;", "toString", "CGLIB$toString$2");
        CGLIB$hashCode$3$Method = var10000[2];
        CGLIB$hashCode$3$Proxy = MethodProxy.create(var1, var0, "()I", "hashCode", "CGLIB$hashCode$3");
        CGLIB$clone$4$Method = var10000[3];
        CGLIB$clone$4$Proxy = MethodProxy.create(var1, var0, "()Ljava/lang/Object;", "clone", "CGLIB$clone$4");
        CGLIB$send$0$Method = ReflectUtils.findMethods(new String[]{"send", "(Ljava/lang/String;)Ljava/lang/String;"}, (var1 = Class.forName("com.cmy.interview.day5.impl.AliSmsService")).getDeclaredMethods())[0];
        CGLIB$send$0$Proxy = MethodProxy.create(var1, var0, "(Ljava/lang/String;)Ljava/lang/String;", "send", "CGLIB$send$0");
    }

    final String CGLIB$send$0(String var1) {
        return super.send(var1);
    }

    public final String send(String var1) {
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }

        return var10000 != null ? (String)var10000.intercept(this, CGLIB$send$0$Method, new Object[]{var1}, CGLIB$send$0$Proxy) : super.send(var1);
    }

    final boolean CGLIB$equals$1(Object var1) {
        return super.equals(var1);
    }

    public final boolean equals(Object var1) {
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }

        if (var10000 != null) {
            Object var2 = var10000.intercept(this, CGLIB$equals$1$Method, new Object[]{var1}, CGLIB$equals$1$Proxy);
            return var2 == null ? false : (Boolean)var2;
        } else {
            return super.equals(var1);
        }
    }

    final String CGLIB$toString$2() {
        return super.toString();
    }

    public final String toString() {
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }

        return var10000 != null ? (String)var10000.intercept(this, CGLIB$toString$2$Method, CGLIB$emptyArgs, CGLIB$toString$2$Proxy) : super.toString();
    }

    final int CGLIB$hashCode$3() {
        return super.hashCode();
    }

    public final int hashCode() {
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }

        if (var10000 != null) {
            Object var1 = var10000.intercept(this, CGLIB$hashCode$3$Method, CGLIB$emptyArgs, CGLIB$hashCode$3$Proxy);
            return var1 == null ? 0 : ((Number)var1).intValue();
        } else {
            return super.hashCode();
        }
    }

    final Object CGLIB$clone$4() throws CloneNotSupportedException {
        return super.clone();
    }

    protected final Object clone() throws CloneNotSupportedException {
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }

        return var10000 != null ? var10000.intercept(this, CGLIB$clone$4$Method, CGLIB$emptyArgs, CGLIB$clone$4$Proxy) : super.clone();
    }

    public static MethodProxy CGLIB$findMethodProxy(Signature var0) {
        String var10000 = var0.toString();
        switch (var10000.hashCode()) {
            case -1437300215:
                if (var10000.equals("send(Ljava/lang/String;)Ljava/lang/String;")) {
                    return CGLIB$send$0$Proxy;
                }
                break;
            case -508378822:
                if (var10000.equals("clone()Ljava/lang/Object;")) {
                    return CGLIB$clone$4$Proxy;
                }
                break;
            case 1826985398:
                if (var10000.equals("equals(Ljava/lang/Object;)Z")) {
                    return CGLIB$equals$1$Proxy;
                }
                break;
            case 1913648695:
                if (var10000.equals("toString()Ljava/lang/String;")) {
                    return CGLIB$toString$2$Proxy;
                }
                break;
            case 1984935277:
                if (var10000.equals("hashCode()I")) {
                    return CGLIB$hashCode$3$Proxy;
                }
        }

        return null;
    }

    public AliSmsService$$EnhancerByCGLIB$$9d97d15c() {
        CGLIB$BIND_CALLBACKS(this);
    }

    public static void CGLIB$SET_THREAD_CALLBACKS(Callback[] var0) {
        CGLIB$THREAD_CALLBACKS.set(var0);
    }

    public static void CGLIB$SET_STATIC_CALLBACKS(Callback[] var0) {
        CGLIB$STATIC_CALLBACKS = var0;
    }

    private static final void CGLIB$BIND_CALLBACKS(Object var0) {
        AliSmsService$$EnhancerByCGLIB$$9d97d15c var1 = (AliSmsService$$EnhancerByCGLIB$$9d97d15c)var0;
        if (!var1.CGLIB$BOUND) {
            var1.CGLIB$BOUND = true;
            Object var10000 = CGLIB$THREAD_CALLBACKS.get();
            if (var10000 == null) {
                var10000 = CGLIB$STATIC_CALLBACKS;
                if (var10000 == null) {
                    return;
                }
            }

            var1.CGLIB$CALLBACK_0 = (MethodInterceptor)((Callback[])var10000)[0];
        }

    }

    public Object newInstance(Callback[] var1) {
        CGLIB$SET_THREAD_CALLBACKS(var1);
        AliSmsService$$EnhancerByCGLIB$$9d97d15c var10000 = new AliSmsService$$EnhancerByCGLIB$$9d97d15c();
        CGLIB$SET_THREAD_CALLBACKS((Callback[])null);
        return var10000;
    }

    public Object newInstance(Callback var1) {
        CGLIB$SET_THREAD_CALLBACKS(new Callback[]{var1});
        AliSmsService$$EnhancerByCGLIB$$9d97d15c var10000 = new AliSmsService$$EnhancerByCGLIB$$9d97d15c();
        CGLIB$SET_THREAD_CALLBACKS((Callback[])null);
        return var10000;
    }

    public Object newInstance(Class[] var1, Object[] var2, Callback[] var3) {
        CGLIB$SET_THREAD_CALLBACKS(var3);
        AliSmsService$$EnhancerByCGLIB$$9d97d15c var10000 = new AliSmsService$$EnhancerByCGLIB$$9d97d15c;
        switch (var1.length) {
            case 0:
                var10000.<init>();
                CGLIB$SET_THREAD_CALLBACKS((Callback[])null);
                return var10000;
            default:
                throw new IllegalArgumentException("Constructor not found");
        }
    }

    public Callback getCallback(int var1) {
        CGLIB$BIND_CALLBACKS(this);
        MethodInterceptor var10000;
        switch (var1) {
            case 0:
                var10000 = this.CGLIB$CALLBACK_0;
                break;
            default:
                var10000 = null;
        }

        return var10000;
    }

    public void setCallback(int var1, Callback var2) {
        switch (var1) {
            case 0:
                this.CGLIB$CALLBACK_0 = (MethodInterceptor)var2;
            default:
        }
    }

    public Callback[] getCallbacks() {
        CGLIB$BIND_CALLBACKS(this);
        return new Callback[]{this.CGLIB$CALLBACK_0};
    }

    public void setCallbacks(Callback[] var1) {
        this.CGLIB$CALLBACK_0 = (MethodInterceptor)var1[0];
    }

    static {
        CGLIB$STATICHOOK1();
    }
}

CGLIB 动态代理的原理基于Java字节码操作技术,具体步骤如下
1.字节码生成
使用CGLIB进行动态代理时,在运行时分析目标类,并利用ASM库生成一个新的类,这个类是目标类的一个子类(
public class AliSmsService$$EnhancerByCGLIB$$9d97d15c extends AliSmsService...
),
目标类是final类无法代理
2.方法拦截
新生成的子类会重写目标类中所有非final且非private的方法,在这些重写的方法内部,CGLIB实现了方法拦截机制,即当调用代理类的方法时,实际上会调用自定义拦截器中的intercept方法

//...
public final String send(String var1) {
    MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
    if (var10000 == null) {
        CGLIB$BIND_CALLBACKS(this);
        var10000 = this.CGLIB$CALLBACK_0;
    }
    //调用自定义拦截器中的intercept方法,并传入代理类,方法,参数
    return var10000 != null ? (String)var10000.intercept(this, CGLIB$send$0$Method, new Object[]{var1}, CGLIB$send$0$Proxy) : super.send(var1);
}
//...

3.织入横切逻辑
在intercept()方法中,可以添加额外的业务逻辑(比如事务管理、日志记录、权限检查等),这就是所谓的“横切逻辑”。在这个方法内,可以通过回调方法参数获取到方法调用的信息,如方法对象(MethodProxy)、被代理的对象以及方法参数数组,然后决定是否执行原方法或者在前后执行其他操作。

CGLIB 动态代理快在哪里?

在上面生成的字节码文件中,除了代理类字节码之外,还有带FastClass字样的其他两个字节码文件

image-20240308102020578

image-20240308102204442

CGLIB通过创建
FastClass
对象来加速方法调用,
FastClass
对象包含了一个方法索引表,可以
直接调用方法对应的索引而不是每次都通过反射,从而提高性能

总结

静态代理、动态代理(包括JDK动态代理和CGLIB动态代理)是Java编程中实现代理设计模式的不同方式,它们各有特点及适用场景。

  1. 静态代理
    • 定义
      :在编译阶段就确定了代理类的形式,代理类和被代理类通常都需要实现同一个接口或继承相同的父类。
    • 优点
      :代码结构清晰,易于理解。
    • 缺点
      :每次新增业务类时都需要编写对应的代理类,代码量较大且不易于维护;扩展性差,如果接口增加方法,所有相关的代理类都需要同步更新。
  2. JDK动态代理
    • 定义
      :基于Java反射机制实现,在运行时动态生成代理类的字节码文件,代理类实现了与被代理类所实现的所有接口。
    • 优点
      • 动态生成,不需要预先写好代理类代码,更灵活。
      • 只要接口不变,代理逻辑可以统一处理,减少重复代码。
    • 限制
      • 被代理的目标类必须实现至少一个接口,对于未实现接口的类无法进行代理。
      • 基于反射,性能有所损耗
  3. CGLIB动态代理
    • 定义
      :CGLIB通过字节码技术在运行时生成被代理类的子类,从而实现代理功能。
    • 优点
      • 不依赖于接口,能够代理没有实现接口的普通类。
      • 拦截范围较广,除了接口方法,还可以代理类的非final方法。
      • CGLIB通过创建
        FastClass
        对象来加速方法调用,方法调用密集情况下,性能较好
    • 缺点
      • 由于是生成子类的方式,所以无法代理final类和final方法。