2024年10月

介绍一些开发中常用的slice关联的性能优化手段。鉴于golang编译器本身捉鸡的优化能力,优化的成本就得分摊在开发者自己的头上了。

这篇文章会介绍的优化手段是下面这几样:

  1. 创建slice时预分配内存
  2. 操作slice前预分配内存
  3. slice表达式中合理设置cap值
  4. 添加多个零值元素的优化
  5. 循环展开
  6. 避免for-range复制数据带来的损耗
  7. 边界检查消除
  8. 并行处理slice
  9. 复用slice的内存
  10. 高效删除多个元素
  11. 减轻GC扫描压力

这篇文章不会讨论缓存命中率和SIMD,我知道这两样也和slice的性能相关,但前者我认为是合格的开发者必须要了解的,网上优秀的教程也很多不需要我再赘述,后者除非性能瓶颈真的在数据吞吐量上否则一般不应该纳入考虑范围尤其在go语言里,所以这两个主题本文不会介绍。

最后开篇之前我还想提醒一下,性能瓶颈要靠测试和profile来定位,性能优化方案的收益和开销也需要性能测试来衡量,切记不可生搬硬套。

本文比较长,所以我建议可以挑自己感兴趣的内容看,有时间再通读。

本文索引

创建slice时预分配内存

预分配内存是最常见的优化手段,我会分为创建时和使用中两部分来讲解如何进行优化。

提前为要创建的slice分配足够的内存,可以消除后续添加元素时扩容产生的性能损耗。

具体做法如下:

s1 := make([]T, 0, 预分配的元素个数)

// 另一种不太常见的预分配手段,此时元素个数必须是常量
var arr [元素个数]T
s2 := arr[:]

很简单的代码,性能测试我就不做了。

前面说到添加元素时扩容产生的性能损耗,这个损耗分为两方面,一是扩容需要重新计算slice的cap,尤其是1.19之后采用更缓和的分配策略后计算量是有所增加的,另一方面在于重新分配内存,如果没能原地扩容的话还需要重新分配一块内存把数据移动过去,再释放原先的内存,添加的元素越多遇到这种情况的概率越大,这是相当大的开销。

另外slice采用的扩容策略有时候会造成浪费,比如下面这样:

func main() {
    var a []int
    for i := 0; i < 2048; i++ {
            a = append(a, i)
    }
    fmt.Println(cap(a)) // go1.22: 2560
}

可以看到,我们添加了2048个元素,但go最后给我们分配了2560个元素的内存,浪费了将近500个。

不过预分配不是万金油,有限定了的适用场景:

适用场景:

  1. 明确知道slice里会有多少个元素的场景
  2. 元素的个数虽然不确定,但大致在
    [x, y]
    的区间内,这时候可以选择设置预分配大小为
    y+N
    (N取决于误差范围,预分配大量内存之后再触发扩容的代价非常高昂,所以算好误差范围宁可少量浪费也要避免再次扩容),当然x和y之间的差不能太大,像1和1000这种很明显是不应该进行预分配的,主要的判断依据是最坏情况下的内存浪费率。

除了上面两种情况,我不建议使用预分配,因为分配内存本身是要付出性能的代价的,不是上面两种场景时预分配都会不可避免的产生大量浪费,这些浪费带来的性能代价很可能会超过扩容的代价。

预分配内存还有另一个好处:如果分配的大小是常量或者常量表达式,则有机会被逃逸分析认定为大小合适分配在栈上,从而使性能更进一步提升。这也是编译器实现的,具体的代码如下:

// https://github.com/golang/go/blob/master/src/cmd/compile/internal/walk/builtin.go#L412

// walkMakeSlice walks an OMAKESLICE node.
func walkMakeSlice(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
	l := n.Len
	r := n.Cap
	if r == nil {
		r = safeExpr(l, init)
		l = r
	}
	t := n.Type()
	if t.Elem().NotInHeap() {
		base.Errorf("%v can't be allocated in Go; it is incomplete (or unallocatable)", t.Elem())
	}
	if n.Esc() == ir.EscNone {
		if why := escape.HeapAllocReason(n); why != "" {
			base.Fatalf("%v has EscNone, but %v", n, why)
		}
		// 检查i是否是常量
		i := typecheck.IndexConst(r)
		if i < 0 {
			base.Fatalf("walkExpr: invalid index %v", r)
		}

		// 检查通过后创建slice临时变量,分配在栈上
	}

	// 逃逸了,这时候会生成调用runtime.makeslice的代码
    // runtime.makeslice用mallocgc从堆分配内存
}

栈上分配内存速度更快,而且对gc的压力也更小一些,但对象会在哪被分配并不是我们能控制的,我们能做的也只有创造让对象分配在栈上的机会仅此而已。

操作slice前预分配内存

从slices包进入标准库开始,操作现有的slice时也能预分配内存了。

当然之前也可以,不过得绕些弯路,有兴趣可以去看下
slices.Grow
是怎么做的。

通过简单的测试来看看效果:

func BenchmarkAppend(b *testing.B) {
	for i := 0; i < b.N; i++ {
		s := []int{1, 2, 3, 4, 5}
		for j := 0; j < 1024; j++ {
			s = append(s, j)
		}
	}
}

func BenchmarkAppendWithGrow(b *testing.B) {
	for i := 0; i < b.N; i++ {
		s := []int{1, 2, 3, 4, 5}
		s = slices.Grow(s, 1024)
		for j := 0; j < 1024; j++ {
			s = append(s, j)
		}
	}
}

这是结果,用benchstat进行了比较:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
         │   old.txt   │               new.txt               │
         │   sec/op    │   sec/op     vs base                │
Append-8   4.149µ ± 3%   1.922µ ± 5%  -53.69% (p=0.000 n=10)

         │    old.txt    │               new.txt                │
         │     B/op      │     B/op      vs base                │
Append-8   19.547Ki ± 0%   9.250Ki ± 0%  -52.68% (p=0.000 n=10)

         │  old.txt   │              new.txt               │
         │ allocs/op  │ allocs/op   vs base                │
Append-8   8.000 ± 0%   1.000 ± 0%  -87.50% (p=0.000 n=10)

不仅速度快了一倍,内存也节约了50%,而且相比未用Grow的代码,优化过后的代码只需要一次内存分配。

性能提升的原因和上一节的完全一样:避免了多次扩容带来的开销。

同时节约内存的好处也和上一节一样是存在的:

func main() {
	s1 := make([]int, 10, 50) // 注意已经有一定的预分配了
	for i := 0; i < 1024; i++ {
		s1 = append(s1, i)
	}
	fmt.Println(cap(s1))  // 1280

	s2 := make([]int, 10, 50)
	s2 = slices.Grow(s3, 1024)
	for i := 0; i < 1024; i++ {
		s2 = append(s2, i)
	}
	fmt.Println(cap(s2))  // 1184
}

如例子所示,前者的内存利用率是80%,而后者是86.5%,
Grow虽然也是利用append的机制来扩容,但它可以更充分得利用内存,避免了浪费

也和上一节一样,使用前的预分配的适用场景也只有两个:

  1. 明确知道会往slice里追加多少个元素的场景
  2. 追加的元素的个数虽然不确定,但大致在
    [x, y]
    的区间内,这时候可以选择设置预分配大小为
    y+N
    (和上面一样,N取决于误差范围)。

另外如果是拼接多个slice,最好使用
slices.Concat
,因为它内部会用Grow预分配足够的内存,比直接用append快一些。这也算本节所述优化手段的一个活得例子。

slice表达式中合理设置cap值

https://github.com/golang/go/pull/64835

在比较新的go版本里slice表达式是可以有第三个参数的,即cap的值,形式类似:
slice[start:end:capEnd]

注意我用了
capEnd
而不是cap,因为这个参数不是cap的长度,而是指新的slice最大可以访问到原数组或者slice的(索引-1)的元素。举个例子:
slice[1:2:3]
,这个表达式创建了一个新的切片,长度为
2-1
即1,可以访问到原切片的索引
3-1
即2的元素,因此新切片可以访问的元素实际上有
index 1

index 2
两个,cap为2。

为啥要加这个参数呢?因为可以限制切片访问的范围,避免意外地改变数据。

当然那么没有第三个参数的时候cap是怎么处理的呢?当然是相当于
cap(old slice) - start
了。

这和性能优化有什么关系呢?看个例子:

func noop(s []int) int {
	return s[1] + s[2]
}

func BenchmarkSlice(b *testing.B) {
	slice := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	for i := 0; i < b.N; i++ {
		noop(slice[1:5])
	}
}

func BenchmarkSliceWithEqualCap(b *testing.B) {
	slice := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	for i := 0; i < b.N; i++ {
		noop(slice[1:5:5])
	}
}

测试结果:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
BenchmarkSlice-8                1000000000               0.3263 ns/op          0 B/op          0 allocs/op
BenchmarkSliceWithEqualCap-8    1000000000               0.3015 ns/op          0 B/op          0 allocs/op

如果用benchstat进行比较,平均来说使用
slice[1:5:5]
的代码要快3%左右。

事实上这里有一个go的小优化,当切片表达式里第二个参数和第三个参数一样的时候,cap可以不用额外计算,直接取之前算出来的length就行了。这会少几次内存访问和一个减法运算。

不信可以看看编译器的
代码

