| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package util |
| |
| import ( |
| "fmt" |
| "math" |
| "sort" |
| "strconv" |
| "strings" |
| "time" |
| |
| "k8s.io/klog" |
| |
| apps "k8s.io/api/apps/v1" |
| "k8s.io/api/core/v1" |
| apiequality "k8s.io/apimachinery/pkg/api/equality" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/types" |
| intstrutil "k8s.io/apimachinery/pkg/util/intstr" |
| "k8s.io/apimachinery/pkg/util/wait" |
| appsclient "k8s.io/client-go/kubernetes/typed/apps/v1" |
| "k8s.io/client-go/util/integer" |
| "k8s.io/kubernetes/pkg/controller" |
| labelsutil "k8s.io/kubernetes/pkg/util/labels" |
| ) |
| |
| const ( |
| // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence |
| RevisionAnnotation = "deployment.kubernetes.io/revision" |
| // RevisionHistoryAnnotation maintains the history of all old revisions that a replica set has served for a deployment. |
| RevisionHistoryAnnotation = "deployment.kubernetes.io/revision-history" |
| // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation |
| // in its replica sets. Helps in separating scaling events from the rollout process and for |
| // determining if the new replica set for a deployment is really saturated. |
| DesiredReplicasAnnotation = "deployment.kubernetes.io/desired-replicas" |
| // MaxReplicasAnnotation is the maximum replicas a deployment can have at a given point, which |
| // is deployment.spec.replicas + maxSurge. Used by the underlying replica sets to estimate their |
| // proportions in case the deployment has surge replicas. |
| MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas" |
| |
| // RollbackRevisionNotFound is not found rollback event reason |
| RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound" |
| // RollbackTemplateUnchanged is the template unchanged rollback event reason |
| RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged" |
| // RollbackDone is the done rollback event reason |
| RollbackDone = "DeploymentRollback" |
| // Reasons for deployment conditions |
| // |
| // Progressing: |
| // |
| // ReplicaSetUpdatedReason is added in a deployment when one of its replica sets is updated as part |
| // of the rollout process. |
| ReplicaSetUpdatedReason = "ReplicaSetUpdated" |
| // FailedRSCreateReason is added in a deployment when it cannot create a new replica set. |
| FailedRSCreateReason = "ReplicaSetCreateError" |
| // NewReplicaSetReason is added in a deployment when it creates a new replica set. |
| NewReplicaSetReason = "NewReplicaSetCreated" |
| // FoundNewRSReason is added in a deployment when it adopts an existing replica set. |
| FoundNewRSReason = "FoundNewReplicaSet" |
| // NewRSAvailableReason is added in a deployment when its newest replica set is made available |
| // ie. the number of new pods that have passed readiness checks and run for at least minReadySeconds |
| // is at least the minimum available pods that need to run for the deployment. |
| NewRSAvailableReason = "NewReplicaSetAvailable" |
| // TimedOutReason is added in a deployment when its newest replica set fails to show any progress |
| // within the given deadline (progressDeadlineSeconds). |
| TimedOutReason = "ProgressDeadlineExceeded" |
| // PausedDeployReason is added in a deployment when it is paused. Lack of progress shouldn't be |
| // estimated once a deployment is paused. |
| PausedDeployReason = "DeploymentPaused" |
| // ResumedDeployReason is added in a deployment when it is resumed. Useful for not failing accidentally |
| // deployments that paused amidst a rollout and are bounded by a deadline. |
| ResumedDeployReason = "DeploymentResumed" |
| // |
| // Available: |
| // |
| // MinimumReplicasAvailable is added in a deployment when it has its minimum replicas required available. |
| MinimumReplicasAvailable = "MinimumReplicasAvailable" |
| // MinimumReplicasUnavailable is added in a deployment when it doesn't have the minimum required replicas |
| // available. |
| MinimumReplicasUnavailable = "MinimumReplicasUnavailable" |
| ) |
| |
| // NewDeploymentCondition creates a new deployment condition. |
| func NewDeploymentCondition(condType apps.DeploymentConditionType, status v1.ConditionStatus, reason, message string) *apps.DeploymentCondition { |
| return &apps.DeploymentCondition{ |
| Type: condType, |
| Status: status, |
| LastUpdateTime: metav1.Now(), |
| LastTransitionTime: metav1.Now(), |
| Reason: reason, |
| Message: message, |
| } |
| } |
| |
| // GetDeploymentCondition returns the condition with the provided type. |
| func GetDeploymentCondition(status apps.DeploymentStatus, condType apps.DeploymentConditionType) *apps.DeploymentCondition { |
| for i := range status.Conditions { |
| c := status.Conditions[i] |
| if c.Type == condType { |
| return &c |
| } |
| } |
| return nil |
| } |
| |
| // SetDeploymentCondition updates the deployment to include the provided condition. If the condition that |
| // we are about to add already exists and has the same status and reason then we are not going to update. |
| func SetDeploymentCondition(status *apps.DeploymentStatus, condition apps.DeploymentCondition) { |
| currentCond := GetDeploymentCondition(*status, condition.Type) |
| if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason { |
| return |
| } |
| // Do not update lastTransitionTime if the status of the condition doesn't change. |
| if currentCond != nil && currentCond.Status == condition.Status { |
| condition.LastTransitionTime = currentCond.LastTransitionTime |
| } |
| newConditions := filterOutCondition(status.Conditions, condition.Type) |
| status.Conditions = append(newConditions, condition) |
| } |
| |
| // RemoveDeploymentCondition removes the deployment condition with the provided type. |
| func RemoveDeploymentCondition(status *apps.DeploymentStatus, condType apps.DeploymentConditionType) { |
| status.Conditions = filterOutCondition(status.Conditions, condType) |
| } |
| |
| // filterOutCondition returns a new slice of deployment conditions without conditions with the provided type. |
| func filterOutCondition(conditions []apps.DeploymentCondition, condType apps.DeploymentConditionType) []apps.DeploymentCondition { |
| var newConditions []apps.DeploymentCondition |
| for _, c := range conditions { |
| if c.Type == condType { |
| continue |
| } |
| newConditions = append(newConditions, c) |
| } |
| return newConditions |
| } |
| |
| // ReplicaSetToDeploymentCondition converts a replica set condition into a deployment condition. |
| // Useful for promoting replica set failure conditions into deployments. |
| func ReplicaSetToDeploymentCondition(cond apps.ReplicaSetCondition) apps.DeploymentCondition { |
| return apps.DeploymentCondition{ |
| Type: apps.DeploymentConditionType(cond.Type), |
| Status: cond.Status, |
| LastTransitionTime: cond.LastTransitionTime, |
| LastUpdateTime: cond.LastTransitionTime, |
| Reason: cond.Reason, |
| Message: cond.Message, |
| } |
| } |
| |
| // SetDeploymentRevision updates the revision for a deployment. |
| func SetDeploymentRevision(deployment *apps.Deployment, revision string) bool { |
| updated := false |
| |
| if deployment.Annotations == nil { |
| deployment.Annotations = make(map[string]string) |
| } |
| if deployment.Annotations[RevisionAnnotation] != revision { |
| deployment.Annotations[RevisionAnnotation] = revision |
| updated = true |
| } |
| |
| return updated |
| } |
| |
| // MaxRevision finds the highest revision in the replica sets |
| func MaxRevision(allRSs []*apps.ReplicaSet) int64 { |
| max := int64(0) |
| for _, rs := range allRSs { |
| if v, err := Revision(rs); err != nil { |
| // Skip the replica sets when it failed to parse their revision information |
| klog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) |
| } else if v > max { |
| max = v |
| } |
| } |
| return max |
| } |
| |
| // LastRevision finds the second max revision number in all replica sets (the last revision) |
| func LastRevision(allRSs []*apps.ReplicaSet) int64 { |
| max, secMax := int64(0), int64(0) |
| for _, rs := range allRSs { |
| if v, err := Revision(rs); err != nil { |
| // Skip the replica sets when it failed to parse their revision information |
| klog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) |
| } else if v >= max { |
| secMax = max |
| max = v |
| } else if v > secMax { |
| secMax = v |
| } |
| } |
| return secMax |
| } |
| |
| // Revision returns the revision number of the input object. |
| func Revision(obj runtime.Object) (int64, error) { |
| acc, err := meta.Accessor(obj) |
| if err != nil { |
| return 0, err |
| } |
| v, ok := acc.GetAnnotations()[RevisionAnnotation] |
| if !ok { |
| return 0, nil |
| } |
| return strconv.ParseInt(v, 10, 64) |
| } |
| |
| // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and |
| // copying required deployment annotations to it; it returns true if replica set's annotation is changed. |
| func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, newRevision string, exists bool) bool { |
| // First, copy deployment's annotations (except for apply and revision annotations) |
| annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) |
| // Then, update replica set's revision annotation |
| if newRS.Annotations == nil { |
| newRS.Annotations = make(map[string]string) |
| } |
| oldRevision, ok := newRS.Annotations[RevisionAnnotation] |
| // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number |
| // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and |
| // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. |
| |
| oldRevisionInt, err := strconv.ParseInt(oldRevision, 10, 64) |
| if err != nil { |
| if oldRevision != "" { |
| klog.Warningf("Updating replica set revision OldRevision not int %s", err) |
| return false |
| } |
| //If the RS annotation is empty then initialise it to 0 |
| oldRevisionInt = 0 |
| } |
| newRevisionInt, err := strconv.ParseInt(newRevision, 10, 64) |
| if err != nil { |
| klog.Warningf("Updating replica set revision NewRevision not int %s", err) |
| return false |
| } |
| if oldRevisionInt < newRevisionInt { |
| newRS.Annotations[RevisionAnnotation] = newRevision |
| annotationChanged = true |
| klog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) |
| } |
| // If a revision annotation already existed and this replica set was updated with a new revision |
| // then that means we are rolling back to this replica set. We need to preserve the old revisions |
| // for historical information. |
| if ok && annotationChanged { |
| revisionHistoryAnnotation := newRS.Annotations[RevisionHistoryAnnotation] |
| oldRevisions := strings.Split(revisionHistoryAnnotation, ",") |
| if len(oldRevisions[0]) == 0 { |
| newRS.Annotations[RevisionHistoryAnnotation] = oldRevision |
| } else { |
| oldRevisions = append(oldRevisions, oldRevision) |
| newRS.Annotations[RevisionHistoryAnnotation] = strings.Join(oldRevisions, ",") |
| } |
| } |
| // If the new replica set is about to be created, we need to add replica annotations to it. |
| if !exists && SetReplicasAnnotations(newRS, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+MaxSurge(*deployment)) { |
| annotationChanged = true |
| } |
| return annotationChanged |
| } |
| |
| var annotationsToSkip = map[string]bool{ |
| v1.LastAppliedConfigAnnotation: true, |
| RevisionAnnotation: true, |
| RevisionHistoryAnnotation: true, |
| DesiredReplicasAnnotation: true, |
| MaxReplicasAnnotation: true, |
| apps.DeprecatedRollbackTo: true, |
| } |
| |
| // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key |
| // TODO: How to decide which annotations should / should not be copied? |
| // See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 |
| func skipCopyAnnotation(key string) bool { |
| return annotationsToSkip[key] |
| } |
| |
| // copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations, |
| // and returns true if replica set's annotation is changed. |
| // Note that apply and revision annotations are not copied. |
| func copyDeploymentAnnotationsToReplicaSet(deployment *apps.Deployment, rs *apps.ReplicaSet) bool { |
| rsAnnotationsChanged := false |
| if rs.Annotations == nil { |
| rs.Annotations = make(map[string]string) |
| } |
| for k, v := range deployment.Annotations { |
| // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated |
| // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of |
| // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable. |
| if skipCopyAnnotation(k) || rs.Annotations[k] == v { |
| continue |
| } |
| rs.Annotations[k] = v |
| rsAnnotationsChanged = true |
| } |
| return rsAnnotationsChanged |
| } |
| |
| // SetDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations. |
| // This action should be done if and only if the deployment is rolling back to this rs. |
| // Note that apply and revision annotations are not changed. |
| func SetDeploymentAnnotationsTo(deployment *apps.Deployment, rollbackToRS *apps.ReplicaSet) { |
| deployment.Annotations = getSkippedAnnotations(deployment.Annotations) |
| for k, v := range rollbackToRS.Annotations { |
| if !skipCopyAnnotation(k) { |
| deployment.Annotations[k] = v |
| } |
| } |
| } |
| |
| func getSkippedAnnotations(annotations map[string]string) map[string]string { |
| skippedAnnotations := make(map[string]string) |
| for k, v := range annotations { |
| if skipCopyAnnotation(k) { |
| skippedAnnotations[k] = v |
| } |
| } |
| return skippedAnnotations |
| } |
| |
| // FindActiveOrLatest returns the only active or the latest replica set in case there is at most one active |
| // replica set. If there are more active replica sets, then we should proportionally scale them. |
| func FindActiveOrLatest(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) *apps.ReplicaSet { |
| if newRS == nil && len(oldRSs) == 0 { |
| return nil |
| } |
| |
| sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs))) |
| allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) |
| |
| switch len(allRSs) { |
| case 0: |
| // If there is no active replica set then we should return the newest. |
| if newRS != nil { |
| return newRS |
| } |
| return oldRSs[0] |
| case 1: |
| return allRSs[0] |
| default: |
| return nil |
| } |
| } |
| |
| // GetDesiredReplicasAnnotation returns the number of desired replicas |
| func GetDesiredReplicasAnnotation(rs *apps.ReplicaSet) (int32, bool) { |
| return getIntFromAnnotation(rs, DesiredReplicasAnnotation) |
| } |
| |
| func getMaxReplicasAnnotation(rs *apps.ReplicaSet) (int32, bool) { |
| return getIntFromAnnotation(rs, MaxReplicasAnnotation) |
| } |
| |
| func getIntFromAnnotation(rs *apps.ReplicaSet, annotationKey string) (int32, bool) { |
| annotationValue, ok := rs.Annotations[annotationKey] |
| if !ok { |
| return int32(0), false |
| } |
| intValue, err := strconv.Atoi(annotationValue) |
| if err != nil { |
| klog.V(2).Infof("Cannot convert the value %q with annotation key %q for the replica set %q", annotationValue, annotationKey, rs.Name) |
| return int32(0), false |
| } |
| return int32(intValue), true |
| } |
| |
| // SetReplicasAnnotations sets the desiredReplicas and maxReplicas into the annotations |
| func SetReplicasAnnotations(rs *apps.ReplicaSet, desiredReplicas, maxReplicas int32) bool { |
| updated := false |
| if rs.Annotations == nil { |
| rs.Annotations = make(map[string]string) |
| } |
| desiredString := fmt.Sprintf("%d", desiredReplicas) |
| if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString { |
| rs.Annotations[DesiredReplicasAnnotation] = desiredString |
| updated = true |
| } |
| maxString := fmt.Sprintf("%d", maxReplicas) |
| if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString { |
| rs.Annotations[MaxReplicasAnnotation] = maxString |
| updated = true |
| } |
| return updated |
| } |
| |
| // AnnotationsNeedUpdate return true if ReplicasAnnotations need to be updated |
| func ReplicasAnnotationsNeedUpdate(rs *apps.ReplicaSet, desiredReplicas, maxReplicas int32) bool { |
| if rs.Annotations == nil { |
| return true |
| } |
| desiredString := fmt.Sprintf("%d", desiredReplicas) |
| if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString { |
| return true |
| } |
| maxString := fmt.Sprintf("%d", maxReplicas) |
| if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString { |
| return true |
| } |
| return false |
| } |
| |
| // MaxUnavailable returns the maximum unavailable pods a rolling deployment can take. |
| func MaxUnavailable(deployment apps.Deployment) int32 { |
| if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 { |
| return int32(0) |
| } |
| // Error caught by validation |
| _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) |
| if maxUnavailable > *deployment.Spec.Replicas { |
| return *deployment.Spec.Replicas |
| } |
| return maxUnavailable |
| } |
| |
| // MinAvailable returns the minimum available pods of a given deployment |
| func MinAvailable(deployment *apps.Deployment) int32 { |
| if !IsRollingUpdate(deployment) { |
| return int32(0) |
| } |
| return *(deployment.Spec.Replicas) - MaxUnavailable(*deployment) |
| } |
| |
| // MaxSurge returns the maximum surge pods a rolling deployment can take. |
| func MaxSurge(deployment apps.Deployment) int32 { |
| if !IsRollingUpdate(&deployment) { |
| return int32(0) |
| } |
| // Error caught by validation |
| maxSurge, _, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) |
| return maxSurge |
| } |
| |
| // GetProportion will estimate the proportion for the provided replica set using 1. the current size |
| // of the parent deployment, 2. the replica count that needs be added on the replica sets of the |
| // deployment, and 3. the total replicas added in the replica sets of the deployment so far. |
| func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { |
| if rs == nil || *(rs.Spec.Replicas) == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded { |
| return int32(0) |
| } |
| |
| rsFraction := getReplicaSetFraction(*rs, d) |
| allowed := deploymentReplicasToAdd - deploymentReplicasAdded |
| |
| if deploymentReplicasToAdd > 0 { |
| // Use the minimum between the replica set fraction and the maximum allowed replicas |
| // when scaling up. This way we ensure we will not scale up more than the allowed |
| // replicas we can add. |
| return integer.Int32Min(rsFraction, allowed) |
| } |
| // Use the maximum between the replica set fraction and the maximum allowed replicas |
| // when scaling down. This way we ensure we will not scale down more than the allowed |
| // replicas we can remove. |
| return integer.Int32Max(rsFraction, allowed) |
| } |
| |
| // getReplicaSetFraction estimates the fraction of replicas a replica set can have in |
| // 1. a scaling event during a rollout or 2. when scaling a paused deployment. |
| func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment) int32 { |
| // If we are scaling down to zero then the fraction of this replica set is its whole size (negative) |
| if *(d.Spec.Replicas) == int32(0) { |
| return -*(rs.Spec.Replicas) |
| } |
| |
| deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(d) |
| annotatedReplicas, ok := getMaxReplicasAnnotation(&rs) |
| if !ok { |
| // If we cannot find the annotation then fallback to the current deployment size. Note that this |
| // will not be an accurate proportion estimation in case other replica sets have different values |
| // which means that the deployment was scaled at some point but we at least will stay in limits |
| // due to the min-max comparisons in getProportion. |
| annotatedReplicas = d.Status.Replicas |
| } |
| |
| // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas |
| // will never be zero here. |
| newRSsize := (float64(*(rs.Spec.Replicas) * deploymentReplicas)) / float64(annotatedReplicas) |
| return integer.RoundToInt32(newRSsize) - *(rs.Spec.Replicas) |
| } |
| |
| // GetAllReplicaSets returns the old and new replica sets targeted by the given Deployment. It gets PodList and ReplicaSetList from client interface. |
| // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. |
| // The third returned value is the new replica set, and it may be nil if it doesn't exist yet. |
| func GetAllReplicaSets(deployment *apps.Deployment, c appsclient.AppsV1Interface) ([]*apps.ReplicaSet, []*apps.ReplicaSet, *apps.ReplicaSet, error) { |
| rsList, err := ListReplicaSets(deployment, RsListFromClient(c)) |
| if err != nil { |
| return nil, nil, nil, err |
| } |
| oldRSes, allOldRSes := FindOldReplicaSets(deployment, rsList) |
| newRS := FindNewReplicaSet(deployment, rsList) |
| return oldRSes, allOldRSes, newRS, nil |
| } |
| |
| // GetOldReplicaSets returns the old replica sets targeted by the given Deployment; get PodList and ReplicaSetList from client interface. |
| // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. |
| func GetOldReplicaSets(deployment *apps.Deployment, c appsclient.AppsV1Interface) ([]*apps.ReplicaSet, []*apps.ReplicaSet, error) { |
| rsList, err := ListReplicaSets(deployment, RsListFromClient(c)) |
| if err != nil { |
| return nil, nil, err |
| } |
| oldRSes, allOldRSes := FindOldReplicaSets(deployment, rsList) |
| return oldRSes, allOldRSes, nil |
| } |
| |
| // GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface. |
| // Returns nil if the new replica set doesn't exist yet. |
| func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface) (*apps.ReplicaSet, error) { |
| rsList, err := ListReplicaSets(deployment, RsListFromClient(c)) |
| if err != nil { |
| return nil, err |
| } |
| return FindNewReplicaSet(deployment, rsList), nil |
| } |
| |
| // RsListFromClient returns an rsListFunc that wraps the given client. |
| func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc { |
| return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) { |
| rsList, err := c.ReplicaSets(namespace).List(options) |
| if err != nil { |
| return nil, err |
| } |
| var ret []*apps.ReplicaSet |
| for i := range rsList.Items { |
| ret = append(ret, &rsList.Items[i]) |
| } |
| return ret, err |
| } |
| } |
| |
| // TODO: switch this to full namespacers |
| type RsListFunc func(string, metav1.ListOptions) ([]*apps.ReplicaSet, error) |
| type podListFunc func(string, metav1.ListOptions) (*v1.PodList, error) |
| |
| // ListReplicaSets returns a slice of RSes the given deployment targets. |
| // Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan), |
| // because only the controller itself should do that. |
| // However, it does filter out anything whose ControllerRef doesn't match. |
| func ListReplicaSets(deployment *apps.Deployment, getRSList RsListFunc) ([]*apps.ReplicaSet, error) { |
| // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector |
| // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830. |
| namespace := deployment.Namespace |
| selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) |
| if err != nil { |
| return nil, err |
| } |
| options := metav1.ListOptions{LabelSelector: selector.String()} |
| all, err := getRSList(namespace, options) |
| if err != nil { |
| return nil, err |
| } |
| // Only include those whose ControllerRef matches the Deployment. |
| owned := make([]*apps.ReplicaSet, 0, len(all)) |
| for _, rs := range all { |
| if metav1.IsControlledBy(rs, deployment) { |
| owned = append(owned, rs) |
| } |
| } |
| return owned, nil |
| } |
| |
| // ListPods returns a list of pods the given deployment targets. |
| // This needs a list of ReplicaSets for the Deployment, |
| // which can be found with ListReplicaSets(). |
| // Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan), |
| // because only the controller itself should do that. |
| // However, it does filter out anything whose ControllerRef doesn't match. |
| func ListPods(deployment *apps.Deployment, rsList []*apps.ReplicaSet, getPodList podListFunc) (*v1.PodList, error) { |
| namespace := deployment.Namespace |
| selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) |
| if err != nil { |
| return nil, err |
| } |
| options := metav1.ListOptions{LabelSelector: selector.String()} |
| all, err := getPodList(namespace, options) |
| if err != nil { |
| return all, err |
| } |
| // Only include those whose ControllerRef points to a ReplicaSet that is in |
| // turn owned by this Deployment. |
| rsMap := make(map[types.UID]bool, len(rsList)) |
| for _, rs := range rsList { |
| rsMap[rs.UID] = true |
| } |
| owned := &v1.PodList{Items: make([]v1.Pod, 0, len(all.Items))} |
| for i := range all.Items { |
| pod := &all.Items[i] |
| controllerRef := metav1.GetControllerOf(pod) |
| if controllerRef != nil && rsMap[controllerRef.UID] { |
| owned.Items = append(owned.Items, *pod) |
| } |
| } |
| return owned, nil |
| } |
| |
| // EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash] |
| // We ignore pod-template-hash because: |
| // 1. The hash result would be different upon podTemplateSpec API changes |
| // (e.g. the addition of a new field will cause the hash code to change) |
| // 2. The deployment template won't have hash labels |
| func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool { |
| t1Copy := template1.DeepCopy() |
| t2Copy := template2.DeepCopy() |
| // Remove hash labels from template.Labels before comparing |
| delete(t1Copy.Labels, apps.DefaultDeploymentUniqueLabelKey) |
| delete(t2Copy.Labels, apps.DefaultDeploymentUniqueLabelKey) |
| return apiequality.Semantic.DeepEqual(t1Copy, t2Copy) |
| } |
| |
| // FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template). |
| func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet { |
| sort.Sort(controller.ReplicaSetsByCreationTimestamp(rsList)) |
| for i := range rsList { |
| if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) { |
| // In rare cases, such as after cluster upgrades, Deployment may end up with |
| // having more than one new ReplicaSets that have the same template as its template, |
| // see https://github.com/kubernetes/kubernetes/issues/40415 |
| // We deterministically choose the oldest new ReplicaSet. |
| return rsList[i] |
| } |
| } |
| // new ReplicaSet does not exist. |
| return nil |
| } |
| |
| // FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given slice of RSes. |
| // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. |
| func FindOldReplicaSets(deployment *apps.Deployment, rsList []*apps.ReplicaSet) ([]*apps.ReplicaSet, []*apps.ReplicaSet) { |
| var requiredRSs []*apps.ReplicaSet |
| var allRSs []*apps.ReplicaSet |
| newRS := FindNewReplicaSet(deployment, rsList) |
| for _, rs := range rsList { |
| // Filter out new replica set |
| if newRS != nil && rs.UID == newRS.UID { |
| continue |
| } |
| allRSs = append(allRSs, rs) |
| if *(rs.Spec.Replicas) != 0 { |
| requiredRSs = append(requiredRSs, rs) |
| } |
| } |
| return requiredRSs, allRSs |
| } |
| |
| // SetFromReplicaSetTemplate sets the desired PodTemplateSpec from a replica set template to the given deployment. |
| func SetFromReplicaSetTemplate(deployment *apps.Deployment, template v1.PodTemplateSpec) *apps.Deployment { |
| deployment.Spec.Template.ObjectMeta = template.ObjectMeta |
| deployment.Spec.Template.Spec = template.Spec |
| deployment.Spec.Template.ObjectMeta.Labels = labelsutil.CloneAndRemoveLabel( |
| deployment.Spec.Template.ObjectMeta.Labels, |
| apps.DefaultDeploymentUniqueLabelKey) |
| return deployment |
| } |
| |
| // GetReplicaCountForReplicaSets returns the sum of Replicas of the given replica sets. |
| func GetReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 { |
| totalReplicas := int32(0) |
| for _, rs := range replicaSets { |
| if rs != nil { |
| totalReplicas += *(rs.Spec.Replicas) |
| } |
| } |
| return totalReplicas |
| } |
| |
| // GetActualReplicaCountForReplicaSets returns the sum of actual replicas of the given replica sets. |
| func GetActualReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 { |
| totalActualReplicas := int32(0) |
| for _, rs := range replicaSets { |
| if rs != nil { |
| totalActualReplicas += rs.Status.Replicas |
| } |
| } |
| return totalActualReplicas |
| } |
| |
| // GetReadyReplicaCountForReplicaSets returns the number of ready pods corresponding to the given replica sets. |
| func GetReadyReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 { |
| totalReadyReplicas := int32(0) |
| for _, rs := range replicaSets { |
| if rs != nil { |
| totalReadyReplicas += rs.Status.ReadyReplicas |
| } |
| } |
| return totalReadyReplicas |
| } |
| |
| // GetAvailableReplicaCountForReplicaSets returns the number of available pods corresponding to the given replica sets. |
| func GetAvailableReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 { |
| totalAvailableReplicas := int32(0) |
| for _, rs := range replicaSets { |
| if rs != nil { |
| totalAvailableReplicas += rs.Status.AvailableReplicas |
| } |
| } |
| return totalAvailableReplicas |
| } |
| |
| // IsRollingUpdate returns true if the strategy type is a rolling update. |
| func IsRollingUpdate(deployment *apps.Deployment) bool { |
| return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType |
| } |
| |
| // DeploymentComplete considers a deployment to be complete once all of its desired replicas |
| // are updated and available, and no old pods are running. |
| func DeploymentComplete(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool { |
| return newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) && |
| newStatus.Replicas == *(deployment.Spec.Replicas) && |
| newStatus.AvailableReplicas == *(deployment.Spec.Replicas) && |
| newStatus.ObservedGeneration >= deployment.Generation |
| } |
| |
| // DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the |
| // current with the new status of the deployment that the controller is observing. More specifically, |
| // when new pods are scaled up or become ready or available, or old pods are scaled down, then we |
| // consider the deployment is progressing. |
| func DeploymentProgressing(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool { |
| oldStatus := deployment.Status |
| |
| // Old replicas that need to be scaled down |
| oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas |
| newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas |
| |
| return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) || |
| (newStatusOldReplicas < oldStatusOldReplicas) || |
| newStatus.ReadyReplicas > deployment.Status.ReadyReplicas || |
| newStatus.AvailableReplicas > deployment.Status.AvailableReplicas |
| } |
| |
| // used for unit testing |
| var nowFn = func() time.Time { return time.Now() } |
| |
| // DeploymentTimedOut considers a deployment to have timed out once its condition that reports progress |
| // is older than progressDeadlineSeconds or a Progressing condition with a TimedOutReason reason already |
| // exists. |
| func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool { |
| if !HasProgressDeadline(deployment) { |
| return false |
| } |
| |
| // Look for the Progressing condition. If it doesn't exist, we have no base to estimate progress. |
| // If it's already set with a TimedOutReason reason, we have already timed out, no need to check |
| // again. |
| condition := GetDeploymentCondition(*newStatus, apps.DeploymentProgressing) |
| if condition == nil { |
| return false |
| } |
| // If the previous condition has been a successful rollout then we shouldn't try to |
| // estimate any progress. Scenario: |
| // |
| // * progressDeadlineSeconds is smaller than the difference between now and the time |
| // the last rollout finished in the past. |
| // * the creation of a new ReplicaSet triggers a resync of the Deployment prior to the |
| // cached copy of the Deployment getting updated with the status.condition that indicates |
| // the creation of the new ReplicaSet. |
| // |
| // The Deployment will be resynced and eventually its Progressing condition will catch |
| // up with the state of the world. |
| if condition.Reason == NewRSAvailableReason { |
| return false |
| } |
| if condition.Reason == TimedOutReason { |
| return true |
| } |
| |
| // Look at the difference in seconds between now and the last time we reported any |
| // progress or tried to create a replica set, or resumed a paused deployment and |
| // compare against progressDeadlineSeconds. |
| from := condition.LastUpdateTime |
| now := nowFn() |
| delta := time.Duration(*deployment.Spec.ProgressDeadlineSeconds) * time.Second |
| timedOut := from.Add(delta).Before(now) |
| |
| klog.V(4).Infof("Deployment %q timed out (%t) [last progress check: %v - now: %v]", deployment.Name, timedOut, from, now) |
| return timedOut |
| } |
| |
| // NewRSNewReplicas calculates the number of replicas a deployment's new RS should have. |
| // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it. |
| // 1) The new RS is saturated: newRS's replicas == deployment's replicas |
| // 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas |
| func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) { |
| switch deployment.Spec.Strategy.Type { |
| case apps.RollingUpdateDeploymentStrategyType: |
| // Check if we can scale up. |
| maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true) |
| if err != nil { |
| return 0, err |
| } |
| // Find the total number of pods |
| currentPodCount := GetReplicaCountForReplicaSets(allRSs) |
| maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge) |
| if currentPodCount >= maxTotalPods { |
| // Cannot scale up. |
| return *(newRS.Spec.Replicas), nil |
| } |
| // Scale up. |
| scaleUpCount := maxTotalPods - currentPodCount |
| // Do not exceed the number of desired replicas. |
| scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas)))) |
| return *(newRS.Spec.Replicas) + scaleUpCount, nil |
| case apps.RecreateDeploymentStrategyType: |
| return *(deployment.Spec.Replicas), nil |
| default: |
| return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type) |
| } |
| } |
| |
| // IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size. |
| // Both the deployment and the replica set have to believe this replica set can own all of the desired |
| // replicas in the deployment and the annotation helps in achieving that. All pods of the ReplicaSet |
| // need to be available. |
| func IsSaturated(deployment *apps.Deployment, rs *apps.ReplicaSet) bool { |
| if rs == nil { |
| return false |
| } |
| desiredString := rs.Annotations[DesiredReplicasAnnotation] |
| desired, err := strconv.Atoi(desiredString) |
| if err != nil { |
| return false |
| } |
| return *(rs.Spec.Replicas) == *(deployment.Spec.Replicas) && |
| int32(desired) == *(deployment.Spec.Replicas) && |
| rs.Status.AvailableReplicas == *(deployment.Spec.Replicas) |
| } |
| |
| // WaitForObservedDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration. |
| // Returns error if polling timesout. |
| func WaitForObservedDeployment(getDeploymentFunc func() (*apps.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error { |
| // TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface. |
| return wait.PollImmediate(interval, timeout, func() (bool, error) { |
| deployment, err := getDeploymentFunc() |
| if err != nil { |
| return false, err |
| } |
| return deployment.Status.ObservedGeneration >= desiredGeneration, nil |
| }) |
| } |
| |
| // ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one |
| // step. For example: |
| // |
| // 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1) |
| // 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1) |
| // 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1) |
| // 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1) |
| // 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1) |
| // 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1) |
| func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) { |
| surge, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxSurge, intstrutil.FromInt(0)), int(desired), true) |
| if err != nil { |
| return 0, 0, err |
| } |
| unavailable, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxUnavailable, intstrutil.FromInt(0)), int(desired), false) |
| if err != nil { |
| return 0, 0, err |
| } |
| |
| if surge == 0 && unavailable == 0 { |
| // Validation should never allow the user to explicitly use zero values for both maxSurge |
| // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero. |
| // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the |
| // theory that surge might not work due to quota. |
| unavailable = 1 |
| } |
| |
| return int32(surge), int32(unavailable), nil |
| } |
| |
| func HasProgressDeadline(d *apps.Deployment) bool { |
| return d.Spec.ProgressDeadlineSeconds != nil && *d.Spec.ProgressDeadlineSeconds != math.MaxInt32 |
| } |
| func HasRevisionHistoryLimit(d *apps.Deployment) bool { |
| return d.Spec.RevisionHistoryLimit != nil && *d.Spec.RevisionHistoryLimit != math.MaxInt32 |
| } |