| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package cacher |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "strconv" |
| "sync" |
| "testing" |
| "time" |
| |
| "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/fields" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/runtime/serializer" |
| "k8s.io/apimachinery/pkg/util/diff" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/watch" |
| "k8s.io/apiserver/pkg/apis/example" |
| examplev1 "k8s.io/apiserver/pkg/apis/example/v1" |
| "k8s.io/apiserver/pkg/storage" |
| ) |
| |
| // verifies the cacheWatcher.process goroutine is properly cleaned up even if |
| // the writes to cacheWatcher.result channel is blocked. |
| func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { |
| var lock sync.RWMutex |
| count := 0 |
| filter := func(string, labels.Set, fields.Set, bool) bool { return true } |
| forget := func(bool) { |
| lock.Lock() |
| defer lock.Unlock() |
| count++ |
| } |
| initEvents := []*watchCacheEvent{ |
| {Object: &v1.Pod{}}, |
| {Object: &v1.Pod{}}, |
| } |
| // set the size of the buffer of w.result to 0, so that the writes to |
| // w.result is blocked. |
| w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{}) |
| w.Stop() |
| if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { |
| lock.RLock() |
| defer lock.RUnlock() |
| return count == 2, nil |
| }); err != nil { |
| t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) |
| } |
| } |
| |
| func TestCacheWatcherHandlesFiltering(t *testing.T) { |
| filter := func(_ string, _ labels.Set, field fields.Set, _ bool) bool { |
| return field["spec.nodeName"] == "host" |
| } |
| forget := func(bool) {} |
| |
| testCases := []struct { |
| events []*watchCacheEvent |
| expected []watch.Event |
| }{ |
| // properly handle starting with the filter, then being deleted, then re-added |
| { |
| events: []*watchCacheEvent{ |
| { |
| Type: watch.Added, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, |
| ObjFields: fields.Set{"spec.nodeName": "host"}, |
| ResourceVersion: 1, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": "host"}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, |
| ObjFields: fields.Set{"spec.nodeName": ""}, |
| ResourceVersion: 2, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": ""}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, |
| ObjFields: fields.Set{"spec.nodeName": "host"}, |
| ResourceVersion: 3, |
| }, |
| }, |
| expected: []watch.Event{ |
| {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, |
| {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, |
| {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, |
| }, |
| }, |
| // properly handle ignoring changes prior to the filter, then getting added, then deleted |
| { |
| events: []*watchCacheEvent{ |
| { |
| Type: watch.Added, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, |
| ObjFields: fields.Set{"spec.nodeName": ""}, |
| ResourceVersion: 1, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": ""}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, |
| ObjFields: fields.Set{"spec.nodeName": ""}, |
| ResourceVersion: 2, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": ""}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, |
| ObjFields: fields.Set{"spec.nodeName": "host"}, |
| ResourceVersion: 3, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": "host"}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, |
| ObjFields: fields.Set{"spec.nodeName": "host"}, |
| ResourceVersion: 4, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": "host"}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, |
| ObjFields: fields.Set{"spec.nodeName": ""}, |
| ResourceVersion: 5, |
| }, |
| { |
| Type: watch.Modified, |
| PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, |
| PrevObjFields: fields.Set{"spec.nodeName": ""}, |
| Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, |
| ObjFields: fields.Set{"spec.nodeName": ""}, |
| ResourceVersion: 6, |
| }, |
| }, |
| expected: []watch.Event{ |
| {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, |
| {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, |
| {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}}, |
| }, |
| }, |
| } |
| |
| TestCase: |
| for i, testCase := range testCases { |
| // set the size of the buffer of w.result to 0, so that the writes to |
| // w.result is blocked. |
| for j := range testCase.events { |
| testCase.events[j].ResourceVersion = uint64(j) + 1 |
| } |
| w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{}) |
| ch := w.ResultChan() |
| for j, event := range testCase.expected { |
| e := <-ch |
| if !reflect.DeepEqual(event, e) { |
| t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e)) |
| break TestCase |
| } |
| } |
| select { |
| case obj, ok := <-ch: |
| t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok) |
| break TestCase |
| default: |
| } |
| w.Stop() |
| } |
| } |
| |
| type testVersioner struct{} |
| |
| func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { |
| return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) |
| } |
| func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error { |
| listAccessor, err := meta.ListAccessor(obj) |
| if err != nil || listAccessor == nil { |
| return err |
| } |
| listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10)) |
| listAccessor.SetContinue(continueValue) |
| return nil |
| } |
| func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { |
| return fmt.Errorf("unimplemented") |
| } |
| func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { |
| return 0, fmt.Errorf("unimplemented") |
| } |
| func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) { |
| return strconv.ParseUint(resourceVersion, 10, 64) |
| } |
| |
| var ( |
| scheme = runtime.NewScheme() |
| codecs = serializer.NewCodecFactory(scheme) |
| errDummy = fmt.Errorf("dummy error") |
| ) |
| |
| func init() { |
| metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) |
| utilruntime.Must(example.AddToScheme(scheme)) |
| utilruntime.Must(examplev1.AddToScheme(scheme)) |
| } |
| |
| func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) { |
| prefix := "pods" |
| config := Config{ |
| CacheCapacity: cap, |
| Storage: s, |
| Versioner: testVersioner{}, |
| Type: &example.Pod{}, |
| ResourcePrefix: prefix, |
| KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, |
| GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { return nil, nil, true, nil }, |
| NewListFunc: func() runtime.Object { return &example.PodList{} }, |
| Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), |
| } |
| return NewCacherFromConfig(config), testVersioner{} |
| } |
| |
| type dummyStorage struct { |
| err error |
| } |
| |
| type dummyWatch struct { |
| ch chan watch.Event |
| } |
| |
| func (w *dummyWatch) ResultChan() <-chan watch.Event { |
| return w.ch |
| } |
| |
| func (w *dummyWatch) Stop() { |
| close(w.ch) |
| } |
| |
| func newDummyWatch() watch.Interface { |
| return &dummyWatch{ |
| ch: make(chan watch.Event), |
| } |
| } |
| |
| func (d *dummyStorage) Versioner() storage.Versioner { return nil } |
| func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, _ uint64) error { |
| return fmt.Errorf("unimplemented") |
| } |
| func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions) error { |
| return fmt.Errorf("unimplemented") |
| } |
| func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { |
| return newDummyWatch(), nil |
| } |
| func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { |
| return newDummyWatch(), nil |
| } |
| func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error { |
| return fmt.Errorf("unimplemented") |
| } |
| func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error { |
| return d.err |
| } |
| func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error { |
| podList := listObj.(*example.PodList) |
| podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} |
| return d.err |
| } |
| func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ ...runtime.Object) error { |
| return fmt.Errorf("unimplemented") |
| } |
| func (d *dummyStorage) Count(_ string) (int64, error) { |
| return 0, fmt.Errorf("unimplemented") |
| } |
| |
| func TestListWithLimitAndRV0(t *testing.T) { |
| backingStorage := &dummyStorage{} |
| cacher, _ := newTestCacher(backingStorage, 0) |
| defer cacher.Stop() |
| |
| pred := storage.SelectionPredicate{ |
| Limit: 500, |
| } |
| result := &example.PodList{} |
| |
| // Wait until cacher is initialized. |
| cacher.ready.wait() |
| |
| // Inject error to underlying layer and check if cacher is not bypassed. |
| backingStorage.err = errDummy |
| err := cacher.List(context.TODO(), "pods/ns", "0", pred, result) |
| if err != nil { |
| t.Errorf("List with Limit and RV=0 should be served from cache: %v", err) |
| } |
| |
| err = cacher.List(context.TODO(), "pods/ns", "", pred, result) |
| if err != errDummy { |
| t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) |
| } |
| } |
| |
| func TestGetToListWithLimitAndRV0(t *testing.T) { |
| backingStorage := &dummyStorage{} |
| cacher, _ := newTestCacher(backingStorage, 0) |
| defer cacher.Stop() |
| |
| pred := storage.SelectionPredicate{ |
| Limit: 500, |
| } |
| result := &example.PodList{} |
| |
| // Wait until cacher is initialized. |
| cacher.ready.wait() |
| |
| // Inject error to underlying layer and check if cacher is not bypassed. |
| backingStorage.err = errDummy |
| err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) |
| if err != nil { |
| t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err) |
| } |
| |
| err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result) |
| if err != errDummy { |
| t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) |
| } |
| } |