Call rebalance API once all pods are updated (#625)

* Call rebalance API once all pods are updated during a rollingUpdate cluster operation
* Make a BalanceReplicas clusterOp, used by scaleUp, scaleDown (on failure) and rollingUpdate
* Only rebalance for updates with data migration

---------

Co-authored-by: Houston Putman <houston@apache.org>
diff --git a/controllers/solr_cluster_ops_util.go b/controllers/solr_cluster_ops_util.go
index 61d6aed..f97011a 100644
--- a/controllers/solr_cluster_ops_util.go
+++ b/controllers/solr_cluster_ops_util.go
@@ -49,11 +49,18 @@
 type SolrClusterOperationType string
 
 const (
-	ScaleDownLock SolrClusterOperationType = "ScalingDown"
-	ScaleUpLock   SolrClusterOperationType = "ScalingUp"
-	UpdateLock    SolrClusterOperationType = "RollingUpdate"
+	ScaleDownLock       SolrClusterOperationType = "ScalingDown"
+	ScaleUpLock         SolrClusterOperationType = "ScalingUp"
+	UpdateLock          SolrClusterOperationType = "RollingUpdate"
+	BalanceReplicasLock SolrClusterOperationType = "BalanceReplicas"
 )
 
+// RollingUpdateMetadata contains metadata for rolling update cluster operations.
+type RollingUpdateMetadata struct {
+	// Whether or not replicas will be migrated during this rolling upgrade
+	RequiresReplicaMigration bool `json:"requiresReplicaMigration"`
+}
+
 func clearClusterOpLock(statefulSet *appsv1.StatefulSet) {
 	delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
 }
@@ -178,10 +185,10 @@
 	} else if scaleDownOpIsQueued {
 		// If the statefulSet and the solrCloud have the same number of pods configured, and the queued operation is a scaleDown,
 		// that means the scaleDown was reverted. So there's no reason to change the number of pods.
-		// However, a Replica Balancing should be done just in case, so do a ScaleUp, but don't change the number of pods.
+		// However, a Replica Balancing should be done just in case, so start it via a new ClusterOperation.
 		clusterOp = &SolrClusterOp{
-			Operation: ScaleUpLock,
-			Metadata:  strconv.Itoa(desiredPods),
+			Operation: BalanceReplicasLock,
+			Metadata:  "UndoFailedScaleDown",
 		}
 	}
 	return
@@ -244,9 +251,32 @@
 	return
 }
 
+// cleanupManagedCloudScaleDown does the logic of cleaning-up an incomplete scale down operation.
+// This will remove any bad readinessConditions that the scaleDown might have set when trying to scaleDown pods.
+func cleanupManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
+	// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
+	// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
+	readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
+		util.SolrIsNotStoppedReadinessCondition: {
+			reason:  PodStarted,
+			message: "Pod is not being deleted, traffic to the pod must be restarted",
+			status:  true,
+		},
+	}
+	for _, pod := range podList {
+		if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
+			err = e
+			return
+		} else {
+			pod = *updatedPod
+		}
+	}
+	return
+}
+
 // handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
 // This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
-func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
+func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, nextClusterOperation *SolrClusterOp, err error) {
 	desiredPods, err := strconv.Atoi(clusterOp.Metadata)
 	if err != nil {
 		logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
@@ -262,37 +292,41 @@
 		if err != nil {
 			logger.Error(err, "Error while patching StatefulSet to increase the number of pods for the ScaleUp")
 		}
-		// Return and wait for the pods to be created, which will call another reconcile
-		return false, false, 0, err
-	} else {
-		// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
-		readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
-			util.SolrIsNotStoppedReadinessCondition: {
-				reason:  PodStarted,
-				message: "Pod is not being deleted, traffic to the pod must be started",
-				status:  true,
-			},
+	} else if len(podList) >= configuredPods {
+		nextClusterOperation = &SolrClusterOp{
+			Operation: BalanceReplicasLock,
+			Metadata:  "ScaleUp",
 		}
-		for _, pod := range podList {
-			if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
-				err = e
-				return
-			} else {
-				pod = *updatedPod
-			}
-		}
-		if operationComplete, requestInProgress, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, "scaleUp", clusterOp.Metadata, logger); !operationComplete && err == nil {
-			// Retry after five seconds to check if the replica management commands have been completed
-			retryLaterDuration = time.Second * 5
-		}
+		operationComplete = true
 	}
 	return
 }
 
