Add in a retry queue for clusterOps (#596)

diff --git a/controllers/solr_cluster_ops_util.go b/controllers/solr_cluster_ops_util.go
index c642dc3..96859a0 100644
--- a/controllers/solr_cluster_ops_util.go
+++ b/controllers/solr_cluster_ops_util.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
 	"github.com/apache/solr-operator/controllers/util"
@@ -26,6 +27,7 @@
 	"github.com/go-logr/logr"
 	appsv1 "k8s.io/api/apps/v1"
 	corev1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/utils/pointer"
 	"net/url"
 	"sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,7 +35,116 @@
 	"time"
 )
 
-func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, podList []corev1.Pod, logger logr.Logger) (clusterLockAcquired bool, retryLaterDuration time.Duration, err error) {
+// SolrClusterOp contains metadata for cluster operations performed on SolrClouds.
+type SolrClusterOp struct {
+	// The type of Cluster Operation
+	Operation SolrClusterOperationType `json:"operation"`
+
+	// Time that the Cluster Operation was started or re-started
+	LastStartTime metav1.Time `json:"lastStartTime"`
+
+	// Time that the Cluster Operation was started or re-started
+	Metadata string `json:"metadata"`
+}
+type SolrClusterOperationType string
+
+const (
+	ScaleDownLock SolrClusterOperationType = "ScalingDown"
+	ScaleUpLock   SolrClusterOperationType = "ScalingUp"
+	UpdateLock    SolrClusterOperationType = "RollingUpdate"
+)
+
+func clearClusterOpLock(statefulSet *appsv1.StatefulSet) {
+	delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
+}
+
+func setClusterOpLock(statefulSet *appsv1.StatefulSet, op SolrClusterOp) error {
+	bytes, err := json.Marshal(op)
+	if err != nil {
+		return err
+	}
+	statefulSet.Annotations[util.ClusterOpsLockAnnotation] = string(bytes)
+	return nil
+}
+
+func setClusterOpRetryQueue(statefulSet *appsv1.StatefulSet, queue []SolrClusterOp) error {
+	if len(queue) > 0 {
+		bytes, err := json.Marshal(queue)
+		if err != nil {
+			return err
+		}
+		statefulSet.Annotations[util.ClusterOpsRetryQueueAnnotation] = string(bytes)
+	} else {
+		delete(statefulSet.Annotations, util.ClusterOpsRetryQueueAnnotation)
+	}
+	return nil
+}
+
+func GetCurrentClusterOp(statefulSet *appsv1.StatefulSet) (clusterOp *SolrClusterOp, err error) {
+	if op, hasOp := statefulSet.Annotations[util.ClusterOpsLockAnnotation]; hasOp {
+		clusterOp = &SolrClusterOp{}
+		err = json.Unmarshal([]byte(op), clusterOp)
+	}
+	return
+}
+
+func GetClusterOpRetryQueue(statefulSet *appsv1.StatefulSet) (clusterOpQueue []SolrClusterOp, err error) {
+	if op, hasOp := statefulSet.Annotations[util.ClusterOpsRetryQueueAnnotation]; hasOp {
+		err = json.Unmarshal([]byte(op), &clusterOpQueue)
+	}
+	return
+}
+
+func enqueueCurrentClusterOpForRetry(statefulSet *appsv1.StatefulSet) (hasOp bool, err error) {
+	clusterOp, err := GetCurrentClusterOp(statefulSet)
+	if err != nil || clusterOp == nil {
+		return false, err
+	}
+	clusterOpRetryQueue, err := GetClusterOpRetryQueue(statefulSet)
+	if err != nil {
+		return true, err
+	}
+	clusterOpRetryQueue = append(clusterOpRetryQueue, *clusterOp)
+	clearClusterOpLock(statefulSet)
+	return true, setClusterOpRetryQueue(statefulSet, clusterOpRetryQueue)
+}
+
+func retryNextQueuedClusterOp(statefulSet *appsv1.StatefulSet) (hasOp bool, err error) {
+	clusterOpRetryQueue, err := GetClusterOpRetryQueue(statefulSet)
+	if err != nil {
+		return hasOp, err
+	}
+	hasOp = len(clusterOpRetryQueue) > 0
+	if len(clusterOpRetryQueue) > 0 {
+		nextOp := clusterOpRetryQueue[0]
+		nextOp.LastStartTime = metav1.Now()
+		err = setClusterOpLock(statefulSet, nextOp)
+		if err != nil {
+			return hasOp, err
+		}
+		err = setClusterOpRetryQueue(statefulSet, clusterOpRetryQueue[1:])
+	}
+	return hasOp, err
+}
+
+func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterOpQueue []SolrClusterOp) (hasOp bool, err error) {
+	if err != nil {
+		return hasOp, err
+	}
+	hasOp = len(clusterOpQueue) > 0
+	if len(clusterOpQueue) > 0 {
+		nextOp := clusterOpQueue[0]
+		nextOp.LastStartTime = metav1.Now()
+		err = setClusterOpLock(statefulSet, nextOp)
+		if err != nil {
+			return hasOp, err
+		}
+		err = setClusterOpRetryQueue(statefulSet, clusterOpQueue[1:])
+	}
+	return hasOp, err
+}
+
+func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
 	desiredPods := int(*instance.Spec.Replicas)
 	configuredPods := int(*statefulSet.Spec.Replicas)
 	if desiredPods != configuredPods {
@@ -45,61 +156,63 @@
 			if len(podList) > configuredPods {
 				// There are too many pods, the statefulSet controller has yet to delete unwanted pods.
 				// Do not start the scale down until these extra pods are deleted.
-				return false, time.Second * 5, nil
+				return nil, time.Second * 5, nil
 			}
-
-			// Managed Scale down!
-			originalStatefulSet := statefulSet.DeepCopy()
-			statefulSet.Annotations[util.ClusterOpsLockAnnotation] = util.ScaleDownLock
-			// The scaleDown metadata is the number of nodes to scale down to.
-			// We only support scaling down one pod at-a-time when using a managed scale-down.
-			// If the user wishes to scale down by multiple nodes, this ClusterOp will be done once-per-node.
-			statefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = strconv.Itoa(configuredPods - 1)
-			if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-				logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", util.ScaleDownLock, "clusterOpMetadata", configuredPods-1)
-			} else {
-				clusterLockAcquired = true
+			clusterOp = &SolrClusterOp{
+				Operation: ScaleDownLock,
+				Metadata:  strconv.Itoa(configuredPods - 1),
 			}
 		} else if desiredPods > configuredPods && (instance.Spec.Scaling.PopulatePodsOnScaleUp == nil || *instance.Spec.Scaling.PopulatePodsOnScaleUp) {
 			if len(podList) < configuredPods {
 				// There are not enough pods, the statefulSet controller has yet to create the previously desired pods.
 				// Do not start the scale up until these missing pods are created.
-				return false, time.Second * 5, nil
+				return nil, time.Second * 5, nil
 			}
-			// Managed Scale up!
-			originalStatefulSet := statefulSet.DeepCopy()
-			statefulSet.Annotations[util.ClusterOpsLockAnnotation] = util.ScaleUpLock
-			// The scaleUp metadata is the number of nodes that existed before the scaleUp.
-			// This allows the scaleUp operation to know which pods will be empty after the statefulSet is scaledUp.
-			statefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = strconv.Itoa(configuredPods)
-			// We want to set the number of replicas at the beginning of the scaleUp operation
-			statefulSet.Spec.Replicas = pointer.Int32(int32(desiredPods))
-			if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-				logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", util.ScaleUpLock, "clusterOpMetadata", configuredPods, "newStatefulSetSize", desiredPods)
-			} else {
-				clusterLockAcquired = true
+			clusterOp = &SolrClusterOp{
+				Operation: ScaleUpLock,
+				Metadata:  strconv.Itoa(desiredPods),
 			}
 		} else {
 			err = scaleCloudUnmanaged(ctx, r, statefulSet, desiredPods, logger)
 		}