// slice computes the slice v[i:j:k] and returns ptr, len, and cap of result.
// i,j,k may be nil, in which case they are set to their default value.
// v may be a slice, string or pointer to an array.
func (s *state) slice(v, i, j, k *ssa.Value, bounded bool) (p, l, c *ssa.Value) {
	t := v.Type
	var ptr, len, cap *ssa.Value
	switch {
	case t.IsSlice():
		ptr = s.newValue1(ssa.OpSlicePtr, types.NewPtr(t.Elem()), v)
        // 计算slice的len和cap
		len = s.newValue1(ssa.OpSliceLen, types.Types[types.TINT], v)
		cap = s.newValue1(ssa.OpSliceCap, types.Types[types.TINT], v)
	case t.IsString():
		// 省略,这里不重要
	case t.IsPtr():
		// 同上省略
	default:
		s.Fatalf("bad type in slice %v\n", t)
	}

	// 如果是s[:j:k],i会默认设置为0
	if i == nil {
		i = s.constInt(types.Types[types.TINT], 0)
	}
    // 如果是s[i:],则j设置为len(s)
	if j == nil {
		j = len
	}
	three := true
    // 如果是s[i:j:], 则k设置为cap(s)
	if k == nil {
		three = false
		k = cap
	}

	// 对i,j和k进行边界检查

	// 先理解成加减乘除的运算符就行
	subOp := s.ssaOp(ir.OSUB, types.Types[types.TINT])
	mulOp := s.ssaOp(ir.OMUL, types.Types[types.TINT])
	andOp := s.ssaOp(ir.OAND, types.Types[types.TINT])

	// Calculate the length (rlen) and capacity (rcap) of the new slice.
	// For strings the capacity of the result is unimportant. However,
	// we use rcap to test if we've generated a zero-length slice.
	// Use length of strings for that.
	rlen := s.newValue2(subOp, types.Types[types.TINT], j, i)
	rcap := rlen
	if j != k && !t.IsString() {
		rcap = s.newValue2(subOp, types.Types[types.TINT], k, i)
	}

	// 计算slice的内存从那里开始的,在这不重要忽略

	return rptr, rlen, rcap
}

整体没什么难的,所有切片表达式最终都会走到这个函数,这个函数会生产相应的opcode,这个opcode会过一次相对简单的优化,然后编译器根据这些的opcode生成真正的可以运行的程序。

重点在于
if j != k && !t.IsString()
这句,分支里那句
rcap = s.newValue2(subOp, types.Types[types.TINT], k, i)
翻译成普通的go代码的话相当于
rcap = k - i
,k的值怎么计算的在前面的注释里有写。这意味着切片表达式的二三两个参数如果值一样且不是string,那么会直接复用length而不需要额外的计算了。题外话,这里虽然我用了“计算”这个词,但实际是rcap和rlen还都只是表达式,真正的结果是要在程序运行的时候才能计算得到的,有兴趣的话可以自己研究一下go的编译器。

正是因为这个小小的优化带来了细微的性能提升。

当然,这些只是代码生成中的细节,只有这个原因的话我通常不会推荐这样的做法。

所以更重要的是在于前面提到的安全性:限制切片访问的范围,避免意外地改变数据。在此基础上不仅不会有性能下降还有小幅的上升,算是锦上添花。

适用场景:当切片的cap和length理论上长度应该相等时,最好都明确地进行设置,比如:
slice[i : j+2 : j+2]
这样。

上面这个场景估计能占到一半左右,当然还有很多不符合上述要求的场景,所以不要生搬硬套,一切以性能测试为准。

向slice添加多个零值元素的优化

往slice里添加“0”也有些小窍门,看看下面的测试:

func BenchmarkAppendZeros1(b *testing.B) {
	for i := 0; i < b.N; i++ {
		slice := []int{}
		slice = append(slice, []int{0, 0, 0, 0, 0}...)
	}
}

// 优化版本
func BenchmarkAppendZeros2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		slice := []int{}
		slice = append(slice, make([]int, 5)...)
	}
}

测试结果:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
              │   old.txt   │              new.txt               │
              │   sec/op    │   sec/op     vs base               │
AppendZeros-8   31.79n ± 2%   30.04n ± 2%  -5.50% (p=0.000 n=10)

              │  old.txt   │            new.txt             │
              │    B/op    │    B/op     vs base            │
AppendZeros-8   48.00 ± 0%   48.00 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

              │  old.txt   │            new.txt             │
              │ allocs/op  │ allocs/op   vs base            │
AppendZeros-8   1.000 ± 0%   1.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

一行代码,在内存用量没有变化的情况下性能提升了5%。

秘密依然在编译器里。

不管是
append(s1, s2...)
还是
append(s1, make([]T, length)...)
,编译器都有特殊的处理。

前者的流程是这样的:

  1. 创建s2(如果s2是个slice的字面量的话)
  2. 检查s1的cap,不够的情况下要扩容
  3. 将s2的内容copy到s1里

使用make时的流程是这样的:

  1. 检查s1的cap,不够的情况下要扩容
  2. 对length长度的s1的空闲内存做memclr(将内存中的值全设置为0)

代码在这里:
https://github.com/golang/go/blob/master/src/cmd/compile/internal/walk/assign.go#L647

性能提升的秘密在于:不用创建临时的slice,以及memclr做的事比copy更少也更简单所以更快。

而且显然
append(s1, make([]T, length)...)
的可读性也是更好的,可谓一举两得。

适用场景:需要往slice添加连续的零值的时候。

循环展开

用循环处理slice里的数据也是常见的需求,相比下一节会提到的for-range,普通循环访问数据的形式可以更加灵活,而且也不会受1.22改变range运行时行为的影响。

说到循环相关的优化,循环展开是绕不开的话题。顾名思义,就是把本来要迭代n次的循环,改成每轮迭代里处理比原先多m倍的数据,这样总的迭代次数会降为
n/m + 1
次。

这样为啥会更快呢?其中一点是可以少很多次循环跳转和边界条件的更新及比较。另一点是现代 CPU 都有一个叫做指令流水线的东西,它可以同时运行多条指令,如果它们之间没有数据依赖(后一项数据依赖前一项作为输入)的话,展开循环后意味着有机会让一部分指令并行从而提高吞吐量。

然鹅通常这不是程序员该关心的事,因为怎么展开循环,什么时候应该展开什么时候不应(循环展开后会影响到当前函数能否被内联等)都是一个有着良好的优化过程的编译器该做的。

你问go呢?那是自然没有的。在运行时性能和语言表现力之间,go选择了编译速度。编译得确实快,然而优化上就要眼前一黑了。

所以只能自己写了:

func loop(s []int) int {
	sum := 0
	for i := 0; i < len(s); i++ {
		sum += s[i]
	}
	return sum
}

func unroll4(s []int) int {
	sum := 0
	for i := 0; i < len(s); i += 4 {
		sum += s[i]
		sum += s[i+1]
		sum += s[i+2]
		sum += s[i+3]
	}
	return sum
}

func BenchmarkLoop(b *testing.B) {
	s := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 35, 26, 27, 28, 29, 30, 31}
	for i := 0; i < b.N; i++ {
		loop(s)
	}
}

func BenchmarkUnroll4(b *testing.B) {
	s := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 35, 26, 27, 28, 29, 30, 31}
	for i := 0; i < b.N; i++ {
		unroll4(s)
	}
}

func BenchmarkUnroll8(b *testing.B) {
	s := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 35, 26, 27, 28, 29, 30, 31}
	for i := 0; i < b.N; i++ {
		unroll8(s)
	}
}

测试使用32个int的slice,首先和一个循环里处理四个数据的对比:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
         │   old.txt   │               new.txt               │
         │   sec/op    │   sec/op     vs base                │
Unroll-8   9.718n ± 3%   3.196n ± 2%  -67.11% (p=0.000 n=10)

         │  old.txt   │            new.txt             │
         │    B/op    │    B/op     vs base            │
Unroll-8   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

         │  old.txt   │            new.txt             │
         │ allocs/op  │ allocs/op   vs base            │
Unroll-8   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

提升了将近67%,相当之大了。然后我们和一次处理8个数据的比比看:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
         │   old.txt   │               new.txt               │
         │   sec/op    │   sec/op     vs base                │
Unroll-8   9.718n ± 3%   2.104n ± 1%  -78.34% (p=0.000 n=10)

这次提升了78%,相比一次只处理四个,处理8个的方法快了30%。

我这为了方便只处理了总数据量是每轮迭代处理数据数量整数倍的情况,非整数倍的时候需要借助“达夫设备”,在go里实现起来比较麻烦,所以偷个懒。不过鉴于循环展开带来的提升非常之大,如果确定循环处理slice的代码是性能瓶颈,不妨可以实现一下试试效果。

适用场景:slice的长度需要维持在固定值上,且长度需要时每轮迭代处理数据量的整数倍。

需要仔细性能测试的场景:如果单次循环需要处理的内容很多代码很长,那么展开的效果很可能是没有那么好的甚至起反效果,因为过多的代码会影响当前函数和当前代码调用的函数是否被内联以及局部变量的逃逸分析,前者会使函数调用的开销被放大同时干扰分支预测和流水线执行导致性能下降,后者则会导致不必要的逃逸同时降低性能和增加堆内存用量。

