blob: 780af4b2d10397e72ae562badc495272304be411 [file] [log] [blame]
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package cacher
import (
metav1 ""
utilruntime ""
examplev1 ""
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
// the writes to cacheWatcher.result channel is blocked.
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
var lock sync.RWMutex
count := 0
filter := func(string, labels.Set, fields.Set, bool) bool { return true }
forget := func(bool) {
defer lock.Unlock()
initEvents := []*watchCacheEvent{
{Object: &v1.Pod{}},
{Object: &v1.Pod{}},
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
defer lock.RUnlock()
return count == 2, nil
}); err != nil {
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
func TestCacheWatcherHandlesFiltering(t *testing.T) {
filter := func(_ string, _ labels.Set, field fields.Set, _ bool) bool {
return field["spec.nodeName"] == "host"
forget := func(bool) {}
testCases := []struct {
events []*watchCacheEvent
expected []watch.Event
// properly handle starting with the filter, then being deleted, then re-added
events: []*watchCacheEvent{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 1,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
// properly handle ignoring changes prior to the filter, then getting added, then deleted
events: []*watchCacheEvent{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 1,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 4,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 5,
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 6,
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}},
for i, testCase := range testCases {
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
for j := range {[j].ResourceVersion = uint64(j) + 1
w := newCacheWatcher(0, 0,, filter, forget, testVersioner{})
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch
if !reflect.DeepEqual(event, e) {
t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e))
break TestCase
select {
case obj, ok := <-ch:
t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok)
break TestCase
type testVersioner struct{}
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error {
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10))
return nil
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
errDummy = fmt.Errorf("dummy error")
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
prefix := "pods"
config := Config{
CacheCapacity: cap,
Storage: s,
Versioner: testVersioner{},
Type: &example.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { return nil, nil, true, nil },
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
return NewCacherFromConfig(config), testVersioner{}
type dummyStorage struct {
err error
type dummyWatch struct {
ch chan watch.Event
func (w *dummyWatch) ResultChan() <-chan watch.Event {
func (w *dummyWatch) Stop() {
func newDummyWatch() watch.Interface {
return &dummyWatch{
ch: make(chan watch.Event),
func (d *dummyStorage) Versioner() storage.Versioner { return nil }
func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, _ uint64) error {
return fmt.Errorf("unimplemented")
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions) error {
return fmt.Errorf("unimplemented")
func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
return newDummyWatch(), nil
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
return newDummyWatch(), nil
func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error {
return fmt.Errorf("unimplemented")
func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error {
return d.err
func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
return d.err
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ ...runtime.Object) error {
return fmt.Errorf("unimplemented")
func (d *dummyStorage) Count(_ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
func TestListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
defer cacher.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
result := &example.PodList{}
// Wait until cacher is initialized.
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.List(context.TODO(), "pods/ns", "0", pred, result)
if err != nil {
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
err = cacher.List(context.TODO(), "pods/ns", "", pred, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
func TestGetToListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
defer cacher.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
result := &example.PodList{}
// Wait until cacher is initialized.
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
if err != nil {
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)