| package fasthttp |
| |
| import ( |
| "net" |
| "runtime" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| // workerPool serves incoming connections via a pool of workers |
| // in FILO order, i.e. the most recently stopped worker will serve the next |
| // incoming connection. |
| // |
| // Such a scheme keeps CPU caches hot (in theory). |
| type workerPool struct { |
| // Function for serving server connections. |
| // It must leave c unclosed. |
| WorkerFunc func(c net.Conn) error |
| |
| MaxWorkersCount int |
| |
| LogAllErrors bool |
| |
| MaxIdleWorkerDuration time.Duration |
| |
| Logger Logger |
| |
| lock sync.Mutex |
| workersCount int |
| mustStop bool |
| |
| ready []*workerChan |
| |
| stopCh chan struct{} |
| |
| workerChanPool sync.Pool |
| } |
| |
| type workerChan struct { |
| lastUseTime time.Time |
| ch chan net.Conn |
| } |
| |
| func (wp *workerPool) Start() { |
| if wp.stopCh != nil { |
| panic("BUG: workerPool already started") |
| } |
| wp.stopCh = make(chan struct{}) |
| stopCh := wp.stopCh |
| go func() { |
| var scratch []*workerChan |
| for { |
| wp.clean(&scratch) |
| select { |
| case <-stopCh: |
| return |
| default: |
| time.Sleep(wp.getMaxIdleWorkerDuration()) |
| } |
| } |
| }() |
| } |
| |
| func (wp *workerPool) Stop() { |
| if wp.stopCh == nil { |
| panic("BUG: workerPool wasn't started") |
| } |
| close(wp.stopCh) |
| wp.stopCh = nil |
| |
| // Stop all the workers waiting for incoming connections. |
| // Do not wait for busy workers - they will stop after |
| // serving the connection and noticing wp.mustStop = true. |
| wp.lock.Lock() |
| ready := wp.ready |
| for i, ch := range ready { |
| ch.ch <- nil |
| ready[i] = nil |
| } |
| wp.ready = ready[:0] |
| wp.mustStop = true |
| wp.lock.Unlock() |
| } |
| |
| func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { |
| if wp.MaxIdleWorkerDuration <= 0 { |
| return 10 * time.Second |
| } |
| return wp.MaxIdleWorkerDuration |
| } |
| |
| func (wp *workerPool) clean(scratch *[]*workerChan) { |
| maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() |
| |
| // Clean least recently used workers if they didn't serve connections |
| // for more than maxIdleWorkerDuration. |
| currentTime := time.Now() |
| |
| wp.lock.Lock() |
| ready := wp.ready |
| n := len(ready) |
| i := 0 |
| for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { |
| i++ |
| } |
| *scratch = append((*scratch)[:0], ready[:i]...) |
| if i > 0 { |
| m := copy(ready, ready[i:]) |
| for i = m; i < n; i++ { |
| ready[i] = nil |
| } |
| wp.ready = ready[:m] |
| } |
| wp.lock.Unlock() |
| |
| // Notify obsolete workers to stop. |
| // This notification must be outside the wp.lock, since ch.ch |
| // may be blocking and may consume a lot of time if many workers |
| // are located on non-local CPUs. |
| tmp := *scratch |
| for i, ch := range tmp { |
| ch.ch <- nil |
| tmp[i] = nil |
| } |
| } |
| |
| func (wp *workerPool) Serve(c net.Conn) bool { |
| ch := wp.getCh() |
| if ch == nil { |
| return false |
| } |
| ch.ch <- c |
| return true |
| } |
| |
| var workerChanCap = func() int { |
| // Use blocking workerChan if GOMAXPROCS=1. |
| // This immediately switches Serve to WorkerFunc, which results |
| // in higher performance (under go1.5 at least). |
| if runtime.GOMAXPROCS(0) == 1 { |
| return 0 |
| } |
| |
| // Use non-blocking workerChan if GOMAXPROCS>1, |
| // since otherwise the Serve caller (Acceptor) may lag accepting |
| // new connections if WorkerFunc is CPU-bound. |
| return 1 |
| }() |
| |
| func (wp *workerPool) getCh() *workerChan { |
| var ch *workerChan |
| createWorker := false |
| |
| wp.lock.Lock() |
| ready := wp.ready |
| n := len(ready) - 1 |
| if n < 0 { |
| if wp.workersCount < wp.MaxWorkersCount { |
| createWorker = true |
| wp.workersCount++ |
| } |
| } else { |
| ch = ready[n] |
| ready[n] = nil |
| wp.ready = ready[:n] |
| } |
| wp.lock.Unlock() |
| |
| if ch == nil { |
| if !createWorker { |
| return nil |
| } |
| vch := wp.workerChanPool.Get() |
| if vch == nil { |
| vch = &workerChan{ |
| ch: make(chan net.Conn, workerChanCap), |
| } |
| } |
| ch = vch.(*workerChan) |
| go func() { |
| wp.workerFunc(ch) |
| wp.workerChanPool.Put(vch) |
| }() |
| } |
| return ch |
| } |
| |
| func (wp *workerPool) release(ch *workerChan) bool { |
| ch.lastUseTime = CoarseTimeNow() |
| wp.lock.Lock() |
| if wp.mustStop { |
| wp.lock.Unlock() |
| return false |
| } |
| wp.ready = append(wp.ready, ch) |
| wp.lock.Unlock() |
| return true |
| } |
| |
| func (wp *workerPool) workerFunc(ch *workerChan) { |
| var c net.Conn |
| |
| var err error |
| for c = range ch.ch { |
| if c == nil { |
| break |
| } |
| |
| if err = wp.WorkerFunc(c); err != nil && err != errHijacked { |
| errStr := err.Error() |
| if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") || |
| strings.Contains(errStr, "reset by peer") || |
| strings.Contains(errStr, "i/o timeout")) { |
| wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) |
| } |
| } |
| if err != errHijacked { |
| c.Close() |
| } |
| c = nil |
| |
| if !wp.release(ch) { |
| break |
| } |
| } |
| |
| wp.lock.Lock() |
| wp.workersCount-- |
| wp.lock.Unlock() |
| } |