+	} else if scaleDownOpIsQueued {
+		// If the statefulSet and the solrCloud have the same number of pods configured, and the queued operation is a scaleDown,
+		// that means the scaleDown was reverted. So there's no reason to change the number of pods.
+		// However, a Replica Balancing should be done just in case, so do a ScaleUp, but don't change the number of pods.
+		clusterOp = &SolrClusterOp{
+			Operation: ScaleUpLock,
+			Metadata:  strconv.Itoa(desiredPods),
+		}
 	}
 	return
 }
 
 // handleManagedCloudScaleDown does the logic of a managed and "locked" cloud scale down operation.
 // This will likely take many reconcile loops to complete, as it is moving replicas away from the pods that will be scaled down.
-func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownToRaw string, podList []corev1.Pod, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
+func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
 	var scaleDownTo int
-	if scaleDownTo, err = strconv.Atoi(scaleDownToRaw); err != nil {
-		logger.Error(err, "Could not convert statefulSet annotation to int for scale-down-to information", "annotation", util.ClusterOpsMetadataAnnotation, "value", scaleDownToRaw)
+	if scaleDownTo, err = strconv.Atoi(clusterOp.Metadata); err != nil {
+		logger.Error(err, "Could not convert ScaleDown metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
 		return
 		// TODO: Create event for the CRD.
 	}
 
-	if scaleDownTo >= int(*statefulSet.Spec.Replicas) {
+	if len(podList) <= scaleDownTo {
+		// The number of pods is less than we are trying to scaleDown to, so we are done
+		return true, false, 0, nil
+	}
+	if int(*statefulSet.Spec.Replicas) <= scaleDownTo {
+		// We've done everything we need to do at this point. We just need to wait until the pods are deleted to be "done".
+		// So return and wait for the next reconcile loop, whenever it happens
+		return false, false, time.Second, nil
+	}
+	// TODO: It would be great to support a multi-node scale down when Solr supports evicting many SolrNodes at once.
+	if int(*statefulSet.Spec.Replicas) > scaleDownTo+1 {
 		// This shouldn't happen, but we don't want to be stuck if it does.
-		// Just remove the cluster Op, because the cluster has already been scaled down.
-		err = clearClusterOp(ctx, r, statefulSet, "statefulSet already scaled-down", logger)
+		// Just remove the cluster Op, because the cluster is bigger than it should be.
+		// We will retry the whole thing again, with the right metadata this time
+		operationComplete = true
+		return true, false, time.Second, nil
 	}
 
 	// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
@@ -111,62 +224,75 @@
 		},
 	}
 
-	// TODO: It would be great to support a multi-node scale down when Solr supports evicting many SolrNodes at once.
 	// Only evict the last pod, even if we are trying to scale down multiple pods.
 	// Scale down will happen one pod at a time.
-	if replicaManagementComplete, evictErr := evictSinglePod(ctx, r, instance, scaleDownTo, podList, podStoppedReadinessConditions, logger); err != nil {
-		err = evictErr
-	} else if replicaManagementComplete {
-		originalStatefulSet := statefulSet.DeepCopy()
-		statefulSet.Spec.Replicas = pointer.Int32(int32(scaleDownTo))
-		delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
-		delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
-		if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-			logger.Error(err, "Error while patching StatefulSet to finish the managed SolrCloud scale down clusterOp", "newStatefulSetReplicas", scaleDownTo)
+	var replicaManagementComplete bool
+	if replicaManagementComplete, requestInProgress, err = evictSinglePod(ctx, r, instance, scaleDownTo, podList, podStoppedReadinessConditions, logger); err == nil {
+		if replicaManagementComplete {
+			originalStatefulSet := statefulSet.DeepCopy()
+			statefulSet.Spec.Replicas = pointer.Int32(int32(scaleDownTo))
+			if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+				logger.Error(err, "Error while patching StatefulSet to scale down pods after eviction", "newStatefulSetReplicas", scaleDownTo)
+			}
+			// Return and wait for the pods to be created, which will call another reconcile
+			retryLaterDuration = 0
+		} else {
+			// Retry after five seconds to check if the replica management commands have been completed
+			retryLaterDuration = time.Second * 5
 		}
-
-		// TODO: Create event for the CRD.
-	} else {
-		// Retry after five seconds to check if the replica management commands have been completed
-		retryLaterDuration = time.Second * 5
 	}
 	return
 }
 
 // handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
 // This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
-func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleUpFromRaw string, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
-	// TODO: Think about bad pod specs, that will never come up healthy. We want to try a rolling restart in between if necessary
-	if balanceComplete, balanceErr := util.BalanceReplicasForCluster(ctx, instance, statefulSet, "scaleUp", scaleUpFromRaw, logger); err != nil {
-		err = balanceErr
-	} else if balanceComplete {
-		// Once the replica balancing is complete, finish the cluster operation by deleting the statefulSet annotations
+func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
+	desiredPods, err := strconv.Atoi(clusterOp.Metadata)
+	if err != nil {
+		logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
+		return
+	}
+	configuredPods := int(*statefulSet.Spec.Replicas)
+	if configuredPods < desiredPods {
+		// The first thing to do is increase the number of pods the statefulSet is running
 		originalStatefulSet := statefulSet.DeepCopy()
-		delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
-		delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
-		if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-			logger.Error(err, "Error while patching StatefulSet to finish the managed SolrCloud scale up clusterOp")
-		}
+		statefulSet.Spec.Replicas = pointer.Int32(int32(desiredPods))
 
-		// TODO: Create event for the CRD.
+		err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
+		if err != nil {
+			logger.Error(err, "Error while patching StatefulSet to increase the number of pods for the ScaleUp")
+		}
+		// Return and wait for the pods to be created, which will call another reconcile
+		return false, false, 0, err
 	} else {
-		// Retry after five seconds to check if the replica management commands have been completed
-		retryLaterDuration = time.Second * 5
+		// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
+		readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
+			util.SolrIsNotStoppedReadinessCondition: {
+				reason:  PodStarted,
+				message: "Pod is not being deleted, traffic to the pod must be started",
+				status:  true,
+			},
+		}
+		for _, pod := range podList {
+			if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
+				err = e
+				return
+			} else {
+				pod = *updatedPod
+			}
+		}
+		if operationComplete, requestInProgress, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, "scaleUp", clusterOp.Metadata, logger); !operationComplete && err == nil {
+			// Retry after five seconds to check if the replica management commands have been completed
+			retryLaterDuration = time.Second * 5
+		}
 	}
 	return
 }
 