+// hasAnyEphemeralData returns true if any of the given pods uses ephemeral Data for Solr storage, and false if all pods use persistent storage.
+func hasAnyEphemeralData(solrPods []corev1.Pod) bool {
+	for _, pod := range solrPods {
+		for _, cond := range pod.Status.Conditions {
+			if cond.Type == util.SolrReplicasNotEvictedReadinessCondition {
+				return true
+			}
+		}
+	}
+	return false
+}
+
 func determineRollingUpdateClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, outOfDatePods util.OutOfDatePodSegmentation) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
 	if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && !outOfDatePods.IsEmpty() {
+		includesDataMigration := hasAnyEphemeralData(outOfDatePods.Running) || hasAnyEphemeralData(outOfDatePods.ScheduledForDeletion)
+		metadata := RollingUpdateMetadata{
+			RequiresReplicaMigration: includesDataMigration,
+		}
+		metaBytes, err := json.Marshal(metadata)
+		if err != nil {
+			return nil, 0, err
+		}
 		clusterOp = &SolrClusterOp{
 			Operation: UpdateLock,
+			Metadata:  string(metaBytes),
 		}
 	}
 	return
@@ -300,15 +334,27 @@
 
 // handleManagedCloudRollingUpdate does the logic of a managed and "locked" cloud rolling update operation.
 // This will take many reconcile loops to complete, as it is deleting pods/moving replicas.
-func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
+func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, nextClusterOp *SolrClusterOp, err error) {
 	// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
 	updateLogger := logger.WithName("ManagedUpdateSelector")
 
 	// First check if all pods are up to date and ready. If so the rolling update is complete
 	configuredPods := int(*statefulSet.Spec.Replicas)
 	if configuredPods == availableUpdatedPodCount {
-		// The configured number of pods are all healthy and up to date. The operation is complete
+		updateMetadata := &RollingUpdateMetadata{}
+		if clusterOp.Metadata != "" {
+			if err = json.Unmarshal([]byte(clusterOp.Metadata), &updateMetadata); err != nil {
+				updateLogger.Error(err, "Could not unmarshal metadata for rolling update operation")
+			}
+		}
 		operationComplete = true
+		// Only do a re-balancing for rolling restarts that migrated replicas
+		if updateMetadata.RequiresReplicaMigration {
+			nextClusterOp = &SolrClusterOp{
+				Operation: BalanceReplicasLock,
+				Metadata:  "RollingUpdateComplete",
+			}
+		}
 		return
 	} else if outOfDatePods.IsEmpty() {
 		// Just return and wait for the updated pods to come up healthy, these will call new reconciles, so there is nothing for us to do
@@ -327,7 +373,7 @@
 		// a restart to get a working pod config.
 		state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, hasReadyPod, logger)
 		if apiError != nil {
-			return false, true, 0, apiError
+			return false, true, 0, nil, apiError
 		} else if !retryLater {
 			// If the cluster status has been successfully fetched, then add the pods scheduled for deletion
 			// This requires the clusterState to be fetched successfully to ensure that we know if there
@@ -364,6 +410,38 @@
 	return
 }
 
+// cleanupManagedCloudScaleDown does the logic of cleaning-up an incomplete scale down operation.
+// This will remove any bad readinessConditions that the scaleDown might have set when trying to scaleDown pods.
+func cleanupManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
+	// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
+	// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
+	er := EvictingReplicas
+	readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
+		util.SolrIsNotStoppedReadinessCondition: {
+			reason:  PodStarted,
+			message: "Pod is not being deleted, traffic to the pod must be restarted",
+			status:  true,
+		},
+		util.SolrReplicasNotEvictedReadinessCondition: {
+			// Only set this condition if the condition hasn't been changed since pod start
+			// We do not want to over-write future states later down the eviction pipeline
+			matchPreviousReason: &er,
+			reason:              PodStarted,
+			message:             "Pod is not being deleted, ephemeral data is no longer being evicted",
+			status:              true,
+		},
+	}
+	for _, pod := range podList {
+		if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
+			err = e
+			return
+		} else {
+			pod = *updatedPod
+		}
+	}
+	return
+}
+
 // clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
 func clearClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
 	originalStatefulSet := statefulSet.DeepCopy()
