
在Go语言中,我们可以轻松地创建和管理协程池,以有效地处理并发任务 。协程池是一种用于控制协程数量的机制,它可以限制同时运行的协程数量,从而避免资源耗尽和系统负载过高的问题 。
本文旨在学习了解协程池的原理,配合实现简易的Demo,欢迎大家补充!实现的基本思想协程池是一种用于管理和复用协程(goroutines)的设计模式,其基本思想是在应用程序中预先创建一组协程,以便在需要时分配任务给这些协程执行 。
这有助于避免频繁地创建和销毁协程,提高程序的性能和资源利用率
初始化协程池: 在初始化阶段,你需要创建一定数量的协程(工作协程),这些协程将一直保持运行状态,并等待分配任务任务队列: 协程池通常包含一个任务队列(任务通道) , 用于接收需要执行的任务任务可以是函数、闭包或其他可执行的代码片段 。
任务分配: 当需要执行一个任务时 , 应用程序将任务提交给协程池协程池会从任务队列中获取一个空闲的协程(工作协程),并将任务分配给它执行任务执行: 协程池中的工作协程将执行分配给它的任务任务执行完毕后,工作协程将返回到池中,等待下一个任务 。
资源管理: 协程池需要管理协程的生命周期,包括协程的创建、复用和销毁通常,协程在启动时会无限循环地等待任务,直到接收到关闭信号才停止容量控制: 协程池通常会设置最大协程数量以控制并发度,避免过多的协程导致系统资源耗尽 。
任务完成通知: 协程池可以提供任务完成的通知机制 , 以便应用程序可以获得任务的执行结果或状态Golag实现协程池Demo属性type Pool struct {minCount int
maxCount intactiveCount int32addTaskCount int32complateTaskCount int32
maxWaitWorkerCount int32isRunning booltask chanfunc()erroraddTime time.Durationerrs
chan error}minCount:控制最少工作的协程数量maxCount:控制最多工作的协程数量activeCount:活跃的协程数量,也就是实际在跑的协程数量maxWaitWorkerCount:最大任务容量(排队等待量)
isRunning:协程池的状态task:存放任务的管道addTime:添加任务时候,允许等待排队的超时时间辅助变量var waitingTime = time.Second * 3var (needReduceErr = errors.
New("减少协程")paramErr = errors.New("参数错误")poolStoped = errors.New("Pool已经停止")addTaskTimeout = errors.
New("加入任务队列超时"))生成一个池子funcGeneratePool(minCount, maxCount, maxWaitTask, timeout int)(*Pool, error) {
if maxCount < minCount {returnnil, paramErr}p := &Pool{minCount: minCount,maxCount: maxCount,task:
make(chanfunc()error, maxWaitTask),isRunning: true,addTime: time.Second * time.Duration(timeout),}
for i := 0; i < p.minCount; i{p.addWorker()}gofunc() {for {if !p.isRunning {break
}p.balance() //进行协程的数量的动态平衡}}()return p, nil}协程的管理这里通过一个匿名函数返回一个错误(needReduceErr),来标识是否需要降低协程的数量
func(p *Pool)addWorker() {atomic.AddInt32(&p.activeCount, 1)go p.worker()}func(p *Pool)reduceWorker
() {p.task <- func()error {return needReduceErr}}func(p *Pool)worker() {deferfunc() {if
err := recover(); err != nil {log.Printf("worker panic:%v\n", err)}}()time.Sleep(time.Second *
4)for {if !ok {log.Println("worker out...........................")break}err := t()
if err != needReduceErr {atomic.AddInt32(&p.complateTaskCount, 1)}if err == needReduceErr && p.activeCount >
int32(p.minCount) {atomic.AddInt32(&p.activeCount, -1)break}}}协程池的管理
func(p *Pool)StopPool() {p.isRunning = falseclose(p.task)time.Sleep(waitingTime) //等待资源回收}func
(p *Pool)AddTask(t func()error) error {if !p.isRunning {return poolStoped}select {case p.task <- t:atomic.AddInt32(&p.addTaskCount,
1)returnnilcase <-time.After(p.addTime):return addTaskTimeout}}func(p *Pool)ComplateTaskCount
()int32 {return p.complateTaskCount}func(p *Pool)AddTaskCount()int32 {return p.addTaskCount}
func(p *Pool)ActiveCount()int32 {return p.activeCount}func(p *Pool)IsRunning()bool {return p.isRunning}
调用这里加入超时器 任务完成标志,双保险,防止某个协程卡死,导致主进程一直处于阻塞状态!
functest() {pool, err := server.GeneratePool(20, 100, 10, 10)if err != nil {log.Printf("GeneratePool err:%v\n"
, err)return}deferfunc() {if pool.IsRunning() {pool.StopPool()}}()tick := time.Tick(time.Second *
30)for i := 0; i < 100; i{err := pool.AddTask(func()error {returnnil})if err != nil
{log.Printf("add task failed, err:%v\n", err)}}L:for {select {case <-tick:log.Println(
"main timeout", pool.AddTaskCount(), pool.ComplateTaskCount(), pool.ActiveCount())break Ldefault
:if pool.AddTaskCount() == pool.ComplateTaskCount() {log.Println("all task is over", pool.AddTaskCount(), pool.ComplateTaskCount(), pool.ActiveCount())
【协程池 协程奔走相告】break L}}}pool.StopPool()log.Println("ok............")}写在最后错误处理、任务重试等,还未完善,欢迎大家补充 。