-func determineRollingUpdateClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, outOfDatePods util.OutOfDatePodSegmentation, logger logr.Logger) (clusterLockAcquired bool, retryLaterDuration time.Duration, err error) {
+func determineRollingUpdateClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, outOfDatePods util.OutOfDatePodSegmentation) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
 	if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && !outOfDatePods.IsEmpty() {
-		// Managed Rolling Upgrade!
-		originalStatefulSet := statefulSet.DeepCopy()
-		statefulSet.Annotations[util.ClusterOpsLockAnnotation] = util.UpdateLock
-		// No rolling update metadata is currently required
-		statefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = ""
-		if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-			logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", util.UpdateLock, "clusterOpMetadata", "")
-		} else {
-			clusterLockAcquired = true
+		clusterOp = &SolrClusterOp{
+			Operation: UpdateLock,
 		}
 	}
 	return
@@ -174,21 +300,19 @@
 
 // handleManagedCloudRollingUpdate does the logic of a managed and "locked" cloud rolling update operation.
 // This will take many reconcile loops to complete, as it is deleting pods/moving replicas.
-func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
+func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
 	// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
 	updateLogger := logger.WithName("ManagedUpdateSelector")
 
-	// First check if all pods are up to date. If so the rolling update is complete
-	if outOfDatePods.IsEmpty() {
-		// Once the rolling update is complete, finish the cluster operation by deleting the statefulSet annotations
-		originalStatefulSet := statefulSet.DeepCopy()
-		delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
-		delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
-		if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-			logger.Error(err, "Error while patching StatefulSet to finish the managed SolrCloud rollingUpdate clusterOp")
-		}
-
-		// TODO: Create event for the CRD.
+	// First check if all pods are up to date and ready. If so the rolling update is complete
+	configuredPods := int(*statefulSet.Spec.Replicas)
+	if configuredPods == availableUpdatedPodCount {
+		// The configured number of pods are all healthy and up to date. The operation is complete
+		operationComplete = true
+		return
+	} else if outOfDatePods.IsEmpty() {
+		// Just return and wait for the updated pods to come up healthy, these will call new reconciles, so there is nothing for us to do
+		return
 	} else {
 		// The out of date pods that have not been started, should all be updated immediately.
 		// There is no use "safely" updating pods which have not been started yet.
@@ -211,7 +335,8 @@
 
 		// Only actually delete a running pod if it has been evicted, or doesn't need eviction (persistent storage)
 		for _, pod := range podsToUpdate {
-			retryLaterDurationTemp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, podsHaveReplicas[pod.Name], updateLogger)
+			retryLaterDurationTemp, inProgTmp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, podsHaveReplicas[pod.Name], updateLogger)
+			requestInProgress = requestInProgress || inProgTmp
 
 			// Use the retryLaterDuration of the pod that requires a retry the soonest (smallest duration > 0)
 			if retryLaterDurationTemp > 0 && (retryLaterDurationTemp < retryLaterDuration || retryLaterDuration == 0) {
@@ -221,7 +346,6 @@
 				err = errTemp
 			}
 		}
-
 		if retryLater && retryLaterDuration == 0 {
 			retryLaterDuration = time.Second * 10
 		}
@@ -229,21 +353,50 @@
 	return
 }
 
-// clearClusterOp simply removes any clusterOp for the given statefulSet.
-// This should only be used as a "break-glass" scenario. Do not use this to finish off successful clusterOps.
-func clearClusterOp(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
-	logger = logger.WithValues("reason", reason, "clusterOp", statefulSet.Annotations[util.ClusterOpsLockAnnotation], "clusterOpMetadata", statefulSet.Annotations[util.ClusterOpsMetadataAnnotation])
+// clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
+func clearClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
 	originalStatefulSet := statefulSet.DeepCopy()
-	delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
-	delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
+	clearClusterOpLock(statefulSet)
 	if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
-		logger.Error(err, "Error while patching StatefulSet to remove unneeded clusterLockOp annotation")
+		logger.Error(err, "Error while patching StatefulSet to remove unneeded clusterOpLock annotation", "reason", reason)
 	} else {
-		logger.Error(err, "Removed unneeded clusterLockOp annotation from statefulSet")
+		logger.Info("Removed unneeded clusterOpLock annotation from statefulSet", "reason", reason)
 	}
 	return
 }
 
+// enqueueCurrentClusterOpForRetryWithPatch adds the current clusterOp to the clusterOpRetryQueue, and clears the current cluster Op.
+// This method will send the StatefulSet patch to the API Server.
+func enqueueCurrentClusterOpForRetryWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
+	originalStatefulSet := statefulSet.DeepCopy()
+	hasOp, err := enqueueCurrentClusterOpForRetry(statefulSet)
+	if hasOp && err == nil {
+		err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
+	}
+	if err != nil {
+		logger.Error(err, "Error while patching StatefulSet to enqueue clusterOp for retry", "reason", reason)
+	} else if hasOp {
+		logger.Info("Enqueued current clusterOp to continue later", "reason", reason)
+	}
+	return err
+}
+
+// retryNextQueuedClusterOpWithPatch removes the first clusterOp from the clusterOpRetryQueue, and sets it as the current cluster Op.
+// This method will send the StatefulSet patch to the API Server.
+func retryNextQueuedClusterOpWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, clusterOpQueue []SolrClusterOp, logger logr.Logger) (err error) {
+	originalStatefulSet := statefulSet.DeepCopy()
+	hasOp, err := retryNextQueuedClusterOpWithQueue(statefulSet, clusterOpQueue)
+	if hasOp && err == nil {
+		err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
+	}
+	if err != nil {
+		logger.Error(err, "Error while patching StatefulSet to retry next queued clusterOp")
+	} else if hasOp {
+		logger.Info("Retrying next queued clusterOp")
+	}
+	return err
+}
+
 // scaleCloudUnmanaged does simple scaling of a SolrCloud without moving replicas.
 // This is not a "locked" cluster operation, and does not block other cluster operations from taking place.
 func scaleCloudUnmanaged(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, scaleTo int, logger logr.Logger) (err error) {
@@ -283,7 +436,7 @@
 	return
 }
 
