Add readinessCondition to stop traffic to pods that will be stopped (#530)

diff --git a/api/v1beta1/solrcloud_types.go b/api/v1beta1/solrcloud_types.go
index bbbeac8..53dfa97 100644
--- a/api/v1beta1/solrcloud_types.go
+++ b/api/v1beta1/solrcloud_types.go
@@ -1138,6 +1138,10 @@
 
 	// This Solr Node pod is using the latest version of solrcloud pod spec.
 	SpecUpToDate bool `json:"specUpToDate"`
+
+	// This Solr Node pod is scheduled for deletion
+	// +optional
+	ScheduledForDeletion bool `json:"scheduledForDeletion"`
 }
 
 //+kubebuilder:object:root=true
diff --git a/config/crd/bases/solr.apache.org_solrclouds.yaml b/config/crd/bases/solr.apache.org_solrclouds.yaml
index dd8b12f..1a18705 100644
--- a/config/crd/bases/solr.apache.org_solrclouds.yaml
+++ b/config/crd/bases/solr.apache.org_solrclouds.yaml
@@ -11753,6 +11753,9 @@
                     ready:
                       description: Is the node up and running
                       type: boolean
+                    scheduledForDeletion:
+                      description: This Solr Node pod is scheduled for deletion
+                      type: boolean
                     specUpToDate:
                       description: This Solr Node pod is using the latest version
                         of solrcloud pod spec.
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 2132bc0..23691ec 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -67,6 +67,7 @@
   - pods/status
   verbs:
   - get
+  - patch
 - apiGroups:
   - ""
   resources:
