| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package etcd3 |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/coreos/etcd/clientv3" |
| "github.com/coreos/etcd/integration" |
| |
| apitesting "k8s.io/apimachinery/pkg/api/apitesting" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/fields" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/watch" |
| "k8s.io/apiserver/pkg/apis/example" |
| examplev1 "k8s.io/apiserver/pkg/apis/example/v1" |
| "k8s.io/apiserver/pkg/storage" |
| ) |
| |
| func TestWatch(t *testing.T) { |
| testWatch(t, false) |
| } |
| |
| func TestWatchList(t *testing.T) { |
| testWatch(t, true) |
| } |
| |
| // It tests that |
| // - first occurrence of objects should notify Add event |
| // - update should trigger Modified event |
| // - update that gets filtered should trigger Deleted event |
| func testWatch(t *testing.T, recursive bool) { |
| ctx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} |
| podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} |
| |
| tests := []struct { |
| key string |
| pred storage.SelectionPredicate |
| watchTests []*testWatchStruct |
| }{{ // create a key |
| key: "/somekey-1", |
| watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, |
| pred: storage.Everything, |
| }, { // create a key but obj gets filtered. Then update it with unfiltered obj |
| key: "/somekey-3", |
| watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}}, |
| pred: storage.SelectionPredicate{ |
| Label: labels.Everything(), |
| Field: fields.ParseSelectorOrDie("metadata.name=bar"), |
| GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { |
| pod := obj.(*example.Pod) |
| return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil |
| }, |
| }, |
| }, { // update |
| key: "/somekey-4", |
| watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, |
| pred: storage.Everything, |
| }, { // delete because of being filtered |
| key: "/somekey-5", |
| watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, |
| pred: storage.SelectionPredicate{ |
| Label: labels.Everything(), |
| Field: fields.ParseSelectorOrDie("metadata.name!=bar"), |
| GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { |
| pod := obj.(*example.Pod) |
| return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil |
| }, |
| }, |
| }} |
| for i, tt := range tests { |
| w, err := store.watch(ctx, tt.key, "0", tt.pred, recursive) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| var prevObj *example.Pod |
| for _, watchTest := range tt.watchTests { |
| out := &example.Pod{} |
| key := tt.key |
| if recursive { |
| key = key + "/item" |
| } |
| err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( |
| func(runtime.Object) (runtime.Object, error) { |
| return watchTest.obj, nil |
| })) |
| if err != nil { |
| t.Fatalf("GuaranteedUpdate failed: %v", err) |
| } |
| if watchTest.expectEvent { |
| expectObj := out |
| if watchTest.watchType == watch.Deleted { |
| expectObj = prevObj |
| expectObj.ResourceVersion = out.ResourceVersion |
| } |
| testCheckResult(t, i, watchTest.watchType, w, expectObj) |
| } |
| prevObj = out |
| } |
| w.Stop() |
| testCheckStop(t, i, w) |
| } |
| } |
| |
| func TestDeleteTriggerWatch(t *testing.T) { |
| ctx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) |
| w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| if err := store.Delete(ctx, key, &example.Pod{}, nil); err != nil { |
| t.Fatalf("Delete failed: %v", err) |
| } |
| testCheckEventType(t, watch.Deleted, w) |
| } |
| |
| // TestWatchFromZero tests that |
| // - watch from 0 should sync up and grab the object added before |
| // - watch from 0 is able to return events for objects whose previous version has been compacted |
| func TestWatchFromZero(t *testing.T) { |
| ctx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) |
| |
| w, err := store.Watch(ctx, key, "0", storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| testCheckResult(t, 0, watch.Added, w, storedObj) |
| w.Stop() |
| |
| // Update |
| out := &example.Pod{} |
| err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( |
| func(runtime.Object) (runtime.Object, error) { |
| return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil |
| })) |
| if err != nil { |
| t.Fatalf("GuaranteedUpdate failed: %v", err) |
| } |
| |
| // Make sure when we watch from 0 we receive an ADDED event |
| w, err = store.Watch(ctx, key, "0", storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| testCheckResult(t, 1, watch.Added, w, out) |
| w.Stop() |
| |
| // Update again |
| out = &example.Pod{} |
| err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( |
| func(runtime.Object) (runtime.Object, error) { |
| return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil |
| })) |
| if err != nil { |
| t.Fatalf("GuaranteedUpdate failed: %v", err) |
| } |
| |
| // Compact previous versions |
| revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion) |
| if err != nil { |
| t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) |
| } |
| _, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) |
| if err != nil { |
| t.Fatalf("Error compacting: %v", err) |
| } |
| |
| // Make sure we can still watch from 0 and receive an ADDED event |
| w, err = store.Watch(ctx, key, "0", storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| testCheckResult(t, 2, watch.Added, w, out) |
| } |
| |
| // TestWatchFromNoneZero tests that |
| // - watch from non-0 should just watch changes after given version |
| func TestWatchFromNoneZero(t *testing.T) { |
| ctx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) |
| |
| w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| out := &example.Pod{} |
| store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( |
| func(runtime.Object) (runtime.Object, error) { |
| return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err |
| })) |
| testCheckResult(t, 0, watch.Modified, w, out) |
| } |
| |
| func TestWatchError(t *testing.T) { |
| codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} |
| cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) |
| defer cluster.Terminate(t) |
| invalidStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")}) |
| ctx := context.Background() |
| w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| validStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")}) |
| validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( |
| func(runtime.Object) (runtime.Object, error) { |
| return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil |
| })) |
| testCheckEventType(t, watch.Error, w) |
| } |
| |
| func TestWatchContextCancel(t *testing.T) { |
| ctx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| canceledCtx, cancel := context.WithCancel(ctx) |
| cancel() |
| // When we watch with a canceled context, we should detect that it's context canceled. |
| // We won't take it as error and also close the watcher. |
| w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| select { |
| case _, ok := <-w.ResultChan(): |
| if ok { |
| t.Error("ResultChan() should be closed") |
| } |
| case <-time.After(wait.ForeverTestTimeout): |
| t.Errorf("timeout after %v", wait.ForeverTestTimeout) |
| } |
| } |
| |
| func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { |
| origCtx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| ctx, cancel := context.WithCancel(origCtx) |
| w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) |
| // make resutlChan and errChan blocking to ensure ordering. |
| w.resultChan = make(chan watch.Event) |
| w.errChan = make(chan error) |
| // The event flow goes like: |
| // - first we send an error, it should block on resultChan. |
| // - Then we cancel ctx. The blocking on resultChan should be freed up |
| // and run() goroutine should return. |
| var wg sync.WaitGroup |
| wg.Add(1) |
| go func() { |
| w.run() |
| wg.Done() |
| }() |
| w.errChan <- fmt.Errorf("some error") |
| cancel() |
| wg.Wait() |
| } |
| |
| func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { |
| ctx, store, cluster := testSetup(t) |
| defer cluster.Terminate(t) |
| key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) |
| |
| w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) |
| if err != nil { |
| t.Fatalf("Watch failed: %v", err) |
| } |
| etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) |
| |
| if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}); err != nil { |
| t.Fatalf("Delete failed: %v", err) |
| } |
| |
| e := <-w.ResultChan() |
| watchedDeleteObj := e.Object.(*example.Pod) |
| var wres clientv3.WatchResponse |
| wres = <-etcdW |
| |
| watchedDeleteRev, err := store.versioner.ParseResourceVersion(watchedDeleteObj.ResourceVersion) |
| if err != nil { |
| t.Fatalf("ParseWatchResourceVersion failed: %v", err) |
| } |
| if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { |
| t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", |
| watchedDeleteRev, wres.Events[0].Kv.ModRevision) |
| } |
| } |
| |
| type testWatchStruct struct { |
| obj *example.Pod |
| expectEvent bool |
| watchType watch.EventType |
| } |
| |
| type testCodec struct { |
| runtime.Codec |
| } |
| |
| func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { |
| return nil, nil, errTestingDecode |
| } |
| |
| func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) { |
| select { |
| case res := <-w.ResultChan(): |
| if res.Type != expectEventType { |
| t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) |
| } |
| case <-time.After(wait.ForeverTestTimeout): |
| t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) |
| } |
| } |
| |
| func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { |
| select { |
| case res := <-w.ResultChan(): |
| if res.Type != expectEventType { |
| t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type) |
| return |
| } |
| if !reflect.DeepEqual(expectObj, res.Object) { |
| t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object) |
| } |
| case <-time.After(wait.ForeverTestTimeout): |
| t.Errorf("#%d: time out after waiting %v on ResultChan", i, wait.ForeverTestTimeout) |
| } |
| } |
| |
| func testCheckStop(t *testing.T, i int, w watch.Interface) { |
| select { |
| case e, ok := <-w.ResultChan(): |
| if ok { |
| var obj string |
| switch e.Object.(type) { |
| case *example.Pod: |
| obj = e.Object.(*example.Pod).Name |
| case *metav1.Status: |
| obj = e.Object.(*metav1.Status).Message |
| } |
| t.Errorf("#%d: ResultChan should have been closed. Event: %s. Object: %s", i, e.Type, obj) |
| } |
| case <-time.After(wait.ForeverTestTimeout): |
| t.Errorf("#%d: time out after waiting 1s on ResultChan", i) |
| } |
| } |