-func evictSinglePod(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, scaleDownTo int, podList []corev1.Pod, readinessConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (podIsEmpty bool, err error) {
+func evictSinglePod(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, scaleDownTo int, podList []corev1.Pod, readinessConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (podIsEmpty bool, requestInProgress bool, err error) {
 	var pod *corev1.Pod
 	podName := instance.GetSolrPodName(scaleDownTo)
 	for _, p := range podList {
@@ -295,14 +448,14 @@
 
 	podHasReplicas := true
 	if replicas, e := getReplicasForPod(ctx, instance, podName, logger); e != nil {
-		return false, e
+		return false, false, e
 	} else {
 		podHasReplicas = len(replicas) > 0
 	}
 
 	// The pod doesn't exist, we cannot empty it
 	if pod == nil {
-		return !podHasReplicas, errors.New("Could not find pod " + podName + " when trying to migrate replicas to scale down pod.")
+		return !podHasReplicas, false, errors.New("Could not find pod " + podName + " when trying to migrate replicas to scale down pod.")
 	}
 
 	if updatedPod, e := EnsurePodReadinessConditions(ctx, r, pod, readinessConditions, logger); e != nil {
@@ -313,8 +466,8 @@
 	}
 
 	// Only evict from the pod if it contains replicas in the clusterState
-	if e, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "scaleDown", logger); e != nil {
-		err = e
+	var canDeletePod bool
+	if err, canDeletePod, requestInProgress = util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "scaleDown", logger); err != nil {
 		logger.Error(err, "Error while evicting replicas on Pod, when scaling down SolrCloud", "pod", pod.Name)
 	} else if canDeletePod {
 		// The pod previously had replicas, so loop back in the next reconcile to make sure that the pod doesn't
diff --git a/controllers/solr_pod_lifecycle_util.go b/controllers/solr_pod_lifecycle_util.go
index 203fc13..8af455b 100644
--- a/controllers/solr_pod_lifecycle_util.go
+++ b/controllers/solr_pod_lifecycle_util.go
@@ -46,7 +46,7 @@
 	ScaleDown        PodConditionChangeReason = "ScaleDown"
 )
 
-func DeletePodForUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, pod *corev1.Pod, podHasReplicas bool, logger logr.Logger) (requeueAfterDuration time.Duration, err error) {
+func DeletePodForUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, pod *corev1.Pod, podHasReplicas bool, logger logr.Logger) (requeueAfterDuration time.Duration, requestInProgress bool, err error) {
 	// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
 	ps := PodStarted
 	podStoppedReadinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
@@ -75,10 +75,12 @@
 	deletePod := false
 	if PodConditionEquals(pod, util.SolrReplicasNotEvictedReadinessCondition, EvictingReplicas) {
 		// Only evict pods that contain replicas in the clusterState
-		if evictError, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "podUpdate", logger); evictError != nil {
+		if evictError, canDeletePod, inProgTmp := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "podUpdate", logger); evictError != nil {
+			requestInProgress = true
 			err = evictError
 			logger.Error(err, "Error while evicting replicas on pod", "pod", pod.Name)
 		} else if canDeletePod {
+			requestInProgress = inProgTmp
 			if podHasReplicas {
 				// The pod previously had replicas, so loop back in the next reconcile to make sure that the pod doesn't
 				// have replicas anymore even if the previous evict command was successful.
@@ -88,6 +90,7 @@
 				deletePod = true
 			}
 		} else {
+			requestInProgress = inProgTmp
 			// Try again in 5 seconds if we cannot delete a pod.
 			requeueAfterDuration = time.Second * 5
 		}
diff --git a/controllers/solrcloud_controller.go b/controllers/solrcloud_controller.go
index 56fdbd7..100522f 100644
--- a/controllers/solrcloud_controller.go
+++ b/controllers/solrcloud_controller.go
@@ -453,37 +453,115 @@
 	// Update or Scale, one-at-a-time. We do not want to do both.
 	hasReadyPod := newStatus.ReadyReplicas > 0
 	var retryLaterDuration time.Duration
-	if clusterOpLock, hasAnn := statefulSet.Annotations[util.ClusterOpsLockAnnotation]; hasAnn {
-		clusterOpMetadata := statefulSet.Annotations[util.ClusterOpsMetadataAnnotation]
-		switch clusterOpLock {
-		case util.UpdateLock:
-			retryLaterDuration, err = handleManagedCloudRollingUpdate(ctx, r, instance, statefulSet, outOfDatePods, hasReadyPod, availableUpdatedPodCount, logger)
-		case util.ScaleDownLock:
-			retryLaterDuration, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, clusterOpMetadata, podList, logger)
-		case util.ScaleUpLock:
-			retryLaterDuration, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOpMetadata, logger)
+	if clusterOp, opErr := GetCurrentClusterOp(statefulSet); clusterOp != nil && opErr == nil {
+		var operationComplete, requestInProgress bool
+		operationFound := true
+		shortTimeoutForRequeue := true
+		switch clusterOp.Operation {
+		case UpdateLock:
+			operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudRollingUpdate(ctx, r, instance, statefulSet, outOfDatePods, hasReadyPod, availableUpdatedPodCount, logger)
+			// Rolling Updates should not be requeued quickly. The operation is expected to take a long time and thus should have a longTimeout if errors are not seen.
+			shortTimeoutForRequeue = false
+		case ScaleDownLock:
+			operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, clusterOp, podList, logger)
+		case ScaleUpLock:
+			operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
 		default:
+			operationFound = false
 			// This shouldn't happen, but we don't want to be stuck if it does.
 			// Just remove the cluster Op, because the solr operator version running does not support it.
-			err = clearClusterOp(ctx, r, statefulSet, "clusterOp not supported", logger)
+			err = clearClusterOpLockWithPatch(ctx, r, statefulSet, "clusterOp not supported", logger)
+		}
+		if operationFound {
+			if operationComplete {
+				// Once the operation is complete, finish the cluster operation by deleting the statefulSet annotations
+				err = clearClusterOpLockWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" complete", logger)
+
+				// TODO: Create event for the CRD.
+			} else if !requestInProgress {
+				// If the cluster operation is in a stoppable place (not currently doing an async operation), and either:
+				//   - the operation hit an error and has taken more than 1 minute
+				//   - the operation has a short timeout and has taken more than 1 minute
+				//   - the operation has a long timeout and has taken more than 10 minutes
+				// then continue the operation later.
+				// (it will likely immediately continue, since it is unlikely there is another operation to run)
+				clusterOpRuntime := time.Since(clusterOp.LastStartTime.Time)
+				queueForLaterReason := ""
+				if err != nil && clusterOpRuntime > time.Minute {
+					queueForLaterReason = "hit an error during operation"
+				} else if shortTimeoutForRequeue && clusterOpRuntime > time.Minute {
+					queueForLaterReason = "timed out during operation (1 minutes)"
+				} else if clusterOpRuntime > time.Minute*10 {
+					queueForLaterReason = "timed out during operation (10 minutes)"
+				}
+				if queueForLaterReason != "" {
+					err = enqueueCurrentClusterOpForRetryWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" "+queueForLaterReason, logger)
+
+					// TODO: Create event for the CRD.
+				}
+			}
+		}
+	} else if opErr == nil {
+		if clusterOpQueue, opErr := GetClusterOpRetryQueue(statefulSet); opErr == nil {
+			queuedRetryOps := map[SolrClusterOperationType]int{}
+
+			for i, op := range clusterOpQueue {
+				queuedRetryOps[op.Operation] = i
+			}
+			// Start cluster operations if needed.
+			// The operations will be actually run in future reconcile loops, but a clusterOpLock will be acquired here.
+			// And that lock will tell future reconcile loops that the operation needs to be done.
+			clusterOp, retryLaterDuration, err = determineRollingUpdateClusterOpLockIfNecessary(instance, outOfDatePods)
+			// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
+			if queueIdx, opIsQueued := queuedRetryOps[UpdateLock]; clusterOp != nil && opIsQueued {
+				clusterOpQueue[queueIdx] = *clusterOp
+				clusterOp = nil
+			}
+
+			// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
+			// a "locked" cluster operation
+			if clusterOp == nil {
+				_, scaleDownOpIsQueued := queuedRetryOps[ScaleDownLock]
+				clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, logger)
+
+				// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
+				if clusterOp != nil {
+					// Only one of ScaleUp or ScaleDown can be queued at one time
+					if queueIdx, opIsQueued := queuedRetryOps[ScaleDownLock]; opIsQueued {
+						clusterOpQueue[queueIdx] = *clusterOp
+						clusterOp = nil
+					}
+					if queueIdx, opIsQueued := queuedRetryOps[ScaleUpLock]; opIsQueued {
+						clusterOpQueue[queueIdx] = *clusterOp
+						clusterOp = nil
+					}
+				}
+			}
+
+			if clusterOp != nil {
+				// Starting a locked cluster operation!
+				originalStatefulSet := statefulSet.DeepCopy()
+				clusterOp.LastStartTime = metav1.Now()
+				err = setClusterOpLock(statefulSet, *clusterOp)
+				if err == nil {
+					err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
+				}
+				if err != nil {
+					logger.Error(err, "Error while patching StatefulSet to start locked clusterOp", clusterOp.Operation, "clusterOpMetadata", clusterOp.Metadata)
+				} else {
+					logger.Info("Started locked clusterOp", "clusterOp", clusterOp.Operation, "clusterOpMetadata", clusterOp.Metadata)
+				}
+			} else {
+				// No new clusterOperation has been started, retry the next queued clusterOp, if there are any operations in the retry queue.
+				err = retryNextQueuedClusterOpWithPatch(ctx, r, statefulSet, clusterOpQueue, logger)
+			}
+
+			// After a lock is acquired, the reconcile will be started again because the StatefulSet is being watched for changes
+		} else {
+			err = opErr
 		}
 	} else {
-		lockAcquired := false
-		// Start cluster operations if needed.
-		// The operations will be actually run in future reconcile loops, but a clusterOpLock will be acquired here.
-		// And that lock will tell future reconcile loops that the operation needs to be done.
-		// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
-		// a "locked" cluster operation
-		lockAcquired, retryLaterDuration, err = determineRollingUpdateClusterOpLockIfNecessary(ctx, r, instance, statefulSet, outOfDatePods, logger)
-		// Start cluster operations if needed.
-		// The operations will be actually run in future reconcile loops, but a clusterOpLock will be acquired here.
-		// And that lock will tell future reconcile loops that the operation needs to be done.
-		// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
-		// a "locked" cluster operation
-		if !lockAcquired {
-			lockAcquired, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, podList, logger)
-		}
-		// After a lock is acquired, the reconcile will be started again because the StatefulSet is being watched
+		err = opErr
 	}
 	if err != nil && retryLaterDuration == 0 {
 		retryLaterDuration = time.Second * 5
diff --git a/controllers/util/solr_scale_util.go b/controllers/util/solr_scale_util.go
index 1f7be5c..a88531d 100644
--- a/controllers/util/solr_scale_util.go
+++ b/controllers/util/solr_scale_util.go
@@ -31,7 +31,7 @@
 // a successful status returned from the command. So if we delete the asyncStatus, and then something happens in the operator,
 // and we lose our state, then we will need to retry the balanceReplicas command. This should be ok since calling
 // balanceReplicas multiple times should not be bad when the replicas for the cluster are already balanced.
-func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, err error) {
+func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, requestInProgress bool, err error) {
 	logger = logger.WithValues("balanceReason", balanceReason)
 	// If the Cloud has 1 or zero pods, there is no reason to balance replicas.
 	if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas < 1 {
@@ -65,9 +65,11 @@
 				} else if apiError != nil {
 					err = apiError
 				}
-				if err == nil {
+
+				if !balanceComplete && err == nil {
 					logger.Info("Started balancing replicas across cluster.", "requestId", requestId)
-				} else {
+					requestInProgress = true
+				} else if err == nil {
 					logger.Error(err, "Could not balance replicas across the cluster. Will try again.")
 				}
 			}
@@ -79,6 +81,8 @@
 				logger.Info("Replica Balancing command completed successfully")
 			} else if asyncState == "failed" {
 				logger.Info("Replica Balancing command failed. Will try again", "message", message)
+			} else {
+				requestInProgress = true
 			}
 
 			// Delete the async request Id if the async request is successful or failed.