diff --git a/controllers/solr_pod_lifecycle_util.go b/controllers/solr_pod_lifecycle_util.go
new file mode 100644
index 0000000..0d3666f
--- /dev/null
+++ b/controllers/solr_pod_lifecycle_util.go
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controllers
+
+import (
+	"context"
+	solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
+	"github.com/apache/solr-operator/controllers/util"
+	"github.com/go-logr/logr"
+	corev1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+	"time"
+)
+
+type podReadinessConditionChange struct {
+	// If this is provided, the change will only be made if the condition currently uses this reason
+	matchPreviousReason *PodConditionChangeReason
+	reason              PodConditionChangeReason
+	message             string
+	status              bool
+}
+
+// PodConditionChangeReason describes the reason why a Pod is being stopped.
+type PodConditionChangeReason string
+
+const (
+	PodStarted           PodConditionChangeReason = "PodStarted"
+	PodUpdate            PodConditionChangeReason = "PodUpdate"
+	EvictingReplicas     PodConditionChangeReason = "EvictingReplicas"
+	StatefulSetScaleDown PodConditionChangeReason = "StatefulSetScaleDown"
+)
+
+func DeletePodForUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, pod *corev1.Pod, podHasReplicas bool, logger logr.Logger) (requeueAfterDuration time.Duration, err error) {
+	// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
+	ps := PodStarted
+	podStoppedReadinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
+		util.SolrIsNotStoppedReadinessCondition: {
+			reason:  PodUpdate,
+			message: "Pod is being deleted, traffic to the pod must be stopped",
+			status:  false,
+		},
+		util.SolrReplicasNotEvictedReadinessCondition: {
+			// Only set this condition if the condition hasn't been changed since pod start
+			// We do not want to over-write future states later down the eviction pipeline
+			matchPreviousReason: &ps,
+			reason:              EvictingReplicas,
+			message:             "Pod is being deleted, ephemeral data must be evicted",
+			status:              false,
+		},
+	}
+	if updatedPod, e := EnsurePodReadinessConditions(ctx, r, pod, podStoppedReadinessConditions, logger); e != nil {
+		err = e
+		return
+	} else {
+		pod = updatedPod
+	}
+
+	// If the pod needs to be drained of replicas (i.e. upgrading a pod with ephemeral storage), do that before deleting the pod
+	deletePod := false
+	// TODO: After v0.7.0 release, can remove "podHasReplicas ||", as this is no longer needed
+	if podHasReplicas || PodConditionEquals(pod, util.SolrReplicasNotEvictedReadinessCondition, EvictingReplicas) {
+		// Only evict pods that contain replicas in the clusterState
+		if evictError, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, logger); evictError != nil {
+			err = evictError
+			logger.Error(err, "Error while evicting replicas on pod", "pod", pod.Name)
+		} else if canDeletePod {
+			deletePod = true
+		} else {
+			// Try again in 5 seconds if we cannot delete a pod.
+			requeueAfterDuration = time.Second * 5
+		}
+	} else {
+		// If a pod has no replicas, then update it when asked to
+		deletePod = true
+	}
+
+	// Delete the pod
+	if deletePod {
+		err = r.Delete(ctx, pod, client.Preconditions{
+			UID: &pod.UID,
+		})
+		if err != nil {
+			logger.Error(err, "Error while killing solr pod for update", "pod", pod.Name)
+		}
+
+		// TODO: Create event for the CRD.
+	}
+
+	return
+}
+
+func EnsurePodReadinessConditions(ctx context.Context, r *SolrCloudReconciler, pod *corev1.Pod, ensureConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (updatedPod *corev1.Pod, err error) {
+	updatedPod = pod.DeepCopy()
+
+	needsUpdate := false
+
+	for conditionType, readinessCondition := range ensureConditions {
+		podHasCondition := false
+		for _, gate := range pod.Spec.ReadinessGates {
+			if gate.ConditionType == conditionType {
+				podHasCondition = true
+			}
+		}
+		if podHasCondition {
+			needsUpdate = EnsurePodReadinessCondition(updatedPod, conditionType, readinessCondition) || needsUpdate
+		}
+	}
+
+	if needsUpdate {
+		if err = r.Status().Patch(ctx, updatedPod, client.MergeFrom(pod)); err != nil {
+			logger.Error(err, "Could not patch readiness condition(s) for pod to stop traffic", "pod", pod.Name)
+			updatedPod = pod
+
+			// TODO: Create event for the CRD.
+		}
+	} else {
+		updatedPod = pod
+	}
+
+	return
+}
+
+var (
+	initialSolrPodReadinessConditions = map[corev1.PodConditionType]podReadinessConditionChange{
+		util.SolrIsNotStoppedReadinessCondition: {
+			reason:  PodStarted,
+			message: "Pod has not yet been stopped",
+			status:  true,
+		},
+		util.SolrReplicasNotEvictedReadinessCondition: {
+			reason:  PodStarted,
+			message: "Replicas have not yet been evicted",
+			status:  true,
+		},
+	}
+)
+
+// InitializePodReadinessCondition set the default value for a pod's readiness condition after pod creation.
+func InitializePodReadinessCondition(pod *corev1.Pod, conditionType corev1.PodConditionType) (conditionNeedsInitializing bool) {
+	if foundInitialPodReadinessCondition, found := initialSolrPodReadinessConditions[conditionType]; found {
+		return InitializeCustomPodReadinessCondition(
+			pod,
+			conditionType,
+			foundInitialPodReadinessCondition.reason,
+			foundInitialPodReadinessCondition.message,
+			foundInitialPodReadinessCondition.status)
+	} else {
+		// If there is no default given for this readinessCondition, do nothing
+		return false
+	}
+}
+
+// InitializeCustomPodReadinessCondition set the default value for a pod's readiness condition after pod creation, given all the default values to set
+func InitializeCustomPodReadinessCondition(pod *corev1.Pod, conditionType corev1.PodConditionType, reason PodConditionChangeReason, message string, status bool) (conditionNeedsInitializing bool) {
+	conditionNeedsInitializing = true
+	conditionIndex := -1
+	for i, condition := range pod.Status.Conditions {
+		if condition.Type == conditionType {
+			conditionNeedsInitializing = condition.Reason == ""
+			conditionIndex = i
+			break
+		}
+	}
+
+	if conditionNeedsInitializing {
+		patchTime := metav1.Now()
+		conditionStatus := corev1.ConditionFalse
+		if status {
+			conditionStatus = corev1.ConditionTrue
+		}
+		initializedCondition := corev1.PodCondition{
+			Type:               conditionType,
+			Status:             conditionStatus,
+			Reason:             string(reason),
+			Message:            message,
+			LastProbeTime:      patchTime,
+			LastTransitionTime: patchTime,
+		}
+
+		if conditionIndex < 0 {
+			// The pod status does not contain the readiness condition, so add it
+			pod.Status.Conditions = append(pod.Status.Conditions, initializedCondition)
+		} else {
+			pod.Status.Conditions[conditionIndex] = initializedCondition
+		}
+	}
+
+	return
+}
+
+// EnsurePodReadinessCondition ensure the podCondition is set to the given values
+func EnsurePodReadinessCondition(pod *corev1.Pod, conditionType corev1.PodConditionType, ensureCondition podReadinessConditionChange) (conditionNeedsChange bool) {
+	conditionNeedsChange = false
+	conditionIndex := -1
+	for i, condition := range pod.Status.Conditions {
+		if condition.Type == conditionType {
+			if ensureCondition.matchPreviousReason != nil {
+				conditionNeedsChange = condition.Reason == string(*ensureCondition.matchPreviousReason)
+			} else {
+				conditionNeedsChange = condition.Reason != string(ensureCondition.reason)
+			}
+			if (condition.Status == corev1.ConditionTrue) != ensureCondition.status {
+				conditionNeedsChange = true
+			}
+			conditionIndex = i
+			break
+		}
+	}
+
+	if conditionNeedsChange {
+		patchTime := metav1.Now()
+		conditionStatus := corev1.ConditionFalse
+		if ensureCondition.status {
+			conditionStatus = corev1.ConditionTrue
+		}
+		initializedCondition := corev1.PodCondition{
+			Type:               conditionType,
+			Status:             conditionStatus,
+			Reason:             string(ensureCondition.reason),
+			Message:            ensureCondition.message,
+			LastProbeTime:      patchTime,
+			LastTransitionTime: patchTime,
+		}
+
+		pod.Status.Conditions[conditionIndex] = initializedCondition
+	}
+
+	return
+}
+
+// PodConditionEquals check if a podCondition equals what is expected
+func PodConditionEquals(pod *corev1.Pod, conditionType corev1.PodConditionType, reason PodConditionChangeReason) bool {
+	for _, condition := range pod.Status.Conditions {
+		if condition.Type == conditionType {
+			return string(reason) == condition.Reason
+		}
+	}
+
+	return false
+}
diff --git a/controllers/solrcloud_controller.go b/controllers/solrcloud_controller.go
index 9c9aacc..392923b 100644
--- a/controllers/solrcloud_controller.go
+++ b/controllers/solrcloud_controller.go
@@ -64,7 +64,7 @@
 }
 
 //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete
-//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get
+//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get;patch
 //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
 //+kubebuilder:rbac:groups="",resources=services/status,verbs=get
 //+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
@@ -393,22 +393,30 @@
 		}
 	}
 
-	var outOfDatePods, outOfDatePodsNotStarted []corev1.Pod
+	// Get the SolrCloud's Pods and initialize them if necessary
+	var podList []corev1.Pod
+	var podSelector labels.Selector
+	if podSelector, podList, err = r.initializePods(ctx, instance, logger); err != nil {
+		return requeueOrNot, err
+	}
+
+	// Make sure the SolrCloud status is up-to-date with the state of the cluster
+	var outOfDatePods util.OutOfDatePodSegmentation
 	var availableUpdatedPodCount int
-	outOfDatePods, outOfDatePodsNotStarted, availableUpdatedPodCount, err = r.reconcileCloudStatus(ctx, instance, logger, &newStatus, statefulSetStatus)
+	outOfDatePods, availableUpdatedPodCount, err = createCloudStatus(instance, &newStatus, statefulSetStatus, podSelector, podList)
 	if err != nil {
 		return requeueOrNot, err
 	}
 
 	// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
