blob: 10b7c1709f3323443c654ea3a19b54e8f24f7096 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"container/heap"
"runtime"
"sync"
"time"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
)
type delayTask struct {
do func() error
runAt time.Time
retries int
}
const maxTaskRetry = 3
var _ heap.Interface = &pq{}
// pq implements an internal priority queue so that tasks with the soonest expiry will be run first.
// Methods on pq are not threadsafe, access should be protected.
// much of this is taken from the example at https://golang.org/pkg/container/heap/
type pq []*delayTask
func (q pq) Len() int {
return len(q)
}
func (q pq) Less(i, j int) bool {
return q[i].runAt.Before(q[j].runAt)
}
func (q *pq) Swap(i, j int) {
(*q)[i], (*q)[j] = (*q)[j], (*q)[i]
}
func (q *pq) Push(x interface{}) {
*q = append(*q, x.(*delayTask))
}
func (q *pq) Pop() interface{} {
old := *q
n := len(old)
c := cap(old)
// Shrink the capacity of task queue.
if n < c/2 && c > 32 {
npq := make(pq, n, c/2)
copy(npq, old)
old = npq
}
if n == 0 {
return nil
}
item := old[n-1]
old[n-1] = nil // avoid memory leak
*q = old[0 : n-1]
return item
}
// Peek is not managed by the container/heap package, so we return the 0th element in the list.
func (q *pq) Peek() interface{} {
if q.Len() < 1 {
return nil
}
return (*q)[0]
}
// Delayed implements queue such that tasks are executed after a specified delay.
type Delayed interface {
Instance
PushDelayed(t Task, delay time.Duration)
}
var _ Delayed = &delayQueue{}
// DelayQueueOption configure the behavior of the queue. Must be applied before Start.
type DelayQueueOption func(*delayQueue)
// DelayQueueBuffer sets maximum number of tasks awaiting execution. If this limit is reached, Push and PushDelayed
// will block until there is room.
func DelayQueueBuffer(bufferSize int) DelayQueueOption {
return func(queue *delayQueue) {
if queue.enqueue != nil {
close(queue.enqueue)
}
queue.enqueue = make(chan *delayTask, bufferSize)
}
}
// DelayQueueWorkers sets the number of background worker goroutines await tasks to execute. Effectively the
// maximum number of concurrent tasks.
func DelayQueueWorkers(workers int) DelayQueueOption {
return func(queue *delayQueue) {
queue.workers = workers
}
}
// workerChanBuf determines whether the channel of a worker should be a buffered channel
// to get the best performance.
var workerChanBuf = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance.
var n int
if n = runtime.GOMAXPROCS(0); n == 1 {
return 0
}
// Make channel non-blocking and set up its capacity with GOMAXPROCS if GOMAXPROCS>1,
// otherwise the sender might be dragged down if the receiver is CPU-bound.
//
// GOMAXPROCS determines how many goroutines can run in parallel,
// which makes it the best choice as the channel capacity,
return n
}()
// NewDelayed gives a Delayed queue with maximum concurrency specified by workers.
func NewDelayed(opts ...DelayQueueOption) Delayed {
q := &delayQueue{
workers: 1,
queue: &pq{},
execute: make(chan *delayTask, workerChanBuf),
enqueue: make(chan *delayTask, 100),
}
for _, o := range opts {
o(q)
}
return q
}
type delayQueue struct {
workers int
// incoming
enqueue chan *delayTask
// outgoing
execute chan *delayTask
mu sync.Mutex
queue *pq
}
// PushDelayed will execute the task after waiting for the delay
func (d *delayQueue) PushDelayed(t Task, delay time.Duration) {
task := &delayTask{do: t, runAt: time.Now().Add(delay)}
select {
case d.enqueue <- task:
// buffer has room to enqueue
default:
// TODO warn and resize buffer
// if the buffer is full, we take the more expensive route of locking and pushing directly to the heap
d.mu.Lock()
heap.Push(d.queue, task)
d.mu.Unlock()
}
}
// Push will execute the task as soon as possible
func (d *delayQueue) Push(task Task) {
d.PushDelayed(task, 0)
}
func (d *delayQueue) Run(stop <-chan struct{}) {
for i := 0; i < d.workers; i++ {
go d.work(stop)
}
for {
var task *delayTask
d.mu.Lock()
if head := d.queue.Peek(); head != nil {
task = head.(*delayTask)
heap.Pop(d.queue)
}
d.mu.Unlock()
if task != nil {
delay := time.Until(task.runAt)
if delay <= 0 {
// execute now and continue processing incoming enqueues/tasks
d.execute <- task
} else {
// not ready yet, don't block enqueueing
await := time.NewTimer(delay)
select {
case t := <-d.enqueue:
d.mu.Lock()
heap.Push(d.queue, t)
// put the old "head" back on the queue, it may be scheduled to execute after the one
// that was just pushed
heap.Push(d.queue, task)
d.mu.Unlock()
case <-await.C:
d.execute <- task
case <-stop:
await.Stop()
return
}
await.Stop()
}
} else {
// no items, wait for Push or stop
select {
case t := <-d.enqueue:
d.mu.Lock()
d.queue.Push(t)
d.mu.Unlock()
case <-stop:
return
}
}
}
}
func (d *delayQueue) work(stop <-chan struct{}) {
for {
select {
case t := <-d.execute:
if err := t.do(); err != nil {
if t.retries < maxTaskRetry {
d.Push(t.do)
t.retries++
logger.Sugar().Warnf("Work item handle failed: %v %d times, retry it", err, t.retries)
}
logger.Sugar().Errorf("Work item handle failed: %v, reaching the maximum retry times: %d, drop it", err, maxTaskRetry)
}
case <-stop:
return
}
}
}