| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package watch |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "net/http" |
| "time" |
| |
| "github.com/davecgh/go-spew/spew" |
| |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/util/net" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/watch" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/klog" |
| ) |
| |
| // resourceVersionGetter is an interface used to get resource version from events. |
| // We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method |
| type resourceVersionGetter interface { |
| GetResourceVersion() string |
| } |
| |
| // RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) |
| // it will get restarted from the last point without the consumer even knowing about it. |
| // RetryWatcher does that by inspecting events and keeping track of resourceVersion. |
| // Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. |
| // Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to |
| // use Informers for that. |
| type RetryWatcher struct { |
| lastResourceVersion string |
| watcherClient cache.Watcher |
| resultChan chan watch.Event |
| stopChan chan struct{} |
| doneChan chan struct{} |
| minRestartDelay time.Duration |
| } |
| |
| // NewRetryWatcher creates a new RetryWatcher. |
| // It will make sure that watches gets restarted in case of recoverable errors. |
| // The initialResourceVersion will be given to watch method when first called. |
| func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { |
| return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) |
| } |
| |
| func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { |
| switch initialResourceVersion { |
| case "", "0": |
| // TODO: revisit this if we ever get WATCH v2 where it means start "now" |
| // without doing the synthetic list of objects at the beginning (see #74022) |
| return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) |
| default: |
| break |
| } |
| |
| rw := &RetryWatcher{ |
| lastResourceVersion: initialResourceVersion, |
| watcherClient: watcherClient, |
| stopChan: make(chan struct{}), |
| doneChan: make(chan struct{}), |
| resultChan: make(chan watch.Event, 0), |
| minRestartDelay: minRestartDelay, |
| } |
| |
| go rw.receive() |
| return rw, nil |
| } |
| |
| func (rw *RetryWatcher) send(event watch.Event) bool { |
| // Writing to an unbuffered channel is blocking operation |
| // and we need to check if stop wasn't requested while doing so. |
| select { |
| case rw.resultChan <- event: |
| return true |
| case <-rw.stopChan: |
| return false |
| } |
| } |
| |
| // doReceive returns true when it is done, false otherwise. |
| // If it is not done the second return value holds the time to wait before calling it again. |
| func (rw *RetryWatcher) doReceive() (bool, time.Duration) { |
| watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ |
| ResourceVersion: rw.lastResourceVersion, |
| }) |
| // We are very unlikely to hit EOF here since we are just establishing the call, |
| // but it may happen that the apiserver is just shutting down (e.g. being restarted) |
| // This is consistent with how it is handled for informers |
| switch err { |
| case nil: |
| break |
| |
| case io.EOF: |
| // watch closed normally |
| return false, 0 |
| |
| case io.ErrUnexpectedEOF: |
| klog.V(1).Infof("Watch closed with unexpected EOF: %v", err) |
| return false, 0 |
| |
| default: |
| msg := "Watch failed: %v" |
| if net.IsProbableEOF(err) { |
| klog.V(5).Infof(msg, err) |
| // Retry |
| return false, 0 |
| } |
| |
| klog.Errorf(msg, err) |
| // Retry |
| return false, 0 |
| } |
| |
| if watcher == nil { |
| klog.Error("Watch returned nil watcher") |
| // Retry |
| return false, 0 |
| } |
| |
| ch := watcher.ResultChan() |
| defer watcher.Stop() |
| |
| for { |
| select { |
| case <-rw.stopChan: |
| klog.V(4).Info("Stopping RetryWatcher.") |
| return true, 0 |
| case event, ok := <-ch: |
| if !ok { |
| klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion) |
| return false, 0 |
| } |
| |
| // We need to inspect the event and get ResourceVersion out of it |
| switch event.Type { |
| case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: |
| metaObject, ok := event.Object.(resourceVersionGetter) |
| if !ok { |
| _ = rw.send(watch.Event{ |
| Type: watch.Error, |
| Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, |
| }) |
| // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! |
| return true, 0 |
| } |
| |
| resourceVersion := metaObject.GetResourceVersion() |
| if resourceVersion == "" { |
| _ = rw.send(watch.Event{ |
| Type: watch.Error, |
| Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, |
| }) |
| // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! |
| return true, 0 |
| } |
| |
| // All is fine; send the event and update lastResourceVersion |
| ok = rw.send(event) |
| if !ok { |
| return true, 0 |
| } |
| rw.lastResourceVersion = resourceVersion |
| |
| continue |
| |
| case watch.Error: |
| // This round trip allows us to handle unstructured status |
| errObject := apierrors.FromObject(event.Object) |
| statusErr, ok := errObject.(*apierrors.StatusError) |
| if !ok { |
| klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object)) |
| // Retry unknown errors |
| return false, 0 |
| } |
| |
| status := statusErr.ErrStatus |
| |
| statusDelay := time.Duration(0) |
| if status.Details != nil { |
| statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second |
| } |
| |
| switch status.Code { |
| case http.StatusGone: |
| // Never retry RV too old errors |
| _ = rw.send(event) |
| return true, 0 |
| |
| case http.StatusGatewayTimeout, http.StatusInternalServerError: |
| // Retry |
| return false, statusDelay |
| |
| default: |
| // We retry by default. RetryWatcher is meant to proceed unless it is certain |
| // that it can't. If we are not certain, we proceed with retry and leave it |
| // up to the user to timeout if needed. |
| |
| // Log here so we have a record of hitting the unexpected error |
| // and we can whitelist some error codes if we missed any that are expected. |
| klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object)) |
| |
| // Retry |
| return false, statusDelay |
| } |
| |
| default: |
| klog.Errorf("Failed to recognize Event type %q", event.Type) |
| _ = rw.send(watch.Event{ |
| Type: watch.Error, |
| Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, |
| }) |
| // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! |
| return true, 0 |
| } |
| } |
| } |
| } |
| |
| // receive reads the result from a watcher, restarting it if necessary. |
| func (rw *RetryWatcher) receive() { |
| defer close(rw.doneChan) |
| defer close(rw.resultChan) |
| |
| klog.V(4).Info("Starting RetryWatcher.") |
| defer klog.V(4).Info("Stopping RetryWatcher.") |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| go func() { |
| select { |
| case <-rw.stopChan: |
| cancel() |
| return |
| case <-ctx.Done(): |
| return |
| } |
| }() |
| |
| // We use non sliding until so we don't introduce delays on happy path when WATCH call |
| // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. |
| wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { |
| done, retryAfter := rw.doReceive() |
| if done { |
| cancel() |
| return |
| } |
| |
| time.Sleep(retryAfter) |
| |
| klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) |
| }, rw.minRestartDelay) |
| } |
| |
| // ResultChan implements Interface. |
| func (rw *RetryWatcher) ResultChan() <-chan watch.Event { |
| return rw.resultChan |
| } |
| |
| // Stop implements Interface. |
| func (rw *RetryWatcher) Stop() { |
| close(rw.stopChan) |
| } |
| |
| // Done allows the caller to be notified when Retry watcher stops. |
| func (rw *RetryWatcher) Done() <-chan struct{} { |
| return rw.doneChan |
| } |