2024年1月

[西湖论剑 2022]Node Magical Login

环境!启动!(ノへ ̄、)

这么一看好像弱口令啊,(不过西湖论剑题目怎么会这么简单,当时真的傻),那就bp抓包试一下(这里就不展示了,因为是展示自己思路,这里就写了一下当时的NC思路,其实是不对的┭┮﹏┭┮)

不是BP弱口令?那好吧,我们先看一下源码,比赛的时候是给了源码的NSS复现上是没有的,这里我把源码放在这里,或者可以去我主页GITHUB上下载“
源码

单独建立一个工程看一下

大概扒拉了一下,main.js,controller.js,login.css............(+.+)(-.-)(_ _) ..zzZZ,最终发现!应该是“
controller.js
”这里有关于flag的内容!

我把源码放在下边o.O?

const fs = require("fs");
const SECRET_COOKIE = process.env.SECRET_COOKIE || "this_is_testing_cookie"

const flag1 = fs.readFileSync("/flag1")
const flag2 = fs.readFileSync("/flag2")


function LoginController(req,res) {
    try {
        const username = req.body.username
        const password = req.body.password
        if (username !== "admin" || password !== Math.random().toString()) {
            res.status(401).type("text/html").send("Login Failed")
        } else {
            res.cookie("user",SECRET_COOKIE)
            res.redirect("/flag1")
        }
    } catch (__) {}
}

function CheckInternalController(req,res) {
    res.sendFile("check.html",{root:"static"})

}

function CheckController(req,res) {
    let checkcode = req.body.checkcode?req.body.checkcode:1234;
    console.log(req.body)
    if(checkcode.length === 16){
        try{
            checkcode = checkcode.toLowerCase()
            if(checkcode !== "aGr5AtSp55dRacer"){
                res.status(403).json({"msg":"Invalid Checkcode1:" + checkcode})
            }
        }catch (__) {}
        res.status(200).type("text/html").json({"msg":"You Got Another Part Of Flag: " + flag2.toString().trim()})
    }else{
        res.status(403).type("text/html").json({"msg":"Invalid Checkcode2:" + checkcode})
    }
}

function Flag1Controller(req,res){
    try {
        if(req.cookies.user === SECRET_COOKIE){
            res.setHeader("This_Is_The_Flag1",flag1.toString().trim())
            res.setHeader("This_Is_The_Flag2",flag2.toString().trim())
            res.status(200).type("text/html").send("Login success. Welcome,admin!")
        }
        if(req.cookies.user === "admin") {
            res.setHeader("This_Is_The_Flag1", flag1.toString().trim())
            res.status(200).type("text/html").send("You Got One Part Of Flag! Try To Get Another Part of Flag!")
        }else{
            res.status(401).type("text/html").send("Unauthorized")
        }
    }catch (__) {}
}



module.exports = {
    LoginController,
    CheckInternalController,
    Flag1Controller,
    CheckController
}

很好理解先来第一段:

Flag1:

function LoginController(req,res) {
    try {
        const username = req.body.username
        const password = req.body.password
        if (username !== "admin" || password !== Math.random().toString()) {
            res.status(401).type("text/html").send("Login Failed")
        } else {
            res.cookie("user",SECRET_COOKIE)
            res.redirect("/flag1")
        }
    } catch (__) {}
}

这代码从上向下看就是username !== "admin"也就是让他等于admin,相当于屏蔽了这个,好解决BP启动!

抓到包以后直接改cookie就好了(・-・*),还有因为是访问请求,所以,GET一下flag1

Cookie:user=admin
GET /flag1 HTTP/1.1

得到:

好耶!我们得到了一半的flag!(*^▽^*)

