blob: 323b927d06154015aef7b91311d12221a7cca481 [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
"time"
"github.com/davecgh/go-spew/spew"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)
type LogfFn func(format string, args ...interface{})
func LogReplicaSetsOfDeployment(deployment *apps.Deployment, allOldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, logf LogfFn) {
if newRS != nil {
logf(spew.Sprintf("New ReplicaSet %q of Deployment %q:\n%+v", newRS.Name, deployment.Name, *newRS))
} else {
logf("New ReplicaSet of Deployment %q is nil.", deployment.Name)
}
if len(allOldRSs) > 0 {
logf("All old ReplicaSets of Deployment %q:", deployment.Name)
}
for i := range allOldRSs {
logf(spew.Sprintf("%+v", *allOldRSs[i]))
}
}
func LogPodsOfDeployment(c clientset.Interface, deployment *apps.Deployment, rsList []*apps.ReplicaSet, logf LogfFn) {
minReadySeconds := deployment.Spec.MinReadySeconds
podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
return c.CoreV1().Pods(namespace).List(options)
}
podList, err := deploymentutil.ListPods(deployment, rsList, podListFunc)
if err != nil {
logf("Failed to list Pods of Deployment %q: %v", deployment.Name, err)
return
}
for _, pod := range podList.Items {
availability := "not available"
if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
availability = "available"
}
logf(spew.Sprintf("Pod %q is %s:\n%+v", pod.Name, availability, pod))
}
}
// Waits for the deployment to complete.
// If during a rolling update (rolling == true), returns an error if the deployment's
// rolling update strategy (max unavailable or max surge) is broken at any times.
// It's not seen as a rolling update if shortly after a scaling event or the deployment is just created.
func waitForDeploymentCompleteMaybeCheckRolling(c clientset.Interface, d *apps.Deployment, rolling bool, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
var (
deployment *apps.Deployment
reason string
)
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
var err error
deployment, err = c.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// If during a rolling update, make sure rolling update strategy isn't broken at any times.
if rolling {
reason, err = checkRollingUpdateStatus(c, deployment, logf)
if err != nil {
return false, err
}
logf(reason)
}
// When the deployment status and its underlying resources reach the desired state, we're done
if deploymentutil.DeploymentComplete(d, &deployment.Status) {
return true, nil
}
reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
logf(reason)
return false, nil
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("%s", reason)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
}
return nil
}
func checkRollingUpdateStatus(c clientset.Interface, deployment *apps.Deployment, logf LogfFn) (string, error) {
var reason string
oldRSs, allOldRSs, newRS, err := deploymentutil.GetAllReplicaSets(deployment, c.AppsV1())
if err != nil {
return "", err
}
if newRS == nil {
// New RC hasn't been created yet.
reason = "new replica set hasn't been created yet"
return reason, nil
}
allRSs := append(oldRSs, newRS)
// The old/new ReplicaSets need to contain the pod-template-hash label
for i := range allRSs {
if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
reason = "all replica sets need to contain the pod-template-hash label"
return reason, nil
}
}
// Check max surge and min available
totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
if totalCreated > maxCreated {
LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
LogPodsOfDeployment(c, deployment, allRSs, logf)
return "", fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
}
minAvailable := deploymentutil.MinAvailable(deployment)
if deployment.Status.AvailableReplicas < minAvailable {
LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
LogPodsOfDeployment(c, deployment, allRSs, logf)
return "", fmt.Errorf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
}
return "", nil
}
// Waits for the deployment to complete, and check rolling update strategy isn't broken at any times.
// Rolling update strategy should not be broken during a rolling update.
func WaitForDeploymentCompleteAndCheckRolling(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
rolling := true
return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
}
// Waits for the deployment to complete, and don't check if rolling update strategy is broken.
// Rolling update strategy is used only during a rolling update, and can be violated in other situations,
// such as shortly after a scaling event or the deployment is just created.
func WaitForDeploymentComplete(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
rolling := false
return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
}
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
var deployment *apps.Deployment
var newRS *apps.ReplicaSet
var reason string
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
var err error
deployment, err = c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
newRS, err = deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
if err != nil {
return false, err
}
if err := checkRevisionAndImage(deployment, newRS, revision, image); err != nil {
reason = err.Error()
logf(reason)
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
LogReplicaSetsOfDeployment(deployment, nil, newRS, logf)
err = fmt.Errorf(reason)
}
if newRS == nil {
return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
}
return nil
}
// CheckDeploymentRevisionAndImage checks if the input deployment's and its new replica set's revision and image are as expected.
func CheckDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName, revision, image string) error {
deployment, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("unable to get deployment %s during revision check: %v", deploymentName, err)
}
// Check revision of the new replica set of this deployment
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
if err != nil {
return fmt.Errorf("unable to get new replicaset of deployment %s during revision check: %v", deploymentName, err)
}
return checkRevisionAndImage(deployment, newRS, revision, image)
}
func checkRevisionAndImage(deployment *apps.Deployment, newRS *apps.ReplicaSet, revision, image string) error {
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
if newRS == nil {
return fmt.Errorf("new replicaset for deployment %q is yet to be created", deployment.Name)
}
if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
return fmt.Errorf("new replica set %q doesn't have %q label selector", newRS.Name, apps.DefaultDeploymentUniqueLabelKey)
}
// Check revision of this deployment, and of the new replica set of this deployment
if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
return fmt.Errorf("deployment %q doesn't have the required revision set", deployment.Name)
}
if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
return fmt.Errorf("new replicaset %q doesn't have the required revision set", newRS.Name)
}
// Check the image of this deployment, and of the new replica set of this deployment
if !containsImage(deployment.Spec.Template.Spec.Containers, image) {
return fmt.Errorf("deployment %q doesn't have the required image %s set", deployment.Name, image)
}
if !containsImage(newRS.Spec.Template.Spec.Containers, image) {
return fmt.Errorf("new replica set %q doesn't have the required image %s.", newRS.Name, image)
}
return nil
}
func containsImage(containers []v1.Container, imageName string) bool {
for _, container := range containers {
if container.Image == imageName {
return true
}
}
return false
}
type UpdateDeploymentFunc func(d *apps.Deployment)
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateDeploymentFunc, logf LogfFn, pollInterval, pollTimeout time.Duration) (*apps.Deployment, error) {
var deployment *apps.Deployment
var updateErr error
pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
var err error
if deployment, err = c.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{}); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(deployment)
if deployment, err = c.AppsV1().Deployments(namespace).Update(deployment); err == nil {
logf("Updating deployment %s", name)
return true, nil
}
updateErr = err
return false, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to deployment %q: %v", name, updateErr)
}
return deployment, pollErr
}
func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, desiredGeneration int64) error {
return deploymentutil.WaitForObservedDeployment(func() (*apps.Deployment, error) {
return c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
}, desiredGeneration, 2*time.Second, 1*time.Minute)
}
// WaitForDeploymentRollbackCleared waits for given deployment either started rolling back or doesn't need to rollback.
func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName string, pollInterval, pollTimeout time.Duration) error {
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
deployment, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
// Rollback not set or is kicked off
if deployment.Annotations[apps.DeprecatedRollbackTo] == "" {
return true, nil
}
return false, nil
})
if err != nil {
return fmt.Errorf("error waiting for deployment %s rollbackTo to be cleared: %v", deploymentName, err)
}
return nil
}
// WaitForDeploymentUpdatedReplicasGTE waits for given deployment to be observed by the controller and has at least a number of updatedReplicas
func WaitForDeploymentUpdatedReplicasGTE(c clientset.Interface, ns, deploymentName string, minUpdatedReplicas int32, desiredGeneration int64, pollInterval, pollTimeout time.Duration) error {
var deployment *apps.Deployment
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
d, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
deployment = d
return deployment.Status.ObservedGeneration >= desiredGeneration && deployment.Status.UpdatedReplicas >= minUpdatedReplicas, nil
})
if err != nil {
return fmt.Errorf("error waiting for deployment %q to have at least %d updatedReplicas: %v; latest .status.updatedReplicas: %d", deploymentName, minUpdatedReplicas, err, deployment.Status.UpdatedReplicas)
}
return nil
}
func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, reason string, condType apps.DeploymentConditionType, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
var deployment *apps.Deployment
pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
d, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
deployment = d
cond := deploymentutil.GetDeploymentCondition(deployment.Status, condType)
return cond != nil && cond.Reason == reason, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("deployment %q never updated with the desired condition and reason, latest deployment conditions: %+v", deployment.Name, deployment.Status.Conditions)
_, allOldRSs, newRS, err := deploymentutil.GetAllReplicaSets(deployment, c.AppsV1())
if err == nil {
LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
LogPodsOfDeployment(c, deployment, append(allOldRSs, newRS), logf)
}
}
return pollErr
}