-	if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && len(outOfDatePods)+len(outOfDatePodsNotStarted) > 0 {
+	if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && len(outOfDatePods.NotStarted)+len(outOfDatePods.ScheduledForDeletion)+len(outOfDatePods.Running) > 0 {
 		updateLogger := logger.WithName("ManagedUpdateSelector")
 
 		// The out of date pods that have not been started, should all be updated immediately.
 		// There is no use "safely" updating pods which have not been started yet.
-		podsToUpdate := outOfDatePodsNotStarted
-		for _, pod := range outOfDatePodsNotStarted {
-			logger.Info("Pod killed for update.", "pod", pod.Name, "reason", "The solr container in the pod has not yet started, thus it is safe to update.")
+		podsToUpdate := append([]corev1.Pod{}, outOfDatePods.NotStarted...)
+		for _, pod := range outOfDatePods.NotStarted {
+			updateLogger.Info("Pod killed for update.", "pod", pod.Name, "reason", "The solr container in the pod has not yet started, thus it is safe to update.")
 		}
 
 		// If authn enabled on Solr, we need to pass the auth header
@@ -422,37 +430,34 @@
 
 		// Pick which pods should be deleted for an update.
 		// Don't exit on an error, which would only occur because of an HTTP Exception. Requeue later instead.
-		additionalPodsToUpdate, podsHaveReplicas, retryLater := util.DeterminePodsSafeToUpdate(ctx, instance, outOfDatePods, int(newStatus.ReadyReplicas), availableUpdatedPodCount, len(outOfDatePodsNotStarted), updateLogger)
+		additionalPodsToUpdate, podsHaveReplicas, retryLater, clusterStateError := util.DeterminePodsSafeToUpdate(ctx, instance, outOfDatePods, int(newStatus.ReadyReplicas), availableUpdatedPodCount, updateLogger)
+		// If we do not have the clusterState, it's not safe to update pods that are running
+		if clusterStateError != nil {
+			retryLater = true
+		} else {
+			podsToUpdate = append(podsToUpdate, outOfDatePods.ScheduledForDeletion...)
+			podsToUpdate = append(podsToUpdate, additionalPodsToUpdate...)
+		}
+
+		var retryLaterDuration time.Duration
 		// Only actually delete a running pod if it has been evicted, or doesn't need eviction (persistent storage)
-		for _, pod := range additionalPodsToUpdate {
-			if podsHaveReplicas[pod.Name] {
-				// Only evict pods that contain replicas in the clusterState
-				if evictError, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, &pod, updateLogger); evictError != nil {
-					err = evictError
-					updateLogger.Error(err, "Error while evicting replicas on pod", "pod", pod.Name)
-				} else if canDeletePod {
-					podsToUpdate = append(podsToUpdate, pod)
-				} else {
-					// Try again in 5 seconds if cannot delete a pod.
-					updateRequeueAfter(&requeueOrNot, time.Second*5)
-				}
-			} else {
-				// If a pod has no replicas, then update it when asked to
-				podsToUpdate = append(podsToUpdate, pod)
+		for _, pod := range podsToUpdate {
+			retryLaterDurationTemp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, podsHaveReplicas[pod.Name], updateLogger)
+
+			// Use the retryLaterDuration of the pod that requires a retry the soonest (smallest duration > 0)
+			if retryLaterDurationTemp > 0 && (retryLaterDurationTemp < retryLaterDuration || retryLaterDuration == 0) {
+				retryLaterDuration = retryLaterDurationTemp
+			}
+			if errTemp != nil {
+				err = errTemp
 			}
 		}
 
-		for _, pod := range podsToUpdate {
-			err = r.Delete(ctx, &pod, client.Preconditions{
-				UID: &pod.UID,
-			})
-			if err != nil {
-				updateLogger.Error(err, "Error while killing solr pod for update", "pod", pod.Name)
+		if err != nil || retryLaterDuration > 0 || retryLater {
+			if retryLaterDuration == 0 {
+				retryLaterDuration = time.Second * 10
 			}
-			// TODO: Create event for the CRD.
-		}
-		if err != nil || retryLater {
-			updateRequeueAfter(&requeueOrNot, time.Second*15)
+			updateRequeueAfter(&requeueOrNot, retryLaterDuration)
 		}
 		if err != nil {
 			return requeueOrNot, err
@@ -528,43 +533,78 @@
 	return requeueOrNot, err
 }
 
-func (r *SolrCloudReconciler) reconcileCloudStatus(ctx context.Context, solrCloud *solrv1beta1.SolrCloud, logger logr.Logger,
-	newStatus *solrv1beta1.SolrCloudStatus, statefulSetStatus appsv1.StatefulSetStatus) (outOfDatePods []corev1.Pod, outOfDatePodsNotStarted []corev1.Pod, availableUpdatedPodCount int, err error) {
+// InitializePods Ensure that all SolrCloud Pods are initialized
+func (r *SolrCloudReconciler) initializePods(ctx context.Context, solrCloud *solrv1beta1.SolrCloud, logger logr.Logger) (podSelector labels.Selector, podList []corev1.Pod, err error) {
 	foundPods := &corev1.PodList{}
 	selectorLabels := solrCloud.SharedLabels()
 	selectorLabels["technology"] = solrv1beta1.SolrTechnologyLabel
 
-	labelSelector := labels.SelectorFromSet(selectorLabels)
+	podSelector = labels.SelectorFromSet(selectorLabels)
 	listOps := &client.ListOptions{
 		Namespace:     solrCloud.Namespace,
-		LabelSelector: labelSelector,
+		LabelSelector: podSelector,
 	}
 
-	err = r.List(ctx, foundPods, listOps)
-	if err != nil {
-		return outOfDatePods, outOfDatePodsNotStarted, availableUpdatedPodCount, err
+	if err = r.List(ctx, foundPods, listOps); err != nil {
+		logger.Error(err, "Error listing pods for SolrCloud")
+		return
+	}
+	podList = foundPods.Items
+
+	// Initialize the pod's notStopped readinessCondition so that they can receive traffic until they are stopped
+	for i, pod := range podList {
+		if updatedPod, podError := r.initializePod(ctx, &pod, logger); podError != nil {
+			err = podError
+		} else if updatedPod != nil {
+			podList[i] = *updatedPod
+		}
+	}
+	return
+}
+
+// InitializePod Initialize Status Conditions for a SolrCloud Pod
+func (r *SolrCloudReconciler) initializePod(ctx context.Context, pod *corev1.Pod, logger logr.Logger) (updatedPod *corev1.Pod, err error) {
+	shouldPatchPod := false
+
+	updatedPod = pod.DeepCopy()
+
+	// Initialize all the readiness gates found for the pod
+	for _, readinessGate := range pod.Spec.ReadinessGates {
+		if InitializePodReadinessCondition(updatedPod, readinessGate.ConditionType) {
+			shouldPatchPod = true
+		}
 	}
 
+	if shouldPatchPod {
+		if err = r.Status().Patch(ctx, updatedPod, client.StrategicMergeFrom(pod)); err != nil {
+			logger.Error(err, "Could not patch pod-stopped readiness condition for pod to start traffic", "pod", pod.Name)
+			// set the pod back to its original state since the patch failed
+			updatedPod = pod
+
+			// TODO: Create event for the CRD.
+		}
+	}
+	return
+}
+
+// Initialize the SolrCloud.Status object
+func createCloudStatus(solrCloud *solrv1beta1.SolrCloud,
+	newStatus *solrv1beta1.SolrCloudStatus, statefulSetStatus appsv1.StatefulSetStatus, podSelector labels.Selector,
+	podList []corev1.Pod) (outOfDatePods util.OutOfDatePodSegmentation, availableUpdatedPodCount int, err error) {
 	var otherVersions []string
-	nodeNames := make([]string, len(foundPods.Items))
+	nodeNames := make([]string, len(podList))
 	nodeStatusMap := map[string]solrv1beta1.SolrNodeStatus{}
 
 	newStatus.Replicas = statefulSetStatus.Replicas
 	newStatus.UpToDateNodes = int32(0)
 	newStatus.ReadyReplicas = int32(0)
-	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
-		MatchLabels: selectorLabels,
-	})
-	if err != nil {
-		logger.Error(err, "Error getting SolrCloud PodSelector labels")
-		return outOfDatePods, outOfDatePodsNotStarted, availableUpdatedPodCount, err
-	}
-	newStatus.PodSelector = selector.String()
+
+	newStatus.PodSelector = podSelector.String()
 	backupReposAvailable := make(map[string]bool, len(solrCloud.Spec.BackupRepositories))
 	for _, repo := range solrCloud.Spec.BackupRepositories {
 		backupReposAvailable[repo.Name] = false
 	}
-	for podIdx, p := range foundPods.Items {
+	for podIdx, p := range podList {
 		nodeNames[podIdx] = p.Name
 		nodeStatus := solrv1beta1.SolrNodeStatus{}
 		nodeStatus.Name = p.Name
@@ -583,9 +623,14 @@
 
 		// Check whether the node is considered "ready" by kubernetes
 		nodeStatus.Ready = false
+		nodeStatus.ScheduledForDeletion = false
+		podIsScheduledForUpdate := false
 		for _, condition := range p.Status.Conditions {
 			if condition.Type == corev1.PodReady {
 				nodeStatus.Ready = condition.Status == corev1.ConditionTrue
+			} else if condition.Type == util.SolrIsNotStoppedReadinessCondition {
+				nodeStatus.ScheduledForDeletion = condition.Status == corev1.ConditionFalse
+				podIsScheduledForUpdate = nodeStatus.ScheduledForDeletion && condition.Reason == string(PodUpdate)
 			}
 		}
 		if nodeStatus.Ready {
@@ -624,10 +669,12 @@
 					}
 				}
 			}
-			if containerNotStarted {
-				outOfDatePodsNotStarted = append(outOfDatePodsNotStarted, p)
+			if podIsScheduledForUpdate {
+				outOfDatePods.ScheduledForDeletion = append(outOfDatePods.ScheduledForDeletion, p)
+			} else if containerNotStarted {
+				outOfDatePods.NotStarted = append(outOfDatePods.NotStarted, p)
 			} else {
-				outOfDatePods = append(outOfDatePods, p)
+				outOfDatePods.Running = append(outOfDatePods.Running, p)
 			}
 		}
 
@@ -666,7 +713,7 @@
 		newStatus.ExternalCommonAddress = &extAddress
 	}
 
-	return outOfDatePods, outOfDatePodsNotStarted, availableUpdatedPodCount, nil
+	return outOfDatePods, availableUpdatedPodCount, nil
 }
 
 func (r *SolrCloudReconciler) reconcileNodeService(ctx context.Context, logger logr.Logger, instance *solrv1beta1.SolrCloud, nodeName string) (err error, ip string) {
diff --git a/controllers/util/solr_update_util.go b/controllers/util/solr_update_util.go
index 422bf20..715fe38 100644
--- a/controllers/util/solr_update_util.go
+++ b/controllers/util/solr_update_util.go
@@ -86,6 +86,12 @@
 	return
 }
 
+type OutOfDatePodSegmentation struct {
+	NotStarted           []corev1.Pod
+	ScheduledForDeletion []corev1.Pod
+	Running              []corev1.Pod
+}
+
 // DeterminePodsSafeToUpdate takes a list of solr Pods and returns a list of pods that are safe to upgrade now.
 // This function MUST be idempotent and return the same list of pods given the same kubernetes/solr state.
 //
@@ -95,12 +101,12 @@
 // TODO:
 //   - Think about caching this for ~250 ms? Not a huge need to send these requests milliseconds apart.
 //   - Might be too much complexity for very little gain.
-func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, readyPods int, availableUpdatedPodCount int, outOfDatePodsNotStartedCount int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool, retryLater bool) {
+func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentation, readyPods int, availableUpdatedPodCount int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool, retryLater bool, err error) {
 	// Before fetching the cluster state, be sure that there is room to update at least 1 pod
-	maxPodsUnavailable, unavailableUpdatedPodCount, maxPodsToUpdate := calculateMaxPodsToUpdate(cloud, len(outOfDatePods), outOfDatePodsNotStartedCount, availableUpdatedPodCount)
+	maxPodsUnavailable, unavailableUpdatedPodCount, maxPodsToUpdate := calculateMaxPodsToUpdate(cloud, len(outOfDatePods.Running), len(outOfDatePods.NotStarted)+len(outOfDatePods.ScheduledForDeletion), availableUpdatedPodCount)
 	if maxPodsToUpdate <= 0 {
 		logger.Info("Pod update selection canceled. The number of updated pods unavailable equals or exceeds the calculated maxPodsUnavailable.",
-			"unavailableUpdatedPods", unavailableUpdatedPodCount, "outOfDatePodsNotStarted", outOfDatePodsNotStartedCount, "maxPodsUnavailable", maxPodsUnavailable)
+			"unavailableUpdatedPods", unavailableUpdatedPodCount, "outOfDatePodsNotStarted", len(outOfDatePods.NotStarted), "alreadyScheduledForDeletion", len(outOfDatePods.ScheduledForDeletion), "maxPodsUnavailable", maxPodsUnavailable)
 	} else {
 		clusterResp := &solr_api.SolrClusterStatusResponse{}
 		overseerResp := &solr_api.SolrOverseerStatusResponse{}
@@ -108,7 +114,7 @@
 		if readyPods > 0 {
 			queryParams := url.Values{}
 			queryParams.Add("action", "CLUSTERSTATUS")
-			err := solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
+			err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
 			if err == nil {
 				if hasError, apiErr := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader); hasError {
 					err = apiErr
@@ -119,25 +125,30 @@
 						err = apiErr
 					}
 				}
-			} else {
+			}
+			if err != nil {
 				logger.Error(err, "Error retrieving cluster status, delaying pod update selection")
-				// If there is an error fetching the clusterState, retry later.
-				retryLater = true
 			}
 		}
 		// If the update logic already wants to retry later, then do not pick any pods
 		if !retryLater {
-			logger.Info("Pod update selection started.", "outOfDatePods", len(outOfDatePods), "maxPodsUnavailable", maxPodsUnavailable, "unavailableUpdatedPods", unavailableUpdatedPodCount, "outOfDatePodsNotStarted", outOfDatePodsNotStartedCount, "maxPodsToUpdate", maxPodsToUpdate)
+			logger.Info("Pod update selection started.",
+				"outOfDatePods", len(outOfDatePods.Running),
+				"maxPodsUnavailable", maxPodsUnavailable,
+				"unavailableUpdatedPods", unavailableUpdatedPodCount,
+				"outOfDatePodsNotStarted", len(outOfDatePods.NotStarted),
+				"alreadyScheduledForDeletion", len(outOfDatePods.ScheduledForDeletion),
+				"maxPodsToUpdate", maxPodsToUpdate)
 			podsToUpdate, podsHaveReplicas = pickPodsToUpdate(cloud, outOfDatePods, clusterResp.ClusterStatus, overseerResp.Leader, maxPodsToUpdate, logger)
 
 			// If there are no pods to upgrade, even though the maxPodsToUpdate is >0, then retry later because the issue stems from cluster state
 			// and clusterState changes will not call the reconciler.
-			if len(podsToUpdate) == 0 && len(outOfDatePods) > 0 {
+			if len(podsToUpdate) == 0 && len(outOfDatePods.Running) > 0 {
 				retryLater = true
 			}
 		}
 	}
-	return podsToUpdate, podsHaveReplicas, retryLater
+	return podsToUpdate, podsHaveReplicas, retryLater, err
 }
 
 // calculateMaxPodsToUpdate determines the maximum number of additional pods that can be updated.
@@ -154,11 +165,11 @@
 	return maxPodsUnavailable, unavailableUpdatedPodCount, maxPodsToUpdate
 }
 