另外每次迭代处理多少个元素也没必要拘泥于4或者2的倍数什么的,理论上不管一次处理几个都会有显著的性能提升,实际测试也是如此,一次性处理3、5或者7个的效果和4或者8个时差不多,总体来说一次处理的越多提升越明显。但如果展开的太过火就会发展成为上面说的需要严格测试的场景了。所以我建议展开处理的数量最好别超过8个。

避免for-ranges复制数据带来的损耗

普通的循环结构提供了灵活的访问方式,但要是遍历slice的话我想大部分人的首选应该是for-ranges结构吧。

这一节要说的东西与其叫性能优化,到不如说应该是“如何避开for-ranges”的性能陷阱才对。

先说说陷阱在哪。

陷阱其实有两个,一个基本能避开,另一个得看情况才行。我们先从能完全避开的开始。

避免复制

第一个坑在于range遍历slice的时候,会把待遍历的数据复制一份到循环变量里,而且从1.22开始range的循环遍历每次迭代都会创建出一个新的实例,如果没注意到这点的话不仅性能下降还会使内存压力急剧升高。我们要做的就是避免不必要的复制带来的开销。

作为例子,我们用包含8个
int64
和1个string的结构体填充slice然后对比复制和不复制时的性能:

type Data struct {
	a, b, c, d, e, f, g, h int64
	text                   string
}

func generateData(n int) []Data {
	ret := make([]Data, 0, n)
	for i := range int64(n) {
		ret = append(ret, Data{
			a:    i,
			b:    i + 1,
			c:    i + 2,
			d:    i + 3,
			e:    i + 4,
			f:    i + 5,
			g:    i + 6,
			h:    i + 7,
			text: "测试",
		})
	}
	return ret
}

// 会导致额外复制数据的例子
func BenchmarkRanges1(b *testing.B) {
	data := generateData(100)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		tmp := int64(0)
		for _, v := range data { // 数据被复制给循环变量v
			tmp -= v.a - v.h
		}
	}
}

// 避免了复制的例子
func BenchmarkRanges2(b *testing.B) {
	data := generateData(100)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		tmp := int64(0)
		for i := range data { // 注意这两行
			v := &data[i]
			tmp -= v.a - v.h
		}
	}
}

结果:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
         │   old.txt   │              new.txt               │
         │   sec/op    │   sec/op     vs base               │
Ranges-8   33.51n ± 2%   32.63n ± 1%  -2.41% (p=0.000 n=10)

使用指针或者直接通过索引访问可以避免复制,如结果所示,结构体越大性能的差异就越明显。此外新版本的go修改了range的语义,从以前会复用循环变量变成了每轮循环都创建新的循环变量,这会使一部分存在复制开销的for-range循环变得更慢。

适用场景:需要遍历每个元素,遍历的slice里的单项数据比较大且明确不需要遍历的数据被额外复制给循环变量的时候。

遍历字符串的时候避免转换带来的开销

字符串可能有点偏题了,但我们要说的这点也勉强和slice有关。

这个坑在于,range遍历字符串的时候会把字符串的内容转换成一个个rune,这一步会带来开销,尤其是字符串里只有ascii字符的时候。

写个简单例子看看性能损耗有多少:

func checkByte(s string) bool {
	for _, b := range []byte(s) {
		if b == '\n' {
			return true
		}
	}
	return false
}

func checkRune(s string) bool {
	for _, r := range s {
		if r == '\n' {
			return true
		}
	}
	return false
}

func BenchmarkRanges1(b *testing.B) {
	s := "abcdefghijklmnopqrstuvwxyz1234567890."
	for i := 0; i < b.N; i++ {
		checkRune(s)
	}
}

func BenchmarkRanges2(b *testing.B) {
	s := "abcdefghijklmnopqrstuvwxyz1234567890."
	for i := 0; i < b.N; i++ {
		checkByte(s)
	}
}

这是结果:

goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
         │   old.txt   │               new.txt               │
         │   sec/op    │   sec/op     vs base                │
Ranges-8   36.07n ± 2%   23.95n ± 1%  -33.61% (p=0.000 n=10)

把string转换成
[]byte
再遍历的性能居然提升了1/3

。换句话说如果你没注意到这个坑,那么就要白白丢失这么多性能了。

而且将string转换成
[]byte
是不需要额外分配新的内存的,可以直接复用string内部的数据,当然前提是不会修改转换后的slice,在这里我们把这个slice直接交给了range,它不会修改slice,所以转换的开销被省去了。

这个优化是从1.6开始的,有兴趣可以看看编译器的代码:
https://github.com/golang/go/blob/master/src/cmd/compile/internal/walk/convert.go#L316
(看代码其实还有别的针对这种转换的优化,比如字符串比较短的时候转换出来的
[]byte
会分配在栈上)

当然,如果你要处理ASCII以外的字符,比如中文汉字,那么这个优化就行不通了。

适用场景:需要遍历处理的字符串里的字符都在ASCII编码的范围内,比如只有换行符英文半角数字和半角标点的字符串。

BCE边界检查消除

边界检查是指在访问slice元素、使用slice表达式、make创建slice等场景下检查参数的值是否超过最大限制以及是否会越界访问内存。这些检查是编译器根据编译时获得的信息添加到对应位置上的,检查的代码会在运行时被运行。

这个特性对于程序的安全非常重要。

那么是否只要是有上述表达式的地方就会导致边界检查呢?答案是不,因为边界检查需要取slice的长度或者cap然后进行比较,检查失败的时候会panic,整个造成有些花时间而且对分支预测不是很友好,总体上每个访问slice元素的表达式都添加检查会拖垮性能。

因此边界检查消除就顺理成章出现了——一些场景下明显index不可能有越界问题,那么检查就是完全不必要的。

如何查看编译器在哪里插入了检查呢?可以用下面这个命令:
go build -gcflags='-d=ssa/check_bce' main.go

以上一节的
unroll4
为例子:

$ go build -gcflags='-d=ssa/check_bce' main.go

# command-line-arguments
./main.go:8:11: Found IsInBounds
./main.go:9:11: Found IsInBounds
./main.go:10:11: Found IsInBounds
./main.go:11:11: Found IsInBounds

目前你会看到两种输出
IsInBounds

IsSliceInBounds
。两者都是插入边界检测的证明,检查的内容差不多,只有微小的差别,有兴趣可以看ssa怎么生成两者代码的:
https://github.com/golang/go/blob/master/src/cmd/compile/internal/ssa/rewriteAMD64.go#L25798

那么这些检查怎么消除呢?具体来说可以分为好几种情况,但随着编译器的发展肯定会有不少变化,所以我不准备一一列举。

既然不列举,那肯定有大致通用的规则:如果使用index访问slice前的表达式里可以推算出当前index值不会越界,那么检查就能消除。

举几个例子:

s1 := make([]T, 10)
s1[9] // 常数索引值编译时就能判断是否越界,所以不需要插入运行时的检测。
_ = s1[i&6]   // 索引的值肯定在0-6之间,检查被消除

var s2 []int
_ = s2[:i] // 检查
_ = s2[:i] // 重复访问,消除边界检查
_ = s2[:i+1] // 检查
_ = s2[:i+1] // 重复的表达式,检查过了所以检查被消除

func f(s []int) int {
    if len(s) < 3 {
        panic("error")
    }

    return s[1] + s[2] // 前面的if保证了这两个访问一定不会越界,所以检查可以消除
}

// 一种通过临时变量避免多次边界检测的常用作法
func f2(s []int) int {
    tmp := s[:4:4] // 这里会边界检查。这里还利用了前面说的合理设置slice表达式的cap避免额外开销
    a := tmp[2] // tmp那里的检查保证了这里不会越界,因此不会再检查
    b := tmp[3] // 同上
    return a+b
}

我没列出所有例子,想看的可以去
这里

当然有一些隐藏的不能消除检查的场景:

func f(s []int, i int) {
    if i < len(s) {
        fmt.Println(s[i]) // 消除不了,因为i是有符号整数,可能会小于0
    }
}

func f(s []int, i int) {
    if 0 < i && i < len(s) {
        fmt.Println(s[i+2]) // 消除不了,因为i是有符号整数,i+2万一发生溢出,索引值会因为绕回而变成负数
    }
}

有了这些知识,前面的
unroll4
有四次边界检查,实际上用不着这么多,因此可以改成下面这样:

func unroll4(s []int) int {
	sum := 0
	for i := 0; i < len(s); i += 4 {
		tmp := s[i : i+4 : i+4] // 只有这里会检查一次
		sum += tmp[0]
		sum += tmp[1]
		sum += tmp[2]
		sum += tmp[3]
	}
	return sum
}

这么做实际上还是会检查一次,能不能完全消除呢?

func unroll4(s []int) int {
	sum := 0
	for len(s) >= 4 {
		sum += s[0]
		sum += s[1]
		sum += s[2]
		sum += s[3]
        s = s[4:] // 忽略掉已经处理过的四个元素,而且因为len(s) >= 4,所以这里也不需要检查
	}
	return sum
}

这样检查就完全消除了,但多了一次slice的赋值。

然而我这的例子实在是太简单了,性能测试显示边界检查消除并没有带来性能提升,完全消除了检查的那个例子反而因为额外的slice赋值操作带来了轻微的性能下降(和消除到只剩一次检查的比较)。

如果想要看效果更明显的例子,可以参考
这篇博客

适用场景:能有效利用
len(slice)
的结果的地方可以尝试BCE。