@@ -376,6 +454,21 @@
 	return
 }
 
+// clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
+func setNextClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, nextClusterOp *SolrClusterOp, reason string, logger logr.Logger) (err error) {
+	originalStatefulSet := statefulSet.DeepCopy()
+	clearClusterOpLock(statefulSet)
+	if err = setClusterOpLock(statefulSet, *nextClusterOp); err != nil {
+		logger.Error(err, "Error while patching StatefulSet to set next clusterOpLock annotation after finishing previous clusterOp", "reason", reason)
+	}
+	if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+		logger.Error(err, "Error while patching StatefulSet to set next clusterOpLock annotation after finishing previous clusterOp", "reason", reason)
+	} else {
+		logger.Info("Set next clusterOpLock annotation on statefulSet after finishing previous clusterOp", "reason", reason)
+	}
+	return
+}
+
 // enqueueCurrentClusterOpForRetryWithPatch adds the current clusterOp to the clusterOpRetryQueue, and clears the current cluster Op.
 // This method will send the StatefulSet patch to the API Server.
 func enqueueCurrentClusterOpForRetryWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
diff --git a/controllers/solr_pod_lifecycle_util.go b/controllers/solr_pod_lifecycle_util.go
index adf289e..84116ad 100644
--- a/controllers/solr_pod_lifecycle_util.go
+++ b/controllers/solr_pod_lifecycle_util.go
@@ -101,7 +101,7 @@
 
 	// Delete the pod
 	if deletePod {
-		logger.Error(err, "Deleting solr pod for update", "pod", pod.Name)
+		logger.Info("Deleting solr pod for update", "pod", pod.Name)
 		err = r.Delete(ctx, pod, client.Preconditions{
 			UID: &pod.UID,
 		})
diff --git a/controllers/solrcloud_controller.go b/controllers/solrcloud_controller.go
index 2b2aea4..706c066 100644
--- a/controllers/solrcloud_controller.go
+++ b/controllers/solrcloud_controller.go
@@ -460,17 +460,20 @@
 	var retryLaterDuration time.Duration
 	if clusterOp, opErr := GetCurrentClusterOp(statefulSet); clusterOp != nil && opErr == nil {
 		var operationComplete, requestInProgress bool
+		var nextClusterOperation *SolrClusterOp
 		operationFound := true
 		shortTimeoutForRequeue := true
 		switch clusterOp.Operation {
 		case UpdateLock:
-			operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudRollingUpdate(ctx, r, instance, statefulSet, outOfDatePods, hasReadyPod, availableUpdatedPodCount, logger)
+			operationComplete, requestInProgress, retryLaterDuration, nextClusterOperation, err = handleManagedCloudRollingUpdate(ctx, r, instance, statefulSet, clusterOp, outOfDatePods, hasReadyPod, availableUpdatedPodCount, logger)
 			// Rolling Updates should not be requeued quickly. The operation is expected to take a long time and thus should have a longTimeout if errors are not seen.
 			shortTimeoutForRequeue = false
 		case ScaleDownLock:
 			operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, clusterOp, podList, logger)
 		case ScaleUpLock:
-			operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
+			operationComplete, nextClusterOperation, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
+		case BalanceReplicasLock:
+			operationComplete, requestInProgress, retryLaterDuration, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, clusterOp.Metadata, clusterOp.Metadata, logger)
 		default:
 			operationFound = false
 			// This shouldn't happen, but we don't want to be stuck if it does.
@@ -479,8 +482,13 @@
 		}
 		if operationFound {
 			if operationComplete {
-				// Once the operation is complete, finish the cluster operation by deleting the statefulSet annotations
-				err = clearClusterOpLockWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" complete", logger)
+				if nextClusterOperation == nil {
+					// Once the operation is complete, finish the cluster operation by deleting the statefulSet annotations
+					err = clearClusterOpLockWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" complete", logger)
+				} else {
+					// Once the operation is complete, finish the cluster operation and start the next one by setting the statefulSet annotations
+					err = setNextClusterOpLockWithPatch(ctx, r, statefulSet, nextClusterOperation, string(clusterOp.Operation)+" complete", logger)
+				}
 
 				// TODO: Create event for the CRD.
 			} else if !requestInProgress {
@@ -490,6 +498,7 @@
 				//   - the operation has a long timeout and has taken more than 10 minutes
 				// then continue the operation later.
 				// (it will likely immediately continue, since it is unlikely there is another operation to run)
+
 				clusterOpRuntime := time.Since(clusterOp.LastStartTime.Time)
 				queueForLaterReason := ""
 				if err != nil && clusterOpRuntime > time.Minute {
@@ -500,7 +509,16 @@
 					queueForLaterReason = "timed out during operation (10 minutes)"
 				}
 				if queueForLaterReason != "" {
-					err = enqueueCurrentClusterOpForRetryWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" "+queueForLaterReason, logger)
+					// If the operation is being queued, first have the operation cleanup after itself
+					switch clusterOp.Operation {
+					case UpdateLock:
+						err = cleanupManagedCloudRollingUpdate(ctx, r, outOfDatePods.ScheduledForDeletion, logger)
+					case ScaleDownLock:
+						err = cleanupManagedCloudScaleDown(ctx, r, podList, logger)
+					}
+					if err == nil {
+						err = enqueueCurrentClusterOpForRetryWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" "+queueForLaterReason, logger)
+					}
 
 					// TODO: Create event for the CRD.
 				}