@@ -87,6 +91,7 @@
 				if _, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
 					logger.Error(err, "Could not delete Async request status.", "requestId", requestId)
 					balanceComplete = false
+					requestInProgress = true
 				}
 			}
 		}
diff --git a/controllers/util/solr_update_util.go b/controllers/util/solr_update_util.go
index e0ca2f0..7d39cd9 100644
--- a/controllers/util/solr_update_util.go
+++ b/controllers/util/solr_update_util.go
@@ -504,7 +504,7 @@
 // EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod.
 // For updates this will only be called for pods using ephemeral data.
 // For scale-down operations, this can be called for pods using ephemeral or persistent data.
-func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, podHasReplicas bool, evictionReason string, logger logr.Logger) (err error, canDeletePod bool) {
+func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, podHasReplicas bool, evictionReason string, logger logr.Logger) (err error, canDeletePod bool, requestInProgress bool) {
 	logger = logger.WithValues("evictionReason", evictionReason)
 	// If the Cloud has 1 or zero pods, and this is the "-0" pod, then delete the data since we can't move it anywhere else
 	// Otherwise, move the replicas to other pods
@@ -537,6 +537,7 @@
 				}
 				if err == nil {
 					logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
+					requestInProgress = true
 				} else {
 					logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again.")
 				}
@@ -552,6 +553,8 @@
 				logger.Info("Migration of all replicas off of pod before deletion complete. Pod can now be deleted.", "pod", pod.Name)
 			} else if asyncState == "failed" {
 				logger.Info("Migration of all replicas off of pod before deletion failed. Will try again.", "pod", pod.Name, "message", message)
+			} else {
+				requestInProgress = true
 			}
 
 			// Delete the async request Id if the async request is successful or failed.
@@ -560,9 +563,10 @@
 				if _, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
 					logger.Error(err, "Could not delete Async request status.", "requestId", requestId)
 					canDeletePod = false
+					requestInProgress = true
 				}
 			}
 		}
 	}
-	return err, canDeletePod
+	return err, canDeletePod, requestInProgress
 }
diff --git a/controllers/util/solr_util.go b/controllers/util/solr_util.go
index 2d09d0b..88949d7 100644
--- a/controllers/util/solr_util.go
+++ b/controllers/util/solr_util.go
@@ -53,11 +53,8 @@
 
 	// Protected StatefulSet annotations
 	// These are to be saved on a statefulSet update
