ants - 目前开源最优的协程池
ants - 目前开源最优的协程池
目前我们的项目重度使用 ants 协程池,在开启一个 go 的时候并不是用 go 关键字,而是用一个封装的 go 函数来开启协程。框架底层,则是使用 ants 项目来实现协程池。
ants 是一个协程池的实现,这个项目短小精悍,非常适合用来做代码研究。ants 的作者是国人panjf2000,该项目目前已经广泛应用在腾讯,字节,百度,新浪等大厂了。
相关资料
github仓库地址:
https://github.com/panjf2000/ants
文档地址:
https://pkg.go.dev/github.com/panjf2000/ants/v2
主体思路研究
研究项目有必要先想通下项目的意义和架构思路:
首先的第一个问题,为什么需要有 Golang 的协程池呢?
Golang 提供的 go 关键字能很方便地将多个协程塞在一个进程中。但是在实际开发过程中,我们容易遇到协程滥用的问题。这点我是深有体会:一个项目越复杂,交接次数越多,后续的接手者越不愿意修改主逻辑。而一旦有一些非主逻辑的业务,我们都倾向于开启独立代码分支逻辑,同时封装为独立的协程来完成。这样不仅美其名曰在性能上能达到一个最优,而且在业务逻辑上也能保持单独的独立性,让代码 bug 的出生率达到最低。
但是这种不断叠加分支逻辑、不断增加独立协程的方式本质上就是一种协程滥用,我们不断增加协程数,忽略了协程的本身开销和上下文切换成本,很容易造成一个进程的 goroutine 数量过多,内存增加。不仅如此,这种做法还必须要保证分支代码质量。一个代码分支写的质量不行(比如没有设置 ctx 超时卡在 io 请求上),那么新启动的 goroutine 长时间无法释放,这就可能导致 goroutine 的泄露。这种泄露的 goroutine 如果没有被及时发现,那就是一个灾难。
所以在这里,我们更希望能将一个程序的并发度进行一定的控制,将进程消耗的资源控制在一定比例,比如我希望我的进程最多只执行 1000 个 goroutine,进程能长期保持在 1G 内存以下。所以我们就有协程池的需求了,ants 也为此应运而生。
顺带说下,ants 的名字非常有意思:蚁群,非常多的蚂蚁组成一个蚁群,烦乱但是又瑾然有序。和这个项目的愿景一样,乱而有序。
理解了ants 项目的意义和目的,再思考下,我们使用协程池来控制了协程数,一旦协程池满了之后,想新创建一个协程,这时候应该有什么表现呢?是直接在新创建协程的地方失败,还是在新创建协程的时候阻塞?是的,其实无非就是这两种方式。但是使用失败 or 阻塞 的选择权,应该是交给业务方的,也就是库的使用者。所以 ants 库需要同时能支持这两种的表现。
再思考一下,我们要如何控制 go 这个关键字的使用呢?根据不知道谁的名言,封装能解决程序世界里的所有问题。是的,封装,我们需要将 goroutine 进行封装,并且将 go 关键字也进行一下封装。goroutine 不就是一个协程来运行我们的函数么,我们就封装一个 goWorker 结构来运行我们的函数,goWorker 结构在 run 的时候,再启动实际的 goroutine 。 go 关键字呢,我们也替换为一个方法 Submit,这个方法就只有一个参数,就是我们要运行的函数。考虑到我们要运行的函数是各式各样的,所以我们还需要用一个闭包 func() 来包住我们的实际运行函数。
想到这些,我们有一些大致思路了,首先基于 OO 思想,我们为这个协程池定义一个结构 Pool,他有一系列的 goWorker,我们定义单个 goWorker 的结构,同时我们也定义一系列 goWorker 的结构 workerQueue(这里的思路是我们一定会对这个批量的 workerQueue 有一些需要封装的方法,比如获取一个可运行的 goWorker 等,所以这里并不是简单的实用 slice[goWorker])。回到 Pool 结构,我们定义好 Submit 方法,能提交一个函数。初始化的方法呢,我们要定义好这个 Pool 的goroutine 容量。
按照上述思考,我们基本能得到如下的协程池的框架设计:
class Pool {
+ NewPool(size int, options ...Option) (*Pool, error)
+ Submit(task func()) error
+ workers workerQueue // 可用的 workers
+ capacity int32 // 协程池容量
+ running int32 // 运行中的协程池
}
class goWorker {
+ run() // 运行 worker
}
class workerQueue {
+ detach() worker // 获取一个 worker
}
goWorker --> workerQueue
workerQueue --> Pool
再继续思考下细节,一个 goWorker,本质上是对 goroutine 的封装,而这个 goroutine 我们一旦 run 起来了,我们就不希望它会停止,而是在一个 for 循环中,不断等待有新的任务进入。而 submit又是在另外一个主业务的 goroutine 中执行,它负责把 task 从当前主业务 goroutine 传递给 goWorker run 所在的 goroutine。这里是不是就涉及到两个 goroutine 之间的任务传递了。goroutine 传递我们用什么方法呢?channel?- 对的。
基于以上分析, worker 的 run 函数和 pool 的 submit 函数的联动我们能想象到伪代码大致是这样的:
type goWorker struct {
task chan func() // task 的 channel
}
func (w *goWorker) run() {
go func() {
for f := range w.tasks { // 一旦有 tasks
f() // 实际运行 func
}
}
}
func (p *Pool) Submit(task func()) error {
w, err := p.workers.detach()
if err != nil {
w.task <- task // 将 task 任务直接传递到 worker 的 tasks channel 中
}
if w 为空,且 pool 的容量大于运行的 worker 数 {
worker = newWorker()
worker.run()
w.task <- task
}
if w 为空,且 pool 的容量小于等于运行的 worker 数 {
if pool 标记为阻塞 {
p.cond.Wait() // 实用 sync.Cond 阻塞住
} else {
return ErrPoolOverload // 返回 pool 已经过载的错误
}
}
}
是的,上面这段代码这就是 ants 库最核心的代码逻辑了。
两个goroutine,一个是 goworker 的 run 的 goruotine,for 循环中不断获取任务执行,另外一个是业务 submit goroutine,不断投递任务。submit 投递的时候,一旦达到了容量,就使用 wait 阻塞住,或者返回已经过载的错误。
细节分析
魔鬼在细节,我们了解了 ants 库最核心的代码逻辑,其实只是了解了皮毛。为什么之前也有很多库都是类似的协程池功能,但是只有 ants 脱颖而出呢?原因就是在于 ants 的细节做的非常优异,我们深入研究一下。
使用 sync.Pool 初始化 goworker
当我们在 pool 中获取不到 空闲的goWorker,且 pool 的容量还未满的时候,我们就需要初始化一个 goWorker(上述伪代码的 newWorker 函数),直接 new 是最简单的办法。
但是对于协程池来说, goWorker 的初始化、回收是一个非常频繁的动作,这种动作消耗非常大。
所以我们考虑,是否可以使用对象池 sync.Pool 来优化初始化呢?这样这种大量的获取回收 worker 的行为就可以直接从 pool 中获取,降低内存的消耗回收了。
关于 sync.Pool 的使用和原理这里就不说了,参考官网:
https://pkg.go.dev/sync#Pool
。ants 这里就是使用了对象池的技术优化了 goWorker 的效率。
在 NewPool 函数中,我们定义了一个 workerCache sync.Pool
func NewPool(size int, options ...Option) (*Pool, error) {
p := &Pool{
capacity: int32(size),
...
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
...
}
goWorker 的存取同时支持队列和栈方式
前面说过,pool 有一个 workers 的字段,它存储的是可用的/当前正在运行的 goWorker。那么这里就有一个问题,这个 workers 是否需要预先分配呢?
如果预先分配,那么在 Submit 函数的时候,就少了很多 new 的操作,提升了程序运行效率,但是同时带来的问题是进程启动的时候就会多占用内存。
反之,如果不预先分配,我们在每次Submit 的时候就要去初始化,这也是一种方法,特别在 goworker 并不需要特别多的时候,这种模式很合适,能很大程度节省内存。
这两种就是一种是地主做法,地主家有余量,先屯粮, 满足你所有的需求,另外一种就是贫农做法,加中无余量,你要多少,我种多少。本质上是
ants 考虑到了使用者的这种需求,为这两种模式都提供了方法,根据参数 PreAlloc 进行区别。
如果设置了 PreAlloc,则使用循环队列(loopQueue)的方式存取这个 workers。在初始化 pool 的时候,就初始化了 capacity 长度的循环队列,取的时候从队头取,插入的时候往队列尾部插入,整体 queue 保持在 capacity 长度。
如果没有设置 PreAlloc,则使用堆栈(stack)的方式存取这个 workers。初始化的时候不初始化任何的 worker,在第一次取的时候在 stack 中取不到,则会从 sync.Pool 中初始化并取到对象,然后使用完成后插入到 当前这个栈中。下次取就直接从 stack 中再获取了。
type workerQueue interface {
len() int
isEmpty() bool
insert(worker) error
detach() worker
refresh(duration time.Duration) []worker // clean up the stale workers and return them
reset()
}
func newWorkerQueue(qType queueType, size int) workerQueue {
switch qType {
case queueTypeStack:
return newWorkerStack(size)
case queueTypeLoopQueue:
return newWorkerLoopQueue(size)
default:
return newWorkerStack(size)
}
}
自定义自旋锁
在存取和回收worker 的时候,是需要使用到锁的,然而 ants 没有使用 sync.mutex 这样的锁,而是自己实现了一个自旋锁(spinlock)。
这个自旋锁和其他锁不同的是,它遵循指数退避(Exponential Backoff)策略。就是说,当我获取不到这个锁的时候,我也会阻塞,但是我的阻塞方案并不是不断 for 循环,在循环中不断获取锁。
指数退避原则认为,我取不到锁的次数越多,说明当前系统越繁忙,即取锁的协程越多,所以从大局出发,我应该再慢一些尝试取锁。即每次重试之后,等待的时间逐渐增加,以避免连续重试造成系统拥塞。
ants 自己实现的自旋锁就是基于这个指数退避原则,让整个系统不至于协程数获取锁的数量太多而导致崩溃。
ants 取锁的过程使用了一个 backoff 变量,当取锁失败之后,backoff 就增加固定倍数(2 倍),然后会等待 backoff 次 goroutine 的调度(runtime.GoSched())再进行下一次取锁。
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1
}
}
}
时间戳使用 ticker 来更新
ants 在使用 worker 的时候,每次任务完成后,会希望记录 worker 上次任务时间,这样后续的回收机制能根据这个任务时间判断是否回收。原本这是很简单的一个需求,使用 time.Now()就行,但是 time.Now() 实际上是有系统消耗的,当 ants 这样底层的协程库频繁使用 time.Now() 是会对底层有一定压力的。那么有什么办法呢?我们能不能自己在内存中保持一个当前时间,这样每次要获取当前时间,就从内存中获取就行了,避免系统消耗?
ants 就是这么做的,在启动 pool 的时候,会启动一个 ticker,每 500ms调度一次,来更新 pool 中的一个 now 字段,now 字段这里还不是简单保存 time.Time 类型,而是使用了 atomic.Value 类型(提供并发安全的存取机制)。
type Pool struct {
now atomic.Value
}
func NewPool(size int, options ...Option) (*Pool, error) {
...
p.goTicktock()
}
func (p *Pool) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}
func (p *Pool) ticktock(ctx context.Context) {
ticker := time.NewTicker(nowTimeUpdateInterval)
...
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
...
p.now.Store(time.Now())
}
}
总结
ants 是一个非常完善的协程库,它不仅仅在主体逻辑上非常完备,而且在细节上也处理的非常好,非常值得学习和使用。