blob: c9b82b192ce5ffab6828b78eb7ad25c84f2cacd6 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controllers
import (
"go.uber.org/atomic"
istiolog "istio.io/pkg/log"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
)
// Queue defines an abstraction around Kubernetes' workqueue.
// Items enqueued are deduplicated; this generally means relying on ordering of events in the queue is not feasible.
type Queue struct {
queue workqueue.RateLimitingInterface
initialSync *atomic.Bool
name string
maxAttempts int
workFn func(key interface{}) error
log *istiolog.Scope
}
// WithName sets a name for the queue. This is used for logging
func WithName(name string) func(q *Queue) {
return func(q *Queue) {
q.name = name
}
}
// WithRateLimiter allows defining a custom rate limitter for the queue
func WithRateLimiter(r workqueue.RateLimiter) func(q *Queue) {
return func(q *Queue) {
q.queue = workqueue.NewRateLimitingQueue(r)
}
}
// WithMaxAttempts allows defining a custom max attempts for the queue. If not set, items will not be retried
func WithMaxAttempts(n int) func(q *Queue) {
return func(q *Queue) {
q.maxAttempts = n
}
}
// WithReconciler defines the handler function to handle items in the queue.
func WithReconciler(f func(key types.NamespacedName) error) func(q *Queue) {
return func(q *Queue) {
q.workFn = func(key interface{}) error {
return f(key.(types.NamespacedName))
}
}
}
// WithGenericReconciler defines the handler function to handle items in the queue that can handle any type
func WithGenericReconciler(f func(key interface{}) error) func(q *Queue) {
return func(q *Queue) {
q.workFn = func(key interface{}) error {
return f(key)
}
}
}
// NewQueue creates a new queue
func NewQueue(name string, options ...func(*Queue)) Queue {
q := Queue{
name: name,
initialSync: atomic.NewBool(false),
}
for _, o := range options {
o(&q)
}
if q.queue == nil {
q.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}
q.log = log.WithLabels("controller", q.name)
return q
}
// Add an item to the queue.
func (q Queue) Add(item interface{}) {
q.queue.Add(item)
}
// AddObject takes an Object and adds the types.NamespacedName associated.
func (q Queue) AddObject(obj Object) {
q.queue.Add(types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
})
}
// Run the queue. This is synchronous, so should typically be called in a goroutine.
func (q Queue) Run(stop <-chan struct{}) {
defer q.queue.ShutDown()
q.log.Infof("starting")
q.queue.Add(defaultSyncSignal)
done := make(chan struct{})
go func() {
// Process updates until we return false, which indicates the queue is terminated
for q.processNextItem() {
}
close(done)
}()
select {
case <-stop:
case <-done:
}
q.log.Infof("stopped")
}
// syncSignal defines a dummy signal that is enqueued when .Run() is called. This allows us to detect
// when we have processed all items added to the queue prior to Run().
type syncSignal struct{}
// defaultSyncSignal is a singleton instanceof syncSignal.
var defaultSyncSignal = syncSignal{}
// HasSynced returns true if the queue has 'synced'. A synced queue has started running and has
// processed all events that were added prior to Run() being called Warning: these items will be
// processed at least once, but may have failed.
func (q Queue) HasSynced() bool {
return q.initialSync.Load()
}
// processNextItem is the main workFn loop for the queue
func (q Queue) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := q.queue.Get()
if quit {
// We are done, signal to exit the queue
return false
}
// We got the sync signal. This is not a real event, so we exit early after signaling we are synced
if key == defaultSyncSignal {
q.log.Debugf("synced")
q.initialSync.Store(true)
return true
}
q.log.Debugf("handling update: %v", key)
// 'Done marks item as done processing' - should be called at the end of all processing
defer q.queue.Done(key)
err := q.workFn(key)
if err != nil {
if q.queue.NumRequeues(key) < q.maxAttempts {
q.log.Errorf("error handling %v, retrying: %v", key, err)
q.queue.AddRateLimited(key)
// Return early, so we do not call Forget(), allowing the rate limiting to backoff
return true
}
q.log.Errorf("error handling %v, and retry budget exceeded: %v", key, err)
}
// 'Forget indicates that an item is finished being retried.' - should be called whenever we do not want to backoff on this key.
q.queue.Forget(key)
return true
}