-	ClusterOpsLockAnnotation     = "solr.apache.org/clusterOpsLock"
-	ScaleDownLock                = "scalingDown"
-	ScaleUpLock                  = "scalingUp"
-	UpdateLock                   = "rollingUpdate"
-	ClusterOpsMetadataAnnotation = "solr.apache.org/clusterOpsMetadata"
+	ClusterOpsLockAnnotation       = "solr.apache.org/clusterOpsLock"
+	ClusterOpsRetryQueueAnnotation = "solr.apache.org/clusterOpsRetryQueue"
 
 	SolrIsNotStoppedReadinessCondition       = "solr.apache.org/isNotStopped"
 	SolrReplicasNotEvictedReadinessCondition = "solr.apache.org/replicasNotEvicted"
@@ -624,8 +621,20 @@
 	// Cluster Operations are saved in the annotations of the SolrCloud StatefulSet.
 	// ClusterOps information is saved to the statefulSet independently of the general StatefulSet update.
 	// These annotations can also not be overridden set by the user.
-	expected.Annotations[ClusterOpsLockAnnotation] = found.Annotations[ClusterOpsLockAnnotation]
-	expected.Annotations[ClusterOpsMetadataAnnotation] = found.Annotations[ClusterOpsMetadataAnnotation]
+	if found.Annotations != nil {
+		if lock, hasLock := found.Annotations[ClusterOpsLockAnnotation]; hasLock {
+			if expected.Annotations == nil {
+				expected.Annotations = make(map[string]string, 1)
+			}
+			expected.Annotations[ClusterOpsLockAnnotation] = lock
+		}
+		if queue, hasQueue := found.Annotations[ClusterOpsRetryQueueAnnotation]; hasQueue {
+			if expected.Annotations == nil {
+				expected.Annotations = make(map[string]string, 1)
+			}
+			expected.Annotations[ClusterOpsRetryQueueAnnotation] = queue
+		}
+	}
 
 	// Scaling (i.e. changing) the number of replicas in the SolrCloud statefulSet is handled during the clusterOps
 	// section of the SolrCloud reconcile loop
diff --git a/docs/solr-cloud/cluster-operations.md b/docs/solr-cloud/cluster-operations.md
new file mode 100644
index 0000000..6b39886
--- /dev/null
+++ b/docs/solr-cloud/cluster-operations.md
@@ -0,0 +1,95 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+ -->
+
+# Cluster Operations
+_Since v0.8.0_
+
+Solr Clouds are complex distributed systems, and thus any operations that deal with data availability should be handled with care.
+
+## Cluster Operation Locks
+
+Since cluster operations deal with Solr's index data (either the availability of it, or moving it), its safest to only allow one operation to take place at a time.
+That is why these operations must first obtain a lock on the SolrCloud before execution can be started.
+
+### Lockable Operations
+
+- [Managed Rolling Updates](managed-updates.md)
+- [Scaling Down with Replica Migrations](scaling.md#solr-pod-scale-down)
+- [Scaling Up with Replica Migrations](scaling.md#solr-pod-scale-up)
+
+### How is the Lock Implemented?
+
+The lock is implemented as an annotation on the SolrCloud's `StatefulSet`.
+The cluster operation retry queue is also implemented as an annotation.
+These locks can be viewed at the following annotation keys:
+
+- `solr.apache.org/clusterOpsLock` - The cluster operation that currently holds a lock on the SolrCloud and is executing.
+- `solr.apache.org/clusterOpsRetryQueue` - The queue of cluster operations that timed out and will be retried in order after the `clusterOpsLock` is given up.
+
+
+### Avoiding Deadlocks
+
+If all cluster operations executed without any issues, there would be no need to worry about deadlocks.
+Cluster operations give up the lock when the operation is complete, and then other operations that have been waiting can proceed.
+Unfortunately, these cluster operations can and will fail for a number of reasons:
+
+- Replicas have no other pod to be placed when moving off of a node. (Due to the [Replica Placement Plugin](https://solr.apache.org/guide/solr/latest/configuration-guide/replica-placement-plugins.html) used)
+- There are insufficient resources to create new Solr Pods.
+- The Solr Pod Template has an error and new Solr Pods cannot be started successfully.
+
+If this is the case, then we need to be able to stop the locked cluster operation if it hasn't succeeded in a certain time period.
+The cluster operation can only be stopped if there is no background task (async request) being executed in the Solr Cluster.
+Once cluster operation reaches a point at which it can stop, and the locking-timeout has been exceeded or an error was found, the cluster operation is _paused_, and added to a queue to retry later.
+The _timeout_ is different per-operation:
+- Scaling (Up or Down): **1 minute**
+- Rolling restarts: **10 minutes**
+
+Immediately afterwards, the Solr Operator sees if there are any other operations that need to take place while before the queued cluster operation is re-started.
+This allows for users to make changes to fix the reason why the cluster operation was failing.
+Examples:
+
+- **If there are insufficient resources to create new Solr Pods** \
+  The user can decrease the resource requirements in the Pod Template. \
+  This will create a `Rolling Update` cluster operation that will run once the `Scale Up` is paused. \
+  The `Scale Up` will be dequeued when the `Rolling Update` is complete, and can now complete because there are more available resources in the Kubernetes Cluster.
+
+- **Scale Down is failing because a replica from the scaled-down pod has nowhere to be moved to** \
+  The user can see this error in the logs, and know that the scale down won't work for their use case. \
+  Instead they will have to scale the SolrCloud to the number of pods that the `StatefulSet` is currently running. \
+  Once the `Scale Down` is paused, it will be replaced by a `Scale Up` operation to current number of running pods. \
+  This doesn't actually increase the number of pods, but it will issue a command to Solr to balance replicas across all pods, to make sure the cluster is well-balanced after the failed `ScaleDown`.
+
+If a queued operation is going to be retried, the Solr Operator first makes sure that its values are still valid.
+For the `Scale Down` example above, when the Solr Operator tries to restart the queued `Scale Down` operation, it sees that the `SolrCloud.Spec.Replicas` is no longer lower than the current number of Solr Pods.
+Therefore, the `Scale Down` does not need to be retried, and a "fake" `Scale Up` needs to take place.
+
+### In the case of an emergency
+
+When all else fails, and you need to stop a cluster operation, you can remove the lock annotation from the `StatefulSet` manually.
+
+Edit the StatefulSet (e.g. `kubectl edit statefulset <name>`) and remove the cluster operation lock annotation: `solr.apache.org/clusterOpsLock`
+
+This can be done via the following command:
+
+```bash
+$ kubectl annotate statefulset ${statefulSetName} solr.apache.org/clusterOpsLock-
+```
+
+This will only remove the current running cluster operation, if other cluster operations have been queued, they will be retried once the lock annotation is removed.
+Also if the operation still needs to occur to put the SolrCloud in its expected state, then the operation will be retried once a lock can be acquired.
+The only way to have the cluster operation not run again is to put the SolrCloud back to its previous state (for scaling, set `SolrCloud.Spec.replicas` to the value found in `StatefulSet.Spec.replicas`).
+If the SolrCloud requires a rolling restart, it cannot be "put back to its previous state". The only way to move forward is to either delete the `StatefulSet` (a very dangerous operation), or find a way to allow the `RollingUpdate` operation to succeed.
\ No newline at end of file
diff --git a/docs/solr-cloud/managed-updates.md b/docs/solr-cloud/managed-updates.md
index 8eda96f..b3b8c47 100644
--- a/docs/solr-cloud/managed-updates.md
+++ b/docs/solr-cloud/managed-updates.md
@@ -24,6 +24,8 @@
 
 The operator will find all pods that have not been updated yet and choose the next set of pods to delete for an update, given the following workflow.
 
+Note: Managed Updates are a executed via [Cluster Operation Locks](cluster-operations.md), please refer to the documentation for more information about how these operations are executed.
+
 ## Pod Update Workflow
 
 The logic goes as follows:
diff --git a/docs/solr-cloud/scaling.md b/docs/solr-cloud/scaling.md
index 73d6df9..491cbba 100644
--- a/docs/solr-cloud/scaling.md
+++ b/docs/solr-cloud/scaling.md
@@ -44,6 +44,8 @@
 
 For now Replicas are not scaled up and down themselves, they are just moved to utilize new Solr pods or vacate soon-to-be-deleted Solr pods.
 
+Note: Scaling actions with replica movements are a executed via [Cluster Operation Locks](cluster-operations.md), please refer to the documentation for more information about how these operations are executed.
+
 ### Solr Pod Scale-Down
 
 When the desired number of Solr Pods that should be run `SolrCloud.Spec.Replicas` is decreased,
diff --git a/helm/solr-operator/Chart.yaml b/helm/solr-operator/Chart.yaml
index 41c10ad..2977fe9 100644
--- a/helm/solr-operator/Chart.yaml
+++ b/helm/solr-operator/Chart.yaml
@@ -91,6 +91,17 @@
           url: https://github.com/apache/solr-operator/issues/560
         - name: Github PR
           url: https://github.com/apache/solr-operator/pull/586
+        - name: Documentation
+          url: https://apache.github.io/solr-operator/docs/solr-cloud/cluster-operations.html
+    - kind: added
+      description: Cluster Operation Locks now give other operations a chance to run, every minute, to eliminate the risk of deadlocks
+      links:
+        - name: Github Issue
+          url: https://github.com/apache/solr-operator/issues/560
+        - name: Github PR
+          url: https://github.com/apache/solr-operator/pull/596
+        - name: Documentation
+          url: https://apache.github.io/solr-operator/docs/solr-cloud/cluster-operations.html#avoiding-deadlocks
   artifacthub.io/images: |
     - name: solr-operator
       image: apache/solr-operator:v0.8.0-prerelease
diff --git a/tests/e2e/resource_utils_test.go b/tests/e2e/resource_utils_test.go
index 3c6c9fb..75bdb83 100644
--- a/tests/e2e/resource_utils_test.go
+++ b/tests/e2e/resource_utils_test.go
@@ -282,6 +282,13 @@
 	}).Should(MatchError("pods \""+podName+"\" not found"), "Pod exists when it should not")
 }
 
+func expectNoPodNow(ctx context.Context, parentResource client.Object, podName string, additionalOffset ...int) {
+	ExpectWithOffset(
+		resolveOffset(additionalOffset),
+		k8sClient.Get(ctx, resourceKey(parentResource, podName), &corev1.Pod{}),
+	).To(MatchError("pods \""+podName+"\" not found"), "Pod exists when it should not")
+}
+
 func expectService(ctx context.Context, parentResource client.Object, serviceName string, selectorLables map[string]string, isHeadless bool, additionalOffset ...int) *corev1.Service {
 	return expectServiceWithChecks(ctx, parentResource, serviceName, selectorLables, isHeadless, nil, resolveOffset(additionalOffset))
 }
diff --git a/tests/e2e/solrcloud_rolling_upgrade_test.go b/tests/e2e/solrcloud_rolling_upgrade_test.go
index c3fe789..914cb66 100644
--- a/tests/e2e/solrcloud_rolling_upgrade_test.go
+++ b/tests/e2e/solrcloud_rolling_upgrade_test.go
@@ -20,7 +20,7 @@
 import (
 	"context"
 	solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
-	"github.com/apache/solr-operator/controllers/util"
+	"github.com/apache/solr-operator/controllers"
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	"k8s.io/apimachinery/pkg/util/intstr"
@@ -89,7 +89,10 @@
 				}
 			})
 			statefulSet := expectStatefulSet(ctx, solrCloud, solrCloud.StatefulSetName())
