在Java编程中,为了降低开销和优化程序的效率,我们常常使用线程池来管理线程的创建和销毁,并尽量复用已创建的对象。这样做不仅可以提高程序的运行效率,还能减少垃圾回收器对对象的回收次数。

在Golang中,我们知道协程(goroutine)由于其体积小且效率高,在高并发场景中扮演着重要的角色。然而,资源始终是有限的,即使你的内存很大,也会有一个限度。因此,协程池在某些情况下肯定也会有其独特的适用场景。毕竟,世上并不存在所谓的银弹,只有在特定情况下才能找到最优的解决方案。因此,在Golang中,我们仍然需要考虑使用协程池的情况,并根据具体场景来选择最佳的解决方案。

今天,我们将从Java线程池的角度出发,手把手地带你实现一个Golang协程池。如果你觉得有些困难,记住我们的宗旨是使用固定数量的协程来处理所有的任务。这样做的好处是可以避免协程数量过多导致资源浪费,也能确保任务的有序执行。让我们一起开始,一步步地构建这个Golang协程池吧!

协程池

首先,让我们编写一个简单的多协程代码,并逐步进行优化,最终实现一个简化版的协程池。假如我们有10个任务需要处理。我们最简单做法就是直接使用
go
关键字直接去生成相应的协程即可,比如这样:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg  sync.WaitGroup
    wg.Add(5)
    for i := 1; i <= 5; i++ {
        go func(index int) {
            fmt.Printf("Goroutine %d started\n", index)
            time.Sleep(1 * time.Second)
            fmt.Printf("Goroutine %d finished\n", index)
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println("All goroutines finished")
}

我们当前的用法非常简单,无需专门维护goroutine容量大小。每当有一个任务出现,我们就创建一个goroutine来处理。现在,让我们进一步优化这种用法。

优化版协程池

现在我们需要首先创建一个pool对象和一个专门用于运行协程的worker对象。如果你熟悉Java中线程池的源码,对于worker这个名称应该不会陌生。worker是一个专门用于线程复用的对象,而我们的worker同样负责运行任务。

实体Job没啥好说的,就是我们具体的任务。我们先来定义一个简单的任务实体Job:

type Job struct {
    id  int
    num int
}

我们来看下主角Worker定义:

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模拟任务处理
            fmt.Println("jobs 模拟任务处理")
            time.Sleep(2 * time.Second) // 休眠2秒钟
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

你可以看到,我的实体拥有一个channel。这个channel类似于我们想要获取的任务队列。与Java中使用链表形式并通过独占锁获取共享链表这种临界资源的方式不同,我选择使用了Golang中的channel来进行通信。因为Golang更倾向于使用channel进行通信,而不是共享资源。所以,我选择了使用channel。为了避免在添加任务时直接阻塞,我特意创建了一个带有任务缓冲的channel。

在这里,
Start()
方法是一个真正的协程,我会从channel中持续获取任务,以保持这个协程不被销毁,直到没有任务可获取为止。这个逻辑类似于Java中的线程池。

让我们进一步深入地探讨一下Pool池的定义:

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

他的定义可能有点复杂,但是我可以用简单的语言向你解释,就是那些东西,只是用Golang的写法来实现,与Java的实现方式类似。

首先,我定义了一个名为worker的数组,用于存储当前存在的worker数量。这里并没有实现核心和非核心worker的区分。另外,我还创建了一个独立的channel,用于保存可缓冲任务的大小。这些参数在初始化时是必须提供的。

Start
方法的主要目的是启动worker开始执行任务。首先,它会启动一个核心协程,等待任务被派发。然后,dispatchJobs方法会开始监听channel是否有任务,如果有任务,则会将任务分派给worker进行处理。在整个过程中,通过channel进行通信,没有使用链表或其他共享资源实体。需要注意的是,dispatchJobs方法在另一个协程中被调用,这是为了避免阻塞主线程。

getLeastBusyWorker
方法是用来获取阻塞任务最少的worker的。这个方法的主要目标是保持任务平均分配。而
AddJob
方法则是用来直接向channel中添加job任务的,这个方法比较简单,不需要过多解释。

协程池最终实现

image

经过一系列的反复修改和优化,我们终于成功实现了一个功能完善且高效的Golang协程池。下面是最终版本的代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    id  int
    num int
}

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模拟任务处理
            fmt.Println("jobs 模拟任务处理")
            time.Sleep(2 * time.Second) // 休眠2秒钟
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

func main() {
    pool := NewPool(3, 10)
    pool.Start()
    // 添加任务到协程池
    for i := 1; i <= 15; i++ {
        pool.AddJob(Job{
            id:  i,
            num: i,
        })
    }

    // 等待所有任务完成
    pool.wg.Wait()
    close(pool.jobChan)
    fmt.Println("All jobs finished")
}

三方协程池

在这种场景下,既然已经有一个存在的场景,那么显然轮子是肯定有的。不论使用哪种编程语言,我们都可以探索一下,以下是Golang语言中关于三方协程池的实现工具。

ants
: ants是一个高性能的协程池实现,支持动态调整协程池的大小,可以通过简单的API调用来将任务提交给协程池进行执行。官方地址:
https://github.com/panjf2000/ants

gopool
:gopool 是一个高性能的 goroutine 池,旨在重用 goroutine 并限制 goroutine 的数量。官方地址:
https://github.com/bytedance/gopkg/tree/develop/util/gopool

这些库都提供了简单易用的API,可以方便地创建和管理协程池,同时也支持动态调整协程池的大小,以满足不同场景下的需求。

总结

当然,我写的简易版协程池还有很多可以优化的地方,比如可以实现动态扩容等功能。今天我们要简单总结一下协程池的优势,主要是为了降低资源开销。协程池的好处在于可以重复利用协程,避免频繁创建和销毁协程,从而减少系统开销,提高系统性能。此外,协程池还可以提高响应速度,因为一旦接收到任务,可以立即执行,不需要等待协程创建的时间。另外,协程池还具有增强可管理性的优点,可以对协程进行集中调度和统一管理,方便进行性能调优。

标签: none

添加新评论