其他场合需要通过性能测试来判断是否有提升以及提升的幅度。像这样既不像设置slice表达式cap值那样增强安全性又不像用make批量添加空值那样增加可读性的改动,个人认为除非真的是性能瓶颈而且没有其他优化手段,
否则提升低于5%的话建议不要做这类改动

并行处理slice

前面说到了循环展开,基于这一手段更进一步的优化就是并行处理了。这里的并行不是指SIMD,而是依赖goroutine实现的并行。

能并行的前提是slice元素的处理不会互相依赖,比如
s[1]
的处理依赖于
s[0]
的处理结果这样的。

在能确定slice的处理可以并行后,就可以写一些并行代码了,比如并行求和:

func Sum(s []int64) int64 {
	// 假设s的长度是4000
	var sum atomic.Int64
	var wg sync.WaitGroup
	// 每个goroutine处理800个
	for i := 0; i < len(s); i += 800 {
		wg.Add(1)
		go func(ss []int) {
			defer wg.Done()
			var ret int64
			for j := range ss {
				ret += ss[j]
			}
			sum.Add(ret)
		}(s[i: i+800])
	}
	wg.Wait()
	return sum.Load()
}

很简单的代码。和循环展开一样,需要额外料理数量不够一次处理的剩余的元素。

另外协程的创建销毁以及数据的同步都是比较耗时的,如果slice里元素很少的话并行处理反而得不偿失。

适用场景:slice里元素很多、对元素的处理可以并行互不干扰,还有重要的一点,golang程序可以使用超过一个cpu核心保证代码真正可以“并行”运行。

复用

复用slice是个常见的套路,其中复用
[]byte
是最为常见的。

复用可以利用
sync.Pool
,也可以像下面这样:

buf := make([]byte, 1024)
for {
	read(buf)
	...
	// reuse
	buf = buf[:0]
}

其中
buf = buf[:0]
使得slice的cap不变,length清零,这样就可以复用slice的内存了。使用
sync.Pool
时也需要这样使slice的长度为零。

此外使用
sync.Pool
时还要注意slice的尺寸不能太大,否则同样会增加gc负担。一般来说超过1M大小的slice是不建议存进去的,当然还得结合项目需求和性能测试才能决定尺寸的上限。

适用场景:你的slice内存可以反复被使用(最好是能直接重用连清理都可以不做的那种,清理会让优化效果打点折扣)并且多次创建slice确实成为了性能瓶颈时。

高效删除多个元素

删除元素也是常见需求,这里我们也要分三种情况来讨论。

这三种情况都包含在标准库的
slices.Delete
里了,所以比起自己写我更推荐你用标准库。因此本节没有适用场景这一环境,但每一小节针对一些特殊场景给出了相应的建议。

删除所有元素

如果删除元素后也不打算复用slice了,直接设置为nil就行。

如果还要复用内存,利用我们在
复用
那节里提到的
s := s[:0]
就行,不过光这样还不够,为了防止内存泄漏还得把删除的元素全部清零,在1.21前我们只能这么做:

func deleteSlice[T any, S ~[]T](s S) S {
	var zero T
	for i := range s {
		s[i] = zero
	}
	return s[:0]
}

1.21之后我们有了clear内置函数,代码可以大幅简化:

func deleteSlice[T any, S ~[]T](s S) S {
	clear(s)
	return s[:0]
}

两种写法的性能是一样的,因为go专门对for-range循环写入零值做了优化,效果和直接用clear一样:

func BenchmarkClear(b *testing.B) {
	for i := 0; i < b.N; i++ {
		var a = [...]uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		clear(a[:])
	}
}

func BenchmarkForRange(b *testing.B) {
	for i := 0; i < b.N; i++ {
		var a = [...]uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		for j := range a {
			a[j] = 0
		}
	}
}
goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
BenchmarkClear-8      	1000000000	         0.2588 ns/op	       0 B/op	       0 allocs/op
BenchmarkForRange-8   	1000000000	         0.2608 ns/op	       0 B/op	       0 allocs/op

但是如果循环的形式不是for-range,那么就吃不到这个优化了:

func BenchmarkClear(b *testing.B) {
	for i := 0; i < b.N; i++ {
		var a = [...]uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		clear(a[:])
	}
}

func BenchmarkForLoop(b *testing.B) {
	for i := 0; i < b.N; i++ {
		var a = [...]uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		for j := 0; j < 20; j++ {
			a[j] = 0
		}
	}
}
goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-10200H CPU @ 2.40GHz
BenchmarkClear-8     	1000000000	         0.2613 ns/op	       0 B/op	       0 allocs/op
BenchmarkForLoop-8   	173418799	         7.088 ns/op	       0 B/op	       0 allocs/op

速度相差一个数量级。对“循环写零”优化有兴趣的可以在这看到是这么实现的:
arrayclear
。这个优化对map也有效果。

我们可以简单对比下置空为nil和clear的性能:

func BenchmarkDeleteWithClear(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		clear(a)
		a = a[:0]
	}
}

func BenchmarkDeleteWithSetNil(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		a = a[:] // 防止编译器认为a没有被使用
		a = nil
	}
}

从结果来看只是删除操作的话没有太大区别:

BenchmarkDeleteWithClear-8      1000000000               0.2592 ns/op          0 B/op          0 allocs/op
BenchmarkDeleteWithSetNil-8     1000000000               0.2631 ns/op          0 B/op          0 allocs/op

所以选用哪种方式主要取决于你后续是否还要复用slice的内存,需要复用就用clear,否则直接设为nil。

删除头部或尾部的元素

删除尾部元素是最简单的,最快的方法只有
s = s[:index]
这一种。注意别忘了要用
clear
清零被删除的部分。

这个方法唯一的缺点是被删除的部分的内存不会释放,通常这没有坏处而且能在新添加元素时复用这些内存,但如果你不会再复用这些内存并且对浪费很敏感,那只能分配一个新slice然后把要留下的元素复制过去了,但要注意这么做的话会慢很多而且在删除的过程中要消费更多内存(因为新旧两个slice得同时存在)。

删除头部元素的选择就比较多了,常见的有两种(我们需要保持元素之间的相对顺序):
s = s[index+1:]
或者
s = append(s[:0], s[index+1:]...)

前者是新建一个slice,底层数组起始为止指向原先slice的
index+1
处,注意虽然底层数组被复用了,但cap实际上是减小的,而且被删除部分的内存没有机会再被复用了。这种方法需要在删除前先把元素清零。

后一种则不会创建新的slice,它把
index+1
开始的元素平移到了slice的头部,这样也是删除了头部的元素(被覆盖掉了)。使用这种方案不需要主动清零元素,你要是不放心移动后尾部剩下的空间也可以选择使用clear但一般不建议。

理论上前者真正地浪费了内存但性能更好,不过性能始终要用benchmark来证明:

func BenchmarkClearWithReSlice(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		// 删除头部7个元素
		clear(a[:7])
		a = a[7:]
	}
}

func BenchmarkClearWithAppend(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
		a = append(a[:0], a[7:]...)
	}
}

测试结果显示确实第一种方法快:

BenchmarkClearWithReSlice-8     1000000000               0.2636 ns/op          0 B/op          0 allocs/op
BenchmarkClearWithAppend-8      100000000               10.82 ns/op            0 B/op          0 allocs/op

Append慢了一个数量级,即使memmove已经得到了相当多的优化,在内存里移动数据还是很慢的。

在实际应用中应该根据内存利用效率和运行速度综合考虑选择合适的方案。

删除在中间位置的元素

删除中间部分的元素还要保持相对顺序,能用的办法就只有移动删除部分后面的元素到前面进行覆盖这一种办法:

s := append(s[:index], s[index+n:]...)

这个方法也不需要主动clear被删除元素,因为它们都被覆盖掉了。利用append而不是for循环除了前面说的for循环优化差之外还有代码更简洁和能利用memmove这两个优势。

因为方法唯一没啥参照物,所以性能就不测试了。

减轻GC扫描压力

简单的说,尽量不要在slice里存放大量的指针或者包含指针的结构体。指针越多gc在扫描对象时需要做的工作就越多,最后会导致性能下降。

更具体的解释和性能测试可以看
这篇

适用场景:无特殊需求且元素大小不是特别大的,存值优于存指针。

作为代价,如果选择了存值,得小心额外的复制导致的开销。

总结

按个人经验来看,使用频率最高的几个优化手段依次是预分配内存、避免for-ranges踩坑、slice复用、循环展开。从提升来看这几个也是效果最明显的。

编译器的优化不够给力的话就只能自己想办法用这些优化技巧了。

有时候也可以利用逃逸分析规则来做优化,但正如
这篇文章
所说,绝大多数情况下你都不应该考虑逃逸分析。

还有另外一条路:给go编译器共享代码提升编译产物的性能。虽然阻力会很大,但我还是相信有大佬一定能做到的。这也是我为什么会把编译器怎么做优化的代码贴出来,抛砖引玉嘛。

还有最重要的一点:性能问题不管是定位还是优化,
都必须以性能测试为依据
,切记不可光靠“经验”和没有事实依据支撑的“推论”。

最后我希望这篇文章能成为大家优化性能时的趁手工具,而不是面试时背的八股文。

