blob: b724e3b447ce58d1e682899487e7c5922513f231 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package status
import (
"context"
"strconv"
"sync"
)
import (
"istio.io/api/meta/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config"
)
// Task to be performed.
type Task func(entry cacheEntry)
// WorkerQueue implements an expandable goroutine pool which executes at most one concurrent routine per target
// resource. Multiple calls to Push() will not schedule multiple executions per target resource, but will ensure that
// the single execution uses the latest value.
type WorkerQueue interface {
// Push a task.
Push(target Resource, controller *Controller, context interface{})
// Run the loop until a signal on the context
Run(ctx context.Context)
// Delete a task
Delete(target Resource)
}
type cacheEntry struct {
// the cacheVale represents the latest version of the resource, including ResourceVersion
cacheResource Resource
// the perControllerStatus represents the latest version of the ResourceStatus
perControllerStatus map[*Controller]interface{}
}
type lockResource struct {
schema.GroupVersionResource
Namespace string
Name string
}
func convert(i Resource) lockResource {
return lockResource{
GroupVersionResource: i.GroupVersionResource,
Namespace: i.Namespace,
Name: i.Name,
}
}
type WorkQueue struct {
// tasks which are not currently executing but need to run
tasks []lockResource
// a lock to govern access to data in the cache
lock sync.Mutex
// for each task, a cacheEntry which can be updated before the task is run so that execution will have latest values
cache map[lockResource]cacheEntry
OnPush func()
}
func (wq *WorkQueue) Push(target Resource, ctl *Controller, progress interface{}) {
wq.lock.Lock()
key := convert(target)
if item, inqueue := wq.cache[key]; inqueue {
item.perControllerStatus[ctl] = progress
wq.cache[key] = item
} else {
wq.cache[key] = cacheEntry{
cacheResource: target,
perControllerStatus: map[*Controller]interface{}{ctl: progress},
}
wq.tasks = append(wq.tasks, key)
}
wq.lock.Unlock()
if wq.OnPush != nil {
wq.OnPush()
}
}
// Pop returns the first item in the queue not in exclusion, along with it's latest progress
func (wq *WorkQueue) Pop(exclusion map[lockResource]struct{}) (target Resource, progress map[*Controller]interface{}) {
wq.lock.Lock()
defer wq.lock.Unlock()
for i := 0; i < len(wq.tasks); i++ {
if _, ok := exclusion[wq.tasks[i]]; !ok {
// remove from tasks
t, ok := wq.cache[wq.tasks[i]]
wq.tasks = append(wq.tasks[:i], wq.tasks[i+1:]...)
if !ok {
return Resource{}, nil
}
return t.cacheResource, t.perControllerStatus
}
}
return Resource{}, nil
}
func (wq *WorkQueue) Length() int {
wq.lock.Lock()
defer wq.lock.Unlock()
return len(wq.tasks)
}
func (wq *WorkQueue) Delete(target Resource) {
wq.lock.Lock()
defer wq.lock.Unlock()
delete(wq.cache, convert(target))
}
type WorkerPool struct {
q WorkQueue
// indicates the queue is closing
closing bool
// the function which will be run for each task in queue
write func(*config.Config, interface{})
// the function to retrieve the initial status
get func(Resource) *config.Config
// current worker routine count
workerCount uint
// maximum worker routine count
maxWorkers uint
currentlyWorking map[lockResource]struct{}
lock sync.Mutex
}
func NewWorkerPool(write func(*config.Config, interface{}), get func(Resource) *config.Config, maxWorkers uint) WorkerQueue {
return &WorkerPool{
write: write,
get: get,
maxWorkers: maxWorkers,
currentlyWorking: make(map[lockResource]struct{}),
q: WorkQueue{
tasks: make([]lockResource, 0),
cache: make(map[lockResource]cacheEntry),
OnPush: nil,
},
}
}
func (wp *WorkerPool) Delete(target Resource) {
wp.q.Delete(target)
}
func (wp *WorkerPool) Push(target Resource, controller *Controller, context interface{}) {
wp.q.Push(target, controller, context)
wp.maybeAddWorker()
}
func (wp *WorkerPool) Run(ctx context.Context) {
go func() {
<-ctx.Done()
wp.lock.Lock()
wp.closing = true
wp.lock.Unlock()
}()
}
// maybeAddWorker adds a worker unless we are at maxWorkers. Workers exit when there are no more tasks, except for the
// last worker, which stays alive indefinitely.
func (wp *WorkerPool) maybeAddWorker() {
wp.lock.Lock()
if wp.workerCount >= wp.maxWorkers || wp.q.Length() == 0 {
wp.lock.Unlock()
return
}
wp.workerCount++
wp.lock.Unlock()
go func() {
for {
wp.lock.Lock()
if wp.closing || wp.q.Length() == 0 {
wp.workerCount--
wp.lock.Unlock()
return
}
target, perControllerWork := wp.q.Pop(wp.currentlyWorking)
if target == (Resource{}) {
// continue or return?
// could have been deleted, or could be no items in queue not currently worked on. need a way to differentiate.
wp.lock.Unlock()
continue
}
wp.q.Delete(target)
wp.currentlyWorking[convert(target)] = struct{}{}
wp.lock.Unlock()
// work should be done without holding the lock
cfg := wp.get(target)
if cfg != nil {
// Check that generation matches
if strconv.FormatInt(cfg.Generation, 10) == target.Generation {
x, err := GetOGProvider(cfg.Status)
if err == nil {
// Not all controllers user generation, so we can ignore errors
x.SetObservedGeneration(cfg.Generation)
}
for c, i := range perControllerWork {
// TODO: this does not guarantee controller order. perhaps it should?
x = c.fn(x, i)
}
wp.write(cfg, x)
}
}
wp.lock.Lock()
delete(wp.currentlyWorking, convert(target))
wp.lock.Unlock()
}
}()
}
type GenerationProvider interface {
SetObservedGeneration(int64)
Unwrap() interface{}
}
type IstioGenerationProvider struct {
*v1alpha1.IstioStatus
}
func (i *IstioGenerationProvider) SetObservedGeneration(in int64) {
i.ObservedGeneration = in
}
func (i *IstioGenerationProvider) Unwrap() interface{} {
return i.IstioStatus
}