blob: 547ab8dd402560935b192cbb0e1b67bfff608922 [file] [log] [blame]
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/test/integration/framework"
)
func noopNormalization(output []string) []string {
return output
}
func normalizeInformerOutputFunc(initialVal string) func(output []string) []string {
return func(output []string) []string {
result := make([]string, 0, len(output))
// Removes initial value and all of its direct repetitions
lastVal := initialVal
for _, v := range output {
// Make values unique as informer(List+Watch) duplicates some events
if v == lastVal {
continue
}
result = append(result, v)
lastVal = v
}
return result
}
}
func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
// Has to be longer than 5 seconds
timeout := 2 * time.Minute
// Set up a master
masterConfig := framework.NewIntegrationTestMasterConfig()
// Timeout is set random between MinRequestTimeout and 2x
masterConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()
config := &restclient.Config{
Host: s.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[corev1.GroupName].GroupVersion()},
}
namespaceObject := framework.CreateTestingNamespace("retry-watch", s, t)
defer framework.DeleteTestingNamespace(namespaceObject, s, t)
getListFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) *v1.SecretList {
return func(options metav1.ListOptions) *v1.SecretList {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
res, err := c.CoreV1().Secrets(secret.Namespace).List(options)
if err != nil {
t.Fatalf("Failed to list Secrets: %v", err)
}
return res
}
}
getWatchFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) (watch.Interface, error) {
return func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
res, err := c.CoreV1().Secrets(secret.Namespace).Watch(options)
if err != nil {
t.Fatalf("Failed to create a watcher on Secrets: %v", err)
}
return res, err
}
}
generateEvents := func(t *testing.T, c *kubernetes.Clientset, secret *v1.Secret, referenceOutput *[]string, stopChan chan struct{}, stoppedChan chan struct{}) {
defer close(stoppedChan)
counter := 0
// These 5 seconds are here to protect against a race at the end when we could write something there at the same time as watch.Until ends
softTimeout := timeout - 5*time.Second
if softTimeout < 0 {
panic("Timeout has to be grater than 5 seconds!")
}
endChannel := time.After(softTimeout)
for {
select {
// TODO: get this lower once we figure out how to extend ETCD cache
case <-time.After(1000 * time.Millisecond):
counter = counter + 1
patch := fmt.Sprintf(`{"metadata": {"annotations": {"count": "%d"}}}`, counter)
_, err := c.CoreV1().Secrets(secret.Namespace).Patch(secret.Name, types.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch secret: %v", err)
}
*referenceOutput = append(*referenceOutput, fmt.Sprintf("%d", counter))
case <-endChannel:
return
case <-stopChan:
return
}
}
}
initialCount := "0"
newTestSecret := func(name string) *v1.Secret {
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespaceObject.Name,
Annotations: map[string]string{
"count": initialCount,
},
},
Data: map[string][]byte{
"data": []byte("value1\n"),
},
}
}
tt := []struct {
name string
succeed bool
secret *v1.Secret
getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error)
normalizeOutputFunc func(referenceOutput []string) []string
}{
{
name: "regular watcher should fail",
succeed: false,
secret: newTestSecret("secret-01"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) {
options := metav1.ListOptions{
ResourceVersion: secret.ResourceVersion,
}
return getWatchFunc(c, secret)(options)
}, // regular watcher; unfortunately destined to fail
normalizeOutputFunc: noopNormalization,
},
{
name: "InformerWatcher survives closed watches",
succeed: true,
secret: newTestSecret("secret-03"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return getListFunc(c, secret)(options), nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return getWatchFunc(c, secret)(options)
},
}
_, _, w := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{})
return w, nil
},
normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),
},
}
for _, tmptc := range tt {
tc := tmptc // we need to copy it for parallel runs
t.Run(tc.name, func(t *testing.T) {
c, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatalf("Failed to create clientset: %v", err)
}
secret, err := c.CoreV1().Secrets(tc.secret.Namespace).Create(tc.secret)
if err != nil {
t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err)
}
watcher, err := tc.getWatcher(c, secret)
if err != nil {
t.Fatalf("Failed to create watcher: %v", err)
}
var referenceOutput []string
var output []string
stopChan := make(chan struct{})
stoppedChan := make(chan struct{})
go generateEvents(t, c, secret, &referenceOutput, stopChan, stoppedChan)
// Record current time to be able to asses if the timeout has been reached
startTime := time.Now()
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
s, ok := event.Object.(*v1.Secret)
if !ok {
t.Fatalf("Received an object that is not a Secret: %#v", event.Object)
}
output = append(output, s.Annotations["count"])
// Watch will never end voluntarily
return false, nil
})
watchDuration := time.Since(startTime)
close(stopChan)
<-stoppedChan
output = tc.normalizeOutputFunc(output)
t.Logf("Watch duration: %v; timeout: %v", watchDuration, timeout)
if err == nil && !tc.succeed {
t.Fatalf("Watch should have timed out but it exited without an error!")
}
if err != wait.ErrWaitTimeout && tc.succeed {
t.Fatalf("Watch exited with error: %v!", err)
}
if watchDuration < timeout && tc.succeed {
t.Fatalf("Watch should have timed out after %v but it timed out prematurely after %v!", timeout, watchDuration)
}
if watchDuration >= timeout && !tc.succeed {
t.Fatalf("Watch should have timed out but it succeeded!")
}
if tc.succeed && !reflect.DeepEqual(referenceOutput, output) {
t.Fatalf("Reference and real output differ! We must have lost some events or read some multiple times!\nRef: %#v\nReal: %#v", referenceOutput, output)
}
})
}
}