-func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, clusterStatus solr_api.SolrClusterStatus,
+func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentation, clusterStatus solr_api.SolrClusterStatus,
 	overseer string, maxPodsToUpdate int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool) {
 	podsHaveReplicas = make(map[string]bool, maxPodsToUpdate)
 	nodeContents, totalShardReplicas, shardReplicasNotActive, allManagedPodsLive := findSolrNodeContents(clusterStatus, overseer, GetAllManagedSolrNodeNames(cloud))
-	sortNodePodsBySafety(outOfDatePods, nodeContents, cloud)
+	sortNodePodsBySafety(outOfDatePods.Running, nodeContents, cloud)
 
 	updateOptions := cloud.Spec.UpdateStrategy.ManagedUpdateOptions
 	var maxShardReplicasUnavailableCache map[string]int
@@ -169,7 +180,20 @@
 		maxShardReplicasUnavailableCache = make(map[string]int, len(totalShardReplicas))
 	}
 
-	for _, pod := range outOfDatePods {
+	for _, pod := range outOfDatePods.ScheduledForDeletion {
+		nodeName := SolrNodeName(cloud, pod.Name)
+		nodeContent, isInClusterState := nodeContents[nodeName]
+
+		// This pod will be deleted, add its information to future down shards
+		podsHaveReplicas[pod.Name] = isInClusterState && nodeContent.replicas > 0
+		if isInClusterState && nodeContent.live {
+			for shard, additionalReplicaCount := range nodeContent.activeReplicasPerShard {
+				shardReplicasNotActive[shard] += additionalReplicaCount
+			}
+		}
+	}
+
+	for _, pod := range outOfDatePods.Running {
 		isSafeToUpdate := true
 		nodeName := SolrNodeName(cloud, pod.Name)
 		nodeContent, isInClusterState := nodeContents[nodeName]
@@ -187,7 +211,7 @@
 				// But we want to make sure it still follows the same replicasDown rules as the other nodes, so still use that logic
 				// This works if there are other solr nodes not managed by this SolrCloud resource, because we just check that this is the last
 				// pod managed for this SolrCloud that has not been updated.
-				if len(outOfDatePods) == 1 && allManagedPodsLive {
+				if len(outOfDatePods.Running) == 1 && allManagedPodsLive {
 					isSafeToUpdate = true
 					reason = "Pod is overseer and all other nodes have been updated."
 				} else {
@@ -237,7 +261,7 @@
 					shardReplicasNotActive[shard] += additionalReplicaCount
 				}
 			}
-			logger.Info("Pod killed for update.", "pod", pod.Name, "reason", reason)
+			logger.Info("Pod selected to be deleted for update.", "pod", pod.Name, "reason", reason)
 			podsToUpdate = append(podsToUpdate, pod)
 			podsHaveReplicas[pod.Name] = isInClusterState && nodeContent.replicas > 0
 
@@ -481,8 +505,10 @@
 // EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod, if the Pod is using ephemeral storage.
 // If the pod is using persistent storage, this function is a no-op.
 // This function MUST be idempotent and return the same list of pods given the same kubernetes/solr state.
-func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, logger logr.Logger) (err error, canDeletePod bool) {
+func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, podHasReplicas bool, logger logr.Logger) (err error, canDeletePod bool) {
 	var solrDataVolume *corev1.Volume
+
+	// TODO: Remove these checks after v0.7.0, since it will be taken care by the evictReplicas podReadinessCondition
 	dataVolumeName := solrCloud.DataVolumeName()
 	for _, volume := range pod.Spec.Volumes {
 		if volume.Name == dataVolumeName {
@@ -508,22 +534,27 @@
 			if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
 				err = asyncErr
 			} else if asyncState == "notfound" {
-				// Submit new Replace Node request
-				replaceResponse := &solr_api.SolrAsyncResponse{}
-				queryParams := url.Values{}
-				queryParams.Add("action", "REPLACENODE")
-				queryParams.Add("parallel", "true")
-				queryParams.Add("sourceNode", SolrNodeName(solrCloud, pod.Name))
-				queryParams.Add("async", requestId)
-				err = solr_api.CallCollectionsApi(ctx, solrCloud, queryParams, replaceResponse)
-				if hasError, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader); hasError {
-					err = apiErr
-				}
-				if err == nil {
-					logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
+				if podHasReplicas {
+					// Submit new Replace Node request
+					replaceResponse := &solr_api.SolrAsyncResponse{}
+					queryParams := url.Values{}
+					queryParams.Add("action", "REPLACENODE")
+					queryParams.Add("parallel", "true")
+					queryParams.Add("sourceNode", SolrNodeName(solrCloud, pod.Name))
+					queryParams.Add("async", requestId)
+					err = solr_api.CallCollectionsApi(ctx, solrCloud, queryParams, replaceResponse)
+					if hasError, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader); hasError {
+						err = apiErr
+					}
+					if err == nil {
+						logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
+					} else {
+						logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again later.", "requestId", requestId, "message", message)
+					}
 				} else {
-					logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again later.", "requestId", requestId, "message", message)
+					canDeletePod = true
 				}
+
 			} else {
 				logger.Info("Found async status", "requestId", requestId, "state", asyncState)
 				// Only continue to delete the pod if the ReplaceNode request is complete and successful
diff --git a/controllers/util/solr_update_util_test.go b/controllers/util/solr_update_util_test.go
index d235d19..1ff982d 100644
--- a/controllers/util/solr_update_util_test.go
+++ b/controllers/util/solr_update_util_test.go
@@ -53,25 +53,50 @@
 		},
 	}
 
-	allPods := []corev1.Pod{
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-1"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-2"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-3"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-4"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-5"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-6"}, Spec: corev1.PodSpec{}},
+	allPods := OutOfDatePodSegmentation{
+		NotStarted:           []corev1.Pod{},
+		ScheduledForDeletion: []corev1.Pod{},
+		Running: []corev1.Pod{
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-1"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-2"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-3"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-4"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-5"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-6"}, Spec: corev1.PodSpec{}},
+		},
 	}
 