最近在学习CodeQL,对于CodeQL就不介绍了,目前网上一搜一大把。本系列是学习CodeQL的个人学习笔记,根据个人知识库笔记修改整理而来的,分享出来共同学习。个人觉得QL的语法比较反人类,至少与目前主流的这些OOP语言相比,还是有一定难度的。与现在网上的大多数所谓CodeQL教程不同,本系列基于
官方文档

情景实例
,包含大量的个人理解、思考和延伸,直入主题,只切要害,几乎没有废话,并且坚持用从每一个实例中学习总结归纳,再到实例中验证。希望能给各位一点不一样的见解和思路。当然,也正是如此必定会包含一定的错误,希望各位大佬能在评论区留言指正。


先来看一下一些基本的概念和结构

// 基本结构
from /* variable declarations */
where /* logical formulas */
select /* expressions */

from int a, int b
where x = 3, y = 4
select x, y
// 找出1-10内的勾股数
from int x, int y, int z
where x in [1..10], y in [1..10], z in [1..10] and
    x * x + y * y = z * z
select x, y, z

// 或下面这种类写法,封装和方法复用
class SmallInt extends int {
    SmallInt(){
        this in [1..10]
    }
    int square(){
        result = this * this
    }
}
from SmallInt x, SmallInt y, SmallInt z
where x.sqrt() + y.square() = z.sqrt()
select x, y, z

逻辑连接词、量词、聚合词

from Person p
where p.getAge() = max(int i | exists(Person t | t.getAge() = i ) | i)        // 通用聚合语法,比较冗长
select p

// 或用以下有序聚合方式
select max(Person p | | p order by p.getAge())

exists(<变量声明> | <条件表达式>)

<aggregates> ( <变量声明> | <逻辑表达式(限制符合条件的数据范围)> | <表达式(返回经过筛选的)> )

e.g.
exists( Person p | p.getName() = "test" )
,判断是否存在一个人的名字为test

max(int i | exists(Person p | p.getAge() = i) | i)
,第二部分的意思是拿到所有人的年龄放入i,第三部分是作用范围为i,目前i为int数组,存放所有人的年龄,即最终计算的是
max(i)

select max(Person p | | p order by p.getAge())
,考虑每个人,取出年龄最大的人。过程是按照年龄来取最大值,换句话说,
order by p.getAge()
是告诉 max() 函数要基于 getAge() 来找最大值,并不涉及对所有对象的排序操作。

// 其他有序聚合练习
select min(Person p | p.getLocation() = "east" | p order by p.getHeight())  // 村东最矮的人
select count(Person p | p.getLocation() = "south" | p)  // 村南的人数
select avg(Person p |  | p.getHeight()) // 村民平均身高
select sum(Person p | p.getHairColor() = "brown" | p.getAge())  // 所有棕色头发的村民年龄总和
// 综合练习,https://codeql.github.com/docs/writing-codeql-queries/find-the-thief/#the-real-investigation
import tutorial
from Person p 
where 
    p.getHeight() > 150 and // 身高超过150
    not p.getHairColor() = "blond" and  // 头发颜色不是金发
    exists(string c | p.getHairColor() = c) and // 不是秃头。这里表示这个人存在某种发色,但不用具象化
    not p.getAge() < 30 and // 年龄满30岁。也可以是p.getAge() >= 30
    p.getLocation() = "east" and    // 住在东边
    ( p.getHairColor() = "black" or p.getHairColor() = "brown" ) and    // 头发是黑色或棕色
    not (p.getHeight() > 180 and p.getHeight() < 190) and   // 没有(超过180且矮于190)
    exists(Person t | t.getAge() > p.getAge()) and   // 不是最年长的人。这里用存在语法是,存在一个人比他的年龄大
    exists(Person t | t.getHeight() > p.getHeight()) and    // 不是最高的人
    p.getHeight() < avg(Person t | | t.getHeight()) and // 比平均身高要矮。所有人,没有限制范围
    p = max(Person t | t.getLocation() = "east" | t order by t.getAge())    // 东部年纪最大的人。这一行是官方给的参考,但是官方文档中说 "Note that if there are several people with the same maximum age, the query lists all of them.",如果存在最大年龄相同的两个人会同时列出,可能会造成不可控的后果。
    // p.getAge() = max(Person t | t.getLocation() = "east" | t.getAge())   // 按照个人理解和chatgpt的解答,应该使用这种方式
select p


谓词和类别

CodeQL中的谓词(Predicates)的说法大概可以理解为其他高级编程语言中的函数,同样拥有可传参、返回值、可复用等特性

先来看一个简单的例子

import tutorial
predicate isSouthern(Person p) {
    p.getLocation() = "south"
}

from Person p
where isSouthern(p)
select p

这里的predicate为一个逻辑条件判断,返回true or false,有些类似于boolean,当然ql中有单独的boolean类型,还是有一定区别的,只是理解上可以联系起来理解,这里先不展开

谓词的定义方式和函数类似,其中的predicate可以替换为返回结果类型,例如
int getAge() { result = xxx }
,谓词名称只能以小写字母开头

此外,还可以定义一个新类,直接包含isSouthern的人

class Southerner extends Person {
  Southerner() { isSouthern(this) }
}
from Southerner s
select s

这里类似于面向对象语言(OOL)中的类定义,同样拥有继承、封装、方法等;这里的
Southerner()
类似于构造函数,但是不同于类中的构造函数,这里是一个逻辑属性,并不会创建对象。ool类中的方法在ql中被称为类成员谓词

表达式
isSouthern(this)
定义了这个类的逻辑属性,称作
特征谓词
,他用一个变量
this
(这里的this理解同ool)表示:如果属性
isSouthern(this)
成立,则一个
Person
--
this
是一个
Southerner
。简单理解就是ql中每个继承子类的特征谓词表示的是
什么样的父类是我这种子类

我这种子类在父类的基础上还具有什么特征/特性

引用官方文档:QL 中的类表示一种逻辑属性:当某个值满足该属性时,它就是该类的成员。这意味着一个值可以属于许多类 — 属于某个特定类并不妨碍它属于其他类。

来看下面这个例子

class Child extends Person {
    Child(){
        this.getAge() < 10
    }
    override predicate isAllowedIn(string region) {
        region = this.getLocation()
    }
}
// Person父类中的isAllowedIn实现如下:
predicate isAllowedIn(string region) { region = ["north", "south", "east", "west"] }
// 父类isAllowedIn(region)方法永远返回的是true,子类返回的是当前所在区域才为true(getLocation()方法)

看一个完整的例子

import tutorial
predicate isSoutherner(Person p) {
    p.getLocation() = "south"
}
class Southerner extends Person {
    Southerner(){isSoutherner(this)}
}
class Child extends Person {
    Child(){this.getAge() < 10}
    override predicate isAllowedIn(string region) {
        region = this.getLocation()
    }
}

from Southerner s 
where s.isAllowedIn("north")
select s, s.getAge()

这里有个概念非常重要,要与ool的类完全区别开来,在ool的类中,继承的子类中重构的方法是不会影响其他继承子类的,每个子类不需要考虑是否交错。但是在QL中,引用官方文档的一句话
QL 中的类表示一种逻辑属性:当某个值满足该属性时,它就是该类的成员。这意味着一个值可以属于许多类 — 属于某个特定类并不妨碍它属于其他类
,在ql的每个子类中,只要满足其特征谓词,就是这个子类的成员。

针对如上代码中这个具体的例子,如果Person中有人同时满足Southerner和Child的特征关系,则同时属于这两个类,自然也会继承其中的成员谓词。

个人理解,其实QL中的子类就是把父类全部拿出来,然后根据特征谓词来匹配父类中的某些元素,然后去复写/重构其中的这些元素的成员谓词,事实上是对父类中的元素进行了修改。下面用三个实例来对比理解

// 从所有Person中取出当前在South的,然后取出其中能去north的。因为把child限定了只能呆在当地,因此取出的这部分Southerner中的Child全都没法去north,因此就把这部分(原本在South的)Child过滤了
from Southerner s
where s.isAllowedIn("north")
select s

// 取出所有Child,因此他们都只能呆在原地,因此找谁能去north就是找谁原本呆在north
from Child c
where c.isAllowedIn("north")
select c

// 取出所有Person,要找谁能去north的,即找所有成年人(默认所有人都可以前往所有地区)和找本来就呆在north的Child
from Person p
where p.isAllowedIn("north")
select p

延伸一下,如果多个子类别同时重构override了同一个成员谓词,那么遵循如下规则(先假定有三个类A、B、C)(后面有总结):

  1. 假定A是父类,即其中的某个成员谓词
    test()
    没有override,B和C同时继承A,并且都override了A的
    test()
    成员谓词。
    1. 如果from的谓词类型是A,则其中的
      test()
      方法会被B和C全部改写。碰到B与C重叠的部分,不冲突,保持并存
    2. 如果from的谓词类型是B或C,则以B/C为基础,在满足B/C的条件下加上与另一个重叠的部分,不冲突,保持并存
  2. 如果A是父类,B继承A,C继承B,则C会把B中的相同成员谓词override掉,而不是共存
  3. 对于多重继承,C同时继承A和B,如果A和B的成员谓词有重合,则C必须override这个谓词

例如:

class OneTwoThree extends int {
  OneTwoThree() { // 特征谓词
    this = 1 or this = 2 or this = 3
  }
  string getAString() { // 成员谓词
    result = "One, two or three: " + this.toString()
  }
}