diff --git a/controllers/util/solr_scale_util.go b/controllers/util/solr_scale_util.go
index a88531d..550bd15 100644
--- a/controllers/util/solr_scale_util.go
+++ b/controllers/util/solr_scale_util.go
@@ -23,6 +23,7 @@
 	"github.com/apache/solr-operator/controllers/util/solr_api"
 	"github.com/go-logr/logr"
 	appsv1 "k8s.io/api/apps/v1"
+	"time"
 )
 
 // BalanceReplicasForCluster takes a SolrCloud and balances all replicas across the Pods that are currently alive.
@@ -31,7 +32,7 @@
 // a successful status returned from the command. So if we delete the asyncStatus, and then something happens in the operator,
 // and we lose our state, then we will need to retry the balanceReplicas command. This should be ok since calling
 // balanceReplicas multiple times should not be bad when the replicas for the cluster are already balanced.
-func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, requestInProgress bool, err error) {
+func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
 	logger = logger.WithValues("balanceReason", balanceReason)
 	// If the Cloud has 1 or zero pods, there is no reason to balance replicas.
 	if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas < 1 {
@@ -96,5 +97,8 @@
 			}
 		}
 	}
+	if requestInProgress && !balanceComplete {
+		retryLaterDuration = time.Second * 5
+	}
 	return
 }
