| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package cache |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "math/rand" |
| "net" |
| "net/url" |
| "reflect" |
| "strconv" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "syscall" |
| "time" |
| |
| apierrs "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/util/clock" |
| "k8s.io/apimachinery/pkg/util/naming" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/watch" |
| "k8s.io/klog" |
| "k8s.io/utils/trace" |
| ) |
| |
| // Reflector watches a specified resource and causes all changes to be reflected in the given store. |
| type Reflector struct { |
| // name identifies this reflector. By default it will be a file:line if possible. |
| name string |
| // metrics tracks basic metric information about the reflector |
| metrics *reflectorMetrics |
| |
| // The type of object we expect to place in the store. |
| expectedType reflect.Type |
| // The destination to sync up with the watch source |
| store Store |
| // listerWatcher is used to perform lists and watches. |
| listerWatcher ListerWatcher |
| // period controls timing between one watch ending and |
| // the beginning of the next one. |
| period time.Duration |
| resyncPeriod time.Duration |
| ShouldResync func() bool |
| // clock allows tests to manipulate time |
| clock clock.Clock |
| // lastSyncResourceVersion is the resource version token last |
| // observed when doing a sync with the underlying store |
| // it is thread safe, but not synchronized with the underlying store |
| lastSyncResourceVersion string |
| // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion |
| lastSyncResourceVersionMutex sync.RWMutex |
| } |
| |
| var ( |
| // We try to spread the load on apiserver by setting timeouts for |
| // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. |
| minWatchTimeout = 5 * time.Minute |
| ) |
| |
| // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector |
| // The indexer is configured to key on namespace |
| func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { |
| indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc}) |
| reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) |
| return indexer, reflector |
| } |
| |
| // NewReflector creates a new Reflector object which will keep the given store up to |
| // date with the server's contents for the given resource. Reflector promises to |
| // only put things in the store that have the type of expectedType, unless expectedType |
| // is nil. If resyncPeriod is non-zero, then lists will be executed after every |
| // resyncPeriod, so that you can use reflectors to periodically process everything as |
| // well as incrementally processing the things that change. |
| func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { |
| return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) |
| } |
| |
| // reflectorDisambiguator is used to disambiguate started reflectors. |
| // initialized to an unstable value to ensure meaning isn't attributed to the suffix. |
| var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345) |
| |
| // NewNamedReflector same as NewReflector, but with a specified name for logging |
| func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { |
| reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) |
| r := &Reflector{ |
| name: name, |
| // we need this to be unique per process (some names are still the same) but obvious who it belongs to |
| metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))), |
| listerWatcher: lw, |
| store: store, |
| expectedType: reflect.TypeOf(expectedType), |
| period: time.Second, |
| resyncPeriod: resyncPeriod, |
| clock: &clock.RealClock{}, |
| } |
| return r |
| } |
| |
| func makeValidPrometheusMetricLabel(in string) string { |
| // this isn't perfect, but it removes our common characters |
| return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in) |
| } |
| |
| // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common |
| // call chains to NewReflector, so they'd be low entropy names for reflectors |
| var internalPackages = []string{"client-go/tools/cache/"} |
| |
| // Run starts a watch and handles watch events. Will restart the watch if it is closed. |
| // Run will exit when stopCh is closed. |
| func (r *Reflector) Run(stopCh <-chan struct{}) { |
| klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) |
| wait.Until(func() { |
| if err := r.ListAndWatch(stopCh); err != nil { |
| utilruntime.HandleError(err) |
| } |
| }, r.period, stopCh) |
| } |
| |
| var ( |
| // nothing will ever be sent down this channel |
| neverExitWatch <-chan time.Time = make(chan time.Time) |
| |
| // Used to indicate that watching stopped so that a resync could happen. |
| errorResyncRequested = errors.New("resync channel fired") |
| |
| // Used to indicate that watching stopped because of a signal from the stop |
| // channel passed in from a client of the reflector. |
| errorStopRequested = errors.New("Stop requested") |
| ) |
| |
| // resyncChan returns a channel which will receive something when a resync is |
| // required, and a cleanup function. |
| func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { |
| if r.resyncPeriod == 0 { |
| return neverExitWatch, func() bool { return false } |
| } |
| // The cleanup function is required: imagine the scenario where watches |
| // always fail so we end up listing frequently. Then, if we don't |
| // manually stop the timer, we could end up with many timers active |
| // concurrently. |
| t := r.clock.NewTimer(r.resyncPeriod) |
| return t.C(), t.Stop |
| } |
| |
| // ListAndWatch first lists all items and get the resource version at the moment of call, |
| // and then use the resource version to watch. |
| // It returns error if ListAndWatch didn't even try to initialize watch. |
| func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { |
| klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) |
| var resourceVersion string |
| |
| // Explicitly set "0" as resource version - it's fine for the List() |
| // to be served from cache and potentially be delayed relative to |
| // etcd contents. Reflector framework will catch up via Watch() eventually. |
| options := metav1.ListOptions{ResourceVersion: "0"} |
| r.metrics.numberOfLists.Inc() |
| start := r.clock.Now() |
| |
| if err := func() error { |
| initTrace := trace.New("Reflector " + r.name + " ListAndWatch") |
| defer initTrace.LogIfLong(10 * time.Second) |
| var list runtime.Object |
| var err error |
| listCh := make(chan struct{}, 1) |
| panicCh := make(chan interface{}, 1) |
| go func() { |
| defer func() { |
| if r := recover(); r != nil { |
| panicCh <- r |
| } |
| }() |
| list, err = r.listerWatcher.List(options) |
| close(listCh) |
| }() |
| select { |
| case <-stopCh: |
| return nil |
| case r := <-panicCh: |
| panic(r) |
| case <-listCh: |
| } |
| if err != nil { |
| return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) |
| } |
| initTrace.Step("Objects listed") |
| r.metrics.listDuration.Observe(time.Since(start).Seconds()) |
| listMetaInterface, err := meta.ListAccessor(list) |
| if err != nil { |
| return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) |
| } |
| resourceVersion = listMetaInterface.GetResourceVersion() |
| initTrace.Step("Resource version extracted") |
| items, err := meta.ExtractList(list) |
| if err != nil { |
| return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) |
| } |
| initTrace.Step("Objects extracted") |
| r.metrics.numberOfItemsInList.Observe(float64(len(items))) |
| if err := r.syncWith(items, resourceVersion); err != nil { |
| return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) |
| } |
| initTrace.Step("SyncWith done") |
| r.setLastSyncResourceVersion(resourceVersion) |
| initTrace.Step("Resource version updated") |
| return nil |
| }(); err != nil { |
| return err |
| } |
| |
| resyncerrc := make(chan error, 1) |
| cancelCh := make(chan struct{}) |
| defer close(cancelCh) |
| go func() { |
| resyncCh, cleanup := r.resyncChan() |
| defer func() { |
| cleanup() // Call the last one written into cleanup |
| }() |
| for { |
| select { |
| case <-resyncCh: |
| case <-stopCh: |
| return |
| case <-cancelCh: |
| return |
| } |
| if r.ShouldResync == nil || r.ShouldResync() { |
| klog.V(4).Infof("%s: forcing resync", r.name) |
| if err := r.store.Resync(); err != nil { |
| resyncerrc <- err |
| return |
| } |
| } |
| cleanup() |
| resyncCh, cleanup = r.resyncChan() |
| } |
| }() |
| |
| for { |
| // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors |
| select { |
| case <-stopCh: |
| return nil |
| default: |
| } |
| |
| timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) |
| options = metav1.ListOptions{ |
| ResourceVersion: resourceVersion, |
| // We want to avoid situations of hanging watchers. Stop any wachers that do not |
| // receive any events within the timeout window. |
| TimeoutSeconds: &timeoutSeconds, |
| } |
| |
| r.metrics.numberOfWatches.Inc() |
| w, err := r.listerWatcher.Watch(options) |
| if err != nil { |
| switch err { |
| case io.EOF: |
| // watch closed normally |
| case io.ErrUnexpectedEOF: |
| klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) |
| default: |
| utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) |
| } |
| // If this is "connection refused" error, it means that most likely apiserver is not responsive. |
| // It doesn't make sense to re-list all objects because most likely we will be able to restart |
| // watch where we ended. |
| // If that's the case wait and resend watch request. |
| if urlError, ok := err.(*url.Error); ok { |
| if opError, ok := urlError.Err.(*net.OpError); ok { |
| if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { |
| time.Sleep(time.Second) |
| continue |
| } |
| } |
| } |
| return nil |
| } |
| |
| if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { |
| if err != errorStopRequested { |
| klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) |
| } |
| return nil |
| } |
| } |
| } |
| |
| // syncWith replaces the store's items with the given list. |
| func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { |
| found := make([]interface{}, 0, len(items)) |
| for _, item := range items { |
| found = append(found, item) |
| } |
| return r.store.Replace(found, resourceVersion) |
| } |
| |
| // watchHandler watches w and keeps *resourceVersion up to date. |
| func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { |
| start := r.clock.Now() |
| eventCount := 0 |
| |
| // Stopping the watcher should be idempotent and if we return from this function there's no way |
| // we're coming back in with the same watch interface. |
| defer w.Stop() |
| // update metrics |
| defer func() { |
| r.metrics.numberOfItemsInWatch.Observe(float64(eventCount)) |
| r.metrics.watchDuration.Observe(time.Since(start).Seconds()) |
| }() |
| |
| loop: |
| for { |
| select { |
| case <-stopCh: |
| return errorStopRequested |
| case err := <-errc: |
| return err |
| case event, ok := <-w.ResultChan(): |
| if !ok { |
| break loop |
| } |
| if event.Type == watch.Error { |
| return apierrs.FromObject(event.Object) |
| } |
| if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { |
| utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) |
| continue |
| } |
| meta, err := meta.Accessor(event.Object) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) |
| continue |
| } |
| newResourceVersion := meta.GetResourceVersion() |
| switch event.Type { |
| case watch.Added: |
| err := r.store.Add(event.Object) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) |
| } |
| case watch.Modified: |
| err := r.store.Update(event.Object) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) |
| } |
| case watch.Deleted: |
| // TODO: Will any consumers need access to the "last known |
| // state", which is passed in event.Object? If so, may need |
| // to change this. |
| err := r.store.Delete(event.Object) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) |
| } |
| default: |
| utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) |
| } |
| *resourceVersion = newResourceVersion |
| r.setLastSyncResourceVersion(newResourceVersion) |
| eventCount++ |
| } |
| } |
| |
| watchDuration := r.clock.Now().Sub(start) |
| if watchDuration < 1*time.Second && eventCount == 0 { |
| r.metrics.numberOfShortWatches.Inc() |
| return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) |
| } |
| klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) |
| return nil |
| } |
| |
| // LastSyncResourceVersion is the resource version observed when last sync with the underlying store |
| // The value returned is not synchronized with access to the underlying store and is not thread-safe |
| func (r *Reflector) LastSyncResourceVersion() string { |
| r.lastSyncResourceVersionMutex.RLock() |
| defer r.lastSyncResourceVersionMutex.RUnlock() |
| return r.lastSyncResourceVersion |
| } |
| |
| func (r *Reflector) setLastSyncResourceVersion(v string) { |
| r.lastSyncResourceVersionMutex.Lock() |
| defer r.lastSyncResourceVersionMutex.Unlock() |
| r.lastSyncResourceVersion = v |
| |
| rv, err := strconv.Atoi(v) |
| if err == nil { |
| r.metrics.lastResourceVersion.Set(float64(rv)) |
| } |
| } |