-	halfPods := []corev1.Pod{
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-1"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-3"}, Spec: corev1.PodSpec{}},
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-5"}, Spec: corev1.PodSpec{}},
+	halfPods := OutOfDatePodSegmentation{
+		NotStarted:           []corev1.Pod{},
+		ScheduledForDeletion: []corev1.Pod{},
+		Running: []corev1.Pod{
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-1"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-3"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-5"}, Spec: corev1.PodSpec{}},
+		},
 	}
 
-	lastPod := []corev1.Pod{
-		{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
+	lastPod := OutOfDatePodSegmentation{
+		NotStarted:           []corev1.Pod{},
+		ScheduledForDeletion: []corev1.Pod{},
+		Running: []corev1.Pod{
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
+		},
+	}
+
+	someScheduledForDeletionPods := OutOfDatePodSegmentation{
+		NotStarted: []corev1.Pod{},
+		ScheduledForDeletion: []corev1.Pod{
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-2"}, Spec: corev1.PodSpec{}},
+		},
+		Running: []corev1.Pod{
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-0"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-1"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-3"}, Spec: corev1.PodSpec{}},
+			{ObjectMeta: metav1.ObjectMeta{Name: "foo-solrcloud-5"}, Spec: corev1.PodSpec{}},
+		},
 	}
 
 	/*
@@ -180,6 +205,26 @@
 	solrCloud.Spec.Replicas = Replicas(4)
 	podsToUpgrade = getPodNames(pickPodsToUpdate(solrCloud, lastPod, testHealthyClusterStatus, overseerLeader, 6, log))
 	assert.ElementsMatch(t, []string{"foo-solrcloud-0"}, podsToUpgrade, "Incorrect set of next pods to upgrade. The overseer should be upgraded when everything is healthy and it is the last node, even though this SolrCloud resource doesn't manage all Nodes")
+
+	/*
+		Test when some pods have already been selected for deletion
+	*/
+
+	// Normal inputs
+	maxshardReplicasUnavailable = intstr.FromInt(1)
+	podsToUpgrade = getPodNames(pickPodsToUpdate(solrCloud, someScheduledForDeletionPods, testHealthyClusterStatus, overseerLeader, 6, log))
+	assert.ElementsMatch(t, []string{}, podsToUpgrade, "Incorrect set of next pods to upgrade. Due to replica placement, only the node with the least leaders can be upgraded and replicas.")
+
+	// Test the maxShardReplicasDownSpec
+	maxshardReplicasUnavailable = intstr.FromInt(2)
+	podsToUpgrade = getPodNames(pickPodsToUpdate(solrCloud, someScheduledForDeletionPods, testHealthyClusterStatus, overseerLeader, 6, log))
+	assert.ElementsMatch(t, []string{"foo-solrcloud-1", "foo-solrcloud-3"}, podsToUpgrade, "Incorrect set of next pods to upgrade. More nodes should be upgraded when maxShardReplicasDown=2")
+
+	// Test the maxNodes
+	maxshardReplicasUnavailable = intstr.FromInt(2)
+	podsToUpgrade = getPodNames(pickPodsToUpdate(solrCloud, someScheduledForDeletionPods, testHealthyClusterStatus, overseerLeader, 2, log))
+	assert.ElementsMatch(t, []string{"foo-solrcloud-1", "foo-solrcloud-3"}, podsToUpgrade, "Incorrect set of next pods to upgrade. More nodes should be upgraded when maxShardReplicasDown=2")
+
 }
 
 func TestPodUpgradeOrdering(t *testing.T) {
diff --git a/controllers/util/solr_util.go b/controllers/util/solr_util.go
index 4ae2ba5..a46ba62 100644
--- a/controllers/util/solr_util.go
+++ b/controllers/util/solr_util.go
@@ -51,6 +51,9 @@
 	LogXmlMd5Annotation              = "solr.apache.org/logXmlMd5"
 	LogXmlFile                       = "log4j2.xml"
 
+	SolrIsNotStoppedReadinessCondition       = "solr.apache.org/isNotStopped"
+	SolrReplicasNotEvictedReadinessCondition = "solr.apache.org/replicasNotEvicted"
+
 	DefaultStatefulSetPodManagementPolicy = appsv1.ParallelPodManagement
 
 	DistLibs    = "/opt/solr/dist"
@@ -117,6 +120,13 @@
 		}
 	}
 