diff --git a/docs/solr-cloud/cluster-operations.md b/docs/solr-cloud/cluster-operations.md
index 6b39886..aa915de 100644
--- a/docs/solr-cloud/cluster-operations.md
+++ b/docs/solr-cloud/cluster-operations.md
@@ -30,6 +30,8 @@
 - [Managed Rolling Updates](managed-updates.md)
 - [Scaling Down with Replica Migrations](scaling.md#solr-pod-scale-down)
 - [Scaling Up with Replica Migrations](scaling.md#solr-pod-scale-up)
+- Balancing Replicas Across Pods
+  - This is started after a Rolling Update with Ephemeral Data or after a ScaleUp operation.
 
 ### How is the Lock Implemented?
 
diff --git a/helm/solr-operator/Chart.yaml b/helm/solr-operator/Chart.yaml
index 849b501..62955e2 100644
--- a/helm/solr-operator/Chart.yaml
+++ b/helm/solr-operator/Chart.yaml
@@ -135,6 +135,13 @@
           url: https://github.com/apache/solr-operator/issues/640
         - name: Github PR
           url: https://github.com/apache/solr-operator/pull/641
+    - kind: added
+      description: SolrClouds using ephemeral data will now have their replicas rebalanced after a rolling update.
+      links:
+        - name: Github Issue
+          url: https://github.com/apache/solr-operator/issues/615
+        - name: Github PR
+          url: https://github.com/apache/solr-operator/pull/625
   artifacthub.io/images: |
     - name: solr-operator
       image: apache/solr-operator:v0.8.0-prerelease
diff --git a/tests/e2e/resource_utils_test.go b/tests/e2e/resource_utils_test.go
index fc7a998..7dba163 100644
--- a/tests/e2e/resource_utils_test.go
+++ b/tests/e2e/resource_utils_test.go
@@ -95,6 +95,18 @@
 	return foundSolrCloud
 }
 
+func expectSolrCloudWithChecksAndTimeout(ctx context.Context, solrCloud *solrv1beta1.SolrCloud, within time.Duration, checkEvery time.Duration, additionalChecks func(Gomega, *solrv1beta1.SolrCloud), additionalOffset ...int) *solrv1beta1.SolrCloud {
+	foundSolrCloud := &solrv1beta1.SolrCloud{}
+	EventuallyWithOffset(resolveOffset(additionalOffset), func(g Gomega) {
+		g.Expect(k8sClient.Get(ctx, resourceKey(solrCloud, solrCloud.Name), foundSolrCloud)).To(Succeed(), "Expected SolrCloud does not exist")
+		if additionalChecks != nil {
+			additionalChecks(g, foundSolrCloud)
+		}
+	}).Within(within).WithPolling(checkEvery).WithContext(ctx).Should(Succeed())
+
+	return foundSolrCloud
+}
+
 func expectSolrCloudWithConsistentChecks(ctx context.Context, solrCloud *solrv1beta1.SolrCloud, additionalChecks func(Gomega, *solrv1beta1.SolrCloud), additionalOffset ...int) *solrv1beta1.SolrCloud {
 	foundSolrCloud := &solrv1beta1.SolrCloud{}
 	ConsistentlyWithOffset(resolveOffset(additionalOffset), func(g Gomega) {
@@ -286,6 +298,22 @@
 	return statefulSet
 }
 
+func expectStatefulSetWithConsistentChecksAndDuration(ctx context.Context, parentResource client.Object, statefulSetName string, duration time.Duration, additionalChecks func(Gomega, *appsv1.StatefulSet), additionalOffset ...int) *appsv1.StatefulSet {
+	statefulSet := &appsv1.StatefulSet{}
+	ConsistentlyWithOffset(resolveOffset(additionalOffset), func(g Gomega) {
+		g.Expect(k8sClient.Get(ctx, resourceKey(parentResource, statefulSetName), statefulSet)).To(Succeed(), "Expected StatefulSet does not exist")
+
+		testMapContainsOtherWithGomega(g, "StatefulSet pod template selector", statefulSet.Spec.Template.Labels, statefulSet.Spec.Selector.MatchLabels)
+		g.Expect(len(statefulSet.Spec.Selector.MatchLabels)).To(BeNumerically(">=", 1), "StatefulSet pod template selector must have at least 1 label")
+
+		if additionalChecks != nil {
+			additionalChecks(g, statefulSet)
+		}
+	}).Within(duration).Should(Succeed())
+
+	return statefulSet
+}
+
 func expectNoStatefulSet(ctx context.Context, parentResource client.Object, statefulSetName string, additionalOffset ...int) {
 	ConsistentlyWithOffset(resolveOffset(additionalOffset), func() error {
 		return k8sClient.Get(ctx, resourceKey(parentResource, statefulSetName), &appsv1.StatefulSet{})
diff --git a/tests/e2e/solrcloud_rolling_upgrade_test.go b/tests/e2e/solrcloud_rolling_upgrade_test.go
index 914cb66..6143342 100644
--- a/tests/e2e/solrcloud_rolling_upgrade_test.go
+++ b/tests/e2e/solrcloud_rolling_upgrade_test.go
@@ -23,8 +23,10 @@
 	"github.com/apache/solr-operator/controllers"
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
+	appsv1 "k8s.io/api/apps/v1"
 	"k8s.io/apimachinery/pkg/util/intstr"
 	"sigs.k8s.io/controller-runtime/pkg/client"
+	"time"
 )
 
 var _ = FDescribe("E2E - SolrCloud - Rolling Upgrades", func() {
@@ -98,7 +100,7 @@
 			// Expect the SolrCloud to be up-to-date, or in a valid restarting state
 			lastCheckNodeStatuses := make(map[string]solrv1beta1.SolrNodeStatus, *solrCloud.Spec.Replicas)
 			lastCheckReplicas := *solrCloud.Spec.Replicas
-			foundSolrCloud := expectSolrCloudWithChecks(ctx, solrCloud, func(g Gomega, cloud *solrv1beta1.SolrCloud) {
+			expectSolrCloudWithChecks(ctx, solrCloud, func(g Gomega, cloud *solrv1beta1.SolrCloud) {
 				// If there are more than 1 pods not ready, then fail because we have set MaxPodsUnavailable to 1
 				if cloud.Status.ReadyReplicas < *solrCloud.Spec.Replicas-int32(1) {
 					StopTrying("More than 1 pod (replica) is not ready, which is not allowed by the managed upgrade options").
@@ -107,9 +109,6 @@
 						Attach("SolrCloud Status", cloud.Status).
 						Now()
 				}
-				// As long as the current restart is in a healthy place, keep checking if the restart is finished
-				g.Expect(cloud.Status.UpToDateNodes).To(Equal(*cloud.Spec.Replicas), "The SolrCloud did not finish the rolling restart, not all nodes are up-to-date")
-				g.Expect(cloud.Status.ReadyReplicas).To(Equal(cloud.Status.UpToDateNodes), "The SolrCloud did not finish the rolling restart, all nodes are up-to-date, but not all are ready")
 
 				// Make sure that if a pod is deleted/recreated, it was first taken offline and "scheduledForDeletion" was set to true
 				// TODO: Try to find a better way to make sure that the deletion readinessCondition works
@@ -138,18 +137,37 @@
 						g.Expect(nodeStatus.ScheduledForDeletion).To(BeTrue(), "SolrNode %s must be scheduledForDeletion while not being 'ready' or 'upToDate', so it was taken down for the update", nodeStatus.Name)
 					}
 				}
+
+				// As long as the current restart is in a healthy place, keep checking if the restart is finished
+				g.Expect(cloud.Status.UpToDateNodes).To(Equal(*cloud.Spec.Replicas), "The SolrCloud did not finish the rolling restart, not all nodes are up-to-date")
 			})
 
+			By("When the rolling update is done, a balanceReplicas operation should be started")
+			// Wait for new pods to come up, and when they do we should be doing a balanceReplicas clusterOp
+			statefulSet = expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*45, time.Millisecond, func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Status.ReadyReplicas).To(BeEquivalentTo(*found.Spec.Replicas), "The SolrCloud did not finish the rolling restart, all nodes are up-to-date, but not all are ready")
+				clusterOp, err = controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a balanceReplicas lock after rolling update is complete.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.BalanceReplicasLock), "StatefulSet does not have a balanceReplicas lock after rolling update is complete.")
+				g.Expect(clusterOp.Metadata).To(Equal("RollingUpdateComplete"), "StatefulSet balanceReplicas lock operation has the wrong metadata.")
+			})
+
+			// After all pods are ready, make sure that the SolrCloud status is correct
+			solrCloud = expectSolrCloud(ctx, solrCloud)
+			Expect(solrCloud.Status.ReadyReplicas).To(Equal(solrCloud.Status.UpToDateNodes), "The SolrCloud did not finish the rolling restart, all nodes are up-to-date, but not all are ready")
 			// Make sure that the status object is correct for the nodes
-			for _, nodeStatus := range foundSolrCloud.Status.SolrNodes {
+			for _, nodeStatus := range solrCloud.Status.SolrNodes {
 				Expect(nodeStatus.SpecUpToDate).To(BeTrue(), "Node not finishing as up-to-date when rolling restart ends: %s", nodeStatus.Name)
 				Expect(nodeStatus.Ready).To(BeTrue(), "Node not finishing as ready when rolling restart ends: %s", nodeStatus.Name)
 			}
 
-			statefulSet = expectStatefulSet(ctx, solrCloud, solrCloud.StatefulSetName())
-			clusterOp, err = controllers.GetCurrentClusterOp(statefulSet)
-			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
-			Expect(clusterOp).To(BeNil(), "StatefulSet should not have a RollingUpdate lock after finishing a managed update.")
+			By("waiting for the balanceReplicas to finish")
+			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a balanceReplicas lock after balancing is complete.")
+			})
 
 			By("checking that the collections can be queried after the restart")
 			queryCollection(ctx, solrCloud, solrCollection1, 0)
diff --git a/tests/e2e/solrcloud_scaling_test.go b/tests/e2e/solrcloud_scaling_test.go
index 1fa3890..81ab608 100644
--- a/tests/e2e/solrcloud_scaling_test.go
+++ b/tests/e2e/solrcloud_scaling_test.go
@@ -26,6 +26,7 @@
 	appsv1 "k8s.io/api/apps/v1"
 	"k8s.io/utils/pointer"
 	"sigs.k8s.io/controller-runtime/pkg/client"
+	"strings"
 	"time"
 )
 