class OneTwo extends OneTwoThree {
  OneTwo() {
    this = 1 or this = 2
  }
  override string getAString() {
    result = "One or two: " + this.toString()
  }
}

from OneTwoThree o
select o, o.getAString()

/* result:
o	getAString() result
1	One or two: 1
2	One or two: 2
3	One, two or three: 3

// 理解:onetwothree类定义了1 2 3,onetwo重构了onetwothree中1和2的成员谓词。因此onetwothree o中有3个,其中的1和2使用onetwo的成员谓词,3使用onetwothree的成员谓词
*/

情况1: 在这个基础上加上另一个类别(重要),A->B, A->C

image-20241025105910532

class TwoThree extends OneTwoThree{
  TwoThree() {
    this = 2 or this = 3
  }
  override string getAString() {
    result = "Two or three: " + this.toString()
  }
}

/* 
command:
from OneTwoThree o
select o, o.getAString()

result:
o	getAString() result
1	One or two: 1
2	One or two: 2
2	Two or three: 2
3	Two or three: 3
// 理解:twothree和onetwo重合了two,但是不像其他ool,ql并不会冲突,而是并存。

---
command:
from OneTwo o
select o, o.getAString()

result:
1	One or two: 1
2	One or two: 2
2	Two or three: 2
// 理解:twothree和onetwo都重构了其中的2,由于ql不会冲突,所以并存。由于o的类型是onetwo,因此"地基"是1和2,然后再加上twothree重构的2

---
command:
from TwoThree o
select o, o.getAString()

result:
2	One or two: 2
2	Two or three: 2
3	Two or three: 3
// 理解: twothree和onetwo都重构了2,由于ql不会冲突,会并存。由于o的类型是twothree,所以“地基”是2和3,然后再加上onetwo重构的2
*/

情况2: A->B->C(继承链)

image-20241025105937484

class Two extends TwoThree {
    Two() {
        this = 2
    }
    override string getAString() {
        result = "Two: " + this.toString()
    }
  }

from TwoThree o
select o, o.getAString()

/* result:
o	getAString() result
1	One or two: 2
2	Two: 2
3	Two or three: 3

// 理解:在上面的例子的基础上,Two重构了twothree中的成员谓词,因此与twothree不是共存关系
*/

from OneTwo o
select o, o.getAString()
/* result:
o	getAString() result
1	One or two: 1
2	One or two: 2
3	Two: 2

// 理解:在之前例子的基础上,OneTwo和TwoThree共存,但是Two把TwoThree中的一部分给override了(即Two和TwoThree并不是共存关系)
*/

阶段总结:根据上面这么多例子的学习,总结归纳起来其实很简单,核心要义就是搞清楚“继承链关系”。如果两个类别是继承的同一个父类,那么他们两个的结果共存;如果两个类是从属关系(父与子),那么子类覆盖父类对应的部分。

例如上面的例子中,OneTwo和TwoThree是并存关系,同时继承OneTwoThree,所以他们的结果共存,不冲突;TwoThree和Two是从属关系,所以根据最子类优先原则,覆盖对应TwoThree中的内容(Two也间接继承OneTwoThree,所以对所有父类包括OneTwoThree造成影响)。

情况3: 多重继承

image-20241025105948674

class Two extends OneTwo, TwoThree {
    Two() {
        this = 2
    }
    override string getAString() {
        result = "Two: " + this.toString()
    }
  }
// 解释1:Two同时继承TwoThree和OneTwo,如果不写条件谓词,则默认为同时满足两个父类条件,如果写,则范围也要小于等于这个交集范围。
// 解释2:如果多重继承的父类中同一名称的成员谓词有多重定义,则必须覆盖这些定义避免歧义。在这里的Two的getAString()是不能省略的

from OneTwoThree o
select o, o.getAString()
/* result:
o	getAString() result
1	One or two: 1
2	Two: 2
3	Two or three: 3

// 理解:由于two与onetwo和twothree是父子关系,因此直接把共有的two全部覆盖,不是并存关系
*/

在这个基础上再去创建一个谓词,用于判断是否是秃头isBald

predicate isBald(Person p) {
    not exists(string c | p.getHairColor() = c)    // 不加not表示某人有头发
}

// 获得最终结果,允许进入北方的南方秃头
from Southerner s 
where s.isAllowedIn("north") and isBald(s)
select s, s.getAge()

Streamlit
不仅让创建单页应用变得易如反掌,更通过一系列创新特性,支持构建
多页面应用
,极大地丰富了用户体验和数据探索的可能性。

随着我们
Streamlit App
的功能逐渐增多之后,单个页面展示过多信息,使用不便,

通过多页面可以将功能相关的部分组织在一起,形成逻辑清晰的多个页面,使用户能够轻松地与不同的功能模块进行交互。

从代码方面来看,多页面应用将不同的功能模块拆分成独立的页面,每个页面可以有自己的代码逻辑和数据流。

这有助于实现代码的模块化,使代码结构更加清晰、易于管理。

从运行性能上来看,多页面应用可以加快页面的加载速度,因为用户只需加载当前所需页面的内容,而无需加载整个应用的全部内容。

此外,对于复杂的应用来说,多页面应用更容易实现功能的迭代和扩展。

随着应用的发展,可以逐步添加新的页面和功能模块,而无需对现有页面进行大规模修改。

本篇主要介绍构建一个
Streamlit
的多页面应用需要掌握的基本知识。

1. 多页应用的文件结构


Streamlit
多页面应用中,文件和文件夹的布局对于项目的组织、管理和维护至关重要。

下面是一个推荐的布局方式:

my_app/  
├── app.py  # 主应用文件,负责启动应用和配置路由
├── pages/  
│   ├── __init__.py  # 可选,用于将pages文件夹作为Python包处理  
│   ├── page1.py  
│   ├── page2.py  
│   └── ...  # 其他页面文件  
├── session_state.py  # Session State管理类文件
└── common.py  # 共通函数

扩展功能时,在
pages
文件夹下添加新的
py文件
即可。

其中
session_state.py

common.py
不是必需的,当应用的
session
管理变得复杂,或者共通函数比较多时才需要单独用文件管理。

对于简单的多页面应用,一般只需要上面的
app.py

page1.py

page2.py
就够了。

2. 多页应用的导航


Streamlit
中,使用
st.navigation
,可以帮助我们轻松地创建动态导航菜单。

比如,以
app.py

page1.py

page2.py
为例,创建一个多页面应用。

# app.py

import streamlit as st

page1 = st.Page("pages/page1.py", title="页面1")
page2 = st.Page("pages/page2.py", title="页面2")

pg = st.navigation([page1, page2])
pg.run()
# page1.py

import streamlit as st

st.header("这是页面 1")
# page2.py

import streamlit as st

st.header("这是页面 2")

通过
streamlit run app.py
启动之后,一个带有导航的简单多页面应用就完成了。

通过侧边栏中的菜单,可以自由切换页面。

除了通过
app.py
生成的菜单来切换页面,
Streamlit
中还提供了
st.switch_page
方法,

可以在一个页面中导航到其他页面。

比如,可以在
page1.py

page2.py
中添加一个互相导航的按钮。

# page1.py

import streamlit as st

st.header("这是页面 1")

if st.button("GoTo Page 2"):
    st.switch_page("pages/page2.py")
# page2.py

import streamlit as st

st.header("这是页面 2")

if st.button("GoTo Page 1"):
    st.switch_page("pages/page1.py")

3. 多页之间共享数据

最后,介绍下如何在不同的页面直接共享数据,这样就可以让不同页面的功能联动起来。

Streamlit多页面之间共享数据有几个方案可以实现,

第一个方案是使用
全局变量

但是这种方法存在一些问题,比如如并发访问时的数据不一致性和难以调试等。

因此,一般
不推荐
使用全局变量来共享数据。

第二个方案是
使用外部存储
,比如将共享的数据保存在文件或者数据库中,这种方案适用于需要比较大型的应用,或者需要持久化存储的应用场景。

如果你的应用规模不大,并且不需要持久化存储,那么用这个方案显得有些笨重。

最后一个方案就是
Session State
,这是
Streamlit
提供的一种机制,特别适合在不同页面之间传递和保存状态数据。

下面构造一个模拟的示例,演示如何在不同的页面间共享数据。

首先在
page1.py
中,我们可以选择数据集,

然后在
page2.py
中,会自动根据我们选择的数据集开始分析。

# page1.py

import streamlit as st

st.header("这是页面 1")

if st.button("GoTo Page 2"):
    st.switch_page("pages/page2.py")

datalist = ("", "人口数据", "环境数据", "交易数据")

if "dataset" not in st.session_state:
    option = st.selectbox(
        "请选择数据集",
        datalist,
    )
else:
    option = st.session_state.dataset
    option = st.selectbox(
        "请选择数据集",
        datalist,
        index=datalist.index(option),
    )


if option == "":
    st.write("当前尚未选择数据集")
else:
    st.write("你当前选择的是: 【", option, "】")

st.session_state.dataset = option

page1.py
中将选择数据集名称保存到
Session State
中。

# page2.py

import streamlit as st

st.header("这是页面 2")

if st.button("GoTo Page 1"):
    st.switch_page("pages/page1.py")

if "dataset" not in st.session_state or st.session_state.dataset == "":
    st.write("当前尚未选择数据集")
else:
    st.write("开始分析数据集: 【", st.session_state.dataset, "】")

