我们的项目有大量的api请求由goroutine完成,所以我们需要引入一个pool来节省频繁创建goroutine所造成的的开销,同时也可以更简易的调度goroutine,在对github上多个协程池的对比后,我们最终选定了ants作为我们的调度管理pool。
func (apiClient *ApiAsyncClient) DoAsync( retry int, ) error { return apiClient.scheduler.Submit(func() error { _, err := apiClient.Do() if err != nil { if retry < apiClient.maxRetry { return apiClient.DoAsync(retry+1) } } return err }) }
在上面的代码块中,可以看到return apiClient.DoAsync(retry+1)这一步递归调用了自己,即在submit中又调用了submit
// retrieveWorker returns a available worker to run the tasks. func (p *Pool) retrieveWorker() (w *goWorker) { spawnWorker := func() { w = p.workerCache.Get().(*goWorker) w.run() } p.lock.Lock() w = p.workers.detach() if w != nil { // first try to fetch the worker from the queue p.lock.Unlock() } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { // if the worker queue is empty and we don't run out of the pool capacity, // then just spawn a new worker goroutine. p.lock.Unlock() spawnWorker() } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. if p.options.Nonblocking { p.lock.Unlock() return } retry: if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() return } p.blockingNum++ p.cond.Wait() // block and wait for an available worker p.blockingNum-- var nw int if nw = p.Running(); nw == 0 { // awakened by the scavenger p.lock.Unlock() if !p.IsClosed() { spawnWorker() } return } if w = p.workers.detach(); w == nil { if nw < capacity { p.lock.Unlock() spawnWorker() return } goto retry } p.lock.Unlock() } return }
Pool size | CPU(ants)% | CPU(runtime.gcBgMarkWorker)% | CPU(runtime.mcall)% | 内存(runtime.allocm)kB | 内存(runtime.gcBgMarkWorker)kB | 内存(root) |
---|---|---|---|---|---|---|
Two pools(158, 632) | 27.98 | 7.73 | 25.44 | 2050.25 | 512.02 | 8798 |
Pool 158 | 28.11 | 6.61 | 25.08 | 2050 | 6661 | |
Pool 1580 | 27.41 | 12.96 | 23.17 | 3075.38 | 10264 | |
Pool 7900 | 25.89 | 9.82 | 22.52 | 3587.94 | 5725 | |
Pool 790000 | 25.12 | 12.79 | 23.44 | 3075.38 | 9748 |
runtime.gcBgMarkWorker: 用于标记垃圾对象
从上面的表格可以看到,可能存在多核的影响,所以对于我们公司现在需要的并发数量级来讲,pool的size对系统影响并不大。