2024年1月

1、准备材料

开发板(
STM32F407G-DISC1

ST-LINK/V2驱动
STM32CubeMX软件(
Version 6.10.0

keil µVision5 IDE(
MDK-Arm

逻辑分析仪
nanoDLA

2、实验目标

使用STM32CubeMX软件配置STM32F407
通用定时器生成可变占空比PWM波形,并将其输出到LED灯引脚实现呼吸灯效果

3、实验流程

3.0、前提知识

STM32F407有10个通用定时器,其中TIM2、TIM3、TIM4和TIM5有4个捕获/比较通道,TIM9、TIM12两个定时器有2个捕获/比较通道,剩下的TIM10、TIM11、TIM13和TIM14只有一个捕获/比较通道,只有一个捕获/比较通道的通用定时器在CubeMX配置页面无“Slave Mode”和“Trigger Source”的选项,也不能联合通道,相较于拥有多个捕获/比较通道的定时器较为简单,这10个通用定时器具体特性如下表所示
(注释1)

对于通用定时器来说,其每个通道均可以实现①输入捕获、②输出比较、③PWM波生成三种功能,接下来我将分三个实验来分别介绍通用定时器的这三个功能,本小节只介绍通用定时器如何生成PWM波

在“GPIO输出-点亮LED灯”小节我们介绍了如何使用GPIO输出点亮开发板上的LED灯,从而知道了开发板上控制LED灯的四个引脚分别为PD12、PD13、PD14和PD15,这四个引脚刚好可以配置为通用定时器TIM4的四个输出通道,因此接下来我们将配置这四个引脚为通用定时器TIM4的4个通道的PWM输出引脚

3.1、CubeMX相关配置

请先阅读“STM32CubeMX STM32F4 HAL库 工程建立”实验3.4.1小节配置RCC和SYS

3.1.1、时钟树配置

系统时钟树配置与上一实验一致,均设置为STM32F407总线能达到的最高时钟频率,具体如下图所示

3.1.2、外设参数配置

在Pinout & Configuration页面右边芯片引脚预览Pinout view中找到LED灯的四个控制引脚PD12、PD13、PD14和PD15,依次左键单击并配置其功能为TIM4_CHx

然后在页面左侧功能分类栏目中点开Timers栏目,单击栏目下的TIM4,并将其Channel1~4全部配置为PWM Generation CHx

具体配置如下图所示

然后对启用的TIM4定时器的四个通道参数进行设置,下面对重要参数简单介绍

①PSC:该参数为时钟源预分频系数,由于TIM4时钟来源为APB1 Timer clocks (MHz),笔者这里为84MHz,因此经过8399+1=8400分频后的频率为10KHz;

②计数模式:可以选择向上、向下、中心对齐等计数方式;

③ARR:该参数决定了生成PWM的周期,这里设置为199,表示周期为20ms,注意这里周期不能太长
(注释2)

④预装载自动重装:设置为Enable后,当修改ARR的值时会在下一个UEV事件生效,否则表示不适用预装载,修改其值会立即生效

⑤PWM模式选择:可以选择模式1/模式2,这两种模式区别为生成的PWM波形不一样,选择PWM模式1且向上计数时,当Pulse值<计数值ARR时此时通道输出有效状态,否则为无效状态,当选择PWM模式2时刚好与模式1相反。如下面两个PWM波形中,上图为采用PWM模式1,通道极性为高电平时产生的PWM波,下图为采用PWM模式2,通道极性为高时产生的PWM波;

⑥Pulse值:即捕获/比较寄存器CRR的值,通过设置该参数可以决定PWM的脉冲宽度,这里设置为0,因为程序中可以动态修改该参数

⑦输出比较预装载:设置为Enable后,当修改Pulse的值时会在下一个UEV事件生效,否则会立即生效

⑧通道极性:设置通道有效状态

如下图所示为具体参数设置

3.1.3、外设中断配置

在Pinout & Configuration页面左边System Core/NVIC中勾选TIM4全局中断,然后选择合适的中断优先级即可

3.2、生成代码

请先阅读“STM32CubeMX STM32F4 HAL库 工程建立”实验3.4.3小节配置Project Manager

单击页面右上角GENERATE CODE生成工程

3.2.1、外设初始化调用流程

在工程代码主函数main()中调用MX_TIM4_Init()函数对定时器TIM4计数器参数及四个 PWM通道参数进行了配置

在该MX_TIM4_Init()函数中调用了HAL_TIM_PWM_Init()对定时器PWM输出进行了初始化

然后在HAL_TIM_PWM_Init()函数中调用了HAL_TIM_PWM_MspInit()函数对TIM4时钟和中断设置/使能

如下图所示为具体的TIM4四通道PWM输出初始化调用流程

3.2.2、外设中断调用流程

勾选了TIM4的全局中断之后,在工程文件stm32f4xx_it.c中生成了TIM4全局中断服务函数TIM4_IRQHandler()

该函数调用了HAL库的定时器中断统一处理函数HAL_TIM_IRQHandler(),最终调用PWM脉宽调制完成回调函数 HAL_TIM_PWM_PulseFinishedCallback(),该函数为虚函数

如下图所示为TIM4四通道PWM输出中断调用流程

3.2.3、添加其他必要代码

在tim.c中重新实现PWM脉宽调制完成回调函数HAL_TIM_PWM_PulseFinishedCallback(),在该回调函数中实现了对四个通道PWM的占空比重新调节的目的,即重新配置参数里的Pulse,实现了从最低占空比逐渐到最大占空比然后再逐渐减少至最低占空比的无限循环,具体代码如下所示

源代码如下

uint16_t pulseWidth=0;
uint8_t dirInc=1;
void HAL_TIM_PWM_PulseFinishedCallback(TIM_HandleTypeDef *htim)
{
    if(htim->Instance != TIM4)
        return;
 
    if(dirInc == 1)	
    {
        pulseWidth ++;
        if(pulseWidth >= 195)
        {
            pulseWidth = 195;
            dirInc = 0;	
        }
    }
    else
    {
        pulseWidth --;
        if(pulseWidth <= 5)
        {
            pulseWidth = 5;
            dirInc = 1;	
        }
    }
    __HAL_TIM_SET_COMPARE(&htim4, TIM_CHANNEL_1, pulseWidth);
    __HAL_TIM_SET_COMPARE(&htim4, TIM_CHANNEL_2, pulseWidth);
    __HAL_TIM_SET_COMPARE(&htim4, TIM_CHANNEL_3, pulseWidth);
    __HAL_TIM_SET_COMPARE(&htim4, TIM_CHANNEL_4, pulseWidth);
}

最后在主函数中以中断方式启动生成PWM即可,如下所示为启动代码

4、常用函数

/*启动定时器*/
HAL_StatusTypeDef HAL_TIM_Base_Start(TIM_HandleTypeDef *htim)
/*停止定时器*/
HAL_StatusTypeDef HAL_TIM_Base_Stop(TIM_HandleTypeDef *htim)
/*启动PWM输出*/
HAL_StatusTypeDef HAL_TIM_PWM_Start(TIM_HandleTypeDef *htim, uint32_t Channel)
/*停止PWM输出*/
HAL_StatusTypeDef HAL_TIM_PWM_Stop(TIM_HandleTypeDef *htim, uint32_t Channel)
/*以中断方式启动定时器*/
HAL_StatusTypeDef HAL_TIM_Base_Start_IT(TIM_HandleTypeDef *htim)
/*以中断方式停止定时器*/
HAL_StatusTypeDef HAL_TIM_Base_Stop_IT(TIM_HandleTypeDef *htim)
/*以中断方式启动PWM输出*/
HAL_StatusTypeDef HAL_TIM_PWM_Start_IT(TIM_HandleTypeDef *htim, uint32_t Channel)
/*以中断方式停止PWM输出*/
HAL_StatusTypeDef HAL_TIM_PWM_Stop_IT(TIM_HandleTypeDef *htim, uint32_t Channel)
/*PWM输出完毕回调函数*/
void HAL_TIM_PWM_PulseFinishedCallback(TIM_HandleTypeDef *htim)
/*设置PWM占空比函数*/
__HAL_TIM_SET_COMPARE(__HANDLE__, __CHANNEL__, __COMPARE__)

5、烧录验证

5.1、具体步骤

“设置TIM4的4个通道为PWM输出 -> 配置TIM4基本参数及4个PWM通道参数 -> NVIC中勾选TIM4全局中断并设置合适中断优先级 -> 在生成的工程tim.c文件中重新实现PWM脉宽调制完成回调函数void HAL_TIM_PWM_PulseFinishedCallback(TIM_HandleTypeDef *htim) -> 在回调函数中利用__HAL_TIM_SET_COMPARE(
HANDLE
,
CHANNEL
,
COMPARE
)宏定义设置4个通道PWM的占空比 -> 在主函数中使用HAL_TIM_Base_Start_IT(&htim4)启动定时器TIM4 -> 然后使用HAL_TIM_PWM_Start_IT(&htim4, TIM_CHANNEL_x)函数开启四个通道PWM波的输出”

5.2、实验现象

烧录程序,会发现开发板上电后四个LED灯亮度由最暗到最亮然后从最亮再到最暗循环往复,实现呼吸灯的效果

使用逻辑分析仪监测TIM4的四个通道输出引脚状态,可以看出TIM4的四个通道输出的PWM波型周期均为20ms,并且占空比不断地在发生变化

6、注释详解

注释1
:图片来源
STM32Cube高效开发教程(基础篇)
9.1小节
注释2
:这里的周期设置为20ms,也即频率为50Hz,人眼睛对于80Hz以上刷新频率完全没有闪烁感,因此如果你实现LED呼吸灯效果有闪烁情况的话,频率不能设置的太低

更多内容请浏览
OSnotes的CSDN博客

eBPF 中实现内核态代码与用户态代码是可以实时通信的,这主要靠
BPF 映射
来实现。

BPF 映射
是内核空间的一段内存,以
键值对
的方式存储。内核态程序可以直接访问 BPF 映射,用户态需要通过系统调用才能访问这段地址。

BPF 映射有很多种类型,如下表所示。

类型 说明
BPF_HASH 哈希表
BPF_ARRAY 数组
BPF_HISTOGRAM 直方图
BPF_STACK_TRACE 跟踪栈
BPF_PERF_ARRAY 硬件性能数组
BPF_PERCPU_HASH 单CPU哈希表
BPF_PERCPU_ARRAY 单CPU数组
BPF_LPM_TRIE 最长前缀匹配映射
BPF_PROG_ARRAY 尾调用程序数组
... ...

本文列举了使用 eBPF + BCC 实现的多个工具源码,索引如下表。

工具名称 工具用途 工具使用的 MAP 涉及的具体 MAP 用法
killsnoop 检测进程被 kill 时的状态 BPF_HASH 内核态传递数据
filetop 检测指定时间周期内读写文件的 top 列表 BPF_HASH 内核态向用户态传递数据
usercheck 检测当前进程执行的用户 BPF_HASH 用户态向内核态传递数据
pidpersec 检测周期内通过 fork 创建的进程总数 BPF_ARRAY 保存全局数据
vfsreadlat 周期性打印 vfs 文件读取操作耗时分布情况 BPF_HISTOGRAM 直方图统计
stacksnoop 打印内核某个函数执行时的调用栈信息 BPF_STACK_TRACE 内核函数跟踪栈

1 哈希表

哈希表与我们熟悉的 hash_map 实现和用法相似,都是由 key/value 组成,在 eBPF 程序中按需分配和释放。

我们来看几个应用了
BPF_HASH
的例子。

工具1 killsnoop

(改编自 Brendan Gregg 大神给出的源码)—— 用来检测进程被 kill 时的状态。

点击查看代码
from bcc import BPF
from bcc.utils import printb
from time import strftime

# define BPF program
bpf_text = """
#include <uapi/linux/ptrace.h>
#include <linux/sched.h>

struct val_t {
	u32 pid;
	int sig;
	int tpid;
	char comm[TASK_COMM_LEN];
};

struct data_t {
	u32 pid;
	int tpid;
	int sig;
	int ret;
	char comm[TASK_COMM_LEN];
};
// 定义 BPF_HASH 名称为 infotmp,key 类型为 u32,val 类型为 struct val_t
BPF_HASH(infotmp, u32, struct val_t);
BPF_PERF_OUTPUT(events);

int syscall__kill(struct pt_regs *ctx, int tpid, int sig) {
	u64 pid_tgid = bpf_get_current_pid_tgid();
	u32 pid = pid_tgid >> 32;
	u32 tid = (u32)pid_tgid;

	struct val_t val = {.pid = pid};
	if (bpf_get_current_comm(&val.comm, sizeof(val.comm)) == 0) {
		val.tpid = tpid;
		val.sig = sig;
		infotmp.update(&tid, &val);		// 根据 (key, val) 更新 BPF_HASH
	}

	return 0;
};

int do_ret_sys_kill(struct pt_regs *ctx) {
	struct data_t data = {};
	struct val_t *valp;
	u64 pid_tgid = bpf_get_current_pid_tgid();
	u32 pid = pid_tgid >> 32;
	u32 tid = (u32)pid_tgid;

	valp = infotmp.lookup(&tid);		// 根据 key 查找 BPF_HASH
	if (valp == 0) {
		// missed entry
		return 0;
	}

	bpf_probe_read_kernel(&data.comm, sizeof(data.comm), valp->comm);
	data.pid = pid;
	data.tpid = valp->tpid;
	data.ret = PT_REGS_RC(ctx);
	data.sig = valp->sig;

	events.perf_submit(ctx, &data, sizeof(data));
	infotmp.delete(&tid);		// 根据 key 删除 BPF_HASH 记录

	return 0;
}
"""
# initialize BPF
b = BPF(text=bpf_text)
kill_fnname = b.get_syscall_fnname("kill")
b.attach_kprobe(event=kill_fnname, fn_name="syscall__kill")
b.attach_kretprobe(event=kill_fnname, fn_name="do_ret_sys_kill")
# header
print("%-9s %-16s %-16s %-4s %-16s %s" % ("TIME", "PID", "COMM", "SIG",  "TPID", "RESULT"))

# process event
def print_event(cpu, data, size):
    event = b["events"].event(data)
    printb(b"%-9s %-16d %-16s %-4d %-16d %d" % (strftime("%H:%M:%S").encode('ascii'), event.pid, event.comm, event.sig, event.tpid, event.ret))

# loop with callback to print_event
b["events"].open_perf_buffer(print_event)
while 1:
    try:
        b.perf_buffer_poll()
    except KeyboardInterrupt:
        exit()

这个例子给出了
BPF_HASH
在内核态不同函数事件阶段之间传递消息的基本使用方式。主要有几个关键点:

  • BPF_HASH(infotmp, u32, struct val_t)
    :定义一个 BPF 哈希表,前三个参数分别为:哈希表名称,key 的类型,val 的类型;

  • infotmp.update(&tid, &val)
    :更新 (key, val);

  • infotmp.lookup(&tid)
    :查询 key 对应的 val;

  • infotmp.delete(&tid)
    :删除 (key, val);

这段程序最终的运行结果如下。

image

工具2 filetop

(同样改编自 Brendan Gregg 的源码)—— 用来检测指定时间周期内读写文件的 top 列表。

点击查看代码
#!/usr/bin/python3
from bcc import BPF
from time import sleep, strftime

# define BPF program
bpf_text = """
#include <uapi/linux/ptrace.h>
#include <linux/blkdev.h>
// the key for the output summary
struct info_t {
	unsigned long inode;
	dev_t dev;
	dev_t rdev;
	u32 pid;
	u32 name_len;
	char comm[TASK_COMM_LEN];		// 进程名
	// de->d_name.name may point to de->d_iname so limit len accordingly
	char name[DNAME_INLINE_LEN];		// 文件名
	char type;
};
// the value of the output summary
struct val_t {
	u64 reads;
	u64 writes;
	u64 rbytes;
	u64 wbytes;
};
BPF_HASH(counts, struct info_t, struct val_t);		// 定义 HASH 表,key 和 val 均为一个结构体

static int do_entry(struct pt_regs *ctx, struct file *file, char __user *buf, size_t count, int is_read) {
	u32 tgid = bpf_get_current_pid_tgid() >> 32;
	u32 pid = bpf_get_current_pid_tgid();

	// skip I/O lacking a filename
	struct dentry *de = file->f_path.dentry;
	int mode = file->f_inode->i_mode;
	struct qstr d_name = de->d_name;
	if (d_name.len == 0)
		return 0;

	// store counts and sizes by pid & file
	struct info_t info = {
		.pid = pid,
		.inode = file->f_inode->i_ino,
		.dev = file->f_inode->i_sb->s_dev,
		.rdev = file->f_inode->i_rdev,
	};
	bpf_get_current_comm(&info.comm, sizeof(info.comm));
	info.name_len = d_name.len;
	bpf_probe_read_kernel(&info.name, sizeof(info.name), d_name.name);
	// 区分操作的类型
	if (S_ISREG(mode)) {
		info.type = 'R';
	} else if (S_ISSOCK(mode)) {
		info.type = 'S';
	} else {
		info.type = 'O';
	}

	struct val_t *valp, zero = {};
	valp = counts.lookup_or_try_init(&info, &zero);		// 内核态尝试获取指定 key 的 val,若 val == NULL,则赋予一个默认值
	if (valp) {
		if (is_read) {
			valp->reads++;
			valp->rbytes += count;
		} else {
			valp->writes++;
			valp->wbytes += count;
		}
	}
	return 0;
}

int trace_read_entry(struct pt_regs *ctx, struct file *file, char __user *buf, size_t count) {
	return do_entry(ctx, file, buf, count, 1);
}

int trace_write_entry(struct pt_regs *ctx, struct file *file, char __user *buf, size_t count) {
	return do_entry(ctx, file, buf, count, 0);
}
"""

# initialize BPF
b = BPF(text=bpf_text)
b.attach_kprobe(event="vfs_read", fn_name="trace_read_entry")
b.attach_kprobe(event="vfs_write", fn_name="trace_write_entry")

# check whether hash table batch ops is supported
htab_batch_ops = True if BPF.kernel_struct_has_field(b'bpf_map_ops', b'map_lookup_and_delete_batch') == 1 else False
DNAME_INLINE_LEN = 32  # linux/dcache.h
interval = 1
exiting = 0

def sort_fn(counts):
	return (counts[1].rbytes + counts[1].wbytes + counts[1].reads + counts[1].writes)

while 1:
	try:
		sleep(interval)
	except KeyboardInterrupt:
		exit()

	print('Tracing... Output every %d secs. Hit Ctrl-C to end' % interval)
	print("%-7s %-16s %-6s %-6s %-7s %-7s %1s %s" % ("TID", "COMM", "READS", "WRITES", "R_Kb", "W_Kb", "T", "FILE"))

	# 用户态获取 BPF_HASH
	counts = b.get_table("counts")
	# 这里遍历整个 BPF_HASH
	for k, v in reversed(sorted(counts.items_lookup_and_delete_batch()
                                if htab_batch_ops else counts.items(),
                                key=sort_fn)):
		name = k.name.decode('utf-8', 'replace')
		if k.name_len > DNAME_INLINE_LEN:
			name = name[:-3] + "..."

        # print line
		print("%-7d %-16s %-6d %-6d %-7d %-7d %1s %s" % (k.pid,
			k.comm.decode('utf-8', 'replace'), v.reads, v.writes,
			v.rbytes / 1024, v.wbytes / 1024,
			k.type.decode('utf-8', 'replace'), name))
	# 用户态清空 BPF_HASH
	if not htab_batch_ops:
		counts.clear()

这个例子给出了
BPF_HASH
用于内核态向用户态传递数据的场景。主要有以下几个关键点:

  • BPF_HASH(counts, struct info_t, struct val_t)
    :本次声明的哈希表,key 和 val 均为一个结构体,这在实操上是常见的。不过要注意 eBPF 运行栈大小限制。

  • valp = counts.lookup_or_try_init(&info, &zero)
    :内核态的查找辅助函数,和
    lookup()
    用法相同,不过此函数安全性更高。若获取的 val 为空,则为其赋予一个初始值
    zero
    。(注意,获取的 val 是一个指针,可以直接操作器结构体数据)

  • counts = b.get_table("counts")
    :用于用户态获取定义的
    eBPF_HASH

  • counts.items()
    :返回所有的 (key, val),用于用户态遍历哈希表。

  • counts.clear()
    :清空整张哈希表。

  • htab_batch_ops
    :这段代码定义了一个特殊的标志位,用来判断当前版本的
    eBPF
    是否支持
    items_lookup_and_delete_batch()
    辅助函数。

  • items_lookup_and_delete_batch()
    :内核 5.6 版本才引入该函数。作用同
    items() + clear()
    ,即,获取所有的 (key, val),并清空整个哈希表。

这段代码通过
interval
变量控制检测周期(当前为 1s),并按照这个周期,检测打印进程访问文件的一个热度表,按照字节降序排列。如下图所示:

image

工具3 usercheck

(改编自《Learning eBPF》一书第二章给给出的部分代码)——用来检测当前进程执行的用户。

点击查看代码
#!/usr/bin/python3
from bcc import BPF
from ctypes import *

bpf_text = '''
struct user_msg_t {
	char message[12];
};

BPF_HASH(config, u32, struct user_msg_t);
BPF_PERF_OUTPUT(events);

struct data_t {
	int pid;
	int uid;
	char command[16];
	char message[12];
};

int check_user(void *ctx) {
	struct data_t data = {};
	struct user_msg_t *p;
	char message[12] = "Hello World";

	data.pid = bpf_get_current_pid_tgid() >> 32;
	data.uid = bpf_get_current_uid_gid() & 0xFFFFFFFF;
	bpf_get_current_comm(&data.command, sizeof(data.command));

	p = config.lookup(&data.uid);
	if (p != 0) {
		bpf_probe_read_kernel(&data.message, sizeof(data.message), p->message);
	} else {
		bpf_probe_read_kernel(&data.message, sizeof(data.message), message);
	}

	events.perf_submit(ctx, &data, sizeof(data));
	return 0;
}
'''

# initialize BPF
b = BPF(text=bpf_text)
execve_fnname = b.get_syscall_fnname("execve")
b.attach_kprobe(event=execve_fnname, fn_name="check_user")
# 用户态获取 HASH
config = b.get_table("config")
# 用户态修改 HASH
config[c_int(0)] = create_string_buffer(b"Hello, Root!")
config[c_int(1000)] = create_string_buffer(b"Hello, User 501!")

print("%-10s %-10s %-16s %s" % ("PID", "UID", "COMM", "MSG"))
def print_event(cpu, data, size):
	event = b["events"].event(data)
	print("%-10d %-10d %-16s %s" % (event.pid, event.uid, event.command.decode('utf-8'), event.message.decode('utf-8')))

b["events"].open_perf_buffer(print_event)
while 1:
	try:
		b.perf_buffer_poll()
	except KeyboardInterrupt:
		exit()

这个例子给出了一个用户态主动修改
BPF_HASH
的情况。关键点:

  • config[c_int(0)] = create_string_buffer(b"Hello, Root!")
    :修改的方式与常规的 hash_map 类似,但是,key 和 val 的类型转换是必不可少的步骤。

python 到 C 的类型转换可以通过 ctypes 库来实现。可直接通过
pip3 install stypes
安装。

注意:

用户态可以通过这种方式向内核态传入数据,但千万要慎之又慎用这种方式去控制内核 BPF 程序的执行流程。
内核态无法阻塞等待用户态处理复杂逻辑后的响应
(如创建另一个进程)。
[引用-1]

举例来说,当这个程序不是在最初就设定了
BPF_HASH
的值,而是通过内核传出的用户
uid
动态地去打开系统文件检索用户
username
,那么这个工具将无法实现预期功能了。这是因为,在当前进程执行的
execve
挂载点,用户态并没有来得及执行下一个
open
进程,因此,其通过
lookup()
获得的
username
将始终为空。

运行结果:

image

2 数组

工具4 pidpersec

(改编自 Brendan Gregg 给出的源码)—— 用于检测周期内通过
fork
创建的进程总数。

点击查看代码
#!/usr/bin/python3
from bcc import BPF
from ctypes import c_int
from time import sleep, strftime

# load BPF program
b = BPF(text="""
#include <uapi/linux/ptrace.h>
enum stat_types {
	S_COUNT = 1,
	S_MAXSTAT
};
BPF_ARRAY(stats, u64, S_MAXSTAT);		// 创建 ARRAY,名称为 stats,val 的类型为 u64,val 的最大数量为 S_MAXSTAT = 2

static void stats_increment(int key) {
	stats.atomic_increment(key);		// 索引为 key 的 val 原子自增操作
}

void do_count(struct pt_regs *ctx) { stats_increment(S_COUNT); }
""")
b.attach_kprobe(event="sched_fork", fn_name="do_count")

# stat indexes
S_COUNT = c_int(1)
interval = 1		# 打印周期

# header
print("Tracing... Ctrl-C to end.")
# output
while (1):
	try:
		sleep(interval)
	except KeyboardInterrupt:
		exit()

	print("%s: PIDs/sec: %d" % (strftime("%H:%M:%S"),
		b["stats"][S_COUNT].value))
	b["stats"].clear()

同为 BPF 映射类型,
BPF_ARRAY
可以被看作为一类特殊的
BPF_HASH
( ARRAY 的 key 从 0 开始,为非零整数),但有一下几点区别。

  • BPF_ARRAY
    在初始化时会预先分配空间,并设置为零。
  • BPF_ARRAY
    的大小是固定的,其元素不能被删除。
  • BPF_ARRAY
    通常用于保存 val 可能会更新的信息,由于 key 默认为非负整数索引,因此,其固定索引的 val 通常代表一个意义。
  • BPF_ARRAY

    BPF_HASH
    一样,在执行更新操作时,不能保证原子性

    。需要进行额外的手段来保证原子操作。

实际上, HASH 和 ARRAY 在初始化时,都有一个默认的最大
size
(10240)。只不过在使用 ARRAY 时,通常会指定其最大
size
,以免预分配资源空间的浪费。

在此代码中,给出了一个
BPF_ARRAY
的常见用法,即,作为一个全局的计数器(跨用户态和内核态)。当然,若你问用
BPF_HASH
可不可以实现呢?答案自然是可以。数据结构并没有好坏之分,只有适合不适合之别。在此代码中:

  • BPF_ARRAY(stats, u64, S_MAXSTAT)
    :定义一个数组,接受三个参数,分别为数组名,数组元素类型,数组大小。

  • stats.atomic_increment(key)
    :由于修改数组元素时,不能保证原子性,因此这里需要手动调用辅助函数
    atomic_increment()
    为指定 key 的 val 做原子自增。

此工具运行截图。周期性打印
fork
出来的进程数量。

image

3 直方图

工具5 vfsreadlat

(改编自 Brendan Gregg 给出的源码)—— 用于周期性打印 vfs 文件读取操作耗时分布情况。

点击查看代码
from bcc import BPF
from time import sleep

bpf_src = '''
#include <uapi/linux/ptrace.h>
BPF_HASH(start, u32);
BPF_HISTOGRAM(dist);		// 创建一个直方图映射,名称为 dist

int do_entry(struct pt_regs *ctx) {
	u32 pid;
	u64 ts;
	pid = bpf_get_current_pid_tgid();
	ts = bpf_ktime_get_ns();
	start.update(&pid, &ts);
	return 0;
}
int do_return(struct pt_regs *ctx) {
	u32 pid;
	u64 *tsp, delta;
	pid = bpf_get_current_pid_tgid();
	tsp = start.lookup(&pid);
	if (tsp != 0) {
		delta = bpf_ktime_get_ns() - *tsp;
		dist.increment(bpf_log2l(delta / 1000));	// 修改直方图数据,key 为 bpf_log2l(delta / 1000),即 千分之差值的 2 的对数
		start.delete(&pid);
	}
	return 0;
}
'''
# load BPF program
b = BPF(text = bpf_src)
b.attach_kprobe(event="vfs_read", fn_name="do_entry")
b.attach_kretprobe(event="vfs_read", fn_name="do_return")
# header
print("Tracing... Hit Ctrl-C to end.")

interval = 5
count = -1
loop = 0
while (1):
	if count > 0:
		loop += 1
		if loop > count:
			exit()
	try:
		sleep(interval)
	except KeyboardInterrupt:
		pass; exit()

	print()
	b["dist"].print_log2_hist("usecs")		# 打印直方图
	b["dist"].clear()

这个例子给出了一个新的
BPF_MAP
类型:直方图
BPF_HISTOGRAM
。有以下几个关键:

  • BPF_HISTOGRAM(dist)
    :创建了一个名为 dist 的直方图,默认值
    BPF_HISTOGRAM(name, key_type=int, size=64)

  • dist.increment()
    :直方图调用
    increment()
    将值自增,来进行统计。

  • bpf_log2l(delta / 1000)
    :该函数返回
    log_2(delta/1000)
    的值。这样做是为了压缩直方图统计范围。

  • b["dist"].print_log2_hist("usecs")
    :指定统计列名称为
    usecs
    ,打印直方图。

输出结果:

image

4 跟踪栈

工具6 stacksnoop

(改编自 Brendan Gregg 给出的源码)—— 用于打印内核某个函数执行时的调用栈信息。

点击查看代码
from __future__ import print_function
from bcc import BPF
import argparse
import time

parser = argparse.ArgumentParser(
	description="Trace and print kernel stack traces for a kernel function",
	formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("function", help="kernel function name")

function = parser.parse_args().function
offset = False

# define BPF program
bpf_text = """
#include <uapi/linux/ptrace.h>
#include <linux/sched.h>

struct data_t {
	u64 stack_id;
	u32 pid;
	char comm[TASK_COMM_LEN];
};
BPF_STACK_TRACE(stack_traces, 128);		// 定义跟踪栈
BPF_PERF_OUTPUT(events);

void trace_stack(struct pt_regs *ctx) {
	u32 pid = bpf_get_current_pid_tgid() >> 32;
	struct data_t data = {};
	data.stack_id = stack_traces.get_stackid(ctx, 0);		遍历通过 ctx 找到的堆栈,返回它的唯一 ID
	data.pid = pid;
	bpf_get_current_comm(&data.comm, sizeof(data.comm));
	events.perf_submit(ctx, &data, sizeof(data));
}
"""
# initialize BPF
b = BPF(text=bpf_text)
b.attach_kprobe(event=function, fn_name="trace_stack")

TASK_COMM_LEN = 16  # linux/sched.h

matched = b.num_open_kprobes()		# 判断输入的 function 是否合法
if matched == 0:
	print("Function \"%s\" not found. Exiting." % function)
	exit()

stack_traces = b.get_table("stack_traces")		# 获取跟踪栈
start_ts = time.time()

# header
print("%-18s %-12s %-6s %-3s %s" % ("TIME(s)", "COMM", "PID", "CPU", "FUNCTION"))

def print_event(cpu, data, size):
	event = b["events"].event(data)
	ts = time.time() - start_ts
	print("%-18.9f %-12.12s %-6d %-3d %s" % (ts, event.comm.decode('utf-8', 'replace'), event.pid, cpu, function))

	for addr in stack_traces.walk(event.stack_id):		# 根据 stack.id 遍历堆栈
		sym = b.ksym(addr, show_offset=offset).decode('utf-8', 'replace')		# 将一个内核地址翻译成内核函数名
		print("\t%s" % sym)

	print()

b["events"].open_perf_buffer(print_event)
while 1:
	try:
		b.perf_buffer_poll()
	except KeyboardInterrupt:
		exit()

这个例子给出了
BPF_STACK_TRACE
跟踪栈的用法。关键在于:

  • BPF_STACK_TRACE(stack_traces, 128)
    :定义一个跟踪栈,深度为 128。

  • stack_traces.get_stackid(ctx, 0)
    :遍历通过 ctx 找到的堆栈,返回它的唯一 ID。

  • stack_traces = b.get_table("stack_traces")
    :用户态获取跟踪栈。

  • for addr in stack_traces.walk(event.stack_id)
    :根据跟踪栈的唯一 id 遍历栈内元素,函数调用地址信息。拿到地址信息后,通过
    b.ksym()
    函数将其翻译为内核函数名。注意,
    b.ksym()
    函数 接收一个
    show_offset
    参数,用于控制是否显示偏移地址。

  • matched = b.num_open_kprobes()
    :另外,这段程序最终接收一个参数,作为跟踪的内核函数名。因此需要判断其是否合法。
    num_open_kprobes()
    将返回能够匹配上的内核探针数量,这里被应用于检测输入的内核函数是否合法。

跟踪一个函数
do_execve
,stacksnoop 运行效果如下:

image

总结

篇幅有限,本文先介绍这六个工具,主要涵盖了
BPF_HASH / BPF_ARRAY / BPF_HISTOGRAM / BPF_STACK_TRACE
这四种最常见的
BPF 映射
的使用方法。后面有精力的话,再补充
BPF 映射
的其他类型在 BCC 框架中的用法。

BCC 框架相关的中文材料目前不是很多,参考书也比较有限,本文涉及的源码大多改编自 Brendan Gregg 大神的开源项目,项目地址(
https://github.com/iovisor/bcc
)。感兴趣的朋友可以一起交流学习!

时间过得真快,想想十年前还在dudu建的qq群里和大家谈笑风生,如何跟着大佬学习技术。

10 年过去了,我依然没有成为技术大佬,博客园也没有上市敲钟,大家都喜欢低调且平淡的生活,不得不感慨一下,世道艰难,苟着才能活得久。

这么多年,有很多博客类网站出现过,又都消失了,博客园还在,不得不说博客园还是很强大的

在过去的工作中,有段时间深度参与产品的运营,所以分析一下博客园的运营思路

运营核心思路;坚持做自己,用好当代各类商业化平台,降低成体运营成本和运营风险,实现用户规模增长和收入增长,直到上市

博客园一直以来都是坚持做纯净的博客,没有商业化相关的内容,这点有点像bi站,bi站也是因为没有广告,在二次元站稳脚跟上市,博客园也在开发者群体中站稳了脚跟

bi站已经上市了,博客园还没有,这就是博客园的投资机会

博客园和bi站的区别的是

bi站属于消费累产品,看视频打发时间,以及倒流到游戏业务,

博客园属于生产工具类产品,记录学习过程,分享学习心得,目前没有什么商业业务。

两者的特点用户忠诚度都很高,

用户忠诚度很高的产品就像九牧王,和博客园一样,产品也单一,九牧王只有裤子业务

因为是生成工具类产品,学习是反人性,这和反人性的健身 app keep 很像,运动和学习都是反人性的,运动和学习出色,都可以作为炫耀和打卡的产品发朋友圈,而且一个人学习运动比较库枯燥无聊,所以才需要社群进行交流,将运动和学习坚持到底

keep 也上市了,博客园还没有,这就是博客园的投资机会

博客园和keep 虽然有点相似,但在产品性质上还有些区别

运动一次就可以拍照发朋友圈,尤其是拍照比较漂亮的的,能引来朋友圈无数点赞,然后会有大量的人下载keep

但很难说,学习1个小时就写篇博客,然后发朋友圈,发了也没几个人能看懂,点赞估计也比较少,来注册博客园写文章的就更少了

而且写一篇博客要比一次运动困难多了,所以用户增长的也很慢,所以博客园要学习keep 记录用户成长,增加成长分享,keep里面无处不在的分享按钮

因为是学习、记录学习过程,和分享学习心得,不得不说这就和樊登读书会、当当李国庆的读书会很像,

这些读书会 都没有上市,但钱赚了不少。

这里需要画重点,钱可以赚,但没有上市,所以这是一个流量变现的业务

结合博客园目前迫切需要赚钱,而不是上市,这个就是运营中的重点,

运营就像做产品,最忌讳发明创造,和天马行空,抄作业是最稳妥的

结合之前的分析,以bibili、keep、樊登读书会、李国庆读书会,整体为参考,博客园的运营思

路应该是这样的

1 第一步

博客园已经做了,而且自己做的很好,没有自己开发vpi充值业务,而是是用淘宝、京东、小红书之类的平台做充值业务,这就很好。符合我刚才说的做自己,和用好当前的各类商业化平台

2 第二步

以会员为基础,开展线上读书会业务,这个可以作为试点是几次看看参与人数

可以99 的会员允许带1名好友参加

399的会员允许带3名好友参加

腾讯会议一次也就那么多人,搞线上读书会还是比较容易的

博客园有很多主题、计算机考研,前端、后端等等,依次排开能搞好多

3 第三步

第二部做的不错,在做第三步,做这种读书会业务其实没有门槛,技术简单,操作也不复杂,很多人也在做,就是用户忠诚度太低,用户流失太严重,所以才会出现樊登读书会,这是品牌类型的读书会。

而博客园在品牌和用户忠诚度上都很能打,这也是做这个的一个优势。

自己招个运营做,或者找第三方合作,博客园提供品牌和嘉宾,参与收入分成,还有线上曝光和支持

读书会业务开展会增加用户注册和发文章的数量,实现用户规模和收入的增长

别的读书会只能卖书,我们厉害了,还能卖各种云服务

4 第四步

读书会用户数的增加,就需要促进用户分享,像keep 一样,学习给用户颁奖,各种各样的组织学习活动和办法各种各样的奖状,像keep学习颁奖,keep的奖是需要花钱买的,也是一个收入来源

例如打卡学习业务,30天学会git,学会了用报名费给搬个奖或者返还报名费用

5第五步

哔哩哔哩 是把用户倒流到游戏厂商赚钱,博客园作为一个生产力工具,不适合倒流到消费类产品,可以倒流到生活和工作类产品,满足日常生活和工作类的

1 低价产品:结合一些制造商,制作博客园周边,或者开发者群体常用的服务

2 中等价位产品:九牧王、海澜之家、其他户外运动品牌、云计算品牌

3 高价产品:金融理财,买车等

一:背景

1. 讲故事

在我的分析之旅中,遇到过很多程序的故障和杀毒软件扯上了关系,有杀毒软件导致的程序卡死,有杀毒软件导致的程序崩溃,这一篇又出现了一个杀毒软件导致的程序非托管内存泄露,真的是分析多了什么鬼都能撞上。

前几天有位朋友找到过,我他们的程序内存在慢慢的泄露,最后程序会出现崩溃,不知道是什么导致的,让我帮忙看一下怎么回事,简单分析后发现是非托管泄露,让朋友开启了ust并在内存超出预期时抓了一个dump下来,接下来就是分析了。

二:WinDbg 分析

1. 到底是哪里的泄露

相信一直追这个系统的朋友应该知道怎么判断,很简单, 看下
MEM_COMMIT

HEAP
指标即可,使用
!address -summary
命令输出如下:


0:000> !address -summary

--- Usage Summary ---------------- RgnCount ----------- Total Size -------- %ofBusy %ofTotal
Heap                                    678          93bd0000 (   2.308 GB)  65.39%   57.71%
<unknown>                              2610          3005d000 ( 768.363 MB)  21.26%   18.76%
Free                                    515          1e133000 ( 481.199 MB)           11.75%
Image                                  1526          118f8000 ( 280.969 MB)   7.77%    6.86%
Other                                    19           804e000 ( 128.305 MB)   3.55%    3.13%
Stack                                   390           4900000 (  73.000 MB)   2.02%    1.78%
TEB                                      73             49000 ( 292.000 kB)   0.01%    0.01%
PEB                                       1              1000 (   4.000 kB)   0.00%    0.00%

--- State Summary ---------------- RgnCount ----------- Total Size -------- %ofBusy %ofTotal
MEM_COMMIT                             4477          c51f9000 (   3.080 GB)  87.25%   77.00%
MEM_FREE                                515          1e133000 ( 481.199 MB)           11.75%
MEM_RESERVE                             820          1ccc4000 ( 460.766 MB)  12.75%   11.25%


--- Largest Region by Usage ----------- Base Address -------- Region Size ----------
Heap                                        38be0000            fd0000 (  15.812 MB)
<unknown>                                     cc6000           7fd9000 ( 127.848 MB)
Free                                        f7590000           88bf000 ( 136.746 MB)
Image                                       5ab2c000            e41000 (  14.254 MB)
Other                                        8cee000           7fb0000 ( 127.688 MB)
Stack                                       14610000             fd000 (1012.000 kB)
TEB                                         ffe51000              1000 (   4.000 kB)
PEB                                         fffde000              1000 (   4.000 kB)

从卦中看,3G的提交内存,Heap 吃了 2.3G,也就表明是 NTHEAP 的泄露,这是一块非托管内存区域,一般都是 C/C++ 语言用 malloc 或者 new 分配的内存,接下来深挖下 NTHEAP 即可,使用
!heap -s
命令。


0:000> !heap -s
SEGMENT HEAP ERROR: failed to initialize the extention
NtGlobalFlag enables following debugging aids for new heaps:
    stack back traces
LFH Key                   : 0x7c31b93c
Termination on corruption : DISABLED
  Heap     Flags   Reserv  Commit  Virt   Free  List   UCR  Virt  Lock  Fast 
                    (k)     (k)    (k)     (k) length      blocks cont. heap 
-----------------------------------------------------------------------------
00200000 08000002  178304 138172 178304  42165  1747    56    0     34   LFH
    External fragmentation  30 % (1747 free blocks)
006c0000 08001002    1088    224   1088     18     8     2    0      0   LFH
00590000 08041002     256      4    256      2     1     1    0      0      
006a0000 08001002    3136   1184   3136    153    82     3    0      0   LFH
    External fragmentation  12 % (82 free blocks)
00570000 08001002    1088    224   1088     18     8     2    0      0   LFH
...   
15710000 08001002 2185152 2179432 2185152    442  1323   139    0      0   LFH
...

从卦中信息看,
15710000
吃了2.18G,也就表明它是吃内存的主力,这里简单说一下,
00200000
是默认的进程堆,除了这个之外都是用非托管代码调用 Win32API 的
HeapCreate
方法创建出来的,接下来就得看下是什么代码创建的。

2. 到底是谁创建的

要想知道是谁创建的,一定要在注册表中开启 ust 选项,大家可以了解下
gflags.exe
工具,参考如下:


PS C:\Users\Administrator\Desktop> gflags /i Example_17_1_7.exe +ust
Current Registry Settings for Example_17_1_7.exe executable are: 00001000
    ust - Create user mode stack trace database

开启之后 win32api 的 HeapAlloc 方法的内部中会到注册表中看一下是否有 ust 值,如果有就会记录分配的调用栈,这样就知道是谁创建的,抓取dump后可以用windbg的
!gflag
命令看下是否开启成功,参考输出如下:


0:000> !gflag
Current NtGlobalFlag contents: 0x00001000
    ust - Create user mode stack trace database

接下来对
Heap=15710000
进行一个 block 分组,看下是否有一些有价值的信息。


0:000> !heap -stat -h 15710000
 heap @ 15710000
group-by: TOTSIZE max-display: 20
    size     #blocks     total     ( %) (percent of total busy bytes)
    2cb dea4 - 26dd40c  (9.58)
    2d7 c778 - 23675c8  (8.72)
    d0 26d64 - 1f8e140  (7.78)
    7c5 2c50 - 1584990  (5.30)
    cb 14449 - 10125e3  (3.96)
    83c 16c2 - bb6578  (2.89)
    cf9 bc4 - 98a1a4  (2.35)
    1f51 3da - 789dfa  (1.86)
    ...

从卦中数据看没有哪个
size
占用的特别高,接下来就依次从高往低看,发现都是和
prthook
有关,参考输出如下:


0:000> !heap -flt s 2cb
    _HEAP @ 15710000
      HEAP_ENTRY Size Prev Flags    UserPtr UserSize - state
        1571f948 005d 0000  [00]   1571f960    002cb - (busy)
        15649d70 005d 005d  [00]   15649d88    002cb - (busy)
        ...
        3ec4b900 005d 005d  [00]   3ec4b918    002cb - (busy)
        3ec4bbe8 005d 005d  [00]   3ec4bc00    002cb - (busy)
        3ec4bed0 005d 005d  [00]   3ec4bee8    002cb - (busy)
        3ec4c1b8 005d 005d  [00]   3ec4c1d0    002cb - (busy)
        ...

0:000> !heap -flt s 2d7
      HEAP_ENTRY Size Prev Flags    UserPtr UserSize - state
        15666666660 005e 0000  [00]   15665568    002d7 - (busy)
        1566b930 005e 005e  [00]   1566b948    002d7 - (busy)
        1566df98 005e 005e  [00]   1566dfb0    002d7 - (busy)
        1566e288 005e 005e  [00]   1566e2a0    002d7 - (busy)
        ...
        39e3acc8 0061 0061  [00]   39e3ace0    002d7 - (busy)
        39e3c508 0061 0061  [00]   39e3c520    002d7 - (busy)
        39e3c810 0061 0061  [00]   39e3c828    002d7 - (busy)
        39e3cb18 0061 0061  [00]   39e3cb30    002d7 - (busy)
        39e3ce20 0061 0061  [00]   39e3ce38    002d7 - (busy)

0:000> !heap -p -a 3ec4c1b8
    address 3ec4c1b8 found in
    _HEAP @ 15710000
      HEAP_ENTRY Size Prev Flags    UserPtr UserSize - state
        3ec4c1b8 005d 0000  [00]   3ec4c1d0    002cb - (busy)
        771dd969 ntdll!RtlAllocateHeap+0x00000274
        153e7439 prthook!MyShowWindow+0x0001d1f9
        153e543c prthook!MyShowWindow+0x0001b1fc
        153476ab prthook+0x000276ab

0:000> !heap -p -a 39e3ce20
    address 39e3ce20 found in
    _HEAP @ 15710000
      HEAP_ENTRY Size Prev Flags    UserPtr UserSize - state
        39e3ce20 0061 0000  [00]   39e3ce38    002d7 - (busy)
        771dd969 ntdll!RtlAllocateHeap+0x00000274
        153e7439 prthook!MyShowWindow+0x0001d1f9
        153e543c prthook!MyShowWindow+0x0001b1fc
        153476ab prthook+0x000276ab

3. prthook 到底为何方神圣

从前一节的卦中数据看,貌似
prthook
在不断的弹框,在弹框中用
ntdll!RtlAllocateHeap
分配了非托管内存,那
prthook
到底是个啥呢?可以用
lmvm
看下。


0:000> lmvm prthook
Browse full module list
start    end        module name
15320000 155dc000   prthook    (export symbols)       prthook.dll
    Loaded symbol image file: prthook.dll
    Image path: C:\Windows\SysWOW64\prthook.dll
    Image name: prthook.dll
    Browse all global symbols  functions  data
    Timestamp:        Thu Jun 22 17:16:53 2017 (594B8B05)
    CheckSum:         001F4972
    ImageSize:        002BC000
    File version:     16.17.6.22
    Product version:  16.17.6.22
    File flags:       0 (Mask 3F)
    File OS:          40004 NT Win32
    File type:        2.0 Dll
    File date:        00000000.00000000
    Translations:     0804.04b0
    Information from resource tables:
        CompanyName:      Beijing VRV Software Co.,Ltd
        ProductName:      edp
        InternalName:     prthook
        OriginalFilename: prthook.dll_DB
        ProductVersion:   16, 17, 6, 22
        FileVersion:      16, 17, 6, 22
        FileDescription:  prthook_DB
        LegalCopyright:   Copyright (C) 2016 Beijing VRV Software Co.,Ltd
        Comments:         中英文版

从卦中数据看,
prthook.dll
所属公司为
Beijing VRV Software Co.,Ltd
,无语的是把这个第三方的dll放在Windows的系统目录
C:\Windows\SysWOW64
下,容易让人觉得有点
鸠占鹊巢
,接下来查一下百度,发现是
北信源
的,截图如下:

有了这些信息,告诉朋友让客户把这个安全软件卸载掉就可以了。

三:总结

程序的故障如果不是我们的代码造成的,你想通过排查代码找出问题是不可能的事情,追过这个系列的朋友应该深有体会,常见的外在因素有:

  • 杀毒软件
  • 电磁辐射
  • 显卡问题

图片名称

8000字讲清楚程序性能优化。

本文聊一个程序员都会关注的问题:性能。

当大家谈到“性能”时,你首先想到的会是什么?

  • 是每次请求需要多长时间才能返回?
  • 是每秒钟能够处理多少次请求?
  • 还是程序的CPU和内存使用率高不高?

这些问题基本上反应了性能关注的几个主要方面:响应时间、吞吐量和资源利用率。在这三个方面中,如果能够实现更低的响应时间和更高的吞吐量,那么资源利用率也必然得到优化。这是因为我们的工作总是在有限的硬件、软件、时间和预算等的约束下进行的,而优化前两个方面将有助于更有效地利用这些资源。

因此,本文将主要围绕响应时间和吞吐量的优化展开介绍,包括相关领域的定义和软硬件方面的优化方法。

响应时间

想象一下,你在餐厅点了一道菜,响应时间就是从你下单到菜品送到你面前的这段时间。

在计算机里,它指的是单次请求或指令处理的时间。

1.1 软件层面的优化

软件层面的优化主要是通过减少非必要的处理来降低响应时间,包括减少IO请求和优化代码逻辑。

1.1.1 减少IO请求

减少IO请求的意义

IO就是输入输出,减少IO处理就是减少对输入输出设备的访问。在计算机中,除了CPU和内存,其它的键盘、鼠标、显示器、音响、硬盘、网卡等等都属于输入输出设备。减少对这些设备的访问为什么有用呢?

首先让我们了解下程序的运行过程,大概是这样的:

操作系统首先将程序的二进制指令从硬盘加载到内存,然后再从内存加载到CPU,然后CPU就按照二进制指令的逻辑进行处理,指令是加减乘除的就做加减乘除,指令是跳转的就做跳转,指令是访问远程网络的就通过操作系统+网卡发起网络请求,指令是访问文件的就通过操作系统+硬盘进行文件读写。

在CPU的这些处理中,逻辑判断、跳转、加减乘除都是很快的,因为它们只在CPU内部进行处理,CPU中每条指令的运行时间极短;但是如果要进行网络请求、文件读写,速度就会大幅下降,这里边有很多的损耗,包括系统调用的时间消耗、总线的传输时间消耗、IO设备的处理时间消耗、远程过程的处理时间消耗等。

我们可以通过一组数字直观感受下,假设CPU的主频是1GHZ,执行1条指令需要1个时钟周期,那么执行1条指令的时间就是1纳秒。而从硬盘读取数据的时间消耗要远大于此,如果是机械硬盘,大概在5-20毫秒,百万倍的差距;如果换成固态硬盘,情况会好点,普遍都在0.1毫秒以下,部分能达到微秒级,但也是万倍、十万倍以上的差距。

所以减少IO请求能极大的降低响应时间。那么我们可以采取什么方法呢?

硬盘IO的优化

包括降低硬盘读写频次和采用顺序读写。

对于频繁使用的数据,根据业务情况,我们可以把它们放到内存中,后续都从内存读取,速度会快上不少;对于需要写入硬盘的数据,根据业务情况,我们可以先在内存中攒几条,达到一定的数据量之后再写入硬盘,其实操作系统本身就会缓存写入,很多语言写数据到硬盘之后需要做一个flush的操作,就是用来实现最终写入硬盘的。我们使用Memcached、Redis等都是这个方案的延伸,只不过它们被封装成了独立的远程服务。

采用顺序读写主要针对的是机械硬盘,因为机械硬盘挪动悬臂和磁头比较耗时,顺序读写可以尽量减少机械移动情况的发生,进而提升读写速度。

网络IO的优化

网络IO的延迟一般都要在毫秒级以上,对网络IO的优化,除了类似硬盘IO优化中的的降低IO频次,另外还可以通过使用更高效的传输协议、降低数据传输量等方式进行优化。

对于需要频繁通过网络获取的数据,比如访问远程服务、数据库等获取的数据,我们可以在本地内存进行缓存,访问内存比访问网络要快很多,只需要选择合适的缓存存活时间就好了。

对于短时间内大量需要通过网络获取的数据,我们可以采用批量获取的方式,比如有一个列表,列表中的每一条数据都要调用接口去获取某个相同的字段,列表有100条数据就要请求网络100次,如果批量获取,则只需要一次网络请求就把100条数据全部拿到,这可以避免大量的网络IO时间消耗,显著降低业务处理的响应时间。

我们一般认为http是无状态的,但是它的底层是基于TCP协议的,这样每次发起http请求时,网络底层还是要先建立一个TCP连接,然后本次http请求结束后再释放这个连接。你可能听说过TCP的三次握手、四次挥手、TCP包的顺序保证等,这些都需要在客户端和服务器端来回多次通信才能完成,而且导致http的网络请求效率不高。很多大佬也看到了这个问题,所以搞出来了http2、http3,让http使用长连接、跑在UDP协议上。我们在编程时选择基于http2或者http3的通信库就可以降低网络延迟,如果有必要我们也可以直接使用TCP或者UDP编写网络程序。

另外如果我们只需要网络接口返回的部分数据,就没必要传输完整的数据,数据在网络底层通过分包、分帧的方式进行传输,数据越大,包、帧的数量越多,传输消耗的时间也越长。对于减少数据传输量,除了业务上的约定,我们也可以通过一些序列化方式进行优化,比如采用Protobuf替代JSON通常可以生成更短的消息内容。

谈到Protobuf,不得不提一下gRPC,它使用了Protobuf进行序列化处理,还使用了更新的http协议,根据我的不严格测试,同样的服务,相比HTTP+JSON的方式,gRPC的网络延迟可以降低1个数量级。

内存IO的优化

我们在上边的分析中是把CPU和内存看做一个整体的,其实它们内部的通信延迟也不可忽视,我们也有一些方法来优化对内存的访问,包括下边一些技术:

零拷贝
:这种技术要解决的问题是数据在内核态和用户态之间的重复存放问题。

什么是内核态和用户态?操作系统有两个主要的功能,一是管理计算机上的所有软件程序,主要是CPU和内存资源的分配,二是为一些基础计算能力提供统一的使用接口,比如网络、硬盘这种;为了实现这两大能力,操作系统就需要有一些管理程序来处理这些事,这些基础管理程序就运行在内核态,而操作系统上的其它程序则运行在用户态。同时基于安全考虑,内核态的程序以及它们使用的资源必须要保护好,不能随便访问,所以内核态的数据就不能让用户态直接访问,用户程序访问相关数据时得先复制到用户态。

举个例子,当程序从硬盘读取数据时,程序先要调用内核程序,内核程序再去访问硬盘,此时数据先读到内核内存空间中,然后再拷贝到用户程序定义的内存空间中。如果我们能把用户态和内核态之间的拷贝去掉,就是零拷贝技术了。

很多语言和框架中都提供了这种零拷贝的能力。比如Java中的Netty框架在发送文件时,可以直接在内核态将文件数据发送到网络端口,而不需要先一点点读到用户态,再一点点写到内核态的网络处理程序。

其实Netty还使用了一些非传统的零拷贝技术,这包括直接内存和复合缓冲。直接内存是Java程序向操作系统直接申请一块内存,这块内存的数据可以直接与底层网络传输进行交互,不需要在内核态和用户态之间进行拷贝。复合缓冲是Netty定义了一个逻辑上的大缓冲区,把网络传输中的多段小数据组合在一起,外部读取数据时只需要和它打交道,这样比较优雅,实际也没有产生内存拷贝。

CPU缓存行
:这种技术主要是充分利用CPU缓存,减少CPU对内存的访问。

什么是CPU缓存?在上边谈到IO设备的访问时间时,我们说到硬盘的访问时间是CPU执行单条指令消耗时间的万倍以上,其实内存的访问时间相较CPU内部也有百倍左右的差距,所以CPU中搞了一个缓存,将最近需要的数据或指令先加载到缓存中,后续执行的时候都通过缓存进行读写。缓存与内存相比,速度更快,访问一次可能只需要若干纳秒,但是成本也更高,数量比较少,所以没有完全代替内存。

我们要减少内存的访问,就需要数据在CPU缓存中维持的时间更久。CPU缓存是以行为基本单位的,如果一行中保存了多个变量的数据,它们可能就会相互影响,比如其中一个变量更新的频率很高,这个缓存行可能就会频繁失效,导致和内存的频繁同步,而这个缓存行中的另一个变量就受到了连带影响。解决这个问题可以让变量独占一个缓存行,比如前后使用一些空位进行填充,Java中的Disruptor库就是采用了这种方案。

绑定CPU与内存
:这种技术主要解决CPU或者内核访问不同内存时的速度差异问题。

我们知道计算机中可以安装多块CPU、多条内存,在同一块CPU中也可能存在多个核心,也就是俗称的多核处理器,这些CPU、核心到每条内存的距离可能是不相同的,CPU访问距离近的内存,速度就会快些,访问距离远的内存,速度就会慢些。对于计算机的使用者来说,我们肯定是希望程序的运行速度越快越好,即使做不到最快,也不希望程序时快时慢,这样容易导致拥堵。

为了解决这个问题,计算机发展出了一种称为NUMA的技术,NUMA的全称是非一致性内存访问。在这种技术中,CPU和内存划分了不同的区域,CPU访问本区域的内存时可以直接访问,速度十分快;CPU访问其它区域的内存时需要通过内部的通道,速度会相对慢一些。然后操作系统可以感知到NUMA的区域分布情况,并提供了相应的API,让应用程序可以将自己使用的内存和CPU尽量保持在同一个区域,或者尽量平均分布在不同的区域,从而保证了CPU和内存之间访问速度。

1.1.2 优化代码逻辑

在我的编程生涯早期,优化程序性能时考虑最多的是:这里是不是可以少些几行代码,那里是不是可以不使用循环。这些固然可以优化程序的性能,但是远没有上边提到的优化IO带来的收益大。因为少执行几条代码只是若干纳秒的节省,而少一次IO则是百倍、千倍、万倍的节省。不过当IO没得优化的时候,我们也不得不考虑在代码逻辑上下下功夫,特别是一些计算密集型的程序。

可以从以下几个方面着手优化:

  • 算法优化:选择更有效率的算法来减少计算时间。例如,使用快速排序代替冒泡排序,数据越多,快速排序的算法效率越高,节省的时间也越多。
  • 数据结构选择:选择更合理的数据结构。例如,使用哈希表进行快速查找,而不是数组。数组需要遍历查找,而Hash表则可以根据下标快速定位。
  • 循环展开:将循环操作改为同样的逻辑多次执行。这个好处有很多,首先可以减少循环条件判断和跳转的处理,然后有利于提高CPU内部的指令预测准确度、指令并行度,以及提高CPU缓存的命中率。
  • 延迟计算:只有在需要结果的时候才进行计算,避免不必要的计算。很多函数式编程的方法中都大量使用了这一技术,比如C#中针对列表的LINQ查询,只有在真的需要处理列表中某条数据的时候才执行相应的查询算法,而不需要提前对列表中的所有数据进行处理。在Web前端也有很多的延迟处理方案,比如图片的懒加载,只在需要显示图片的时候才去加载,对于图片比较多的页面,可以大幅提升页面的加载速度。
  • 异步计算:程序只处理事务中的关键部分,然后就给调用方返回一个响应,边缘事务或者慢速部分的处理通过发送消息的方式,由后台其它程序慢慢处理。比如很多的秒杀、抢购程序都采用这种方案,先把用户的请求收下来,只是发到一个待处理的队列,然后就给用户反馈已经收到你的请求、正在处理中;同时后台再有若干程序按照顺序处理队列中的消息,处理完毕后再给用户反馈最终的结果。
  • 避免重复计算:通过缓存计算结果来避免重复执行需要花费大量时间的计算。
  • 并行计算:利用多线程或多进程来同时执行任务,特别是有多核CPU的时候。

在进行优化的时候,我们也要区分重点,可以先通过代码分析工具找到执行最频繁的部分,然后再进行优化。

1.1.3 使用更好的编译器

好的编译器,就像是一流的厨师,能用更少的步骤做出美味的菜。

上边提到优化代码逻辑时可以采用“循环展开”的方法,其实这件事完全可以交给编译器去做,好的编译器可以代替人工来完成这件事,程序员就有更多时间来思考业务逻辑。

除了“循环展开”,编译器还可以做“循环合并”,针对同一组数据,如果代码中编写了多次循环迭代,并且迭代的方法都是一样的,编译器可以将多次迭代合并成一次。

编译器还可以做很多优化,比如移除无用的代码、内联函数减少压栈、计算常量表达式的值、使用常量替换变量、使用更优的算法和数据结构、重排程序指令以利于CPU并行执行,等等。

不过大多数情况下,比如使用Java、.NET时,我们使用官方推荐或者IDE集成的编译器就足够了,只有在针对一些特定计算平台或者特定的领域时,我们才需要进行选择。

1.2 硬件层面的升级

在硬件层面要缩减程序的运行时间,也就是更换运行速度更快的硬件,比如使用主频更高的CPU,1GHZ的CPU每条指令的执行时间是1纳秒,如果更换为3GHZ的CPU,每条指令的执行时间可以降低到0.3纳秒。

1.2.1 提升CPU性能

提升CPU就像是给餐厅请一个更快的厨师,他做菜的速度更快。

提升主频:上边我们已经说过,提升主频可以降低每条指令的执行时间。但是主频的提升不是无限的,因为主频越高,电子元器件的散热、稳定性、成本等都会成为新的问题,所以必须在这其中进行平衡。

指令并行技术:现代CPU中已经产生了很多并行执行指令的技术,包括乱序执行、分支预测技术、超标量技术、多发射技术等,这些都像是厨师在同时处理多个菜品,而不是一个个来,这自然能降低整体的响应时间,当然CPU肯定也要兼顾逻辑顺序。

CPU缓存:现代CPU中很大一块面积都是用来放置CPU缓存的,上文在【内存的IO优化】中我们提到过使用CPU缓存可以大幅降低CPU读取指令的时间消耗,同时我们还需要注意到CPU存在多个核心时,数据可能会被加载到不同的核心缓存中,数据在不同缓存中的同步也是一项很有挑战的工作,因此足够大的CPU缓存和足够好的CPU缓存更新机制,对于降低响应时间也很关键。

增加核心:主频无法大幅提升时,可以在CPU中多增加几个核心,每个核心就是一个独立的处理器,不同的线程、进程可以并行运行在不同的核心上,这样也可以降低争抢,并行度越高,程序的执行时间相对也越低。

我们在选择CPU时,应该结合业务特点,综合考虑以上这些方面。

1.2.2 跳过CPU的技术

理论上,计算机中所有的计算都是CPU来处理的,不过我们也可以让它只处理最重要的事,一些不太重要的事就授权给其它部件处理,就像厨房中洗菜、切菜这些事都交给小工去处理,大厨专注于炒菜,可以让出菜的速度更快。

在计算中有一种DMA技术就是干这个事的,比如使用支持DMA的网卡时,网卡可以将数据直接写入到一块内存区域,等写满了再通知CPU来读取,而不是让CPU一开始就从网卡一点点读取,这会大幅提高网络数据处理的效率。因为网卡的速度相对CPU要慢的多,没必要在这里耗着,等数据接收到内存之后,CPU再和内存打交道,速度就会快很多了。

对于需要集成IO设备进行处理的程序,我们可以尽量选择支持DMA技术的硬件。

1.2.3 使用专用硬件

这个就像做饭时使用不同的厨具,虽然我们也可以在汤锅里炒菜,但是总不如炒锅用的顺手,用的顺手就可以做到更快的速度。

在计算机中CPU是一种通用计算器,它可以进行各种运算,但是通用也有通用的坏处,那就是干一些事的时候效率不高,比如图像处理、深度学习算法的运算,这些运算的特点就是包含大量的向量计算,CPU执行向量运算的效率比较低。

为了加速图像处理,科技工作者们搞出了GPU,效率有了很大的提升,计算速度飞起。GPU一开始是专门用于图形计算的,图形计算的主要工作就是向量运算,而深度学习也主要是向量计算,所以GPU后来也被大量用于深度学习。再到后来,科技工作者又搞出来了TPU,这种设备更加有利于深度学习的计算。

另外针对一些需要频繁读写硬盘的程序,比如数据库程序,我们也推荐使用固态硬盘代替机械硬盘,因为固态硬盘的访问延迟相比机械硬盘会低1个或多个数量级,这会大幅降低数据读写的延迟。

所以针对不同的计算特点,我们可以选择更专用的硬件来加速程序的处理,这是个不错的方案。

---

当然使用性能更高的硬件,需要付出更多的成本,需要仔细评估。以前技术开发领域流传过有一句话:不要对程序做过多的优化,升级下硬件就行了。这是因为当时升级硬件的成本要远低于优化程序的时间成本,不过随着互联网的发展,人们对性能的追求越来越高,升级硬件的难度和成本也越来越高,这句话变得不是那么可靠。从Go、Rust等语言的流行,Java、.NET等对原生编译的支持,我们也可以感受到这个趋势,硬件资源开始变得稀缺了。

吞吐量

吞吐量就像餐厅一天能服务多少客人。

在计算机里,它指的是单位时间内处理的请求数、数据量或执行的指令数。

2.1 缩短响应时间

缩短响应时间自然能提高吞吐量,就像提高厨师做菜速度能让更多客人吃到菜一样。

上文已经介绍了响应时间的很多优化方法,这里及不重复了。

但是响应时间对吞吐量的影响不一定总是正面的。降低响应时间有可能会增加资源的使用,比如我们把原本放在硬盘中的数据都放到了内存中,然后就没有足够的内存用来创建新的线程,服务器就无法接收更多的请求,吞吐量也就无法提升。

2.2 增加并发能力

2.2.3 增加资源

这是最直接的方法,就像买更多的炉子和锅,就能同时做更多的菜。

在计算机领域,我们可以购买更多的服务器、更强劲的CPU或GPU、更大的内存、读写能力更强的IO设备、更大的网络带宽,等等。这些可以让程序在单位时间内接收更多的请求、以及更大的并发处理能力,也就增加了系统的吞吐量。当然在具体的增加某种资源之前,我们需要先找到系统的瓶颈,比如内存使用经常达到90%,我们就增加内存空间。

需要注意,增加资源虽然可以提升系统的吞吐量,但是这也会有一个临界点,越过这个临界点之后,获得的收益将会低于为此增加的资源成本,所以我们应该仔细评估收益和成本,再决定增加多少资源。

同时本着节约的精神,我们应该追求使用更少的资源来完成更多的工作,这样也能产生更多的收益。

2.2.2 利用CPU的先进技术

上文我们在【1.2.1 提升CPU性能】中已经提到过CPU的指令并行技术,包括流水线、分支预测、乱序执行、多发射、超标量等,它们可以让CPU同时执行多条指令,提升CPU的指令吞吐量,自然也就提升了程序的处理吞吐量。

这些都是CPU内部的技术优化,只要购买性能优良的CPU,我们就可以拥有这些能力;同时我们在编程时也可以尽量触发指令的并行执行,比如上文【1.1.2 优化代码逻辑】中提到的循环展开、内联函数等,当然我更推荐选择一个更好的编译器来完成这些优化工作,程序员应该多思考下怎么运用技术满足业务需求。

除了CPU微观层面的优化,我们还可以利用下CPU的多核并行能力,在程序中使用多线程、多进程,让程序并行处理,从而提升程序的业务吞吐量。

CPU的这些能力就像是多个厨师协作,同时准备不同的菜品,就能在单位时间内把更多的菜品端上桌。

2.2.1 提高资源利用率

大家可能听说过Go的并发能力特别强,简单手撸一个服务就能轻松应对百万并发,原因就是因为Go搞出了协程。那么协程为什么这么优秀呢?

这是因为程序使用传统的
线程模型
时,线程消耗的时间和空间成本比较高,时间成本就是CPU的使用时间,空间成本就是内存占用,要提升吞吐量只能增加更多的CPU和内存资源,资源利用率高不起来,具体是怎么回事呢?

首先看我们编写的业务程序,其实大部分工作都是很多IO操作,比如访问数据库、请求网络接口、读写文件等,这些IO操作相比CPU指令操作慢了4、5个数量级。使用线程模型时,发起IO请求后,当前线程使用的CPU要么等着要么切换给其它线程使用,等着CPU就是空转,切换给其它线程时的成本也比较高,总之就是浪费了CPU时间;另外在等待IO返回的这段时间内,线程不会消失,会一直存在,而线程占用的内存比较大(Windows默认1M,Linux默认8M),妥妥的站着什么不拉什么。这就是线程的时间成本和空间成本问题。

解决这个问题的第一步是使用异步编程:IO操作提交后,就把线程释放掉,程序也不用在这里等着,等IO返回结果时,操作系统再分配一个新的线程进行处理。线程少了,CPU等待、切换和内存占用也就少了,计算资源自然就可以支持更多的请求了,吞吐量也就上来了。这就像是厨师在等待一个菜烤制的同时,可以去炒另一道菜。


程则是在异步的基础上更进一步,把程序执行的最小单位由线程变成了协程,协程分配的内存更小,初始时仅为2KB,不过它可以随着任务执行按需增长,最大可达1GB。同样的8M内存,Linux中只能创建1个线程,而协程最多则可以创建4096个。我们也就能够理解Go的并发能力为什么这么强了。


总结

性能优化是一个复杂且多面的话题,涉及到代码的编写、系统的架构以及硬件的选择与配置。在追求性能的旅途中,我们需要掌握的知识有很多,既有软件方面的,也有硬件方面的,很多东西我也没有展开详细讲,只是给大家提供了一个引子,遇到问题的时候可以顺着它去寻找。

在优化过程中,还需要注意性能的提升并非总是线性的,我们应当找到系统的瓶颈点,有针对性地优化,并在资源成本和性能收益之间做出平衡。优化的最终目的是在有限的资源下,尽可能地提升程序的响应速度和处理能力。

关注萤火架构,加速技术提升!