page2.py
直接从
Session State
中读取数据集的名称。

连接Elasticsearch(ES)服务器是进行数据搜索和分析的常用操作。Elasticsearch是一个基于Lucene的搜索引擎,提供了RESTful API来进行索引、搜索和管理数据。

以下是一个详细的Python代码示例,展示如何连接到Elasticsearch服务器并执行一些基本操作。这个示例使用了官方的
elasticsearch-py
客户端库。

1. 安装Elasticsearch客户端库

首先,你需要安装
elasticsearch
库。如果你还没有安装,可以使用pip进行安装:

bash复制代码

pip install elasticsearch

2. 连接到Elasticsearch服务器

以下是一个完整的Python脚本,展示了如何连接到Elasticsearch服务器,创建索引,添加文档,并进行搜索。

from elasticsearch import Elasticsearch, helpers  
  
# 配置Elasticsearch连接  
es = Elasticsearch(  
    ['http://localhost:9200'],  # Elasticsearch服务器地址和端口  
    http_auth=('username', 'password'),  # 如果需要认证,填写用户名和密码  
    use_ssl=False,  # 如果使用HTTPS,设置为True  
    verify_certs=False  # 如果使用HTTPS且自签名证书,设置为False  
)  
  
# 检查连接是否成功  
if es.ping():  
    print("Successfully connected to Elasticsearch!")  
else:  
    print("Could not connect to Elasticsearch")  
    exit()  
  
# 创建索引  
index_name = 'my_index'  
if not es.indices.exists(index=index_name):  
    # 定义索引的映射(Schema)  
    mappings = {  
        'properties': {  
            'title': {'type': 'text'},  
            'content': {'type': 'text'},  
            'author': {'type': 'keyword'}  
        }  
    }  
    # 创建索引  
    es.indices.create(index=index_name, body={'mappings': mappings})  
    print(f"Index '{index_name}' created successfully.")  
else:  
    print(f"Index '{index_name}' already exists.")  
  
# 添加文档  
documents = [  
    {"_id": 1, "title": "Elasticsearch Basics", "content": "Learn the basics of Elasticsearch.", "author": "John Doe"},  
    {"_id": 2, "title": "Advanced Elasticsearch", "content": "Go deeper into Elasticsearch features.", "author": "Jane Smith"},  
    {"_id": 3, "title": "Elasticsearch Performance", "content": "Optimize Elasticsearch for performance.", "author": "Alice Johnson"}  
]  
  
# 使用bulk API批量添加文档  
actions = [  
    {  
        "_index": index_name,  
        "_id": doc['_id'],  
        "_source": doc  
    }  
    for doc in documents  
]  
  
helpers.bulk(es, actions)  
print("Documents added successfully.")  
  
# 搜索文档  
search_body = {  
    "query": {  
        "match": {  
            "content": "Elasticsearch"  
        }  
    }  
}  
  
response = es.search(index=index_name, body=search_body)  
print("Search results:")  
for hit in response['hits']['hits']:  
    print(hit['_source'])  
  
# 清理(可选):删除索引  
# es.indices.delete(index=index_name)  
# print(f"Index '{index_name}' deleted successfully.")

3.代码解释

  1. 连接配置:
    • Elasticsearch(['http://localhost:9200'])
      :连接到运行在本地主机上的Elasticsearch服务器,默认端口为9200。
    • http_auth=('username', 'password')
      :如果Elasticsearch服务器需要认证,填写用户名和密码。
    • use_ssl

      verify_certs
      :如果连接使用HTTPS,可以启用这些选项。
  2. 检查连接:
    • 使用
      es.ping()
      方法检查连接是否成功。
  3. 创建索引:
    • 使用
      es.indices.exists(index=index_name)
      检查索引是否存在。
    • 使用
      es.indices.create(index=index_name, body={'mappings': mappings})
      创建索引,并定义文档的映射。
  4. 添加文档:
    • 使用
      helpers.bulk(es, actions)
      批量添加文档到索引中。
  5. 搜索文档:
    • 使用
      es.search(index=index_name, body=search_body)
      进行搜索,并打印搜索结果。
  6. 清理(可选):
    • 使用
      es.indices.delete(index=index_name)
      删除索引。

4.注意事项

  • 服务器地址
    :确保Elasticsearch服务器正在运行,并且地址和端口配置正确。
  • 认证
    :如果Elasticsearch服务器需要认证,确保提供正确的用户名和密码。
  • SSL
    :如果连接使用HTTPS,请正确配置
    use_ssl

    verify_certs
    选项。

时隔很久,终于忙里偷闲可以搞一搞rCore,上帝啊,保佑我日更吧,我真的很想学会.

导读部分

首先还是要看
官方文档
.

我决定看一遍然后自己表述一遍(智将).

这里反复提到MMU,就是因为之前学MCU的时候有一个疑问,就是为什么MCU上不选择跑一个Linux,当时找到的答案是因为没有MMU.

MMU的全称是Memory Management Unit,中文译为内存管理单元.那时候不知道为什么硬件上需要这样一个内存管理单元,现在我们就可以一探究竟.

反复提到他是提醒我们
虚拟地址和物理地址的转换
不是由我们编写的OS实现的,而是OS写了一个接口,通过这个接口
调用
了MMU实现的.

那么最重要的就是
地址空间
这个概念,有了地址空间就可以在
内核
中构建虚实地址空间的
映射
机制.

这里谈谈我对上边这张图的理解:

P1~P3是每个应用自己的地址空间,内核则是操作系统内核的地址空间.对于每段地址空间,地址都是从
0
开始的到
2^32-1
.而我们利用硬件机制(例如MMU+页表查询)完成这些地址空间对物理地址的映射.

地址虚拟化出现之前

这部分的内容是非常有意义的.尤其是提到了:

后来,为了降低等待 I/O 带来的无意义的 CPU 资源损耗,多道程序出现了。而为了提升用户的交互式体验,提高生产力,分时多任务系统诞生了。它们的特点在于:应用开始多出了一种“暂停”状态,这可能来源于它主动 yield 交出 CPU 资源,或是在执行了足够长时间之后被内核强制性放弃处理器。

这部分不就是我们上一部分学到的
分时多道程序
吗?

那么那时候我们是怎么做的呢?

实际上我们是为所有的APP都
单独
开了一个栈,这样就有比较高的内存损耗.

这时候就可以读懂这段文字里边:

经有一种省内存的做法是每个应用仍然和在批处理系统中一样独占内核之外的整块内存,当暂停的时候,内核负责将它的代码、数据保存在外存(如硬盘)中,然后把即将换入的应用在外存上的代码、数据恢复到内存,这些都做完之后才能开始执行新的应用。

那么这种方法就是对每个APP单独占据一段内存的
解决方法之一
,即使用外存保存.

那么众所周知,硬盘的读取速度肯定是不如内存的,那么就很巧妙地引入本节的新内容了.

最后文档总结了一下上一章的弊端:

  1. 从应用开发的角度看,需要应用程序决定自己会被加载到哪个物理地址运行,需要直接访问真实的物理内存。这就要求应用开发者对于硬件的特性和使用方法有更多了解,产生额外的学习成本,也会为应用的开发和调试带来不便。
  2. 从内核的角度来看,将直接访问物理内存的权力下放到应用会使得它难以对应用程序的访存行为进行有效管理,已有的特权级机制亦无法阻止很多来自应用程序的恶意行为。

加一层抽象加强内存管理

Any problem in computer science can be solved with another layer of indirection.

这句话出自麻省理工学院的计算机科学系教授
巴特勒·兰普森(Butler Lampson)
.

这里对我上边的半猜测半理解进行了补充,尤其是这个
不一定真实存在
非常重要.

最终,到目前为止仍被操作系统内核广泛使用的抽象被称为
地址空间
(Address Space) 。某种程度上讲,可以将它看成一块巨大但
并不一定真实存在
的内存。
在每个应用程序的视角里,操作系统分配给应用程序一个地址范围受限(
容量很大
),
独占
的连续地址空间(其中有些地方被操作系统
限制
不能访问,如内核本身占用的虚地址空间等),因此应用程序可以在划分给它的地址空间中随意规划内存布局,它的各个段也就可以分别放置在地址空间中它希望的位置(当然是操作系统允许应用访问的地址)。
这种地址被称为
虚拟地址
(Virtual Address) 。

#TODO
学习
MMU

TLB
的硬件机制.

事实上,特权级机制被拓展,使得应用不再具有直接访问物理内存的能力。

这个还和特权级机制有关了,这里也去查一下
RISC-V参考书
,翻到10.6:

  1. S 模式提供了一种传统的虚拟内存系统,它将内存划分为固定大小的页来进行地址转 换和对内存内容的保护。
  2. 启用分页的时候,大多数地址(包括 load 和 store 的有效地址和 PC 中的地址)
    都是虚拟地址

  3. 访问物理内存
    ,它们必须被转换为真正的物理地址,这通过遍历一种称为
    页表

    高基数树
    实现。
  4. 页表中的
    叶节点
    指示虚地址
    是否已经被映射到了真正的物理页面
  5. 如果是,则指示了哪些权限模式和通过哪种类型的访问可以操作这个页。
  6. 访问未被映射的页或访问权限不足会导致页错误例外(page fault exception)。