+	// The isNotStopped readiness gate will always be used for managedUpdates
+	podReadinessGates := []corev1.PodReadinessGate{
+		{
+			ConditionType: SolrIsNotStoppedReadinessCondition,
+		},
+	}
+
 	// Keep track of the SolrOpts that the Solr Operator needs to set
 	// These will be added to the SolrOpts given by the user.
 	allSolrOpts := []string{"-DhostPort=$(SOLR_NODE_PORT)"}
@@ -197,6 +207,11 @@
 			ephemeralVolume.VolumeSource.EmptyDir = &corev1.EmptyDirVolumeSource{}
 		}
 		solrVolumes = append(solrVolumes, ephemeralVolume)
+
+		// Add an evictPodReadinessCondition for when deleting pods with ephemeral storage
+		podReadinessGates = append(podReadinessGates, corev1.PodReadinessGate{
+			ConditionType: SolrReplicasNotEvictedReadinessCondition,
+		})
 	}
 
 	volumeMounts := []corev1.VolumeMount{{Name: solrDataVolumeName, MountPath: "/var/solr/data"}}
@@ -491,6 +506,7 @@
 					InitContainers: initContainers,
 					HostAliases:    hostAliases,
 					Containers:     containers,
+					ReadinessGates: podReadinessGates,
 				},
 			},
 			VolumeClaimTemplates: pvcs,
