blob: f67cdf232d9d046fcd8edd646a5e5f10ad472cde [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"sync"
"time"
)
import (
"github.com/cenkalti/backoff/v4"
"istio.io/pkg/log"
"k8s.io/apimachinery/pkg/util/rand"
)
// Task to be performed.
type Task func() error
type BackoffTask struct {
task Task
backoff *backoff.ExponentialBackOff
}
// Instance of work tickets processed using a rate-limiting loop
type Instance interface {
// Push a task.
Push(task Task)
// Run the loop until a signal on the channel
Run(<-chan struct{})
// Closed returns a chan that will be signaled when the Instance has stopped processing tasks.
Closed() <-chan struct{}
}
type queueImpl struct {
delay time.Duration
retryBackoff *backoff.ExponentialBackOff
tasks []*BackoffTask
cond *sync.Cond
closing bool
closed chan struct{}
closeOnce *sync.Once
id string
}
func newExponentialBackOff(eb *backoff.ExponentialBackOff) *backoff.ExponentialBackOff {
if eb == nil {
return nil
}
teb := backoff.NewExponentialBackOff()
teb.InitialInterval = eb.InitialInterval
teb.MaxElapsedTime = eb.MaxElapsedTime
teb.MaxInterval = eb.MaxInterval
teb.Multiplier = eb.Multiplier
teb.RandomizationFactor = eb.RandomizationFactor
return teb
}
// NewQueue instantiates a queue with a processing function
func NewQueue(errorDelay time.Duration) Instance {
return NewQueueWithID(errorDelay, rand.String(10))
}
func NewQueueWithID(errorDelay time.Duration, name string) Instance {
return &queueImpl{
delay: errorDelay,
tasks: make([]*BackoffTask, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
cond: sync.NewCond(&sync.Mutex{}),
id: name,
}
}
func NewBackOffQueue(backoff *backoff.ExponentialBackOff) Instance {
return &queueImpl{
retryBackoff: backoff,
tasks: make([]*BackoffTask, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.tasks = append(q.tasks, &BackoffTask{item, newExponentialBackOff(q.retryBackoff)})
}
q.cond.Signal()
}
func (q *queueImpl) pushRetryTask(item *BackoffTask) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.tasks = append(q.tasks, item)
}
q.cond.Signal()
}
func (q *queueImpl) Closed() <-chan struct{} {
return q.closed
}
func (q *queueImpl) Run(stop <-chan struct{}) {
log.Debugf("started queue %s", q.id)
defer func() {
q.closeOnce.Do(func() {
log.Debugf("closed queue %s", q.id)
close(q.closed)
})
}()
go func() {
<-stop
q.cond.L.Lock()
q.cond.Signal()
q.closing = true
q.cond.L.Unlock()
}()
for {
q.cond.L.Lock()
// wait for closing to be set, or a task to be pushed
for !q.closing && len(q.tasks) == 0 {
q.cond.Wait()
}
if q.closing {
q.cond.L.Unlock()
// We must be shutting down.
return
}
backoffTask := q.tasks[0]
// Slicing will not free the underlying elements of the array, so explicitly clear them out here
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.cond.L.Unlock()
if err := backoffTask.task(); err != nil {
delay := q.delay
if q.retryBackoff != nil {
delay = backoffTask.backoff.NextBackOff()
}
log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
time.AfterFunc(delay, func() {
q.pushRetryTask(backoffTask)
})
}
}
}