NSSCTF{0a8c2d78-ee0e

那就接着看代码,找Flag2:

Flag2:

看了可以知道,访问 / 路由时,要满足密码为 Math.random().toString()的 随机数,因此cookie设为SECRET_COOKIE。

那么我们再看一下flag2 的相关代码:

function CheckController(req,res) {
    let checkcode = req.body.checkcode?req.body.checkcode:1234;
    console.log(req.body)
    if(checkcode.length === 16){
        try{
            checkcode = checkcode.toLowerCase()
            if(checkcode !== "aGr5AtSp55dRacer"){
                res.status(403).json({"msg":"Invalid Checkcode1:" + checkcode})
            }
        }catch (__) {}
        res.status(200).type("text/html").json({"msg":"You Got Another Part Of Flag: " + flag2.toString().trim()})
    }else{
        res.status(403).type("text/html").json({"msg":"Invalid Checkcode2:" + checkcode})
    }
}

这段代码其实很好理解,要上传一段长度为16位的码,同时要让checkcode不等于“aGr5AtSp55dRacer”还有就是因为有”toLowerCase“会把字符小写所以考虑用node的相关漏洞,具体讲解有大神讲过了,意思是node有个遗传性,比如:

oi1=1
oi2//不赋值
这个时候我们调用oi1那就会把io2也是1

这个不仅是赋值,同样还有Function也继承,类似于php继承性,这是讲解视频:
Node.js原型链污染

那么我们继续开始赋值,因为他让不等于而且会被默认小写,可以考虑数组:

两种写法:

{"checkcode":["aGr5AtSp55dRacer",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}
{"checkcode":["a","G","r","5","A","t","S","p","5","5","d","R","a","c","e","r"]}

其实是一样的都是凑够"16"位带入进去,还有就是因为是json形式的文件所以改一下json上传(好烦的要求┏┛墓┗┓...(((m -__-)m),还要记得访问flag2:

POST /getflag2 HTTP/1.1//这个是头
Content-Type:application/json//这个是文件形式

两种写的格式截图放这里:

效果一致得到Flag2(lll¬ω¬)

-43a7-9d6e-fba3cc3240ab}

最终为:

NSSCTF{0a8c2d78-ee0e-43a7-9d6e-fba3cc3240ab}

结束<(^-^)>

背景

前段时间无意间看到一篇公众号
招贤令:一起来搞一个新开源项目
,作者介绍他想要做一个开源项目:
cprobe
用于整合目前市面上散落在各地的
Exporter
,统一进行管理。

比如我们常用的
blackbox_exporter/mysqld_exporter
等。

以往的每一个 Exporter 都需要单独部署运维。

同时又完全兼容
Prometheus
生态,也可以复用现有的监控面板。

恰好这段时间我也在公司从事可观测性相关的业务,发现这确实是一个痛点。

于是便一直在关注这个项目,同时也做了些贡献;因为该项目的核心是用于整合 exporter,所以为其编写插件也是非常重要的贡献了。

编写插件

整个项目执行流程图如下:

可以看到编写插件最核心的便是自定义插件解析自定义的配置文件、抓取指标的逻辑。

比如我们需要在配置中指定抓取目标的域名、抓取规则等。

这里
cprobe
已经抽象出了两个接口,我们只需要做对应的实现即可。

type Plugin interface {  
    // ParseConfig is used to parse config  
    ParseConfig(baseDir string, bs []byte) (any, error)  
    // Scrape is used to scrape metrics, cfg need to be cast specific cfg  
    Scrape(ctx context.Context, target string, cfg any, ss *types.Samples) error  
}

下面就以我之前编写的
Consul
为例。

# Allows any Consul server (non-leader) to service a read.  
allow_stale = true  
  
# === CA  
# File path to a PEM-encoded certificate authority used to validate the authenticity of a server certificate.  
ca_file = "/etc/consul.d/consul-agent-ca.pem"  
  
# File path to a PEM-encoded certificate used with the private key to verify the exporter's authenticity.  
cert_file = "/etc/consul.d/consul-agent.pem"  
  
# Generate a health summary for each service instance. Needs n+1 queries to collect all information.  
health_summary = true  
  
# File path to a PEM-encoded private key used with the certificate to verify the exporter's authenticity  
key_file = "/etc/consul.d/consul-agent-key.pem"  
  
# Disable TLS host verification.  
insecure = false

这里每个插件的配置都不相同,所以我们需要将配置解析到具体的结构体中。

func (*Consul) ParseConfig(baseDir string, bs []byte) (any, error) {  
    var c Config  
    err := toml.Unmarshal(bs, &c)  
    if err != nil {  
       return nil, err  
    }  
  
    if c.Timeout == 0 {  
       c.Timeout = time.Millisecond * 500  
    }  
    return &c, nil  
}

解析配置文件没啥好说的,根据自己的逻辑实现即可,可能会配置一些默认值而已。


下面是核心的抓取逻辑,本质上就是使用对应插件的
Client
获取一些核心指标封装为
Prometheus

Metric
,然后由
cprobe
写入到远端的
Prometheus
中(或者是兼容
Prometheus
的数据库中)。


// Create client
config.HttpClient.Timeout = opts.Timeout  
config.HttpClient.Transport = transport  
  
client, err := consul_api.NewClient(config)  
if err != nil {  
    return nil, err  
}  
  
var requestLimitChan chan struct{}  
if opts.RequestLimit > 0 {  
    requestLimitChan = make(chan struct{}, opts.RequestLimit)  
}


所有的指标数据都是通过对应的客户端获取。

如果是迁移一个存在的 export 到 cprobe 中时,这些抓取代码我们都可以直接复制对应
repo
中的代码。

比如我就是参考的:
https://github.com/prometheus/consul_exporter

除非我们是重新写一个插件,不然对于一些流行的库或者是中间件都已经有对应的
exporter
了。

具体的列表可以参考这里:
https://prometheus.io/docs/instrumenting/exporters/

之后便需要在对应的插件目录(
./conf.d
)创建我们的配置文件:

为了方便测试,可以在启动 cprobe 时添加
-no-writer
让指标打印在控制台,从而方便调试。

总结

之前就有人问我有没有毕竟好上手的开源项目,这不就来了吗?

正好目前项目创建时间不长,代码和功能也比较简单,同时还有可观察系统大佬带队,确实是一个非常适合新手参与的开源项目。

项目地址:

https://github.com/cprobe/cprobe

私货


最后夹带一点私货:前两天帮一个读者朋友做了一次付费的技术咨询(主要是关于 Pulsar 相关的),也是我第一次做付费内容,这种拿人钱财替人消灾难道就是知识付费的味道吗

1、准备材料

开发板(
正点原子stm32f407探索者开发板V2.4

STM32CubeMX软件(
Version 6.10.0

野火DAP仿真器

keil µVision5 IDE(
MDK-Arm

ST-LINK/V2驱动

XCOM V2.6串口助手

2、实验目标

使用STM32CubeMX软件配置STM32F407开发板的
PWR电源管理,并了解STM32的睡眠、停止和待机模式

3、实验流程

3.0、前提知识

3.0.1、睡眠模式

睡眠模式可以立即进入,也可以在退出优先级最低的中断时再进入,在进入睡眠模式前可以通过HAL_PWR_EnableSleepOnExit() / HAL_PWR_DisableSleepOnExit()设置

通过调用HAL库的HAL_PWR_EnterSLEEPMode()函数可以进入睡眠模式,以WFI进入的睡眠模式任何中断均可将MCU唤醒,以WFE进入的睡眠模式任何唤醒事件均可将MCU唤醒

如下图所示为立即和退出休眠两种情况的进入/退出说明
(注释1)

睡眠模式下系统状态如下

  1. CPU时钟关闭,CPU停止运行,程序暂停
  2. 外设时钟正常,所有外设正常工作,I/O引脚状态与进入睡眠模式时一致
  3. 调压器正常工作

任何中断或唤醒事件导致退出睡眠模式时,CPU重新运行,程序从暂停处继续运行

3.0.2、停止模式

HAL库中通过HAL_PWR_EnterSTOPMode()可以进入停止模式

由于进入停止模式所有外部中断线均需退出,可以使用EXTI->PR = 0;强制复位所有外部中断线,以保证实验可以正常进入停止模式

如下图所示为停止模式的进入/退出说明
(注释1)

停止模式下系统状态如下

  1. CPU时钟关闭,CPU停止运行,程序暂停
  2. 1.2V域外设时钟停止,外设停止工作
  3. 调压器开启/处于低功耗模式,寄存器/SRAM内容保留
  4. FLASH处于正常/掉电模式(通过HAL_PWREx_EnableFlashPowerDown()/HAL_PWREx_DisableFlashPowerDown()函数设置)
  5. HSI和HSE振荡器关闭

所有配置为外部中断线EXTI上的中断/事件触发都将导致退出停止模式,退出停止模式时,系统重新启动HSI时钟,然后CPU重新运行,程序从暂停处继续运行

3.0.3、待机模式

HAL库中通过HAL_PWR_EnterSTANDBYMode()可以进入停止模式

通过HAL_PWR_EnableWakeUpPin()可以使能唤醒引脚PA0,当处于待机模式时,PA0引脚出现上升沿则从待机模式退出

如下图所示为待机模式的进入/退出说明
(注释1)

待机模式下系统状态如下

  1. 所有外设停止工作,除能退出待机模式的一些引脚,其他引脚均为高阻态
  2. 1.2V调压器关闭,寄存器/SRAM内容全部丢失
  3. PLL、HSI和HSE振荡器均关闭
  4. VBAT供电的RTC寄存器,备份域SRAM内容保留,RTC正常工作

WKUP引脚上升沿、RTC闹钟(闹钟A和闹钟B)、RTC唤醒事件、RTC入侵事件、RTC时间戳事件、NRST引脚外部复位和IWDG复位 其中任何一个事件发生时退出待机模式,CPU复位,程序从头开始运行(退出待机模式相当于复位)

另外从待机模式中唤醒后需要注意以下两件事情

  1. 当MCU从待机模式中唤醒后需要使用__HAL_PWR_CLEAR_FLAG(PWR_FLAG_WU)软件手动清除唤醒标志,否则下次再次进入待机模式将直接唤醒
  2. 另外可以顺便使用HAL_PWR_DisableWakeUpPin(PWR_WAKEUP_PIN1)将PA0上升沿唤醒关闭,只在即将进入待机模式前开启

3.1、CubeMX相关配置

3.1.0、工程基本配置

打开STM32CubeMX软件,单击ACCESS TO MCU SELECTOR选择开发板MCU(选择你使用开发板的主控MCU型号),选中MCU型号后单击页面右上角Start Project开始工程,具体如下图所示

开始工程之后在配置主页面System Core/RCC中配置HSE/LSE晶振,在System Core/SYS中配置Debug模式,具体如下图所示

详细工程建立内容读者可以阅读“
STM32CubeMX教程1 工程建立

3.1.1、时钟树配置

系统时钟使用8MHz外部高速时钟HSE,HCLK、PCLK1和PCLK2均设置为STM32F407能达到的最高时钟频率,具体如下图所示

3.1.2、外设参数配置

本实验需要初始化开发板上KEY2、KEY1和KEY0用户按键做普通输入,具体配置步骤请阅读“
STM32CubeMX教程3 GPIO输入 - 按键响应

本实验需要初始化开发板上WK_UP按键为外部中断,具体配置请阅读“
STM32CubeMX教程4 EXTI 按键外部中断

本实验需要初始化TIM6外设实现500ms定时,具体配置步骤请阅读“
STM32CubeMX教程5 TIM 定时器概述及基本定时器

本实验需要初始化USART1作为输出信息渠道,具体配置步骤请阅读“
STM32CubeMX教程9 USART/UART 异步通信

3.1.3、外设中断配置

本实验无需配置

3.2、生成代码

3.2.0、配置Project Manager页面

单击进入Project Manager页面,在左边Project分栏中修改工程名称、工程目录和工具链,然后在Code Generator中勾选“Gnerate peripheral initialization as a pair of 'c/h' files per peripheral”,最后单击页面右上角GENERATE CODE生成工程,具体如下图所示

详细Project Manager配置内容读者可以阅读“
STM32CubeMX教程1 工程建立
”实验3.4.3小节

3.2.1、外设初始化调用流程

3.2.2、外设中断调用流程

本实验没有启动电源管理相关中断

3.2.3、添加其他必要代码

在主函数中添加按键控制逻辑,按下KEY2按键进入睡眠模式,按下KEY1按键进入停止模式,按下KEY0按键进入待机模式

源代码如下

/*main.c标志位定义*/
uint8_t mode_flag = 0;
 
/*main.h标志位声明*/
extern uint8_t mode_flag;
extern void SystemClock_Config(void);
 
/*main.c主函数内初始化程序*/
printf("\r\nReset\r\n");
HAL_TIM_Base_Start_IT(&htim6);
 
/*main.c主循环内控制程序*/
while(1)
{
    /*从待机模式唤醒后手动清除唤醒标志,否则下次进入待机模式将直接唤醒*/
    if(__HAL_PWR_GET_FLAG(PWR_FLAG_WU)==SET)
    {
        __HAL_PWR_CLEAR_FLAG(PWR_FLAG_WU);
    }
    /*从待机模式唤醒后失能唤醒引脚*/
    if(__HAL_PWR_GET_FLAG(PWR_FLAG_SB)==SET)
    {
        HAL_PWR_DisableWakeUpPin(PWR_WAKEUP_PIN1);
        __HAL_PWR_CLEAR_FLAG(PWR_FLAG_SB);
    }
 
    /*按键KEY2被按下*/
    if(HAL_GPIO_ReadPin(KEY2_GPIO_Port,KEY2_Pin) == GPIO_PIN_RESET)
    {
        HAL_Delay(50);
        if(HAL_GPIO_ReadPin(KEY2_GPIO_Port,KEY2_Pin) == GPIO_PIN_RESET)
        {
            /*进入睡眠模式*/
            mode_flag = 3;
            printf("\r\nKEY2 Pressed,Into Sleep Mode\r\n");
            HAL_SuspendTick();
            HAL_PWR_EnterSLEEPMode(PWR_LOWPOWERREGULATOR_ON,PWR_SLEEPENTRY_WFI);
            while(!HAL_GPIO_ReadPin(KEY2_GPIO_Port,KEY2_Pin));
        }
    }
    /*按键KEY1被按下*/
    if(HAL_GPIO_ReadPin(KEY1_GPIO_Port,KEY1_Pin) == GPIO_PIN_RESET)
    {
        HAL_Delay(50);
        if(HAL_GPIO_ReadPin(KEY1_GPIO_Port,KEY1_Pin) == GPIO_PIN_RESET)
        {
            /*进入停止模式*/
            mode_flag = 2;
            printf("\r\nKEY1 Pressed,Into Stop Mode\r\n");
            HAL_PWREx_EnableFlashPowerDown();
            EXTI->PR = 0;
            HAL_PWR_EnterSTOPMode(PWR_LOWPOWERREGULATOR_ON,PWR_SLEEPENTRY_WFI);
            while(!HAL_GPIO_ReadPin(KEY1_GPIO_Port,KEY1_Pin));
        }
    }
    /*按键KEY0被按下*/
    if(HAL_GPIO_ReadPin(KEY0_GPIO_Port,KEY0_Pin) == GPIO_PIN_RESET)
    {
        HAL_Delay(50);
        if(HAL_GPIO_ReadPin(KEY0_GPIO_Port,KEY0_Pin) == GPIO_PIN_RESET)
        {
            /*进入待机模式*/
            mode_flag = 1;
            printf("\r\nKEY0 Pressed,Into StandBy Mode\r\n");
            HAL_PWR_EnableWakeUpPin(PWR_WAKEUP_PIN1);
            HAL_PWR_EnterSTANDBYMode();
            while(!HAL_GPIO_ReadPin(KEY0_GPIO_Port,KEY0_Pin));
        }
    }
    HAL_Delay(100);
    HAL_GPIO_TogglePin(GREEN_LED_GPIO_Port,GREEN_LED_Pin);
}

在gpio.c中重新实现WK_UP按键外部中断回调函数HAL_GPIO_EXTI_Callback()

源代码如下

void HAL_GPIO_EXTI_Callback(uint16_t GPIO_Pin)
{
    if(GPIO_Pin == WK_UP_Pin)
    {
        /*睡眠模式唤醒*/
        if(mode_flag == 3)
        {
            HAL_ResumeTick();
        }
        /*停止模式唤醒*/
        else if(mode_flag == 2)
        {
            HAL_Init();
            SystemClock_Config();
        }
        printf("\r\nWK_UP Pressed\r\n");
    }
}

4、常用函数

/*进入睡眠模式*/
void HAL_PWR_EnterSLEEPMode(uint32_t Regulator, uint8_t SLEEPEntry)
 
/*进入停止模式*/
void HAL_PWR_EnterSTOPMode(uint32_t Regulator, uint8_t STOPEntry)
 
/*进入待机模式*/
void HAL_PWR_EnterSTANDBYMode(void)
 
/*挂起滴答定时器*/
void HAL_SuspendTick(void)
 
/*恢复滴答定时器*/
void HAL_ResumeTick(void)
 
/*使能停止模式时的FLASH掉电模式*/
void HAL_PWREx_EnableFlashPowerDown(void)
 
/*停止模式时的FLASH处于正常模式*/
void HAL_PWREx_DisableFlashPowerDown(void)
 
/*使能待机唤醒引脚*/
void HAL_PWR_EnableWakeUpPin(uint32_t WakeUpPinx)
 
/*立即进入睡眠模式*/
void HAL_PWR_EnableSleepOnExit(void)
 
/*退出后进入睡眠模式*/
void HAL_PWR_DisableSleepOnExit(void)

5、烧录验证

烧录程序,开发板上电后,由外设TIM控制的红色LED每隔500ms状态翻转一次,由程序控制的绿色LED大约每隔100ms状态翻转一次

当按下KEY2按键时单片机会进入睡眠模式,此时程序暂停运行,所有外设正常运行,因此绿色LED保持进入睡眠模式的状态不再改变,但是红色LED仍然正常每隔500ms状态翻转一次,在睡眠模式时如果按下WK_UP按键,单片机会被唤醒,程序从停止处正常运行

当按下KEY1按键时单片机会进入停止模式,此时程序暂停运行,所有外设也停止工作,调压器处于开启/低功耗状态,因此绿色LED和红色LED的状态均保持进入停止模式时的状态不再改变,在停止模式时如果按下WK_UP按键,单片机会被唤醒,程序从停止处正常运行
(注释2)

当按下KEY0按键时单片机会进入待机模式,此时程序暂停运行,所有外设也停止工作,调压器也关闭,因此绿色LED和红色LED均会熄灭,在待机模式下如果按下WK_UP按键,单片机会退出待机模式,但单片机会复位,程序会重新开始运行

如下图所示为上述整个过程串口输出的信息和开发板绿色/红色LED状态

6、注释详解

注释1
:图片来源于STM32F4xx 中文参考手册 RM009

注释2
:根据手册我们知道进入停止模式时内核暂停,程序此时不应该继续执行,外设也都停止,正常情况下我们设置的红色LED和绿色LED灯都将保持进入停止模式时的状态不改变;但是笔者遇到一个奇怪的现象,不知道是个例还是程序存在BUG(大概率程序BUG),当使用DAP/STLINK烧录到开发板程序后,按下KEY1按键进入停止模式后会被自动唤醒一次,本应该不闪烁的LED灯,则因为意外唤醒而再次闪烁,只不过由于从停止模式唤醒后使用的是内部高速时钟HSI,因此闪烁会较慢,而当烧录到开发板程序后将开发板断电一次,上电后重新按下KEY1按键进入停止模式则一切正常

参考资料

STM32Cube高效开发教程(基础篇)

提到池化技术,很多同学可能都不会感到陌生,因为无论是在我们的项目中,还是在学习的过程的过程,都会接触到池化技术。池化技术旨在提高资源的重复使用和系统性能,在.NET中包含以下几种常用的池化技术。
(1)、连接池(Connection Pool):用于管理数据库连接的池化技术。连接池允许应用程序重复使用已经建立的数据库连接,而不是在每次需要连接时都创建新的连接。
(2)、线程池(Thread Pool):用于管理线程的池化技术。线程池可以重复使用已有的线程,避免频繁创建和销毁线程,从而提高系统的性能和资源利用率。
(3)、对象池(Object Pool):用于管理对象的池化技术。对象池允许应用程序重复使用已经创建的对象,而不是在每次需要对象时都创建新的对象。这有助于减少垃圾回收的压力,提高内存使用效率。
(4)、连接池(Socket Pool):用于管理网络套接字的池化技术。类似于连接池,网络套接字池允许应用程序重复使用已经建立的套接字,提高网络通信的效率。
(5)、资源池(Resource Pool):泛指用于管理各种类型资源的池化技术。这可以包括文件句柄、图形资源等。
以上的这些池化技术,在.NET中使用的是以下的这些对象:

(1)、MemoryPool:用于内存池化,允许你更有效地分配和管理内存,特别是对于大量小对象的情况。
(2)、ArrayPool:用于管理数组类型的内存块。它允许你重用数组,减少频繁创建和销毁数组的开销。
(3)、System.Buffers.MemoryManager:是一个抽象基类,允许你创建自定义的内存管理器。这可以用于创建适应特定场景的内存池。
(4)、连接池 (Connection Pool):与数据库相关的类(如SqlConnection、MySqlConnection等)通常具有连接池的内置实现。这允许应用程序重用数据库连接,提高性能。
(5)、线程池 (ThreadPool):ThreadPool 类提供了对线程池的访问,允许应用程序将工作项提交给池中的线程执行,以减少线程的创建和销毁开销。

这次我们先来介绍一下.NET的对象池化技术,对于C#中提供了的ArrayPool<T>,可能很多同学并不是特别的熟悉,尤其是其内部的实现原理和机制。在.NET以前的版本中是直接提供ObjectPool
类型进行对象的复用。对象池技术的产生背景主要是在编程中,由于需要频繁地分配和释放内存,可能导致性能下降,特别是在高负载和大规模数据处理的情况
下。

ArrayPool<T>主要是用于管理和重复使用数组(或其他内存块)的机制,目的是为了减少垃圾回收的压力,提高内存使用效率,并降低因为频繁分配和释放内
存而导致的性能开销。有以下几个具体的应用场景:
(1)、性能优化:在某些应用中,特别是需要处理大量数据的高性能应用,频繁地分配和释放内存可能会导致垃圾回收的开销。
(2)、数组重用:当某个数组不再被使用时,它并不会立即被销毁,而是放回到 ArrayPool 中,以备将来再次使用。
(3)、减少内存碎片:频繁地分配和释放大块内存可能导致内存碎片化,ArrayPool可以在一定程度上减少这种内存碎片化。
(4)、多线程环境下的内存管理:多个线程同时尝试分配和释放内存,可能会导致竞争条件和性能问题。ArrayPool 通过使用线程安全的机制来管理内存。
具体的应用场景也比较多,例如:网络编程、图形处理、数据库操作、并行计算、流式处理、缓存管理等等实际的开发场景中都存在。
接下来我们就来具体看看ArrayPool的内部实现机制和原理是怎么样的,是如何高效的进行对象的管理和内存分配。在C#中ArrayPool的底层默认实现是
由ConfigurableArrayPool类型完成。

一、ArrayPool应用样例

1 usingSystem;2 usingSystem.Buffers;3 
4 classArrayPoolExample5 {6 static voidMain()7 {8 //创建数组池实例
9 ArrayPool<int> arrayPool = ArrayPool<int>.Shared;10
11 //请求租借一个大小为 5 的数组
12 int[] rentedArray = arrayPool.Rent(5);13
14 try
15 {16 //使用租借的数组进行操作
17 for (int i = 0; i < rentedArray.Length; i++)18 {19 rentedArray[i] = i * 2;20 }21 }22 finally
23 {24 //使用完毕后归还数组到数组池
25 arrayPool.Return(rentedArray);26 }27 }28 }

以上的样例比较简单,主要包含:创建数组池实例、租借一个大小为 5 的数组、使用租借的数组进行操作、使用完毕后归还数组到数组池。在实际的项目中,
我们可以对ArrayPool进行包装,创建我们需要的不同对象池的管理,这可以根据我们实际的项目需求进行开发。

对于以上的几步操作,我们可能会问,ArrayPool的初始化、数组对象的租借、数组对象归还是如何实现的呢,并且为什么能够做到对象的复用,以及如何
实现内存使用较低的呢,那么我们就带着这几个问题往下看看。

二、ArrayPool的初始化

首先我们来看看ArrayPool的初始化,这是对应的实现代码:
1         private static readonly SharedArrayPool<T> s_shared = new SharedArrayPool<T>();2  
3 public static ArrayPool<T> Shared =>s_shared;4
5 public static ArrayPool<T> Create() => new ConfigurableArrayPool<T>();

从以上ArrayPool的初始化代码可以发现,其数组对象池的创建是由ConfigurableArrayPool类完成的,那么我们继续看一下对应的初始化逻辑。部分代码已经
做过删减,我们只关注核心的实现逻辑,需要看全部的实现代码的同学,可以自行前往GitHub上查看。

1         private const int DefaultMaxArrayLength = 1024 * 1024;2         private const int DefaultMaxNumberOfArraysPerBucket = 50;3         private readonlyBucket[] _buckets;4         internal ConfigurableArrayPool() : this(DefaultMaxArrayLength, DefaultMaxNumberOfArraysPerBucket){ }5         internal ConfigurableArrayPool(int maxArrayLength, intmaxArraysPerBucket)6 {7 ...8             
9 int maxBuckets =Utilities.SelectBucketIndex(maxArrayLength);10 var buckets = new Bucket[maxBuckets + 1];11 for (int i = 0; i < buckets.Length; i++)12 {13 buckets[i] = newBucket(Utilities.GetMaxSizeForBucket(i), maxArraysPerBucket, poolId);14 }15 _buckets =buckets;16 }

我们从源码中可以看出几个比较重要的实现逻辑,ConfigurableArrayPool在初始化时,设置了默认的两个参数DefaultMaxArrayLength和
DefaultMaxNumberOfArraysPerBucket,分别用于设置默认的池中每个数组的默认最大长度(2^20)和设置每个桶默认可出租的最大数组数。根据传入的参数,对其调用
Utilities.SelectBucketIndex(maxArrayLength)进行计算,根据最大数组长度计算出桶的数量 maxBuckets,然后创建一个数组 buckets。

1         internal static int SelectBucketIndex(intbufferSize)2 {3             return BitOperations.Log2((uint)bufferSize - 1 | 15) - 3;4         }

SelectBucketIndex使用位操作和数学运算来确定给定缓冲区大小应分配到哪个桶。该方法的目的是为了根据缓冲区的大小,有效地将缓冲区分配到适当大小的桶
中。在bufferSize大小介于 2^(n-1) + 1 和 2^n 之间时,分配大小为 2^n 的缓冲区。使用了BitOperations.Log2 方法,计算 (bufferSize - 1) | 15 的二进制对数(以 2 为
底)。由于要处理1到16字节之间的缓冲区,使用了|15 来确保范围内的所有值都会变成15。最后,通过-3进行调整,以满足桶索引的需求。针对零大小的缓冲区,将
其分配给最高的桶索引,以确保零长度的缓冲区不会由池保留。对于这些情况,池将返回 Array.Empty 单例。

如果我们没有调整默认值,那么创建的maxBuckets=16,说明在默认情况下会创建17个桶。对于Utilities.GetMaxSizeForBucket(i)方法根据给定的桶索引,计算该桶
所能容纳的缓冲区的最大大小。通过左移操作符,可以快速计算出适应桶索引的缓冲区大小。
1         internal static int GetMaxSizeForBucket(intbinIndex)2 {3             int maxSize = 16 <<binIndex;4             returnmaxSize;5         }

GetMaxSizeForBucket将数字 16 左移 binIndex 位。因为左移是指数增长的,所以这样的计算方式确保了每个桶的大小是前一个桶大小的两倍。初始桶的索引
(binIndex 为 0)对应的最大大小为 16。这种是比较通用的内存管理的策略,按照一系列固定的大小划分内存空间,这样可以减少分配的次数。接下来我们看一下Bucket对象的初始化代码。

1             internal readonly int_bufferLength;2             private readonly T[]?[] _buffers;3             private readonly int_poolId;4             privateSpinLock _lock;5             internal Bucket(int bufferLength, int numberOfBuffers, intpoolId)6 {7                 _lock = newSpinLock(Debugger.IsAttached);8                 _buffers = newT[numberOfBuffers][];9                 _bufferLength =bufferLength;10                 _poolId =poolId;11             }

SpinLock只有在附加调试器时才启用线程跟踪;它为Enter/Exit增加了不小的开销;numberOfBuffers表示可以租借的次数,只初始化定义个二维的泛型数组,未分
配内存空间;bufferLength每个缓冲区的大小。以上的逻辑大家可能不是很直观,我们用一个简单的图给大家展示一下。

1 ArrayPool2   |
3 +-- Bucket[0] (Buffer Size: 16)4 | +-- Buffer 1 (Size: 16)5 | +-- Buffer 2 (Size: 16)6 | +--...7 |
8 +-- Bucket[1] (Buffer Size: 32)9 | +-- Buffer 1 (Size: 32)10 | +-- Buffer 2 (Size: 32)11 | +--...12 |
13 ...14 默认会创建50个Buffer

如果对C#的字典的结构比较了解的同学,可能很好理解,ArrayPool是由一个一维数组和一个二维泛型数组进行构建。无论是.NET 还是JAVA中,很多的复杂的
数据结构都是由多种简单结构进行组合,这样不仅一定程度上保证数据的取的效率,又可以考虑插入、删除的性能,也兼顾内存的占用情况。这里用一个简单的图
来说明一下二维数组的初始化时占用的内存的结构。(_buffers = new T[numberOfBuffers][])

1 +-----------+
2 | arrayInt |
3 +-----------+
4 | [0] | --> [ ] (Possibly nullor an actual array)5 +-----------+
6 | [1] | --> null
7 +-----------+
8 | [2] | --> null
9 +-----------+
10
11 +----------+
12 | arrInt1 |
13 +----------+
14 | | --> [ ] (Possibly nullor an actual array)15 +----------+

三、ArrayPool的对象租借

上面简单的介绍了数组对象池的初始化,其实很多同学可以发现,在对象没有进行租借时,整个对象池 并没有占用多少空间,因为用于存储对象的二维数组都
只是进行了申明和设定了对应的大小。接下来我们来看看具体的租借实现逻辑。(部分代码已做删减,只关注核心逻辑)
1         public override T[] Rent(intminimumLength)2 {3             if (minimumLength == 0){ return Array.Empty<T>(); }4             T[]?buffer;5             int index =Utilities.SelectBucketIndex(minimumLength);6             if (index <_buckets.Length)7 {8                 const int MaxBucketsToTry = 2;9                 int i =index;10                 do
11 {12 buffer =_buckets[i].Rent();13 if (buffer != null) { returnbuffer; }14 }15 while (++i < _buckets.Length && i != index +MaxBucketsToTry);16 buffer = newT[_buckets[index]._bufferLength];17 }18 else
19 {20 buffer = newT[minimumLength];21 }22 returnbuffer;23 }

从源码中我们可以看到,如果请求的数组长度为零,直接返回一个空数组。允许请求零长度数组,因为它是一个有效的长度数组。因为在这种情况下,池的大小
没有限制,不需要进行事件记录,并且不会对池的状态产生影响。根据传入的minimumLength确定数组长度对应的池的桶的索引,在选定的桶中尝试租用数组,如果
找到可用的数组,记录相应的事件并返回该数组。如果未找到可用的数组,会尝试在相邻的几个桶中查找(MaxBucketsToTry=2)。buffer = new
T[_buckets[index]._bufferLength]表示如果池已耗尽,则分配一个具有相应大小的新缓冲区到合适的桶。buffer = new T[minimumLength]请求的大小对于池来说太大
了,分配一个完全符合所请求长度的数组。 当它返回到池中时,我们将直接扔掉它。

接下来我们来具体看一下具体完成租借的操作方法_buckets[i].Rent()的实现逻辑。该方法从桶中租用一个缓冲区。它在桶中找到下一个可用的缓冲区,如果没有
可用的,则分配一个新的缓冲区。租用的缓冲区将被从桶中移除。如果启用了事件记录,将记录缓冲区的租用事件。
1             internal T[]?Rent()2 {3                 T[]?[] buffers =_buffers;4                 T[]? buffer = null;5                 bool lockTaken = false, allocateBuffer = false;6                 try
7 {8 _lock.Enter(reflockTaken);9 if (_index <buffers.Length)10 {11 buffer =buffers[_index];12 buffers[_index++] = null;13 allocateBuffer = buffer == null;14 }15 }16 finally
17 {18 if (lockTaken) _lock.Exit(false);19 }20 if(allocateBuffer)21 {22 buffer = newT[_bufferLength];23 }24 returnbuffer;25 }

我们来具体看一下这个方法的核心逻辑。T[]?[] buffers = _buffers通过获取 _buffers 字段的引用,获取桶中缓冲区数组的引用,并初始化一个用于保存租用的缓冲
区的变量 buffer。使用 SpinLock 进入临界区,在临界区中,检查 _index 是否小于缓冲区数组的长度buffers.Length。来判断桶是否还有缓冲区可以使用。我们从if
(allocateBuffer)可以看出,如果allocateBuffer==null时,则需要生成一个对应大小的缓冲区。可以明显的看到,具体的缓冲区对象都是在第一次使用的时候生成的,
未使用时并不初始化,不占据内存空间。

四、ArrayPool的对象归还

上面我们介绍了对象的初始化和租借的实现逻辑,接下来我们来看一下对象的归还是如何实现的。对于ArrayPool是怎么实现对象的高效复用,重点也在对象的
归还策略上,正是因为对象创建完毕之后,没有直接销毁掉,而是缓存在数组对象池中,所以下次才可以进行复用。
首先来看一下归还的策略,Return该方法的目标是将数组返回到池中,并在必要时清空数组内容。在此过程中,记录相应的事件,以便监测池的使用情况。
1         public override void Return(T[] array, bool clearArray = false)2 {3             if (array.Length == 0) { return; }4             int bucket =Utilities.SelectBucketIndex(array.Length);5             bool haveBucket = bucket <_buckets.Length;6             if(haveBucket)7 {8                 if(clearArray) { Array.Clear(array); }9 _buckets[bucket].Return(array);10 }11         }

首先是对归还的数组对象进行长度的判断,如果传入的数组长度为零,表示是一个空数组,直接返回,不进行任何处理。在池中,对于长度为零的数组,通常
不会真正从池中取出,而是返回一个单例,以提高效率。然后根据数组的长度计算确定传入数组的长度对应的桶的索引。bucket < _buckets.Lengt判断是否存在与
传入数组长度对应的桶,如果存在,表示该数组的长度在池的有效范围内。如果存在对应的桶,根据用户传入的 clearArray 参数,选择是否清空数组内容,然后
将数组返回给对应的桶。_buckets[bucket].Return(array)将缓冲区返回到它的bucket。将来,我们可能会考虑让Return返回false不掉一个桶,在这种情况下,我们可以
尝试返回到一个较小大小的桶,就像在Rent中,我们允许从更大的桶中租用。

接下来我们来具体看一下_buckets[bucket].Return(array)的实现逻辑。
1             internal voidReturn(T[] array)2 {3                 if (array.Length !=_bufferLength)4 {5                     throw newArgumentException(SR.ArgumentException_BufferNotFromPool, nameof(array));6 }7                 boolreturned;8                 bool lockTaken = false;9                 try
10 {11 _lock.Enter(reflockTaken);12 returned = _index != 0;13 if (returned) { _buffers[--_index] =array; }14 }15 finally
16 {17 if (lockTaken) _lock.Exit(false);18 }19 }

这一部分的实现逻辑相对较简单,首先判断归还的数组对象长度是否符合要求,在将缓冲区返回到桶之前,首先检查传入的缓冲区的长度是否与桶的期望长度
相匹配。 如果长度不匹配,抛出 ArgumentException,表示传入的缓冲区不是从该池中租用的。使用 SpinLock 进入临界区。在临界区中,检查是否有可用的空槽,
如果有,则将传入的缓冲区放入下一个可用槽,并将 _index 减小。如果没有可用槽,则不存储缓冲区。使用 try/finally 语句确保在退出临界区时正确释放锁,以处
理可能的线程中止。

五、ArrayPool的应用建议

上面介绍了ArrayPool的产生的背景和用途,也重点介绍了ArrayPool的实现原理和机制,哪些我们在具体的项目中应用是,需要注意的点有哪些呢,这里简单的总结了几点:
1、适当选择数组大小:在请求数组时,尽量选择适当大小的数组。不要过度请求超过实际需求的大数组,因为这可能会浪费内存。在选择数组大小时,可以考
虑实际数据量以及性能方面的需求。
2、及时释放数组:当你不再需要数组时,记得及时释放它。虽然 ArrayPool 会负责管理这些数组,但在不再使用时显式地调用 Return 方法可以更快地将数组返
回到池中,以便其他部分的代码可以重用它。
3、小心数组的生命周期:当你将数组返回到池中后,不应该再尝试使用它。ArrayPool 可能已经将其分配给其他部分的代码。尝试使用已经返回到池中的数组
可能导致不可预测的行为。
4、考虑线程安全性:如果你的应用程序是多线程的,确保在多个线程之间正确使用 ArrayPool。ArrayPool 提供了线程安全的方法,但在多线程环境中,仍然需
要小心协调数组的分配和释放。
5、调整默认值(如果有必要):ArrayPool 提供了默认值,但这些值可能不适用于所有情况。根据应用程序的特定需求,可能需要调整默认值,例如,通过调
整 DefaultMaxArrayLength 和 DefaultMaxNumberOfArraysPerBucket。
6、测量和分析:在使用 ArrayPool 之后,测量和分析应用程序的性能。检查内存使用情况、垃圾回收频率等方面,确保 ArrayPool 的使用对性能有积极的影响。
7、合理权衡:在使用 ArrayPool 时,要平衡性能和内存利用效率。不要过度优化,而导致代码变得复杂难以维护,同时也不要牺牲性能。
以上是在实际应用中的几点小建议,一种技术的产生有气特定的意义,但也不是能够解决所有的问题,往往是在解决一个问题时,会造成其他问题的产生,我
们在实际的解决过程中,需要分析当前问题中最需要解决的点是什么,这就要分析问题中的背景和原因,最后再选择合适的方法进行处理。

背景

部门内有一些亿级别核心业务表增速非常快,增量日均100W,但线上业务只依赖近一周的数据。随着数据量的迅速增长,慢SQL频发,数据库性能下降,系统稳定性受到严重影响。本篇文章,将分享如何使用MyBatis拦截器低成本的提升数据库稳定性。

业界常见方案

针对冷数据多的大表,常用的策略有以2种:

1. 删除/归档旧数据。

2. 分表。

归档/删除旧数据

定期将冷数据移动到归档表或者冷存储中,或定期对表进行删除,以减少表的大小。此策略逻辑简单,只需要编写一个JOB定期执行SQL删除数据。我们开始也是用这种方案,但此方案也有一些副作用:

1.数据删除会影响数据库性能,引发慢sql,多张表并行删除,数据库压力会更大。
2.频繁删除数据,会产生数据库碎片,影响数据库性能,引发慢SQL。

综上,此方案有一定风险,为了规避这种风险,我们决定采用另一种方案:分表。

分表

我们决定按日期对表进行横向拆分,实现让系统每周生成一张周期表,表内只存近一周的数据,规避单表过大带来的风险。

分表方案选型

经调研,考虑2种分表方案:Sharding-JDBC、利用Mybatis自带的拦截器特性。

经过对比后,决定采用Mybatis拦截器来实现分表,原因如下:

1.JAVA生态中很常用的分表框架是Sharding-JDBC,虽然功能强大,但需要一定的接入成本,并且很多功能暂时用不上。
2.系统本身已经在使用Mybatis了,只需要添加一个mybaits拦截器,把SQL表名替换为新的周期表就可以了,没有接入新框架的成本,开发成本也不高。

简易架构图

分表具体实现代码

分表配置对象

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ShardingProperty {
    // 分表周期天数,配置7,就是一周一分
    private Integer days;
    // 分表开始日期,需要用这个日期计算周期表名
    private Date beginDate;
    // 需要分表的表名
    private String tableName;
}


分表配置类

import java.util.concurrent.ConcurrentHashMap;

public class ShardingPropertyConfig {

    public static final ConcurrentHashMap<String, ShardingProperty> SHARDING_TABLE ();

static {
ShardingProperty orderInfoShardingConfig = new ShardingProperty(15, DateUtils.string2Date("20236666667"), "order_info");
ShardingProperty userInfoShardingConfig = new ShardingProperty(7, DateUtils.string2Date("20236666667"), "user_info");

SHARDING_TABLE.put(orderInfoShardingConfig.getTableName(), orderInfoShardingConfig);
SHARDING_TABLE.put(userInfoShardingConfig.getTableName(), userInfoShardingConfig);
}
}

拦截器

import lombok.extern.slf4j.Slf4j;
import o2o.aspect.platform.function.template.service.TemplateMatchService;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.ReflectorFactory;
import org.apache.ibatis.reflection.factory.DefaultObjectFactory;
import org.apache.ibatis.reflection.factory.ObjectFactory;
import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory;
import org.apache.ibatis.reflection.wrapper.ObjectWrapperFactory;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Properties;

@Slf4j
@Component
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class ShardingTableInterceptor implements Interceptor {
    private static final ObjectFactory DEFAULT_OBJECT_FACTORY = new DefaultObjectFactory();
    private static final ObjectWrapperFactory DEFAULT_OBJECT_WRAPPER_FACTORY = new DefaultObjectWrapperFactory();
    private static final ReflectorFactory DEFAULT_REFLECTOR_FACTORY = new DefaultReflectorFactory();
    private static final String MAPPED_STATEMENT = "delegate.mappedStatement";
    private static final String BOUND_SQL = "delegate.boundSql";
    private static final String ORIGIN_BOUND_SQL = "delegate.boundSql.sql";
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
    private static final String SHARDING_MAPPER = "com.jd.o2o.inviter.promote.mapper.ShardingMapper";

    private ConfigUtils configUtils = SpringContextHolder.getBean(ConfigUtils.class);

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        boolean shardingSwitch = configUtils.getBool("sharding_switch", false);
        // 没开启分表 直接返回老数据
        if (!shardingSwitch) {
            return invocation.proceed();
        }

        StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
        MetaObject metaStatementHandler = MetaObject.forObject(statementHandler, DEFAULT_OBJECT_FACTORY, DEFAULT_OBJECT_WRAPPER_FACTORY, DEFAULT_REFLECTOR_FACTORY);
        MappedStatement mappedStatement = (MappedStatement) metaStatementHandler.getValue(MAPPED_STATEMENT);
        BoundSql boundSql = (BoundSql) metaStatementHandler.getValue(BOUND_SQL);
        String originSql = (String) metaStatementHandler.getValue(ORIGIN_BOUND_SQL);
        if (StringUtils.isBlank(originSql)) {
            return invocation.proceed();
        }

        // 获取表名
        String tableName = TemplateMatchService.matchTableName(boundSql.getSql().trim());
        ShardingProperty shardingProperty = ShardingPropertyConfig.SHARDING_TABLE.get(tableName);
        if (shardingProperty == null) {
            return invocation.proceed();
        }

        // 新表
        String shardingTable = getCurrentShardingTable(shardingProperty, new Date());
        String rebuildSql = boundSql.getSql().replace(shardingProperty.getTableName(), shardingTable);
        metaStatementHandler.setValue(ORIGIN_BOUND_SQL, rebuildSql);
        if (log.isDebugEnabled()) {
            log.info("rebuildSQL -> {}", rebuildSql);
        }

        return invocation.proceed();
    }

    @Override
    public Object plugin(Object target) {
        if (target instanceof StatementHandler) {
            return Plugin.wrap(target, this);
        }
        return target;
    }

    @Override
    public void setProperties(Properties properties) {}

    public static String getCurrentShardingTable(ShardingProperty shardingProperty, Date createTime) {
        String tableName = shardingProperty.getTableName();
        Integer days = shardingProperty.getDays();
        Date beginDate = shardingProperty.getBeginDate();

        Date date;
        if (createTime == null) {
            date = new Date();
        } else {
            date = createTime;
        }
        if (date.before(beginDate)) {
            return null;
        }
        LocalDateTime targetDate = SimpleDateFormatUtils.convertDateToLocalDateTime(date);
        LocalDateTime startDate = SimpleDateFormatUtils.convertDateToLocalDateTime(beginDate);
        LocalDateTime intervalStartDate = DateIntervalChecker.getIntervalStartDate(targetDate, startDate, days);
        LocalDateTime intervalEndDate = intervalStartDate.plusDays(days - 1);
        return tableName + "_" + intervalStartDate.format(FORMATTER) + "_" + intervalEndDate.format(FORMATTER);
    }
}

临界点数据不连续问题

分表方案有1个难点需要解决:
周期临界点数据不连续
。举例:假设要对operate_log(操作日志表)大表进行横向分表,每周一张表,分表明细可看下面表格。

第一周(operate_log_20240107_20240108) 第二周(operate_log_20240108_20240114) 第三周(operate_log_20240115_20240121)
1月1号 ~ 1月7号的数据 1月8号 ~ 1月14号的数据 1月15号 ~ 1月21号的数据

1月8号就是分表临界点,8号需要切换到第二周的表,但8号0点刚切换的时候,表内没有任何数据,这时如果业务需要查近一周的操作日志是查不到的,这样就会引发线上问题。

我决定采用数据冗余的方式来解决这个痛点。每个周期表都冗余一份上个周期的数据,用双倍数据量实现数据滑动的效果,效果见下面表格。

第一周(operate_log_20240107_20240108) 第二周(operate_log_20240108_20240114) 第三周(operate_log_20240115_20240121)
12月25号 ~ 12月31号的数据 1月1号 ~ 1月7号的数据 1月8号 ~ 1月14号的数据
1月1号 ~ 1月7号的数据 1月8号 ~ 1月14号的数据 1月15号 ~ 1月21号的数据

注:表格内第一行数据就是冗余的上个周期表的数据。

思路有了,接下来就要考虑怎么实现
双写(数据冗余到下个周期表)
,有2种方案:

1.在SQL执行完成返回结果前添加逻辑(可以用AspectJ 或 mybatis拦截器),如果SQL内的表名是当前周期表,就把表名替换为下个周期表,然后再次执行SQL。
此方案对业务影响大,相当于串行执行了2次SQL,有性能损耗。
2.监听增量binlog,京东内部有现成的数据订阅中间件DRC,读者也可以使用cannal等开源中间件来代替DRC,原理大同小异,此方案对业务无影响。

方案对比后,选择了对业务性能损耗小的方案二。

监听binlog并双写流程图

监听binlog数据双写注意点

1.提前上线监听程序,提前把老表数据同步到新的周期表。分表前只监听老表binlog就可以,分表前只需要把老表数据同步到新表。
2.切换到新表的临界点,为了避免丢失积压的老表binlog,需要同时处理新表binlog和老表binlog,这样会出现死循环同步的问题,因为老表需要同步新表,新表又需要双写老表。为了打破循环,需要先把双写老表消费堵上让消息暂时积压,切换新表成功后,再打开双写消费。

监听binlog数据双写代码

注:下面代码不能直接用,只提供基本思路

/**
 * 监听binlog ,分表双写,解决数据临界问题
*/
@Slf4j
@Component
public class BinLogConsumer implements MessageListener {
    
    private MessageDeserialize deserialize = new JMQMessageDeserialize();

    private static final String TABLE_PLACEHOLDER = "%TABLE%";

    @Value("${mq.doubleWriteTopic.topic}")
    private String doubleWriteTopic;

    @Autowired
    private JmqProducerService jmqProducerService;


    @Override
    public void onMessage(List<Message> messages) throws Exception {
        if (messages == null || messages.isEmpty()) {
            return;
        }
        List<EntryMessage> entryMessages = deserialize.deserialize(messages);
        for (EntryMessage entryMessage : entryMessages) {
            try {
                syncData(entryMessage);
            } catch (Exception e) {
                log.error("sharding sync data error", e);
                throw e;
            }
        }
    }

    private void syncData(EntryMessage entryMessage) throws JMQException {
        // 根据binlog内的表名,获取需要同步的表
        // 3种情况:
        // 1、老表:需要同步当前周期表,和下个周期表。
        // 2、当前周期表:需要同步下个周期表,和老表。
        // 3、下个周期表:不需要同步。
        List<String> syncTables = getSyncTables(entryMessage.tableName, entryMessage.createTime);
        
        if (CollectionUtils.isEmpty(syncTables)) {
            log.info("table {} is not need sync", tableName);
            return;
        }

        if (entryMessage.getHeader().getEventType() == WaveEntry.EventType.INSERT) {
            String insertTableSqlTemplate = parseSqlForInsert(rowData);
            for (String syncTable : syncTables) {
                String insertSql = insertTableSqlTemplate.replaceAll(TABLE_PLACEHOLDER, syncTable);
                // 双写老表发Q,为了避免出现同步死循环问题
                if (ShardingPropertyConfig.SHARDING_TABLE.containsKey(syncTable)) {
                    Long primaryKey = getPrimaryKey(rowData.getAfterColumnsList());
                    sendDoubleWriteMsg(insertSql, primaryKey);
                    continue;
                }
                mysqlConnection.executeSql(insertSql);
            }
            continue;
        }
    }


数据对比

为了保证新表和老表数据一致,需要编写对比程序,在上线前进行数据对比,保证binlog同步无问题。

具体实现代码不做展示,思路:新表查询一定量级数据,老表查询相同量级数据,都转换成JSON,equals对比。

作者:京东零售 张均杰

来源:京东云开发者社区 转载请注明来源