blob: 6ec8ec995e73215b32dea86a6f5672dd56f09896 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"sync"
"time"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
)
// Task to be performed.
type Task func() error
// Instance of work tickets processed using a rate-limiting loop
type Instance interface {
// Push a task.
Push(task Task)
// Run the loop until a signal on the channel
Run(<-chan struct{})
}
type queueImpl struct {
delay time.Duration
tasks []Task
cond *sync.Cond
closing bool
}
// NewQueue instantiates a queue with a processing function
func NewQueue(errorDelay time.Duration) Instance {
return &queueImpl{
delay: errorDelay,
tasks: make([]Task, 0),
closing: false,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.tasks = append(q.tasks, item)
}
q.cond.Signal()
}
func (q *queueImpl) Run(stop <-chan struct{}) {
go func() {
<-stop
q.cond.L.Lock()
q.cond.Signal()
q.closing = true
q.cond.L.Unlock()
}()
for {
q.cond.L.Lock()
for !q.closing && len(q.tasks) == 0 {
q.cond.Wait()
}
if len(q.tasks) == 0 {
q.cond.L.Unlock()
// We must be shutting down.
return
}
var task Task
task, q.tasks = q.tasks[0], q.tasks[1:]
q.cond.L.Unlock()
if err := task(); err != nil {
logger.Sugar().Infof("Work item handle failed (%v), retry after delay %v", err, q.delay)
time.AfterFunc(q.delay, func() {
q.Push(task)
})
}
}
}