@@ -201,7 +202,7 @@
 			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale up")
 
 			By("waiting for the scaleUp to begin")
-			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+			statefulSet := expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*5, time.Millisecond*5, func(g Gomega, found *appsv1.StatefulSet) {
 				clusterOp, err := controllers.GetCurrentClusterOp(found)
 				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
 				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
@@ -210,7 +211,8 @@
 			})
 
 			// The first step is to increase the number of pods
-			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+			// Check very often, as the new pods will be created quickly, which will cause the cluster op to change.
+			statefulSet = expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*5, time.Millisecond*5, func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
 			})
 			clusterOp, err := controllers.GetCurrentClusterOp(statefulSet)
@@ -219,11 +221,21 @@
 			Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
 			Expect(clusterOp.Metadata).To(Equal("3"), "StatefulSet scaling lock operation has the wrong metadata.")
 
+			// Wait for new pods to come up, and when they do we should be doing a balanceReplicas clusterOp
+			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Status.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
+			})
+			clusterOp, err = controllers.GetCurrentClusterOp(statefulSet)
+			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+			Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a balanceReplicas lock after new pods are created.")
+			Expect(clusterOp.Operation).To(Equal(controllers.BalanceReplicasLock), "StatefulSet does not have a balanceReplicas lock after new pods are created.")
+			Expect(clusterOp.Metadata).To(Equal("ScaleUp"), "StatefulSet balanceReplicas lock operation has the wrong metadata.")
+
 			By("waiting for the scaleUp to finish")
 			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				clusterOp, err := controllers.GetCurrentClusterOp(found)
 				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