地址空间抽象的好处

  1. 由于每个应用独占一个地址空间,里面只含有自己的各个段,于是它可以随意规划属于它自己的各个段的分布而无需考虑和其他应用冲突;
  2. 同时鉴于应用只能通过虚拟地址读写它自己的地址空间,它完全无法窃取或者破坏其他应用的数据,毕竟那些段在其他应用的地址空间内,这是它没有能力去访问的。

这是地址空间抽象和具体硬件机制对应用程序执行的安全性和稳定性的一种保障。

增加硬件加速虚实地址转换

这一部分讲的内容非常好,要去看看内容和那张图:
地址空间 - rCore-Tutorial-Book-v3 3.6.0-alpha.1 文档

这张图就非常合理,

只看左半边,在APP的视角,
App0
通过访问一个
虚拟地址
Virtual Address
,访问到了它的地址空间
Address Space 0
,并且对应找到了数据
App0 Data
:

再看右半边,实际上这个访问操作由
OS
和硬件代理了,通过
MMU
转化
Virtual Address
为物理地址
Physical Address
,再通过这个物理地址访问到储存在
物理内存
Physical Memory
中的
App0 Data
:

这里注意一下这张图中的
Physical Memory
部分,这里有一些有趣的点,趁着这些点来学会一些东西.

  1. Kernel Reserved
    部分是操作系统的内核占用的内存
  2. 这边不仅储存了
    App 0 Data
    同时还储存了
    App 1 Data

    App 2 Data
    .实际上说明了,在
    App
    的视角,它通过虚拟内存能够看到的只有自己的
    Data
    ,而物理内存中也存着其它的
    Data
    .

那么回到这一节的标题
增加硬件加速
,手册中是这么描述的:

操作系统可以设计巧妙的数据结构来表示地址空间。但如果完全由操作系统来完成转换每次处理器地址访问所需的虚实地址转换,那
开销就太大
了。这就需要扩展硬件功能来加速地址转换过程(回忆
计算机组成原理
课上讲的
MMU

TLB
)。

这说明虚拟地址的实现依赖于:

  1. 操作系统设计
    巧妙的数据结构
  2. MMU

    TLB
    的硬件加速

这段话增强了我们对操作系统设计理念的理解,对
硬件资源抽象化
:

回过头来,在介绍内核对于 CPU 资源的抽象——时分复用的时候,我们曾经提到它为应用制造了一种每个应用独占整个 CPU 的幻象,而隐藏了多个应用分时共享 CPU 的实质。而地址空间也是如此,应用只需、也只能看到它独占整个地址空间的幻象,而藏在背后的实质仍然是多个应用共享物理内存,它们的数据分别存放在内存的不同位置。

分段内存管理

分段内存管理的方法曾经是一种流行的内存管理方法,通过学习它也可以学到很多关于内存管理的知识.
#TODO

常数大小内存+线性映射

如图所示的是曾经的一种分段内存管理的方案.

可以看到这里非常暴力地将
每个APP的内存大小
(不是单个APP设置对应大小)都强制分配为
bound
.这时候我们往往想到之前在16和17节学到的,通过
build.rs

link_app.S
将编译好的
.bin
文件链接进内核的方式.

似乎给每个APP分配一个
插槽(Slot)
的方式并不怎么讨喜,因为我们直接编译进内核,似乎内存大小都是固定的.

这里我们不得不打破心中的幻觉:

  1. 16,17节学到的内容,只有
    stack
    而没有可以动态分配内存的
    heap
  2. 那个阶段的APP没有动态内存分配器
  3. 没有动态内存分配器时怎么实现动态数组等灵活的数据结构呢?
    直接创建一个非常大的数组
    .
  4. 16,17节学到的内容中,包括22节学到的多道程序的内容,在编译的时候需要设置好初始的内存,而没有自己的
    内存空间
    ,
  5. 这决定的编译好的
    .bin
    文件只能被加载到固定的位置
  6. 这使APP既
    不安全
    (可以访问到物理内存)又不
    灵活
    (无法使用动态内存).

那么继续看图说话,我们可以看到
Address Space of App0
:

  1. Code
    段和在
    物理内存
    中的表现
    相同
    ,直接被安置在了一个固定位置,只不过这个位置的地址已经不是物理地址,而是直接从虚拟地址
    0x0
    开始.
  2. 而代表全局变量的
    Data
    段.
  3. 可以向下增长的
    stack
    ,但是注意一个APP的
    stack
    的最大大小是在编译完之后就知道的.
  4. 可以向上增长的
    heap
    .运行时才知道的变量要申请到
    heap
    里边.

这时候
MMU
只需要扮演一个
位图
的作用.

位图是一个二进制数组,其中的每个位(bit)代表内存中的一个特定区域(如一个内存页或内存块)。位图中的每个位可以是0或1,0通常表示对应的内存区域是空闲的,而1表示已被分配。
当程序需要分配内存时,操作系统会检查位图,找到足够数量的连续0(表示空闲区域),并将这些位设置为1,表示内存已被分配。
当程序释放内存时,操作系统会将位图中相应的位重新设置为0,表示这些内存区域再次变为空闲。

那么这个问题就非常简单了,其实我们这一部分的思路就分解为三步:

  • 现在的内存分配方式是什么样的
  • MMU在其中做了什么样的工作
  • 内存碎片是怎么产生的

这时候内存碎片就很好说.

这里简单地把
已经分配的内存中没有被利用到的内存
叫做
内碎片
.

那么这是最典型的内碎片的问题,每次分配的内存都恒为
bound
,那么有多少APP能够刚好利用完呢,这样的利用率就很低了.

分段管理策略

那么为了解决上述的
内碎片
的问题,找到了一种分段管理的策略.

仍然是看图说话,解决我们上述讲的
三个问题
.

可以看到
MMU
分别对APP中的

进行一个线性映射,每一个部分是一个

.

注意,
实验指导书
忽略了一些细节:

简单起见,我们这里忽略一些不必要的细节。比如应用在以虚拟地址为索引访问地址空间的时候,它如何知道该地址属于哪个段,从而硬件可以使用正确的一对 base/bound 寄存器进行合法性检查和完成实际的地址转换。

这里其实就很容易再次想到关于

的问题,
实验指导书
中提到的:

堆的情况可能比较特殊,它的大小可能会在运行时增长,但是那需要应用通过系统调用向内核请求。

这时候就没有
内碎片
的问题了.

思考之前学到
buddy-system
的内存分配问题,实际上每段之间的对齐是需要和
最小可访问内存大小
相关的,使用分段的方法做内存管理就会造成各种
不可利用(不能分配出去)
的内存碎片,一般叫
外碎片
.当然,对于外碎片还是可以使用各种方法拼接起来.

如果这时再想分配一个比较大的块,就需要将这些不连续的外碎片“拼起来”,形成一个大的连续块。然而这是一件开销很大的事情,涉及到极大的内存读写开销。具体而言,这需要移动和调整一些已分配内存块在物理内存上的位置,才能让那些小的外碎片能够合在一起,形成一个大的空闲块。如果连续内存分配算法选取得当,可以尽可能减少这种操作。操作系统课上所讲到的那些算法,包括 first-fit/worst-fit/best-fit 或是 buddy system,其具体表现取决于实际的应用需求,各有优劣。

分页内存管理

看完这一节之后,我想到的内容就是有关于
视角不同
的思考,可以看到段内存分配的时候,我们总是
惯着
APP,总是希望我们的内存分配系统去
配合
APP的一些内存特性.

但是看了这一节之后,我们可以理解到,实际上对APP的内存进行分割管理也许是一种未曾想过的道路.

同样是进行看图说话.

可以看到,对每个应用的
地址空间
进行了
分页
.

那么每个地址空间的
虚拟页号
都可以通过MMU转换为一个
物理页号
.

这样的化其实相当于还是用了
分配一个固定大小的内存
的方法,只不过
粒度更小
.

每个应用都有一个表示地址映射关系的
页表
(Page Table) ,里面记录了该应用地址空间中的每个虚拟页面映射到物理内存中的哪个物理页帧,即数据实际被内核放在哪里。
我们可以用页号来代表二者,因此如果将页表看成一个键值对,其键的类型为虚拟页号,值的类型则为物理页号。当 MMU 进行地址转换的时候,虚拟地址会分为两部分(虚拟页号,页内偏移),MMU首先找到虚拟地址所在虚拟页面的页号,然后查当前应用的页表,根据虚拟页号找到物理页号;最后按照虚拟地址的页内偏移,给物理页号对应的物理页帧的起始地址加上一个偏移量,这就得到了实际访问的物理地址。

这里更加的准确了,不仅要对应到对应的
页号
,那么对于一个准确的内存地址,也有一个
偏移值
.

那么图中还有一个
Protevtion
的段,很好理解是加入了读写的保护,但是我们知道所谓的保护其实是
写在OS里的逻辑
,那么具体是怎么实现( 通过
触发异常
)的也很重要:

在页表中,还针对虚拟页号设置了一组保护位,它限制了应用对转换得到的物理地址对应的内存的使用方式。最典型的如
rwx

r
表示当前应用可以读该内存;
w
表示当前应用可以写该内存;
x
则表示当前应用可以从该内存取指令用来执行。一旦违反了这种限制则会触发异常,并被内核捕获到。通过适当的设置,可以检查一些应用在运行时的明显错误:比如应用修改只读的代码段,或者从数据段取指令来执行。