blob: 99412bdbdfcfa18740ac3f820d44a44eb3ea9beb [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package timer
import (
"container/heap"
"sync"
"time"
)
type Queue struct {
heap timerHeap
mutex sync.Mutex
stopCh chan struct{}
resetTimerCh chan struct{}
stopping bool
timer *time.Timer
currentDeadline time.Time
}
func NewQueue() *Queue {
q := &Queue{
heap: make(timerHeap, 0),
timer: time.NewTimer(1 * time.Minute),
stopCh: make(chan struct{}),
resetTimerCh: make(chan struct{}),
}
// Start the worker thread.
go func() {
for {
select {
case <-q.stopCh:
q.stopTimer()
return
case <-q.resetTimerCh:
q.resetTimer()
case <-q.timer.C:
q.onTimerExpired()
}
}
}()
return q
}
func (q *Queue) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.heap.Len()
}
func (q *Queue) Schedule(handler func(), deadline time.Time) {
// Add the timer to the heap.
q.mutex.Lock()
heap.Push(&q.heap, &entry{
handler: handler,
deadline: deadline,
index: 0,
})
q.mutex.Unlock()
// Request that the timer be reset.
q.resetTimerCh <- struct{}{}
}
func (q *Queue) ShutDown() {
close(q.stopCh)
}
func (q *Queue) stopTimer() {
q.mutex.Lock()
q.stopping = true
q.mutex.Unlock()
q.timer.Stop()
}
func (q *Queue) resetTimer() {
// Below is a separate function to limit the scope of the lock.
// We don't want to lock when we modify the timer in case it causes
// an immediate callback, which would reaquire the lock.
needReset, resetDuration := func() (bool, time.Duration) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.stopping {
// Ignore the event, since we're already shutting down.
return false, 0
}
e := q.heap.peek()
if e == nil || e.deadline.Equal(q.currentDeadline) {
// nothing to do.
return false, 0
}
q.currentDeadline = e.deadline
return true, time.Until(e.deadline)
}()
// Reset the timer.
if needReset {
q.timer.Reset(resetDuration)
}
}
func (q *Queue) onTimerExpired() {
// Collect all expired timers.
q.mutex.Lock()
handlers := q.heap.advanceTo(time.Now())
q.mutex.Unlock()
// Call the expired timer handlers.
for _, h := range handlers {
h()
}
// Reset the timer based on the earliest deadline.
q.resetTimer()
}
type entry struct {
deadline time.Time
handler func()
index int
}
type timerHeap []*entry
func (h timerHeap) peek() *entry {
if h.Len() > 0 {
return h[0]
}
return nil
}
func (h *timerHeap) advanceTo(tnow time.Time) (out []func()) {
for {
if top := h.peek(); top != nil && !top.deadline.After(tnow) {
heap.Remove(h, top.index)
out = append(out, top.handler)
} else {
// There are no further expired timers.
return
}
}
}
func (h timerHeap) Len() int {
return len(h)
}
func (h timerHeap) Less(i, j int) bool {
return h[i].deadline.Before(h[j].deadline)
}
func (h timerHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *timerHeap) Push(x interface{}) {
e := x.(*entry)
*h = append(*h, e)
e.index = len(*h) - 1
}
func (h *timerHeap) Pop() interface{} {
n := h.Len()
e := (*h)[n-1]
*h = (*h)[:n-1]
e.index = -1
return e
}