blob: 1adfd2d0471998787c5891e01488ec4b08339834 [file] [log] [blame]
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"fmt"
"strings"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/api/admissionregistration/v1alpha1"
"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientretry "k8s.io/client-go/util/retry"
"k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
)
var _ = SIGDescribe("Initializers [Feature:Initializers]", func() {
f := framework.NewDefaultFramework("initializers")
// TODO: Add failure traps once we have JustAfterEach
// See https://github.com/onsi/ginkgo/issues/303
It("should be invisible to controllers by default", func() {
ns := f.Namespace.Name
c := f.ClientSet
podName := "uninitialized-pod"
framework.Logf("Creating pod %s", podName)
ch := make(chan struct{})
go func() {
pod := newUninitializedPod(podName)
_, err := c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred(), "failed to create pod %s in namespace: %s", podName, ns)
close(ch)
}()
// wait to ensure the scheduler does not act on an uninitialized pod
err := wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) {
p, err := c.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return len(p.Spec.NodeName) > 0, nil
})
Expect(err).To(Equal(wait.ErrWaitTimeout))
// verify that we can update an initializing pod
pod, err := c.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to get pod %s in namespace: %s", podName, ns)
pod.Annotations = map[string]string{"update-1": "test"}
pod, err = c.CoreV1().Pods(ns).Update(pod)
Expect(err).NotTo(HaveOccurred(), "failed to update pod %s in namespace %s to: %+v", pod.Name, ns, pod)
// verify the list call filters out uninitialized pods
listOptions := metav1.ListOptions{IncludeUninitialized: true}
pods, err := c.CoreV1().Pods(ns).List(listOptions)
Expect(err).NotTo(HaveOccurred(), "failed to list pods in namespace: %s, given list options: %+v", ns, listOptions)
Expect(pods.Items).To(HaveLen(1))
pods, err = c.CoreV1().Pods(ns).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list pods in namespace: %s", ns)
Expect(pods.Items).To(HaveLen(0))
// clear initializers
pod.Initializers = nil
pod, err = c.CoreV1().Pods(ns).Update(pod)
Expect(err).NotTo(HaveOccurred(), "failed to update pod %s in namespace %s to: %+v", pod.Name, ns, pod)
// pod should now start running
err = framework.WaitForPodRunningInNamespace(c, pod)
Expect(err).NotTo(HaveOccurred(), "error while waiting for pod %s to go to Running phase in namespace: %s", pod.Name, pod.Namespace)
// ensure create call returns
<-ch
// verify that we cannot start the pod initializing again
pod, err = c.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to get pod %s in namespace: %s", podName, ns)
pod.Initializers = &metav1.Initializers{
Pending: []metav1.Initializer{{Name: "Other"}},
}
_, err = c.CoreV1().Pods(ns).Update(pod)
if !errors.IsInvalid(err) || !strings.Contains(err.Error(), "immutable") {
Fail(fmt.Sprintf("expected invalid error: %v", err))
}
})
It("should dynamically register and apply initializers to pods [Serial]", func() {
ns := f.Namespace.Name
c := f.ClientSet
podName := "uninitialized-pod"
framework.Logf("Creating pod %s", podName)
// create and register an initializer
initializerName := "pod.test.e2e.kubernetes.io"
initializerConfigName := "e2e-test-initializer"
initializerConfig := &v1alpha1.InitializerConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName},
Initializers: []v1alpha1.Initializer{
{
Name: initializerName,
Rules: []v1alpha1.Rule{
{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
},
},
},
}
_, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(initializerConfig)
if errors.IsNotFound(err) {
framework.Skipf("dynamic configuration of initializers requires the alpha admissionregistration.k8s.io group to be enabled")
}
Expect(err).NotTo(HaveOccurred(), "failed to create and register initializer with config: %+v", initializerConfig)
// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
defer cleanupInitializer(c, initializerConfigName, initializerName)
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// run create that blocks
ch := make(chan struct{})
go func() {
defer close(ch)
pod := newInitPod(podName)
_, err := c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred(), "failed to create pod %s in namespace: %s", podName, ns)
}()
// wait until the pod shows up uninitialized
By("Waiting until the pod is visible to a client")
var pod *v1.Pod
err = wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) {
pod, err = c.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{IncludeUninitialized: true})
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
})
Expect(err).NotTo(HaveOccurred(), "failed to get pod %s from namespace: %s", podName, ns)
Expect(pod.Initializers).NotTo(BeNil())
Expect(pod.Initializers.Pending).To(HaveLen(1))
Expect(pod.Initializers.Pending[0].Name).To(Equal(initializerName))
// pretend we are an initializer
By("Completing initialization")
pod.Initializers = nil
pod, err = c.CoreV1().Pods(ns).Update(pod)
Expect(err).NotTo(HaveOccurred(), "failed to update pod %s in namespace %s to: %+v", pod.Name, ns, pod)
// ensure create call returns
<-ch
// pod should now start running
err = framework.WaitForPodRunningInNamespace(c, pod)
Expect(err).NotTo(HaveOccurred(), "error while waiting for pod %s to go to Running phase in namespace: %s", pod.Name, pod.Namespace)
// bypass initialization by explicitly passing an empty pending list
By("Setting an empty initializer as an admin to bypass initialization")
podName = "preinitialized-pod"
pod = newUninitializedPod(podName)
pod.Initializers.Pending = nil
pod, err = c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred(), "failed to create pod %s in namespace: %s", podName, ns)
Expect(pod.Initializers).To(BeNil())
// bypass initialization for mirror pods
By("Creating a mirror pod that bypasses initialization")
podName = "mirror-pod"
pod = newInitPod(podName)
pod.Annotations = map[string]string{
v1.MirrorPodAnnotationKey: "true",
}
pod.Spec.NodeName = "node-does-not-yet-exist"
pod, err = c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred(), "failed to create pod %s in namespace: %s", podName, ns)
Expect(pod.Initializers).To(BeNil())
Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true"))
})
It("don't cause replicaset controller creating extra pods if the initializer is not handled [Serial]", func() {
ns := f.Namespace.Name
c := f.ClientSet
podName := "uninitialized-pod"
framework.Logf("Creating pod %s", podName)
// create and register an initializer, without setting up a controller to handle it.
initializerName := "pod.test.e2e.kubernetes.io"
initializerConfigName := "e2e-test-initializer"
initializerConfig := &v1alpha1.InitializerConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName},
Initializers: []v1alpha1.Initializer{
{
Name: initializerName,
Rules: []v1alpha1.Rule{
{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
},
},
},
}
_, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(initializerConfig)
if errors.IsNotFound(err) {
framework.Skipf("dynamic configuration of initializers requires the alpha admissionregistration.k8s.io group to be enabled")
}
Expect(err).NotTo(HaveOccurred(), "failed to create and register initializer with config: %+v", initializerConfig)
// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
defer cleanupInitializer(c, initializerConfigName, initializerName)
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// create a replicaset
rs := newReplicaset()
persistedRS, err := c.ExtensionsV1beta1().ReplicaSets(ns).Create(rs)
Expect(err).NotTo(HaveOccurred(), "failed to create replicaset %s in namespace: %s", persistedRS.Name, ns)
// wait for replicaset controller to confirm that it has handled the creation
err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation)
Expect(err).NotTo(HaveOccurred(), "replicaset %s failed to observe generation: %d", persistedRS.Name, persistedRS.Generation)
// update the replicaset spec to trigger a resync
patch := []byte(`{"spec":{"minReadySeconds":5}}`)
persistedRS, err = c.ExtensionsV1beta1().ReplicaSets(ns).Patch(persistedRS.Name, types.StrategicMergePatchType, patch)
Expect(err).NotTo(HaveOccurred(), "failed to apply to replicaset %s in namespace %s a strategic merge patch: %s", persistedRS.Name, ns, patch)
// wait for replicaset controller to confirm that it has handle the spec update
err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation)
Expect(err).NotTo(HaveOccurred(), "replicaset %s failed to observe generation: %d", persistedRS.Name, persistedRS.Generation)
// verify that the replicaset controller doesn't create extra pod
selector, err := metav1.LabelSelectorAsSelector(persistedRS.Spec.Selector)
Expect(err).NotTo(HaveOccurred(), "failed to convert label selector %+v of LabelSelector api type into a struct that implements labels.Selector", persistedRS.Spec.Selector)
listOptions := metav1.ListOptions{
LabelSelector: selector.String(),
IncludeUninitialized: true,
}
pods, err := c.CoreV1().Pods(ns).List(listOptions)
Expect(err).NotTo(HaveOccurred(), "failed to list pods in namespace: %s, given list options: %+v", ns, listOptions)
Expect(len(pods.Items)).Should(Equal(1))
})
It("will be set to nil if a patch removes the last pending initializer", func() {
ns := f.Namespace.Name
c := f.ClientSet
podName := "to-be-patch-initialized-pod"
framework.Logf("Creating pod %s", podName)
// TODO: lower the timeout so that the server responds faster.
_, err := c.CoreV1().Pods(ns).Create(newUninitializedPod(podName))
if err != nil && !errors.IsTimeout(err) {
framework.Failf("expect err to be timeout error, got %v", err)
}
uninitializedPod, err := c.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to get pod %s in namespace: %s", podName, ns)
Expect(uninitializedPod.Initializers).NotTo(BeNil())
Expect(len(uninitializedPod.Initializers.Pending)).Should(Equal(1))
patch := fmt.Sprintf(`{"metadata":{"initializers":{"pending":[{"$patch":"delete","name":"%s"}]}}}`, uninitializedPod.Initializers.Pending[0].Name)
patchedPod, err := c.CoreV1().Pods(ns).Patch(uninitializedPod.Name, types.StrategicMergePatchType, []byte(patch))
Expect(err).NotTo(HaveOccurred(), "failed to apply to pod %s in namespace %s a strategic merge patch: %s", uninitializedPod.Name, ns, patch)
Expect(patchedPod.Initializers).To(BeNil())
})
})
func newUninitializedPod(podName string) *v1.Pod {
pod := newInitPod(podName)
pod.Initializers = &metav1.Initializers{
Pending: []metav1.Initializer{{Name: "test.k8s.io"}},
}
return pod
}
func newReplicaset() *v1beta1.ReplicaSet {
name := "initializer-test-replicaset"
replicas := int32(1)
labels := map[string]string{"initializer-test": "single-replicaset"}
return &v1beta1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1beta1.ReplicaSetSpec{
Replicas: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &zero,
Containers: []v1.Container{
{
Name: name + "-container",
Image: imageutils.GetE2EImage(imageutils.Porter),
},
},
},
},
},
}
}
func newInitPod(podName string) *v1.Pod {
containerName := fmt.Sprintf("%s-container", podName)
port := 8080
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: containerName,
Image: imageutils.GetE2EImage(imageutils.Porter),
Env: []v1.EnvVar{{Name: fmt.Sprintf("SERVE_PORT_%d", port), Value: "foo"}},
Ports: []v1.ContainerPort{{ContainerPort: int32(port)}},
},
},
RestartPolicy: v1.RestartPolicyNever,
},
}
return pod
}
// removeInitializersFromAllPods walks all pods and ensures they don't have the provided initializer,
// to guarantee completing the test doesn't block the entire cluster.
func removeInitializersFromAllPods(c clientset.Interface, initializerName string) {
pods, err := c.CoreV1().Pods("").List(metav1.ListOptions{IncludeUninitialized: true})
if err != nil {
return
}
for _, p := range pods.Items {
if p.Initializers == nil {
continue
}
err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
pod, err := c.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{IncludeUninitialized: true})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if pod.Initializers == nil {
return nil
}
var updated []metav1.Initializer
for _, pending := range pod.Initializers.Pending {
if pending.Name != initializerName {
updated = append(updated, pending)
}
}
if len(updated) == len(pod.Initializers.Pending) {
return nil
}
pod.Initializers.Pending = updated
if len(updated) == 0 {
pod.Initializers = nil
}
framework.Logf("Found initializer on pod %s in ns %s", pod.Name, pod.Namespace)
_, err = c.CoreV1().Pods(p.Namespace).Update(pod)
return err
})
if err != nil {
framework.Logf("Unable to remove initializer from pod %s in ns %s: %v", p.Name, p.Namespace, err)
}
}
}
// remove the initializerConfig, and remove the initializer from all pods
func cleanupInitializer(c clientset.Interface, initializerConfigName, initializerName string) {
if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
framework.Logf("got error on deleting %s", initializerConfigName)
}
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// clear our initializer from anyone who got it
removeInitializersFromAllPods(c, initializerName)
}
// waits till the RS status.observedGeneration matches metadata.generation.
func waitForRSObservedGeneration(c clientset.Interface, ns, name string, generation int64) error {
return wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
rs, err := c.ExtensionsV1beta1().ReplicaSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if generation > rs.Status.ObservedGeneration {
return false, nil
}
return true, nil
})
}