-			Expect(statefulSet.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.UpdateLock), "StatefulSet does not have a RollingUpdate lock after starting managed update.")
+			clusterOp, err := controllers.GetCurrentClusterOp(statefulSet)
+			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+			Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a RollingUpdate lock.")
+			Expect(clusterOp.Operation).To(Equal(controllers.UpdateLock), "StatefulSet does not have a RollingUpdate lock after starting managed update.")
 
 			By("waiting for the rolling restart to complete")
 			// Expect the SolrCloud to be up-to-date, or in a valid restarting state
@@ -144,7 +147,9 @@
 			}
 
 			statefulSet = expectStatefulSet(ctx, solrCloud, solrCloud.StatefulSetName())
-			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a RollingUpdate lock after finishing a managed update.")
+			clusterOp, err = controllers.GetCurrentClusterOp(statefulSet)
+			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+			Expect(clusterOp).To(BeNil(), "StatefulSet should not have a RollingUpdate lock after finishing a managed update.")
 
 			By("checking that the collections can be queried after the restart")
 			queryCollection(ctx, solrCloud, solrCollection1, 0)
diff --git a/tests/e2e/solrcloud_scaling_test.go b/tests/e2e/solrcloud_scaling_test.go
index b906176..d3a0868 100644
--- a/tests/e2e/solrcloud_scaling_test.go
+++ b/tests/e2e/solrcloud_scaling_test.go
@@ -20,7 +20,7 @@
 import (
 	"context"
 	solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
-	"github.com/apache/solr-operator/controllers/util"
+	"github.com/apache/solr-operator/controllers"
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	appsv1 "k8s.io/api/apps/v1"
@@ -67,36 +67,66 @@
 			By("waiting for the scaleDown of first pod to begin")
 			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
-				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
-				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "2"), "StatefulSet scaling lock operation has the wrong metadata.")
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("2"), "StatefulSet scaling lock operation has the wrong metadata.")
 			})
 			queryCollection(ctx, solrCloud, solrCollection2, 0)
 
 			By("waiting for the scaleDown of the first pod to finish")
 			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should now have 2 pods, after the replicas have been moved off the first pod.")
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("2"), "StatefulSet scaling lock operation has the wrong metadata.")
 			})
 			queryCollection(ctx, solrCloud, solrCollection2, 0)
 
-			By("waiting for the scaleDown of second pod to begin")
+			// Wait till the pod has actually been deleted
 			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
-				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should still have 2 pods, because the scale down should first move Solr replicas")
-				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
-				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "1"), "StatefulSet scaling lock operation has the wrong metadata.")
+				g.Expect(found.Status.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should now have 2 pods, after the replicas have been moved off the first pod.")
 			})
