| /* |
| Copyright 2018 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package source |
| |
| import ( |
| "fmt" |
| "sync" |
| |
| "k8s.io/apimachinery/pkg/api/meta" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/client-go/util/workqueue" |
| "sigs.k8s.io/controller-runtime/pkg/event" |
| "sigs.k8s.io/controller-runtime/pkg/handler" |
| "sigs.k8s.io/controller-runtime/pkg/runtime/inject" |
| logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" |
| "sigs.k8s.io/controller-runtime/pkg/source/internal" |
| |
| toolscache "k8s.io/client-go/tools/cache" |
| "sigs.k8s.io/controller-runtime/pkg/cache" |
| "sigs.k8s.io/controller-runtime/pkg/predicate" |
| ) |
| |
| var log = logf.KBLog.WithName("source") |
| |
| const ( |
| // defaultBufferSize is the default number of event notifications that can be buffered. |
| defaultBufferSize = 1024 |
| ) |
| |
| // Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) |
| // which should be processed by event.EventHandlers to enqueue reconcile.Requests. |
| // |
| // * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update). |
| // |
| // * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls). |
| // |
| // Users may build their own Source implementations. If their implementations implement any of the inject package |
| // interfaces, the dependencies will be injected by the Controller when Watch is called. |
| type Source interface { |
| // Start is internal and should be called only by the Controller to register an EventHandler with the Informer |
| // to enqueue reconcile.Requests. |
| Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error |
| } |
| |
| // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create) |
| type Kind struct { |
| // Type is the type of object to watch. e.g. &v1.Pod{} |
| Type runtime.Object |
| |
| // cache used to watch APIs |
| cache cache.Cache |
| } |
| |
| var _ Source = &Kind{} |
| |
| // Start is internal and should be called only by the Controller to register an EventHandler with the Informer |
| // to enqueue reconcile.Requests. |
| func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface, |
| prct ...predicate.Predicate) error { |
| |
| // Type should have been specified by the user. |
| if ks.Type == nil { |
| return fmt.Errorf("must specify Kind.Type") |
| } |
| |
| // cache should have been injected before Start was called |
| if ks.cache == nil { |
| return fmt.Errorf("must call CacheInto on Kind before calling Start") |
| } |
| |
| // Lookup the Informer from the Cache and add an EventHandler which populates the Queue |
| i, err := ks.cache.GetInformer(ks.Type) |
| if err != nil { |
| if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { |
| log.Error(err, "if %s is a CRD, should install it before calling Start", |
| kindMatchErr.GroupKind) |
| } |
| return err |
| } |
| i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) |
| return nil |
| } |
| |
| func (ks *Kind) String() string { |
| if ks.Type != nil && ks.Type.GetObjectKind() != nil { |
| return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String()) |
| } |
| return fmt.Sprintf("kind source: unknown GVK") |
| } |
| |
| var _ inject.Cache = &Kind{} |
| |
| // InjectCache is internal should be called only by the Controller. InjectCache is used to inject |
| // the Cache dependency initialized by the ControllerManager. |
| func (ks *Kind) InjectCache(c cache.Cache) error { |
| if ks.cache == nil { |
| ks.cache = c |
| } |
| return nil |
| } |
| |
| var _ Source = &Channel{} |
| |
| // Channel is used to provide a source of events originating outside the cluster |
| // (e.g. GitHub Webhook callback). Channel requires the user to wire the external |
| // source (eh.g. http handler) to write GenericEvents to the underlying channel. |
| type Channel struct { |
| // once ensures the event distribution goroutine will be performed only once |
| once sync.Once |
| |
| // Source is the source channel to fetch GenericEvents |
| Source <-chan event.GenericEvent |
| |
| // stop is to end ongoing goroutine, and close the channels |
| stop <-chan struct{} |
| |
| // dest is the destination channels of the added event handlers |
| dest []chan event.GenericEvent |
| |
| // DestBufferSize is the specified buffer size of dest channels. |
| // Default to 1024 if not specified. |
| DestBufferSize int |
| |
| // destLock is to ensure the destination channels are safely added/removed |
| destLock sync.Mutex |
| } |
| |
| func (cs *Channel) String() string { |
| return fmt.Sprintf("channel source: %p", cs) |
| } |
| |
| var _ inject.Stoppable = &Channel{} |
| |
| // InjectStopChannel is internal should be called only by the Controller. |
| // It is used to inject the stop channel initialized by the ControllerManager. |
| func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error { |
| if cs.stop == nil { |
| cs.stop = stop |
| } |
| |
| return nil |
| } |
| |
| // Start implements Source and should only be called by the Controller. |
| func (cs *Channel) Start( |
| handler handler.EventHandler, |
| queue workqueue.RateLimitingInterface, |
| prct ...predicate.Predicate) error { |
| // Source should have been specified by the user. |
| if cs.Source == nil { |
| return fmt.Errorf("must specify Channel.Source") |
| } |
| |
| // stop should have been injected before Start was called |
| if cs.stop == nil { |
| return fmt.Errorf("must call InjectStop on Channel before calling Start") |
| } |
| |
| // use default value if DestBufferSize not specified |
| if cs.DestBufferSize == 0 { |
| cs.DestBufferSize = defaultBufferSize |
| } |
| |
| cs.once.Do(func() { |
| // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source |
| go cs.syncLoop() |
| }) |
| |
| dst := make(chan event.GenericEvent, cs.DestBufferSize) |
| go func() { |
| for evt := range dst { |
| shouldHandle := true |
| for _, p := range prct { |
| if !p.Generic(evt) { |
| shouldHandle = false |
| break |
| } |
| } |
| |
| if shouldHandle { |
| handler.Generic(evt, queue) |
| } |
| } |
| }() |
| |
| cs.destLock.Lock() |
| defer cs.destLock.Unlock() |
| |
| cs.dest = append(cs.dest, dst) |
| |
| return nil |
| } |
| |
| func (cs *Channel) doStop() { |
| cs.destLock.Lock() |
| defer cs.destLock.Unlock() |
| |
| for _, dst := range cs.dest { |
| close(dst) |
| } |
| } |
| |
| func (cs *Channel) distribute(evt event.GenericEvent) { |
| cs.destLock.Lock() |
| defer cs.destLock.Unlock() |
| |
| for _, dst := range cs.dest { |
| // We cannot make it under goroutine here, or we'll meet the |
| // race condition of writing message to closed channels. |
| // To avoid blocking, the dest channels are expected to be of |
| // proper buffer size. If we still see it blocked, then |
| // the controller is thought to be in an abnormal state. |
| dst <- evt |
| } |
| } |
| |
| func (cs *Channel) syncLoop() { |
| for { |
| select { |
| case <-cs.stop: |
| // Close destination channels |
| cs.doStop() |
| return |
| case evt := <-cs.Source: |
| cs.distribute(evt) |
| } |
| } |
| } |
| |
| // Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create) |
| type Informer struct { |
| // Informer is the generated client-go Informer |
| Informer toolscache.SharedIndexInformer |
| } |
| |
| var _ Source = &Informer{} |
| |
| // Start is internal and should be called only by the Controller to register an EventHandler with the Informer |
| // to enqueue reconcile.Requests. |
| func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface, |
| prct ...predicate.Predicate) error { |
| |
| // Informer should have been specified by the user. |
| if is.Informer == nil { |
| return fmt.Errorf("must specify Informer.Informer") |
| } |
| |
| is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) |
| return nil |
| } |
| |
| func (is *Informer) String() string { |
| return fmt.Sprintf("informer source: %p", is.Informer) |
| } |
| |
| // Func is a function that implements Source |
| type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error |
| |
| // Start implements Source |
| func (f Func) Start(evt handler.EventHandler, queue workqueue.RateLimitingInterface, |
| pr ...predicate.Predicate) error { |
| return f(evt, queue, pr...) |
| } |
| |
| func (f Func) String() string { |
| return fmt.Sprintf("func source: %p", f) |
| } |