diff --git a/docs/solr-cloud/managed-updates.md b/docs/solr-cloud/managed-updates.md
index 86ec236..8eda96f 100644
--- a/docs/solr-cloud/managed-updates.md
+++ b/docs/solr-cloud/managed-updates.md
@@ -89,3 +89,26 @@
       annotations:
         manualrestart: "2021-10-20T08:37:00Z"
 ```
+
+## Pod Readiness during updates
+
+The Solr Operator sets up at least two Services for every SolrCloud.
+- Always
+  - A clusterIP service for all solr nodes (What we call the "common service")
+- Either
+  - A [Headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) for individual Solr Node endpoints that are not exposed via an Ingress.
+  - Individual clusterIP services for Solr Nodes that are exposed via an Ingress
+
+Only the common service uses the `publishNotReadyAddresses: false` option, since the common service should load balance between all available nodes.
+The other services have individual endpoints for each node, so there is no reason to de-list pods that are not available.
+
+When doing a rolling upgrade, or taking down a pod for any reason, we want to first stop all requests to this pod.
+Solr will do this while stopping by first taking itself out of the cluster's set of `liveNodes` , so that other Solr nodes and clients think it is not running.
+However, for ephemeral clusters we are also evicting data before the pod is deleted. So we want to stop requests to this node since the data will soon no-longer live there.
+
+Kubernetes allows the Solr Operator to control whether a pod is considered `ready`, or available for requests, via [readinessConditions/readinessGates](https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate).
+When the Solr Operator begins the shut-down procedure for a pod, it will first set a `readinessCondition` to `false`, so that no loadBalanced requests (through the common service) go to the pod.
+This readinessCondition will stay set to `false` until the pod is deleted and a new pod is created in its place.
+
+**For this reason, it's a good idea to avoid very aggressive [Update Strategies](solr-cloud-crd.md#update-strategy).
+During a rolling restart with a high `maxPodsUnavailable`, requests that go through the common service might be routed to a very small number of pods.**
diff --git a/docs/upgrade-notes.md b/docs/upgrade-notes.md
index 68a4a7c..dc9a0be 100644
--- a/docs/upgrade-notes.md
+++ b/docs/upgrade-notes.md
@@ -120,6 +120,12 @@
 
 - Provided Zookeeper pods use the `IfNotPresent` pullPolicy by default. Users that specify this field manually will not see a change.
 
+- The Solr Operator now tries to limit connectivity to pods before they are deleted, for rolling updates or other reasons.
+  Before the pod is killed, and evicted of replicas if ephemeral storage is used, a readinessCondition will be set to `false`.
+  The Headless Service does not use readiness, so internode traffic will not be affected, however the ClusterIP (common) service will no longer include these nodes until they have been restarted.
+  This change will improve request success rates during a rolling restart.
+  Refer to the [Managed Updates documentation](solr-cloud/managed-updates.md#pod-readiness-during-updates).
+
 ### v0.6.0
 - The default Solr version for the `SolrCloud` and `SolrPrometheusExporter` resources has been upgraded from `8.9` to `8.11`.
   This will not affect any existing resources, as default versions are hard-written to the resources immediately.
diff --git a/helm/solr-operator/Chart.yaml b/helm/solr-operator/Chart.yaml
index 3f4a2a7..84cadd2 100644
--- a/helm/solr-operator/Chart.yaml
+++ b/helm/solr-operator/Chart.yaml
@@ -105,17 +105,26 @@
     - kind: fixed
       description: Fix SolrBackup not taking backups when the collections field is omitted 
       links:
-        - name: GitHub PR
-          url: https://github.com/apache/solr-operator/pull/516
         - name: GitHub Issue
           url: https://github.com/apache/solr-operator/issues/515
+        - name: GitHub PR
+          url: https://github.com/apache/solr-operator/pull/516
     - kind: changed
       description: Use better default startup, liveness and readiness probes for SolrCloud and SolrPrometheusExporter
       links:
-        - name: GitHub PR
-          url: https://github.com/apache/solr-operator/pull/511
         - name: GitHub Issue
           url: https://github.com/apache/solr-operator/issues/510
+        - name: GitHub PR
+          url: https://github.com/apache/solr-operator/pull/511
+    - kind: added
+      description: The SolrCloud common service now removes nodes that are about to be upgraded
+      links:
+        - name: GitHub Issue
+          url: https://github.com/apache/solr-operator/issues/529
+        - name: GitHub PR
+          url: https://github.com/apache/solr-operator/pull/530
+        - name: Feature Documentation
+          url: https://apache.github.io/solr-operator/docs/solr-cloud/managed-updates.html#pod-readiness-during-updates
   artifacthub.io/images: |
     - name: solr-operator
       image: apache/solr-operator:v0.7.0-prerelease
diff --git a/helm/solr-operator/crds/crds.yaml b/helm/solr-operator/crds/crds.yaml
index 2f61fe8..29c8485 100644
--- a/helm/solr-operator/crds/crds.yaml
+++ b/helm/solr-operator/crds/crds.yaml
@@ -12002,6 +12002,9 @@
                     ready:
                       description: Is the node up and running
                       type: boolean
+                    scheduledForDeletion:
+                      description: This Solr Node pod is scheduled for deletion
+                      type: boolean
                     specUpToDate:
                       description: This Solr Node pod is using the latest version
                         of solrcloud pod spec.
diff --git a/helm/solr-operator/templates/role.yaml b/helm/solr-operator/templates/role.yaml
index 4b0d25e..19a704a 100644
--- a/helm/solr-operator/templates/role.yaml
+++ b/helm/solr-operator/templates/role.yaml
@@ -71,6 +71,7 @@
   - pods/status
   verbs:
   - get
+  - patch
 - apiGroups:
   - ""
   resources:
diff --git a/tests/e2e/solrcloud_rolling_upgrade_test.go b/tests/e2e/solrcloud_rolling_upgrade_test.go
index 5ab3465..f261b44 100644
--- a/tests/e2e/solrcloud_rolling_upgrade_test.go
+++ b/tests/e2e/solrcloud_rolling_upgrade_test.go
@@ -126,6 +126,8 @@
 
 			By("waiting for the rolling restart to complete")
 			// Expect the SolrCloud to be up-to-date, or in a valid restarting state
+			lastCheckNodeStatuses := make(map[string]solrv1beta1.SolrNodeStatus, *solrCloud.Spec.Replicas)
+			lastCheckReplicas := *solrCloud.Spec.Replicas
 			foundSolrCloud := expectSolrCloudWithChecks(ctx, solrCloud, func(g Gomega, cloud *solrv1beta1.SolrCloud) {
 				// If there are more than 1 pods not ready, then fail because we have set MaxPodsUnavailable to 1
 				if cloud.Status.ReadyReplicas < *solrCloud.Spec.Replicas-int32(1) {
@@ -138,6 +140,34 @@
 				// As long as the current restart is in a healthy place, keep checking if the restart is finished
 				g.Expect(cloud.Status.UpToDateNodes).To(Equal(*cloud.Spec.Replicas), "The SolrCloud did not finish the rolling restart, not all nodes are up-to-date")
 				g.Expect(cloud.Status.ReadyReplicas).To(Equal(cloud.Status.UpToDateNodes), "The SolrCloud did not finish the rolling restart, all nodes are up-to-date, but not all are ready")
+
+				// Make sure that if a pod is deleted/recreated, it was first taken offline and "scheduledForDeletion" was set to true
+				// TODO: Try to find a better way to make sure that the deletion readinessCondition works
+				if cloud.Status.Replicas < lastCheckReplicas {
+					// We only want to check the statuses of nodes that the pods have been deleted, or they have been re-created since our last check
+					for _, nodeStatus := range cloud.Status.SolrNodes {
+						if !nodeStatus.SpecUpToDate || lastCheckNodeStatuses[nodeStatus.Name].SpecUpToDate {
+							delete(lastCheckNodeStatuses, nodeStatus.Name)
+						}
+					}
+					for _, nodeStatus := range cloud.Status.SolrNodes {
+						oldNodeStatus := lastCheckNodeStatuses[nodeStatus.Name]
+						g.Expect(oldNodeStatus.ScheduledForDeletion).To(BeTrue(), "Before SolrNode %s is taken down, scheduledForDeletion should be true", nodeStatus.Name)
+						g.Expect(oldNodeStatus.Ready).To(BeFalse(), "Before SolrNode %s is taken down, it should not be ready", nodeStatus.Name)
+					}
+				}
+
+				// Update the nodeStatuses for the next iteration's readinessCondition check
+				lastCheckReplicas = cloud.Status.Replicas
+				for _, nodeStatus := range cloud.Status.SolrNodes {
+					lastCheckNodeStatuses[nodeStatus.Name] = nodeStatus
+
+					if nodeStatus.Ready || nodeStatus.SpecUpToDate {
+						g.Expect(nodeStatus.ScheduledForDeletion).To(BeFalse(), "SolrNode %s cannot be scheduledForDeletion while being 'ready' or 'upToDate'", nodeStatus.Name)
+					} else {
+						g.Expect(nodeStatus.ScheduledForDeletion).To(BeTrue(), "SolrNode %s must be scheduledForDeletion while not being 'ready' or 'upToDate', so it was taken down for the update", nodeStatus.Name)
+					}
+				}
 			})
 
 			// Make sure that the status object is correct for the nodes