-				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a scaling lock after scaling is complete.")
+				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a balanceReplicas lock after balancing is complete.")
 			})
 
 			queryCollection(ctx, solrCloud, solrCollection1, 0)
@@ -269,6 +281,10 @@
 
 	BeforeEach(func() {
 		solrCloud = generateBaseSolrCloudWithPlacementPolicy(2, "minimizecores")
+
+		if strings.Contains(solrImage, ":8") || strings.Contains(solrImage, "8.") {
+			Skip("Cannot run the Scale Down Abandon test with Solr 8, as a working placementPolicy for the test cannot be defaulted")
+		}
 	})
 
 	JustBeforeEach(func(ctx context.Context) {
@@ -308,20 +324,31 @@
 			solrCloud.Spec.Replicas = pointer.Int32(int32(2))
 			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to cancel scale down")
 
-			By("Make sure that the operation is changed to a fake 'scaleUp' to redistribute replicas")
-			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+			By("Make sure the scaleDown attempts for a minute until it times out")
+			// The scaleDown will timeout after a minute, so we have to wait a bit over a minute
+			expectStatefulSetWithConsistentChecksAndDuration(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*50, func(g Gomega, found *appsv1.StatefulSet) {
 				clusterOp, err := controllers.GetCurrentClusterOp(found)
 				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
-				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
-				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
-				g.Expect(clusterOp.Metadata).To(Equal("2"), "StatefulSet scaling lock operation has the wrong metadata.")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("1"), "StatefulSet scaleDown lock operation has the wrong metadata.")
 			})
 
-			By("waiting for the fake scaleUp to finish")
+			By("Make sure that the operation is changed to a balanceReplicas to redistribute replicas")
+			// The scaleDown will timeout after a minute, so we have to wait a bit over a minute
+			expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*30, time.Millisecond*10, func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a balanceReplicas lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.BalanceReplicasLock), "StatefulSet does not have a balanceReplicas lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("UndoFailedScaleDown"), "StatefulSet balanceReplicas lock operation has the wrong metadata.")
+			})
+
+			By("waiting for the balanceReplicas to finish")
 			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				clusterOp, err := controllers.GetCurrentClusterOp(found)
 				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
-				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a scaling lock after scaling is complete.")
+				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a balanceReplicas lock after balancing is complete.")
 			})
 
 			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "After everything, the statefulset should be configured to have 2 pods")