-			queryCollection(ctx, solrCloud, solrCollection1, 0)
+
+			By("waiting for the scaleDown of second pod to begin")
+			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("1"), "StatefulSet scaling lock operation has the wrong metadata.")
+			})
+			// When the next scale down happens, the 3rd solr pod (ordinal 2) should be gone, and the statefulSet replicas should be 2 across the board.
+			// The first scale down should not be complete until this is done.
+			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should still have 2 pods configured, because the scale down should first move Solr replicas")
+			Expect(statefulSet.Status.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should only have 2 pods running, because previous pod scale down should have completely finished")
 			// This pod check must happen after the above clusterLock and replicas check.
 			// The StatefulSet controller might take a good amount of time to actually delete the pod,
 			// and the replica migration/cluster op might already be done by the time the first pod is deleted.
-			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(2))
+			expectNoPodNow(ctx, solrCloud, solrCloud.GetSolrPodName(2))
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
 
 			By("waiting for the scaleDown to finish")
-			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should now have 1 pods, after the replicas have been moved.")
 			})
+			// Once the scale down actually occurs, the clusterOp is not complete. We need to wait till the last pod is deleted
+			clusterOp, err := controllers.GetCurrentClusterOp(statefulSet)
+			Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleDown lock.")
+			Expect(clusterOp.Operation).To(Equal(controllers.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
+			Expect(clusterOp.Metadata).To(Equal("1"), "StatefulSet scaling lock operation has the wrong metadata.")
+
+			// Wait for the last pod to be deleted
+			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Status.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should now have 1 pods, after the replicas have been moved.")
+			})
 			// Once the scale down actually occurs, the statefulSet annotations should already be removed
-			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock after scaling is complete.")
-			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata after scaling is complete.")
+			clusterOp, err = controllers.GetCurrentClusterOp(statefulSet)
+			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+			Expect(clusterOp).To(BeNil(), "StatefulSet should not have a ScaleDown lock after scaling is complete.")
 
 			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(1))
 			queryCollection(ctx, solrCloud, solrCollection1, 0)
@@ -117,9 +147,10 @@
 			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
 
 			By("make sure scaleDown happens without a clusterLock and eventually the replicas are removed")
-			statefulSet := expectStatefulSetWithConsistentChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, statefulSet *appsv1.StatefulSet) {
-				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock while scaling unmanaged.")
-				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata while scaling unmanaged.")
+			statefulSet := expectStatefulSetWithConsistentChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a scaling lock, since scaleDown is unmanaged.")
 			})
 			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should immediately have 1 pod, since the scaleDown is unmanaged.")
 
@@ -166,19 +197,31 @@
 			By("triggering a scale down via solrCloud replicas")
 			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale up")
 
-			By("waiting for the scaleDown of first pod to begin")
+			By("waiting for the scaleUp to begin")
 			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("3"), "StatefulSet scaling lock operation has the wrong metadata.")
+			})
+
+			// The first step is to increase the number of pods
+			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
 			})
-			Expect(statefulSet.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleUpLock), "StatefulSet does not have a scaleUp lock after starting managed scaleUp.")
-			Expect(statefulSet.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "1"), "StatefulSet scaleUp lock operation has the wrong metadata.")
+			clusterOp, err := controllers.GetCurrentClusterOp(statefulSet)
+			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+			Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
+			Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
+			Expect(clusterOp.Metadata).To(Equal("3"), "StatefulSet scaling lock operation has the wrong metadata.")
 
 			By("waiting for the scaleUp to finish")
 			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
-				g.Expect(found.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock after scaling is complete.")
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a scaling lock after scaling is complete.")
 			})
-			// Once the scale down actually occurs, the statefulSet annotations should already be removed
-			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata after scaling is complete.")
 
 			queryCollection(ctx, solrCloud, solrCollection1, 0)
 			queryCollection(ctx, solrCloud, solrCollection2, 0)
@@ -201,8 +244,9 @@
 			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should immediately have 3 pods.")
 			})
-			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock, since scaleUp is unmanaged.")
-			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have a scaling lock metadata, since scaleUp is unmanaged.")
+			clusterOp, err := controllers.GetCurrentClusterOp(statefulSet)
+			Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+			Expect(clusterOp).To(BeNil(), "StatefulSet should not have a scaling lock, since scaleUp is unmanaged.")
 
 			By("Waiting for the new solrCloud pods to become ready")
 			solrCloud = expectSolrCloudToBeReady(ctx, solrCloud)
@@ -212,3 +256,75 @@
 		})
 	})
 })
+
+var _ = FDescribe("E2E - SolrCloud - Scale Down Abandon", func() {
+	var (
+		solrCloud *solrv1beta1.SolrCloud
+
+		solrCollection1 = "e2e-1"
+	)
+
+	BeforeEach(func() {
+		solrCloud = generateBaseSolrCloudWithPlacementPolicy(2, "minimizecores")
+	})
+
+	JustBeforeEach(func(ctx context.Context) {
+		By("creating the SolrCloud")
+		Expect(k8sClient.Create(ctx, solrCloud)).To(Succeed())
+		DeferCleanup(func(ctx context.Context) {
+			cleanupTest(ctx, solrCloud)
+		})
+
+		By("Waiting for the SolrCloud to come up healthy")
+		solrCloud = expectSolrCloudToBeReady(ctx, solrCloud)
+
+		By("creating a first Solr Collection")
+		createAndQueryCollection(ctx, solrCloud, solrCollection1, 1, 2)
+	})
+
+	FContext("with replica migration", func() {
+
+		FIt("Abandons the ScaleDown", func(ctx context.Context) {
+			originalSolrCloud := solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = pointer.Int32(int32(1))
+			By("triggering a scale down via solrCloud replicas")
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale up")
+
+			By("waiting for the scaleDown to begin")
+			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should still have 2 pods, because the scale down should first move Solr replicas")
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("1"), "StatefulSet scaling lock operation has the wrong metadata.")
+			})
+
+			By("Undo the scale down because the replicas cannot fit")
+			originalSolrCloud = solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = pointer.Int32(int32(2))
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to cancel scale down")
+
+			By("Make sure that the operation is changed to a fake 'scaleUp' to redistribute replicas")
+			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
+				g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
+				g.Expect(clusterOp.Metadata).To(Equal("2"), "StatefulSet scaling lock operation has the wrong metadata.")
+			})
+
+			By("waiting for the fake scaleUp to finish")
+			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a scaling lock after scaling is complete.")
+			})
+
+			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "After everything, the statefulset should be configured to have 2 pods")
+			Expect(statefulSet.Status.Replicas).To(HaveValue(BeEquivalentTo(2)), "After everything, the statefulset should have 2 pods running")
+
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
+		})
+	})
+})
diff --git a/tests/e2e/test_utils_test.go b/tests/e2e/test_utils_test.go
index 2568ed8..2330e99 100644
--- a/tests/e2e/test_utils_test.go
+++ b/tests/e2e/test_utils_test.go
@@ -519,3 +519,15 @@
 		},
 	}
 }
+
+func generateBaseSolrCloudWithPlacementPolicy(replicas int, placementPlugin string) *solrv1beta1.SolrCloud {
+	solrCloud := generateBaseSolrCloud(replicas)
+	solrCloud.Spec.CustomSolrKubeOptions.PodOptions.EnvVariables = []corev1.EnvVar{
+		{
+			Name:  "SOLR_PLACEMENTPLUGIN_DEFAULT",
+			Value: placementPlugin,
+		},
+	}
+
+	return solrCloud
+}