| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package status |
| |
| import ( |
| "fmt" |
| "sort" |
| "sync" |
| "time" |
| |
| clientset "k8s.io/client-go/kubernetes" |
| |
| "k8s.io/api/core/v1" |
| apiequality "k8s.io/apimachinery/pkg/api/equality" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/diff" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/klog" |
| podutil "k8s.io/kubernetes/pkg/api/v1/pod" |
| kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" |
| kubepod "k8s.io/kubernetes/pkg/kubelet/pod" |
| kubetypes "k8s.io/kubernetes/pkg/kubelet/types" |
| "k8s.io/kubernetes/pkg/kubelet/util/format" |
| statusutil "k8s.io/kubernetes/pkg/util/pod" |
| ) |
| |
| // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are |
| // not sent to the API server. |
| type versionedPodStatus struct { |
| status v1.PodStatus |
| // Monotonically increasing version number (per pod). |
| version uint64 |
| // Pod name & namespace, for sending updates to API server. |
| podName string |
| podNamespace string |
| } |
| |
| type podStatusSyncRequest struct { |
| podUID types.UID |
| status versionedPodStatus |
| } |
| |
| // Updates pod statuses in apiserver. Writes only when new status has changed. |
| // All methods are thread-safe. |
| type manager struct { |
| kubeClient clientset.Interface |
| podManager kubepod.Manager |
| // Map from pod UID to sync status of the corresponding pod. |
| podStatuses map[types.UID]versionedPodStatus |
| podStatusesLock sync.RWMutex |
| podStatusChannel chan podStatusSyncRequest |
| // Map from (mirror) pod UID to latest status version successfully sent to the API server. |
| // apiStatusVersions must only be accessed from the sync thread. |
| apiStatusVersions map[kubetypes.MirrorPodUID]uint64 |
| podDeletionSafety PodDeletionSafetyProvider |
| } |
| |
| // PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components |
| // that need to introspect status. |
| type PodStatusProvider interface { |
| // GetPodStatus returns the cached status for the provided pod UID, as well as whether it |
| // was a cache hit. |
| GetPodStatus(uid types.UID) (v1.PodStatus, bool) |
| } |
| |
| // An object which provides guarantees that a pod can be safely deleted. |
| type PodDeletionSafetyProvider interface { |
| // A function which returns true if the pod can safely be deleted |
| PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool |
| } |
| |
| // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with |
| // the latest v1.PodStatus. It also syncs updates back to the API server. |
| type Manager interface { |
| PodStatusProvider |
| |
| // Start the API server status sync loop. |
| Start() |
| |
| // SetPodStatus caches updates the cached status for the given pod, and triggers a status update. |
| SetPodStatus(pod *v1.Pod, status v1.PodStatus) |
| |
| // SetContainerReadiness updates the cached container status with the given readiness, and |
| // triggers a status update. |
| SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) |
| |
| // TerminatePod resets the container status for the provided pod to terminated and triggers |
| // a status update. |
| TerminatePod(pod *v1.Pod) |
| |
| // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in |
| // the provided podUIDs. |
| RemoveOrphanedStatuses(podUIDs map[types.UID]bool) |
| } |
| |
| const syncPeriod = 10 * time.Second |
| |
| func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager { |
| return &manager{ |
| kubeClient: kubeClient, |
| podManager: podManager, |
| podStatuses: make(map[types.UID]versionedPodStatus), |
| podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses |
| apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), |
| podDeletionSafety: podDeletionSafety, |
| } |
| } |
| |
| // isPodStatusByKubeletEqual returns true if the given pod statuses are equal when non-kubelet-owned |
| // pod conditions are excluded. |
| // This method normalizes the status before comparing so as to make sure that meaningless |
| // changes will be ignored. |
| func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool { |
| oldCopy := oldStatus.DeepCopy() |
| for _, c := range status.Conditions { |
| if kubetypes.PodConditionByKubelet(c.Type) { |
| _, oc := podutil.GetPodCondition(oldCopy, c.Type) |
| if oc == nil || oc.Status != c.Status { |
| return false |
| } |
| } |
| } |
| oldCopy.Conditions = status.Conditions |
| return apiequality.Semantic.DeepEqual(oldCopy, status) |
| } |
| |
| func (m *manager) Start() { |
| // Don't start the status manager if we don't have a client. This will happen |
| // on the master, where the kubelet is responsible for bootstrapping the pods |
| // of the master components. |
| if m.kubeClient == nil { |
| klog.Infof("Kubernetes client is nil, not starting status manager.") |
| return |
| } |
| |
| klog.Info("Starting to sync pod status with apiserver") |
| syncTicker := time.Tick(syncPeriod) |
| // syncPod and syncBatch share the same go routine to avoid sync races. |
| go wait.Forever(func() { |
| select { |
| case syncRequest := <-m.podStatusChannel: |
| klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel", |
| syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) |
| m.syncPod(syncRequest.podUID, syncRequest.status) |
| case <-syncTicker: |
| m.syncBatch() |
| } |
| }, 0) |
| } |
| |
| func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { |
| m.podStatusesLock.RLock() |
| defer m.podStatusesLock.RUnlock() |
| status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))] |
| return status.status, ok |
| } |
| |
| func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { |
| m.podStatusesLock.Lock() |
| defer m.podStatusesLock.Unlock() |
| |
| for _, c := range pod.Status.Conditions { |
| if !kubetypes.PodConditionByKubelet(c.Type) { |
| klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+ |
| "But it is not owned by kubelet.", string(c.Type), format.Pod(pod)) |
| } |
| } |
| // Make sure we're caching a deep copy. |
| status = *status.DeepCopy() |
| |
| // Force a status update if deletion timestamp is set. This is necessary |
| // because if the pod is in the non-running state, the pod worker still |
| // needs to be able to trigger an update and/or deletion. |
| m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil) |
| } |
| |
| func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { |
| m.podStatusesLock.Lock() |
| defer m.podStatusesLock.Unlock() |
| |
| pod, ok := m.podManager.GetPodByUID(podUID) |
| if !ok { |
| klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID)) |
| return |
| } |
| |
| oldStatus, found := m.podStatuses[pod.UID] |
| if !found { |
| klog.Warningf("Container readiness changed before pod has synced: %q - %q", |
| format.Pod(pod), containerID.String()) |
| return |
| } |
| |
| // Find the container to update. |
| containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String()) |
| if !ok { |
| klog.Warningf("Container readiness changed for unknown container: %q - %q", |
| format.Pod(pod), containerID.String()) |
| return |
| } |
| |
| if containerStatus.Ready == ready { |
| klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready, |
| format.Pod(pod), containerID.String()) |
| return |
| } |
| |
| // Make sure we're not updating the cached version. |
| status := *oldStatus.status.DeepCopy() |
| containerStatus, _, _ = findContainerStatus(&status, containerID.String()) |
| containerStatus.Ready = ready |
| |
| // updateConditionFunc updates the corresponding type of condition |
| updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) { |
| conditionIndex := -1 |
| for i, condition := range status.Conditions { |
| if condition.Type == conditionType { |
| conditionIndex = i |
| break |
| } |
| } |
| if conditionIndex != -1 { |
| status.Conditions[conditionIndex] = condition |
| } else { |
| klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status) |
| status.Conditions = append(status.Conditions, condition) |
| } |
| } |
| updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase)) |
| updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)) |
| m.updateStatusInternal(pod, status, false) |
| } |
| |
| func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { |
| // Find the container to update. |
| for i, c := range status.ContainerStatuses { |
| if c.ContainerID == containerID { |
| return &status.ContainerStatuses[i], false, true |
| } |
| } |
| |
| for i, c := range status.InitContainerStatuses { |
| if c.ContainerID == containerID { |
| return &status.InitContainerStatuses[i], true, true |
| } |
| } |
| |
| return nil, false, false |
| |
| } |
| |
| func (m *manager) TerminatePod(pod *v1.Pod) { |
| m.podStatusesLock.Lock() |
| defer m.podStatusesLock.Unlock() |
| oldStatus := &pod.Status |
| if cachedStatus, ok := m.podStatuses[pod.UID]; ok { |
| oldStatus = &cachedStatus.status |
| } |
| status := *oldStatus.DeepCopy() |
| for i := range status.ContainerStatuses { |
| status.ContainerStatuses[i].State = v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{}, |
| } |
| } |
| for i := range status.InitContainerStatuses { |
| status.InitContainerStatuses[i].State = v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{}, |
| } |
| } |
| m.updateStatusInternal(pod, status, true) |
| } |
| |
| // checkContainerStateTransition ensures that no container is trying to transition |
| // from a terminated to non-terminated state, which is illegal and indicates a |
| // logical error in the kubelet. |
| func checkContainerStateTransition(oldStatuses, newStatuses []v1.ContainerStatus, restartPolicy v1.RestartPolicy) error { |
| // If we should always restart, containers are allowed to leave the terminated state |
| if restartPolicy == v1.RestartPolicyAlways { |
| return nil |
| } |
| for _, oldStatus := range oldStatuses { |
| // Skip any container that wasn't terminated |
| if oldStatus.State.Terminated == nil { |
| continue |
| } |
| // Skip any container that failed but is allowed to restart |
| if oldStatus.State.Terminated.ExitCode != 0 && restartPolicy == v1.RestartPolicyOnFailure { |
| continue |
| } |
| for _, newStatus := range newStatuses { |
| if oldStatus.Name == newStatus.Name && newStatus.State.Terminated == nil { |
| return fmt.Errorf("terminated container %v attempted illegal transition to non-terminated state", newStatus.Name) |
| } |
| } |
| } |
| return nil |
| } |
| |
| // updateStatusInternal updates the internal status cache, and queues an update to the api server if |
| // necessary. Returns whether an update was triggered. |
| // This method IS NOT THREAD SAFE and must be called from a locked function. |
| func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool { |
| var oldStatus v1.PodStatus |
| cachedStatus, isCached := m.podStatuses[pod.UID] |
| if isCached { |
| oldStatus = cachedStatus.status |
| } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { |
| oldStatus = mirrorPod.Status |
| } else { |
| oldStatus = pod.Status |
| } |
| |
| // Check for illegal state transition in containers |
| if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil { |
| klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err) |
| return false |
| } |
| if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil { |
| klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err) |
| return false |
| } |
| |
| // Set ContainersReadyCondition.LastTransitionTime. |
| updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady) |
| |
| // Set ReadyCondition.LastTransitionTime. |
| updateLastTransitionTime(&status, &oldStatus, v1.PodReady) |
| |
| // Set InitializedCondition.LastTransitionTime. |
| updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized) |
| |
| // Set PodScheduledCondition.LastTransitionTime. |
| updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled) |
| |
| // ensure that the start time does not change across updates. |
| if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() { |
| status.StartTime = oldStatus.StartTime |
| } else if status.StartTime.IsZero() { |
| // if the status has no start time, we need to set an initial time |
| now := metav1.Now() |
| status.StartTime = &now |
| } |
| |
| normalizeStatus(pod, &status) |
| // The intent here is to prevent concurrent updates to a pod's status from |
| // clobbering each other so the phase of a pod progresses monotonically. |
| if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { |
| klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) |
| return false // No new status. |
| } |
| |
| newStatus := versionedPodStatus{ |
| status: status, |
| version: cachedStatus.version + 1, |
| podName: pod.Name, |
| podNamespace: pod.Namespace, |
| } |
| m.podStatuses[pod.UID] = newStatus |
| |
| select { |
| case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: |
| klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%q, %v) to podStatusChannel", |
| pod.UID, newStatus.version, newStatus.status) |
| return true |
| default: |
| // Let the periodic syncBatch handle the update if the channel is full. |
| // We can't block, since we hold the mutex lock. |
| klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v", |
| format.Pod(pod), status) |
| return false |
| } |
| } |
| |
| // updateLastTransitionTime updates the LastTransitionTime of a pod condition. |
| func updateLastTransitionTime(status, oldStatus *v1.PodStatus, conditionType v1.PodConditionType) { |
| _, condition := podutil.GetPodCondition(status, conditionType) |
| if condition == nil { |
| return |
| } |
| // Need to set LastTransitionTime. |
| lastTransitionTime := metav1.Now() |
| _, oldCondition := podutil.GetPodCondition(oldStatus, conditionType) |
| if oldCondition != nil && condition.Status == oldCondition.Status { |
| lastTransitionTime = oldCondition.LastTransitionTime |
| } |
| condition.LastTransitionTime = lastTransitionTime |
| } |
| |
| // deletePodStatus simply removes the given pod from the status cache. |
| func (m *manager) deletePodStatus(uid types.UID) { |
| m.podStatusesLock.Lock() |
| defer m.podStatusesLock.Unlock() |
| delete(m.podStatuses, uid) |
| } |
| |
| // TODO(filipg): It'd be cleaner if we can do this without signal from user. |
| func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { |
| m.podStatusesLock.Lock() |
| defer m.podStatusesLock.Unlock() |
| for key := range m.podStatuses { |
| if _, ok := podUIDs[key]; !ok { |
| klog.V(5).Infof("Removing %q from status map.", key) |
| delete(m.podStatuses, key) |
| } |
| } |
| } |
| |
| // syncBatch syncs pods statuses with the apiserver. |
| func (m *manager) syncBatch() { |
| var updatedStatuses []podStatusSyncRequest |
| podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() |
| func() { // Critical section |
| m.podStatusesLock.RLock() |
| defer m.podStatusesLock.RUnlock() |
| |
| // Clean up orphaned versions. |
| for uid := range m.apiStatusVersions { |
| _, hasPod := m.podStatuses[types.UID(uid)] |
| _, hasMirror := mirrorToPod[uid] |
| if !hasPod && !hasMirror { |
| delete(m.apiStatusVersions, uid) |
| } |
| } |
| |
| for uid, status := range m.podStatuses { |
| syncedUID := kubetypes.MirrorPodUID(uid) |
| if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { |
| if mirrorUID == "" { |
| klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace) |
| continue |
| } |
| syncedUID = mirrorUID |
| } |
| if m.needsUpdate(types.UID(syncedUID), status) { |
| updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) |
| } else if m.needsReconcile(uid, status.status) { |
| // Delete the apiStatusVersions here to force an update on the pod status |
| // In most cases the deleted apiStatusVersions here should be filled |
| // soon after the following syncPod() [If the syncPod() sync an update |
| // successfully]. |
| delete(m.apiStatusVersions, syncedUID) |
| updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) |
| } |
| } |
| }() |
| |
| for _, update := range updatedStatuses { |
| klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID) |
| m.syncPod(update.podUID, update.status) |
| } |
| } |
| |
| // syncPod syncs the given status with the API server. The caller must not hold the lock. |
| func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { |
| if !m.needsUpdate(uid, status) { |
| klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid) |
| return |
| } |
| |
| // TODO: make me easier to express from client code |
| pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{}) |
| if errors.IsNotFound(err) { |
| klog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid) |
| // If the Pod is deleted the status will be cleared in |
| // RemoveOrphanedStatuses, so we just ignore the update here. |
| return |
| } |
| if err != nil { |
| klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err) |
| return |
| } |
| |
| translatedUID := m.podManager.TranslatePodUID(pod.UID) |
| // Type convert original uid just for the purpose of comparison. |
| if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) { |
| klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID) |
| m.deletePodStatus(uid) |
| return |
| } |
| |
| oldStatus := pod.Status.DeepCopy() |
| newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status)) |
| klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) |
| if err != nil { |
| klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) |
| return |
| } |
| pod = newPod |
| |
| klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status) |
| m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version |
| |
| // We don't handle graceful deletion of mirror pods. |
| if m.canBeDeleted(pod, status.status) { |
| deleteOptions := metav1.NewDeleteOptions(0) |
| // Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace. |
| deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID)) |
| err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions) |
| if err != nil { |
| klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err) |
| return |
| } |
| klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod)) |
| m.deletePodStatus(uid) |
| } |
| } |
| |
| // needsUpdate returns whether the status is stale for the given pod UID. |
| // This method is not thread safe, and must only be accessed by the sync thread. |
| func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { |
| latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)] |
| if !ok || latest < status.version { |
| return true |
| } |
| pod, ok := m.podManager.GetPodByUID(uid) |
| if !ok { |
| return false |
| } |
| return m.canBeDeleted(pod, status.status) |
| } |
| |
| func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool { |
| if pod.DeletionTimestamp == nil || kubepod.IsMirrorPod(pod) { |
| return false |
| } |
| return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status) |
| } |
| |
| // needsReconcile compares the given status with the status in the pod manager (which |
| // in fact comes from apiserver), returns whether the status needs to be reconciled with |
| // the apiserver. Now when pod status is inconsistent between apiserver and kubelet, |
| // kubelet should forcibly send an update to reconcile the inconsistence, because kubelet |
| // should be the source of truth of pod status. |
| // NOTE(random-liu): It's simpler to pass in mirror pod uid and get mirror pod by uid, but |
| // now the pod manager only supports getting mirror pod by static pod, so we have to pass |
| // static pod uid here. |
| // TODO(random-liu): Simplify the logic when mirror pod manager is added. |
| func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { |
| // The pod could be a static pod, so we should translate first. |
| pod, ok := m.podManager.GetPodByUID(uid) |
| if !ok { |
| klog.V(4).Infof("Pod %q has been deleted, no need to reconcile", string(uid)) |
| return false |
| } |
| // If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us. |
| if kubepod.IsStaticPod(pod) { |
| mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod) |
| if !ok { |
| klog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod)) |
| return false |
| } |
| pod = mirrorPod |
| } |
| |
| podStatus := pod.Status.DeepCopy() |
| normalizeStatus(pod, podStatus) |
| |
| if isPodStatusByKubeletEqual(podStatus, &status) { |
| // If the status from the source is the same with the cached status, |
| // reconcile is not needed. Just return. |
| return false |
| } |
| klog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %+v", format.Pod(pod), |
| diff.ObjectDiff(podStatus, status)) |
| |
| return true |
| } |
| |
| // We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by |
| // apiserver has no nanosecond information. However, the timestamp returned by metav1.Now() contains nanosecond, |
| // so when we do comparison between status from apiserver and cached status, isPodStatusByKubeletEqual() will always return false. |
| // There is related issue #15262 and PR #15263 about this. |
| // In fact, the best way to solve this is to do it on api side. However, for now, we normalize the status locally in |
| // kubelet temporarily. |
| // TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent. |
| func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { |
| bytesPerStatus := kubecontainer.MaxPodTerminationMessageLogLength |
| if containers := len(pod.Spec.Containers) + len(pod.Spec.InitContainers); containers > 0 { |
| bytesPerStatus = bytesPerStatus / containers |
| } |
| normalizeTimeStamp := func(t *metav1.Time) { |
| *t = t.Rfc3339Copy() |
| } |
| normalizeContainerState := func(c *v1.ContainerState) { |
| if c.Running != nil { |
| normalizeTimeStamp(&c.Running.StartedAt) |
| } |
| if c.Terminated != nil { |
| normalizeTimeStamp(&c.Terminated.StartedAt) |
| normalizeTimeStamp(&c.Terminated.FinishedAt) |
| if len(c.Terminated.Message) > bytesPerStatus { |
| c.Terminated.Message = c.Terminated.Message[:bytesPerStatus] |
| } |
| } |
| } |
| |
| if status.StartTime != nil { |
| normalizeTimeStamp(status.StartTime) |
| } |
| for i := range status.Conditions { |
| condition := &status.Conditions[i] |
| normalizeTimeStamp(&condition.LastProbeTime) |
| normalizeTimeStamp(&condition.LastTransitionTime) |
| } |
| |
| // update container statuses |
| for i := range status.ContainerStatuses { |
| cstatus := &status.ContainerStatuses[i] |
| normalizeContainerState(&cstatus.State) |
| normalizeContainerState(&cstatus.LastTerminationState) |
| } |
| // Sort the container statuses, so that the order won't affect the result of comparison |
| sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses)) |
| |
| // update init container statuses |
| for i := range status.InitContainerStatuses { |
| cstatus := &status.InitContainerStatuses[i] |
| normalizeContainerState(&cstatus.State) |
| normalizeContainerState(&cstatus.LastTerminationState) |
| } |
| // Sort the container statuses, so that the order won't affect the result of comparison |
| kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses) |
| return status |
| } |
| |
| // mergePodStatus merges oldPodStatus and newPodStatus where pod conditions |
| // not owned by kubelet is preserved from oldPodStatus |
| func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { |
| podConditions := []v1.PodCondition{} |
| for _, c := range oldPodStatus.Conditions { |
| if !kubetypes.PodConditionByKubelet(c.Type) { |
| podConditions = append(podConditions, c) |
| } |
| } |
| |
| for _, c := range newPodStatus.Conditions { |
| if kubetypes.PodConditionByKubelet(c.Type) { |
| podConditions = append(podConditions, c) |
| } |
| } |
| newPodStatus.Conditions = podConditions |
| return newPodStatus |
| } |
| |
| // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile |
| func NeedToReconcilePodReadiness(pod *v1.Pod) bool { |
| if len(pod.Spec.ReadinessGates) == 0 { |
| return false |
| } |
| podReadyCondition := GeneratePodReadyCondition(&pod.Spec, pod.Status.Conditions, pod.Status.ContainerStatuses, pod.Status.Phase) |
| i, curCondition := podutil.GetPodConditionFromList(pod.Status.Conditions, v1.PodReady) |
| // Only reconcile if "Ready" condition is present |
| if i >= 0 && curCondition.Status != podReadyCondition.Status { |
| return true |
| } |
| return false |
| } |