Add managed scale-down for SolrClouds (#561)

This also adds cluster-operation locking for SolrClouds, as described here:
https://github.com/apache/solr-operator/issues/560
diff --git a/api/v1beta1/solrcloud_types.go b/api/v1beta1/solrcloud_types.go
index b45a027..209a599 100644
--- a/api/v1beta1/solrcloud_types.go
+++ b/api/v1beta1/solrcloud_types.go
@@ -96,6 +96,10 @@
 	// +optional
 	Availability SolrAvailabilityOptions `json:"availability,omitempty"`
 
+	// Define how Solr nodes should be autoscaled.
+	// +optional
+	Autoscaling SolrAutoscalingOptions `json:"autoscaling,omitempty"`
+
 	// +optional
 	BusyBoxImage *ContainerImage `json:"busyBoxImage,omitempty"`
 
@@ -722,6 +726,13 @@
 	ClusterWidePDB SolrPodDisruptionBudgetMethod = "ClusterWide"
 )
 
+type SolrAutoscalingOptions struct {
+	// VacatePodsOnScaleDown determines whether Solr replicas are moved off of a Pod before the Pod is
+	// deleted due to the SolrCloud scaling down.
+	// +kubebuilder:default=true
+	VacatePodsOnScaleDown *bool `json:"vacatePodsOnScaleDown,omitempty"`
+}
+
 // ZookeeperRef defines the zookeeper ensemble for solr to connect to
 // If no ConnectionString is provided, the solr-cloud controller will create and manage an internal ensemble
 type ZookeeperRef struct {
diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go
index 18555db..cc3d0f4 100644
--- a/api/v1beta1/zz_generated.deepcopy.go
+++ b/api/v1beta1/zz_generated.deepcopy.go
@@ -644,6 +644,26 @@
 }
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *SolrAutoscalingOptions) DeepCopyInto(out *SolrAutoscalingOptions) {
+	*out = *in
+	if in.VacatePodsOnScaleDown != nil {
+		in, out := &in.VacatePodsOnScaleDown, &out.VacatePodsOnScaleDown
+		*out = new(bool)
+		**out = **in
+	}
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SolrAutoscalingOptions.
+func (in *SolrAutoscalingOptions) DeepCopy() *SolrAutoscalingOptions {
+	if in == nil {
+		return nil
+	}
+	out := new(SolrAutoscalingOptions)
+	in.DeepCopyInto(out)
+	return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *SolrAvailabilityOptions) DeepCopyInto(out *SolrAvailabilityOptions) {
 	*out = *in
 	in.PodDisruptionBudget.DeepCopyInto(&out.PodDisruptionBudget)
@@ -902,6 +922,7 @@
 	in.SolrAddressability.DeepCopyInto(&out.SolrAddressability)
 	in.UpdateStrategy.DeepCopyInto(&out.UpdateStrategy)
 	in.Availability.DeepCopyInto(&out.Availability)
+	in.Autoscaling.DeepCopyInto(&out.Autoscaling)
 	if in.BusyBoxImage != nil {
 		in, out := &in.BusyBoxImage, &out.BusyBoxImage
 		*out = new(ContainerImage)
diff --git a/config/crd/bases/solr.apache.org_solrclouds.yaml b/config/crd/bases/solr.apache.org_solrclouds.yaml
index 8d75cd7..cc7aa27 100644
--- a/config/crd/bases/solr.apache.org_solrclouds.yaml
+++ b/config/crd/bases/solr.apache.org_solrclouds.yaml
@@ -89,6 +89,16 @@
                 items:
                   type: string
                 type: array
+              autoscaling:
+                description: Define how Solr nodes should be autoscaled.
+                properties:
+                  vacatePodsOnScaleDown:
+                    default: true
+                    description: VacatePodsOnScaleDown determines whether Solr replicas
+                      are moved off of a Pod before the Pod is deleted due to the
+                      SolrCloud scaling down.
+                    type: boolean
+                type: object
               availability:
                 description: Define how Solr nodes should be available.
                 properties:
diff --git a/controllers/solr_cluster_ops_util.go b/controllers/solr_cluster_ops_util.go
new file mode 100644
index 0000000..9dead7e
--- /dev/null
+++ b/controllers/solr_cluster_ops_util.go
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controllers
+
+import (
+	"context"
+	"errors"
+	solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
+	"github.com/apache/solr-operator/controllers/util"
+	"github.com/apache/solr-operator/controllers/util/solr_api"
+	"github.com/go-logr/logr"
+	appsv1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/utils/pointer"
+	"net/url"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+	"strconv"
+	"time"
+)
+
+func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, podList []corev1.Pod, logger logr.Logger) (clusterOpLock string, clusterOpMetadata string, retryLaterDuration time.Duration, err error) {
+	desiredPods := int(*instance.Spec.Replicas)
+	configuredPods := int(*statefulSet.Spec.Replicas)
+	if desiredPods != configuredPods {
+		scaleTo := -1
+		// Start a scaling operation
+		if desiredPods < configuredPods {
+			// Scale down!
+			// The option is enabled by default, so treat "nil" like "true"
+			if instance.Spec.Autoscaling.VacatePodsOnScaleDown == nil || *instance.Spec.Autoscaling.VacatePodsOnScaleDown {
+				if desiredPods > 0 {
+					// We only support one scaling down one pod at-a-time if not scaling down to 0 pods
+					scaleTo = configuredPods - 1
+				} else {
+					// We do not do a "managed" scale-to-zero operation.
+					// Just scale down unmanaged.
+					err = scaleCloudUnmanaged(ctx, r, statefulSet, 0, logger)
+				}
+			} else {
+				// The cloud is not setup to use managed scale-down
+				err = scaleCloudUnmanaged(ctx, r, statefulSet, desiredPods, logger)
+			}
+		} else if desiredPods > configuredPods {
+			// Scale up!
+			// TODO: replicasScaleUp is not supported, so do not make a clusterOp out of it, just do the patch
+			err = scaleCloudUnmanaged(ctx, r, statefulSet, desiredPods, logger)
+		}
+		if scaleTo > -1 {
+			clusterOpLock = util.ScaleLock
+			clusterOpMetadata = strconv.Itoa(scaleTo)
+		}
+	}
+	return
+}
+
+func handleLockedClusterOpScale(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, podList []corev1.Pod, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
+	if scalingToNodes, hasAnn := statefulSet.Annotations[util.ClusterOpsMetadataAnnotation]; hasAnn {
+		if scalingToNodesInt, convErr := strconv.Atoi(scalingToNodes); convErr != nil {
+			logger.Error(convErr, "Could not convert statefulSet annotation to int for scale-down-to information", "annotation", util.ClusterOpsMetadataAnnotation, "value", scalingToNodes)
+			err = convErr
+		} else {
+			replicaManagementComplete := false
+			if scalingToNodesInt < int(*statefulSet.Spec.Replicas) {
+				// Manage scaling down the SolrCloud
+				replicaManagementComplete, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, scalingToNodesInt, podList, logger)
+				// } else if scalingToNodesInt > int(*statefulSet.Spec.Replicas) {
+				// TODO: Utilize the scaled-up nodes in the future, however Solr does not currently have APIs for this.
+				// TODO: Think about the order of scale-up and restart when individual nodeService IPs are injected into the pods.
+				// TODO: Will likely want to do a scale-up of the service first, then do the rolling restart of the cluster, then utilize the node.
+			} else {
+				// This shouldn't happen. The ScalingToNodesAnnotation is removed when the statefulSet size changes, through a Patch.
+				// But if it does happen, we should just remove the annotation and move forward.
+				patchedStatefulSet := statefulSet.DeepCopy()
+				delete(patchedStatefulSet.Annotations, util.ClusterOpsLockAnnotation)
+				delete(patchedStatefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
+				if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
+					logger.Error(err, "Error while patching StatefulSet to remove unneeded clusterLockOp annotation for scaling to the current amount of nodes")
+				} else {
+					statefulSet = patchedStatefulSet
+				}
+			}
+
+			// Scale the statefulSet to represent the new number of pods, if it is lower than the current number of pods
+			// Also remove the lock annotations, as the cluster operation is done. Other operations can now take place.
+			if replicaManagementComplete {
+				patchedStatefulSet := statefulSet.DeepCopy()
+				patchedStatefulSet.Spec.Replicas = pointer.Int32(int32(scalingToNodesInt))
+				delete(patchedStatefulSet.Annotations, util.ClusterOpsLockAnnotation)
+				delete(patchedStatefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
+				if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
+					logger.Error(err, "Error while patching StatefulSet to scale down SolrCloud", "newUtilizedNodes", scalingToNodesInt)
+				}
+
+				// TODO: Create event for the CRD.
+			} else {
+				// Retry after five minutes to check if the replica management commands have been completed
+				retryLaterDuration = time.Second * 5
+			}
+		}
+		// If everything succeeded, the statefulSet will have an annotation updated
+		// and the reconcile loop will be called again.
+
+		return
+	} else {
+		err = errors.New("no clusterOpMetadata annotation is present in the statefulSet")
+		logger.Error(err, "Cannot perform scaling operation when no scale-to-nodes is provided via the clusterOpMetadata")
+		return time.Second * 10, err
+	}
+}
+
+// handleManagedCloudScaleDown does the logic of a managed and "locked" cloud scale down operation.
+// This will likely take many reconcile loops to complete, as it is moving replicas away from the nodes that will be scaled down.
+func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownTo int, podList []corev1.Pod, logger logr.Logger) (replicaManagementComplete bool, err error) {
+	// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
+	podStoppedReadinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
+		util.SolrIsNotStoppedReadinessCondition: {
+			reason:  ScaleDown,
+			message: "Pod is being deleted, traffic to the pod must be stopped",
+			status:  false,
+		},
+	}
+
+	if scaleDownTo == 0 {
+		// Eventually we might want to delete all collections & data,
+		// the user wants no data left if scaling the solrcloud down to 0.
+		// However, for now we do not offer managed scale down to zero, so this line of code shouldn't even happen.
+		replicaManagementComplete = true
+	} else {
+		// Only evict the last pod, even if we are trying to scale down multiple pods.
+		// Scale down will happen one pod at a time.
+		replicaManagementComplete, err = evictSinglePod(ctx, r, instance, scaleDownTo, podList, podStoppedReadinessConditions, logger)
+	}
+	// TODO: It would be great to support a multi-node scale down when Solr supports evicting many SolrNodes at once.
+
+	return
+}
+
+// scaleCloudUnmanaged does simple scaling of a SolrCloud without moving replicas.
+// This is not a "locked" cluster operation, and does not block other cluster operations from taking place.
+func scaleCloudUnmanaged(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, scaleTo int, logger logr.Logger) (err error) {
+	// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
+	patchedStatefulSet := statefulSet.DeepCopy()
+	patchedStatefulSet.Spec.Replicas = pointer.Int32(int32(scaleTo))
+	if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
+		logger.Error(err, "Error while patching StatefulSet to scale SolrCloud.", "fromNodes", *statefulSet.Spec.Replicas, "toNodes", scaleTo)
+	}
+	return err
+}
+
+// This is currently not used, use in the future if we want to delete all data when scaling down to zero
+func evictAllPods(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, podList []corev1.Pod, readinessConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (podsAreEmpty bool, err error) {
+	// If there are no pods, we can't empty them. Just return true
+	if len(podList) == 0 {
+		return true, nil
+	}
+
+	for i, pod := range podList {
+		if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
+			err = e
+			return
+		} else {
+			podList[i] = *updatedPod
+		}
+	}
+
+	// Delete all collections & data, the user wants no data left if scaling the solrcloud down to 0
+	// This is a much different operation to deleting the SolrCloud/StatefulSet all-together
+	// TODO: Implement delete all collections. Currently just leave the data
+	//if err, podsAreEmpty = util.DeleteAllCollectionsIfNecessary(ctx, instance, "scaleDown", logger); err != nil {
+	//	logger.Error(err, "Error while evicting all collections in SolrCloud, when scaling down SolrCloud to 0 pods")
+	//}
+	podsAreEmpty = true
+
+	return
+}
+
+func evictSinglePod(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, scaleDownTo int, podList []corev1.Pod, readinessConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (podIsEmpty bool, err error) {
+	var pod *corev1.Pod
+	podName := instance.GetSolrPodName(scaleDownTo)
+	for _, p := range podList {
+		if p.Name == podName {
+			pod = &p
+			break
+		}
+	}
+
+	podHasReplicas := true
+	if replicas, e := getReplicasForPod(ctx, instance, podName, logger); e != nil {
+		return false, e
+	} else {
+		podHasReplicas = len(replicas) > 0
+	}
+
+	// The pod doesn't exist, we cannot empty it
+	if pod == nil {
+		return !podHasReplicas, errors.New("Could not find pod " + podName + " when trying to migrate replicas to scale down pod.")
+	}
+
+	if updatedPod, e := EnsurePodReadinessConditions(ctx, r, pod, readinessConditions, logger); e != nil {
+		err = e
+		return
+	} else {
+		pod = updatedPod
+	}
+
+	// Only evict from the pod if it contains replicas in the clusterState
+	if e, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "scaleDown", logger); e != nil {
+		err = e
+		logger.Error(err, "Error while evicting replicas on Pod, when scaling down SolrCloud", "pod", pod.Name)
+	} else if canDeletePod {
+		// The pod previously had replicas, so loop back in the next reconcile to make sure that the pod doesn't
+		// have replicas anymore even if the previous evict command was successful.
+		// If there are still replicas, it will start the eviction process again
+		podIsEmpty = !podHasReplicas
+	}
+
+	return
+}
+
+func getReplicasForPod(ctx context.Context, cloud *solrv1beta1.SolrCloud, podName string, logger logr.Logger) (replicas []string, err error) {
+	clusterResp := &solr_api.SolrClusterStatusResponse{}
+	queryParams := url.Values{}
+	queryParams.Add("action", "CLUSTERSTATUS")
+	err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
+	if err == nil {
+		if hasError, apiErr := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader); hasError {
+			err = apiErr
+		}
+	}
+	podNodeName := util.SolrNodeName(cloud, podName)
+	if err == nil {
+		for _, colState := range clusterResp.ClusterStatus.Collections {
+			for _, shardState := range colState.Shards {
+				for replica, replicaState := range shardState.Replicas {
+					if replicaState.NodeName == podNodeName {
+						replicas = append(replicas, replica)
+					}
+				}
+			}
+		}
+	} else {
+		logger.Error(err, "Error retrieving cluster status, cannot determine if pod has replicas")
+	}
+	return
+}
diff --git a/controllers/solr_pod_lifecycle_util.go b/controllers/solr_pod_lifecycle_util.go
index 0d3666f..203fc13 100644
--- a/controllers/solr_pod_lifecycle_util.go
+++ b/controllers/solr_pod_lifecycle_util.go
@@ -40,10 +40,10 @@
 type PodConditionChangeReason string
 
 const (
-	PodStarted           PodConditionChangeReason = "PodStarted"
-	PodUpdate            PodConditionChangeReason = "PodUpdate"
-	EvictingReplicas     PodConditionChangeReason = "EvictingReplicas"
-	StatefulSetScaleDown PodConditionChangeReason = "StatefulSetScaleDown"
+	PodStarted       PodConditionChangeReason = "PodStarted"
+	PodUpdate        PodConditionChangeReason = "PodUpdate"
+	EvictingReplicas PodConditionChangeReason = "EvictingReplicas"
+	ScaleDown        PodConditionChangeReason = "ScaleDown"
 )
 
 func DeletePodForUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, pod *corev1.Pod, podHasReplicas bool, logger logr.Logger) (requeueAfterDuration time.Duration, err error) {
@@ -73,14 +73,20 @@
 
 	// If the pod needs to be drained of replicas (i.e. upgrading a pod with ephemeral storage), do that before deleting the pod
 	deletePod := false
-	// TODO: After v0.7.0 release, can remove "podHasReplicas ||", as this is no longer needed
-	if podHasReplicas || PodConditionEquals(pod, util.SolrReplicasNotEvictedReadinessCondition, EvictingReplicas) {
+	if PodConditionEquals(pod, util.SolrReplicasNotEvictedReadinessCondition, EvictingReplicas) {
 		// Only evict pods that contain replicas in the clusterState
-		if evictError, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, logger); evictError != nil {
+		if evictError, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "podUpdate", logger); evictError != nil {
 			err = evictError
 			logger.Error(err, "Error while evicting replicas on pod", "pod", pod.Name)
 		} else if canDeletePod {
-			deletePod = true
+			if podHasReplicas {
+				// The pod previously had replicas, so loop back in the next reconcile to make sure that the pod doesn't
+				// have replicas anymore even if the previous evict command was successful.
+				// If there are still replicas, it will start the eviction process again
+				requeueAfterDuration = time.Millisecond * 10
+			} else {
+				deletePod = true
+			}
 		} else {
 			// Try again in 5 seconds if we cannot delete a pod.
 			requeueAfterDuration = time.Second * 5
@@ -254,3 +260,14 @@
 
 	return false
 }
+
+// PodConditionEquals check if a podCondition equals what is expected
+func PodConditionHasStatus(pod *corev1.Pod, conditionType corev1.PodConditionType, status corev1.ConditionStatus) bool {
+	for _, condition := range pod.Status.Conditions {
+		if condition.Type == conditionType {
+			return status == condition.Status
+		}
+	}
+
+	return false
+}
diff --git a/controllers/solrcloud_controller.go b/controllers/solrcloud_controller.go
index e1ac4e3..47a125c 100644
--- a/controllers/solrcloud_controller.go
+++ b/controllers/solrcloud_controller.go
@@ -313,29 +313,59 @@
 		}
 	}
 
-	pvcLabelSelector := make(map[string]string, 0)
-	var statefulSetStatus appsv1.StatefulSetStatus
+	extAddressabilityOpts := instance.Spec.SolrAddressability.External
+	if extAddressabilityOpts != nil && extAddressabilityOpts.Method == solrv1beta1.Ingress {
+		// Generate Ingress
+		ingress := util.GenerateIngress(instance, solrNodeNames)
+
+		// Check if the Ingress already exists
+		ingressLogger := logger.WithValues("ingress", ingress.Name)
+		foundIngress := &netv1.Ingress{}
+		err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, foundIngress)
+		if err != nil && errors.IsNotFound(err) {
+			ingressLogger.Info("Creating Ingress")
+			if err = controllerutil.SetControllerReference(instance, ingress, r.Scheme); err == nil {
+				err = r.Create(ctx, ingress)
+			}
+		} else if err == nil {
+			var needsUpdate bool
+			needsUpdate, err = util.OvertakeControllerRef(instance, foundIngress, r.Scheme)
+			needsUpdate = util.CopyIngressFields(ingress, foundIngress, ingressLogger) || needsUpdate
+
+			// Update the found Ingress and write the result back if there are any changes
+			if needsUpdate && err == nil {
+				ingressLogger.Info("Updating Ingress")
+				err = r.Update(ctx, foundIngress)
+			}
+		}
+		if err != nil {
+			return requeueOrNot, err
+		}
+	}
+
+	var statefulSet *appsv1.StatefulSet
 
 	if !blockReconciliationOfStatefulSet {
-		// Generate StatefulSet
-		statefulSet := util.GenerateStatefulSet(instance, &newStatus, hostNameIpMap, reconcileConfigInfo, tls, security)
+		// Generate StatefulSet that should exist
+		expectedStatefulSet := util.GenerateStatefulSet(instance, &newStatus, hostNameIpMap, reconcileConfigInfo, tls, security)
 
 		// Check if the StatefulSet already exists
-		statefulSetLogger := logger.WithValues("statefulSet", statefulSet.Name)
+		statefulSetLogger := logger.WithValues("statefulSet", expectedStatefulSet.Name)
 		foundStatefulSet := &appsv1.StatefulSet{}
-		err = r.Get(ctx, types.NamespacedName{Name: statefulSet.Name, Namespace: statefulSet.Namespace}, foundStatefulSet)
+		err = r.Get(ctx, types.NamespacedName{Name: expectedStatefulSet.Name, Namespace: expectedStatefulSet.Namespace}, foundStatefulSet)
 
+		// TODO: Move this logic down to the cluster ops and save the existing annotation in util.MaintainPreservedStatefulSetFields()
 		// Set the annotation for a scheduled restart, if necessary.
 		if nextRestartAnnotation, reconcileWaitDuration, schedulingErr := util.ScheduleNextRestart(instance.Spec.UpdateStrategy.RestartSchedule, foundStatefulSet.Spec.Template.Annotations); schedulingErr != nil {
 			logger.Error(schedulingErr, "Cannot parse restartSchedule cron", "cron", instance.Spec.UpdateStrategy.RestartSchedule)
 		} else {
 			if nextRestartAnnotation != "" {
 				// Set the new restart time annotation
-				statefulSet.Spec.Template.Annotations[util.SolrScheduledRestartAnnotation] = nextRestartAnnotation
+				expectedStatefulSet.Spec.Template.Annotations[util.SolrScheduledRestartAnnotation] = nextRestartAnnotation
 				// TODO: Create event for the CRD.
 			} else if existingRestartAnnotation, exists := foundStatefulSet.Spec.Template.Annotations[util.SolrScheduledRestartAnnotation]; exists {
 				// Keep the existing nextRestart annotation if it exists and we aren't setting a new one.
-				statefulSet.Spec.Template.Annotations[util.SolrScheduledRestartAnnotation] = existingRestartAnnotation
+				expectedStatefulSet.Spec.Template.Annotations[util.SolrScheduledRestartAnnotation] = existingRestartAnnotation
 			}
 			if reconcileWaitDuration != nil {
 				// Set the requeueAfter if it has not been set, or is greater than the time we need to wait to restart again
@@ -346,48 +376,52 @@
 		// Update or Create the StatefulSet
 		if err != nil && errors.IsNotFound(err) {
 			statefulSetLogger.Info("Creating StatefulSet")
-			if err = controllerutil.SetControllerReference(instance, statefulSet, r.Scheme); err == nil {
-				err = r.Create(ctx, statefulSet)
+			if err = controllerutil.SetControllerReference(instance, expectedStatefulSet, r.Scheme); err == nil {
+				err = r.Create(ctx, expectedStatefulSet)
 			}
-			// Find which labels the PVCs will be using, to use for the finalizer
-			pvcLabelSelector = statefulSet.Spec.Selector.MatchLabels
+			statefulSet = expectedStatefulSet
 		} else if err == nil {
-			statefulSetStatus = foundStatefulSet.Status
-			// Find which labels the PVCs will be using, to use for the finalizer
-			pvcLabelSelector = foundStatefulSet.Spec.Selector.MatchLabels
+			util.MaintainPreservedStatefulSetFields(expectedStatefulSet, foundStatefulSet)
 
 			// Check to see if the StatefulSet needs an update
 			var needsUpdate bool
 			needsUpdate, err = util.OvertakeControllerRef(instance, foundStatefulSet, r.Scheme)
-			needsUpdate = util.CopyStatefulSetFields(statefulSet, foundStatefulSet, statefulSetLogger) || needsUpdate
+			needsUpdate = util.CopyStatefulSetFields(expectedStatefulSet, foundStatefulSet, statefulSetLogger) || needsUpdate
 
 			// Update the found StatefulSet and write the result back if there are any changes
 			if needsUpdate && err == nil {
 				statefulSetLogger.Info("Updating StatefulSet")
 				err = r.Update(ctx, foundStatefulSet)
 			}
+			statefulSet = foundStatefulSet
 		}
 		if err != nil {
 			return requeueOrNot, err
 		}
 	} else {
 		// If we are blocking the reconciliation of the statefulSet, we still want to find information about it.
-		foundStatefulSet := &appsv1.StatefulSet{}
-		err = r.Get(ctx, types.NamespacedName{Name: instance.StatefulSetName(), Namespace: instance.Namespace}, foundStatefulSet)
-		if err == nil {
-			// Find the status
-			statefulSetStatus = foundStatefulSet.Status
-			// Find which labels the PVCs will be using, to use for the finalizer
-			pvcLabelSelector = foundStatefulSet.Spec.Selector.MatchLabels
-		} else if !errors.IsNotFound(err) {
-			return requeueOrNot, err
+		err = r.Get(ctx, types.NamespacedName{Name: instance.StatefulSetName(), Namespace: instance.Namespace}, statefulSet)
+		if err != nil {
+			if !errors.IsNotFound(err) {
+				return requeueOrNot, err
+			} else {
+				statefulSet = nil
+			}
 		}
 	}
 
+	// *********************************************************
+	// The operations after this require a statefulSet to exist,
+	// including updating the solrCloud status
+	// *********************************************************
+	if statefulSet == nil {
+		return requeueOrNot, err
+	}
+
 	// Do not reconcile the storage finalizer unless we have PVC Labels that we know the Solr data PVCs are using.
 	// Otherwise it will delete all PVCs possibly
-	if len(pvcLabelSelector) > 0 {
-		if err := r.reconcileStorageFinalizer(ctx, instance, pvcLabelSelector, logger); err != nil {
+	if len(statefulSet.Spec.Selector.MatchLabels) > 0 {
+		if err := r.reconcileStorageFinalizer(ctx, instance, statefulSet.Spec.Selector.MatchLabels, logger); err != nil {
 			logger.Error(err, "Cannot delete PVCs while garbage collecting after deletion.")
 			updateRequeueAfter(&requeueOrNot, time.Second*15)
 		}
@@ -403,13 +437,54 @@
 	// Make sure the SolrCloud status is up-to-date with the state of the cluster
 	var outOfDatePods util.OutOfDatePodSegmentation
 	var availableUpdatedPodCount int
-	outOfDatePods, availableUpdatedPodCount, err = createCloudStatus(instance, &newStatus, statefulSetStatus, podSelector, podList)
+	outOfDatePods, availableUpdatedPodCount, err = createCloudStatus(instance, &newStatus, statefulSet.Status, podSelector, podList)
 	if err != nil {
 		return requeueOrNot, err
 	}
 
-	// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
+	// We only want to do one cluster operation at a time, so we use a lock to ensure that.
+	// Update or a Scale at a time. We do not want to do both.
+
+	var retryLaterDuration time.Duration
+	if clusterOpLock, hasAnn := statefulSet.Annotations[util.ClusterOpsLockAnnotation]; hasAnn {
+		switch clusterOpLock {
+		case util.ScaleLock:
+			retryLaterDuration, err = handleLockedClusterOpScale(ctx, r, instance, statefulSet, podList, logger)
+		}
+	} else {
+		// Start cluster operations if needed.
+		// The operations will be actually run in future reconcile loops, but a clusterOpLock will be acquired here.
+		// And that lock will tell future reconcile loops that the operation needs to be done.
+		clusterOpLock = ""
+		clusterOpMetadata := ""
+		// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
+		// a "locked" cluster operation
+		clusterOpLock, clusterOpMetadata, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, podList, logger)
+
+		if retryLaterDuration <= 0 && err == nil && clusterOpLock != "" {
+			patchedStatefulSet := statefulSet.DeepCopy()
+			patchedStatefulSet.Annotations[util.ClusterOpsLockAnnotation] = clusterOpLock
+			patchedStatefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = clusterOpMetadata
+			if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
+				logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", clusterOpLock, "clusterOpMetadata", clusterOpMetadata)
+			} else {
+				statefulSet = patchedStatefulSet
+			}
+		}
+	}
+	if err != nil && retryLaterDuration == 0 {
+		retryLaterDuration = time.Second * 5
+	}
+	if retryLaterDuration > 0 {
+		updateRequeueAfter(&requeueOrNot, retryLaterDuration)
+	}
+	if err != nil {
+		return requeueOrNot, err
+	}
+
+	// TODO: Move this logic into the ClusterOpLock, with the "rollingUpdate" lock
 	if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && len(outOfDatePods.NotStarted)+len(outOfDatePods.ScheduledForDeletion)+len(outOfDatePods.Running) > 0 {
+		// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
 		updateLogger := logger.WithName("ManagedUpdateSelector")
 
 		// The out of date pods that have not been started, should all be updated immediately.
@@ -439,7 +514,6 @@
 			podsToUpdate = append(podsToUpdate, additionalPodsToUpdate...)
 		}
 
-		var retryLaterDuration time.Duration
 		// Only actually delete a running pod if it has been evicted, or doesn't need eviction (persistent storage)
 		for _, pod := range podsToUpdate {
 			retryLaterDurationTemp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, podsHaveReplicas[pod.Name], updateLogger)
@@ -465,7 +539,7 @@
 	}
 
 	// Upsert or delete solrcloud-wide PodDisruptionBudget(s) based on 'Enabled' flag.
-	pdb := util.GeneratePodDisruptionBudget(instance, pvcLabelSelector)
+	pdb := util.GeneratePodDisruptionBudget(instance, statefulSet.Spec.Selector.MatchLabels)
 	if instance.Spec.Availability.PodDisruptionBudget.Enabled != nil && *instance.Spec.Availability.PodDisruptionBudget.Enabled {
 		// Check if the PodDistruptionBudget already exists
 		pdbLogger := logger.WithValues("podDisruptionBudget", pdb.Name)
@@ -497,36 +571,6 @@
 		}
 	}
 
-	extAddressabilityOpts := instance.Spec.SolrAddressability.External
-	if extAddressabilityOpts != nil && extAddressabilityOpts.Method == solrv1beta1.Ingress {
-		// Generate Ingress
-		ingress := util.GenerateIngress(instance, solrNodeNames)
-
-		// Check if the Ingress already exists
-		ingressLogger := logger.WithValues("ingress", ingress.Name)
-		foundIngress := &netv1.Ingress{}
-		err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, foundIngress)
-		if err != nil && errors.IsNotFound(err) {
-			ingressLogger.Info("Creating Ingress")
-			if err = controllerutil.SetControllerReference(instance, ingress, r.Scheme); err == nil {
-				err = r.Create(ctx, ingress)
-			}
-		} else if err == nil {
-			var needsUpdate bool
-			needsUpdate, err = util.OvertakeControllerRef(instance, foundIngress, r.Scheme)
-			needsUpdate = util.CopyIngressFields(ingress, foundIngress, ingressLogger) || needsUpdate
-
-			// Update the found Ingress and write the result back if there are any changes
-			if needsUpdate && err == nil {
-				ingressLogger.Info("Updating Ingress")
-				err = r.Update(ctx, foundIngress)
-			}
-		}
-		if err != nil {
-			return requeueOrNot, err
-		}
-	}
-
 	if !reflect.DeepEqual(instance.Status, newStatus) {
 		logger.Info("Updating SolrCloud Status", "status", newStatus)
 		oldInstance := instance.DeepCopy()
diff --git a/controllers/util/solr_update_util.go b/controllers/util/solr_update_util.go
index 715fe38..6d37092 100644
--- a/controllers/util/solr_update_util.go
+++ b/controllers/util/solr_update_util.go
@@ -503,82 +503,65 @@
 }
 
 // EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod, if the Pod is using ephemeral storage.
-// If the pod is using persistent storage, this function is a no-op.
-// This function MUST be idempotent and return the same list of pods given the same kubernetes/solr state.
-func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, podHasReplicas bool, logger logr.Logger) (err error, canDeletePod bool) {
-	var solrDataVolume *corev1.Volume
+// For updates this will only be called for pods using ephemeral data.
+// For scale-down operations, this can be called for pods using ephemeral or persistent data.
+func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, podHasReplicas bool, evictionReason string, logger logr.Logger) (err error, canDeletePod bool) {
+	logger = logger.WithValues("evictionReason", evictionReason)
+	// If the Cloud has 1 or zero pods, and this is the "-0" pod, then delete the data since we can't move it anywhere else
+	// Otherwise, move the replicas to other pods
+	if (solrCloud.Spec.Replicas == nil || *solrCloud.Spec.Replicas < 2) && strings.HasSuffix(pod.Name, "-0") {
+		queryParams := url.Values{}
+		queryParams.Add("action", "DELETENODE")
+		queryParams.Add("node", SolrNodeName(solrCloud, pod.Name))
+		// TODO: Figure out a way to do this, since DeleteNode will not delete the last replica of every type...
+		canDeletePod = true
+	} else {
+		requestId := "move-replicas-" + pod.Name
 
-	// TODO: Remove these checks after v0.7.0, since it will be taken care by the evictReplicas podReadinessCondition
-	dataVolumeName := solrCloud.DataVolumeName()
-	for _, volume := range pod.Spec.Volumes {
-		if volume.Name == dataVolumeName {
-			solrDataVolume = &volume
-			break
-		}
-	}
-
-	// Only evict if the Data volume is not persistent
-	if solrDataVolume != nil && solrDataVolume.VolumeSource.PersistentVolumeClaim == nil {
-		// If the Cloud has 1 or zero pods, and this is the "-0" pod, then delete the data since we can't move it anywhere else
-		// Otherwise, move the replicas to other pods
-		if (solrCloud.Spec.Replicas == nil || *solrCloud.Spec.Replicas < 2) && strings.HasSuffix(pod.Name, "-0") {
-			queryParams := url.Values{}
-			queryParams.Add("action", "DELETENODE")
-			queryParams.Add("node", SolrNodeName(solrCloud, pod.Name))
-			// TODO: Figure out a way to do this, since DeleteNode will not delete the last replica of every type...
-			canDeletePod = true
-		} else {
-			requestId := "move-replicas-" + pod.Name
-
-			// First check to see if the Async Replace request has started
-			if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
-				err = asyncErr
-			} else if asyncState == "notfound" {
-				if podHasReplicas {
-					// Submit new Replace Node request
-					replaceResponse := &solr_api.SolrAsyncResponse{}
-					queryParams := url.Values{}
-					queryParams.Add("action", "REPLACENODE")
-					queryParams.Add("parallel", "true")
-					queryParams.Add("sourceNode", SolrNodeName(solrCloud, pod.Name))
-					queryParams.Add("async", requestId)
-					err = solr_api.CallCollectionsApi(ctx, solrCloud, queryParams, replaceResponse)
-					if hasError, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader); hasError {
-						err = apiErr
-					}
-					if err == nil {
-						logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
-					} else {
-						logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again later.", "requestId", requestId, "message", message)
-					}
+		// First check to see if the Async Replace request has started
+		if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
+			err = asyncErr
+		} else if asyncState == "notfound" {
+			if podHasReplicas {
+				// Submit new Replace Node request
+				replaceResponse := &solr_api.SolrAsyncResponse{}
+				queryParams := url.Values{}
+				queryParams.Add("action", "REPLACENODE")
+				queryParams.Add("parallel", "true")
+				queryParams.Add("sourceNode", SolrNodeName(solrCloud, pod.Name))
+				queryParams.Add("async", requestId)
+				err = solr_api.CallCollectionsApi(ctx, solrCloud, queryParams, replaceResponse)
+				if hasError, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader); hasError {
+					err = apiErr
+				}
+				if err == nil {
+					logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
 				} else {
-					canDeletePod = true
+					logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again later.", "requestId", requestId, "message", message)
 				}
-
 			} else {
-				logger.Info("Found async status", "requestId", requestId, "state", asyncState)
-				// Only continue to delete the pod if the ReplaceNode request is complete and successful
-				if asyncState == "completed" {
-					canDeletePod = true
-					logger.Info("Migration of all replicas off of pod before deletion complete. Pod can now be deleted.", "pod", pod.Name)
-				} else if asyncState == "failed" {
-					logger.Info("Migration of all replicas off of pod before deletion failed. Will try again.", "pod", pod.Name, "message", message)
-				}
+				canDeletePod = true
+			}
 
-				// Delete the async request Id if the async request is successful or failed.
-				// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
-				if asyncState == "completed" || asyncState == "failed" {
-					if message, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
-						logger.Error(err, "Could not delete Async request status.", "requestId", requestId, "message", message)
-					} else {
-						canDeletePod = false
-					}
+		} else {
+			logger.Info("Found async status", "requestId", requestId, "state", asyncState)
+			// Only continue to delete the pod if the ReplaceNode request is complete and successful
+			if asyncState == "completed" {
+				canDeletePod = true
+				logger.Info("Migration of all replicas off of pod before deletion complete. Pod can now be deleted.", "pod", pod.Name)
+			} else if asyncState == "failed" {
+				logger.Info("Migration of all replicas off of pod before deletion failed. Will try again.", "pod", pod.Name, "message", message)
+			}
+
+			// Delete the async request Id if the async request is successful or failed.
+			// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
+			if asyncState == "completed" || asyncState == "failed" {
+				if message, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
+					logger.Error(err, "Could not delete Async request status.", "requestId", requestId, "message", message)
+					canDeletePod = false
 				}
 			}
 		}
-	} else {
-		// The pod can be deleted, since it is using persistent data storage
-		canDeletePod = true
 	}
 	return err, canDeletePod
 }
diff --git a/controllers/util/solr_util.go b/controllers/util/solr_util.go
index 74a3588..1c44776 100644
--- a/controllers/util/solr_util.go
+++ b/controllers/util/solr_util.go
@@ -51,6 +51,13 @@
 	LogXmlMd5Annotation              = "solr.apache.org/logXmlMd5"
 	LogXmlFile                       = "log4j2.xml"
 
+	// Protected StatefulSet annotations
+	// These are to be saved on a statefulSet update
+	ClusterOpsLockAnnotation     = "solr.apache.org/clusterOpsLock"
+	ScaleLock                    = "scaling"
+	UpdateLock                   = "rollingUpdate"
+	ClusterOpsMetadataAnnotation = "solr.apache.org/clusterOpsMetadata"
+
 	SolrIsNotStoppedReadinessCondition       = "solr.apache.org/isNotStopped"
 	SolrReplicasNotEvictedReadinessCondition = "solr.apache.org/replicasNotEvicted"
 
@@ -608,6 +615,22 @@
 	return stateful
 }
 
+// MaintainPreservedStatefulSetFields makes sure that certain fields in the SolrCloud statefulSet are preserved
+// across updates to the statefulSet. The code that generates an "idempotent" statefulSet might not have the information
+// that was used when these values were populated, so they must be saved when the new "expected" statefulSet overwrites
+// all the information on the new "found" statefulSet.
+func MaintainPreservedStatefulSetFields(expected, found *appsv1.StatefulSet) {
+	// Cluster Operations are saved in the annotations of the SolrCloud StatefulSet.
+	// ClusterOps information is saved to the statefulSet independently of the general StatefulSet update.
+	// These annotations can also not be overridden set by the user.
+	expected.Annotations[ClusterOpsLockAnnotation] = found.Annotations[ClusterOpsLockAnnotation]
+	expected.Annotations[ClusterOpsMetadataAnnotation] = found.Annotations[ClusterOpsMetadataAnnotation]
+
+	// Scaling (i.e. changing) the number of replicas in the SolrCloud statefulSet is handled during the clusterOps
+	// section of the SolrCloud reconcile loop
+	expected.Spec.Replicas = found.Spec.Replicas
+}
+
 func generateSolrSetupInitContainers(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCloudStatus, solrDataVolumeName string, security *SecurityConfig) (containers []corev1.Container) {
 	// The setup of the solr.xml will always be necessary
 	volumeMounts := []corev1.VolumeMount{
diff --git a/docs/solr-cloud/README.md b/docs/solr-cloud/README.md
index 3b430b1..ead8009 100644
--- a/docs/solr-cloud/README.md
+++ b/docs/solr-cloud/README.md
@@ -17,9 +17,12 @@
 
 # Solr Clouds
 
-The Solr Operator supports creating and managing Solr Clouds.
+Child Pages:
+- [All SolrCloud CRD Options](solr-cloud-crd.md) - Please refer here when trying to find new SolrCloud options/features.
+- [Managed Updates](managed-updates.md)
+- [Autoscaling](autoscaling.md)
 
-To find how to configure the SolrCloud best for your use case, please refer to the [documentation on available SolrCloud CRD options](solr-cloud-crd.md).
+The Solr Operator supports creating and managing Solr Clouds.
 
 This page outlines how to create, update and delete a SolrCloud in Kubernetes.
 
@@ -76,6 +79,8 @@
 kubectl scale --replicas=5 solrcloud/example
 ```
 
+For more information on SolrCloud scaling, refer to the [autoscaling page](autoscaling.md).
+
 After issuing the scale command, start hitting the "Refresh" button in the Admin UI.
 You will see how the new Solr nodes are added.
 You can also watch the status via the `kubectl get solrclouds` command:
diff --git a/docs/solr-cloud/autoscaling.md b/docs/solr-cloud/autoscaling.md
new file mode 100644
index 0000000..64732a4
--- /dev/null
+++ b/docs/solr-cloud/autoscaling.md
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+ -->
+
+# SolrCloud Scaling
+_Since v0.8.0_
+
+Solr Clouds are complex distributed systems, and thus require additional help when trying to scale up or down.
+
+Scaling/Autoscaling can mean different things in different situations, and this is true even within the `SolrCloud.spec.autoscaling` section.
+- Replicas can be moved when new nodes are added or when nodes need to be taken down
+- Nodes can be added/removed if more or less resources are desired.
+
+The following sections describes all the features that the Solr Operator currently supports to aid in scaling & autoscaling SolrClouds.
+
+## Configuration
+
+The `autoscaling` section in the SolrCloud CRD can be configured in the following ways
+
+```yaml
+spec:
+  autoscaling:
+    vacatePodsOnScaleDown: true # Default: true
+```
+
+## Replica Movement
+
+Solr can be scaled up & down either manually or by `HorizontalPodAutoscaler`'s, however no matter how the `SolrCloud.Spec.Replicas` value
+changes, the Solr Operator must implement this change the same way.
+
+For now Replicas are not scaled up and down themselves, they are just moved to utilize new Solr pods or vacate soon-to-be-deleted Solr pods.
+
+### Solr Pod Scale-Down
+
+When the desired number of Solr Pods that should be run `SolrCloud.Spec.Replicas` is decreased,
+the `SolrCloud.spec.autoscaling.vacatePodsOnScaleDown` option determines whether the Solr Operator should move replicas
+off of the pods that are about to be deleted.
+
+When a StatefulSet, which the Solr Operator uses to run Solr pods, has its size decreased by `x` pods, it's the last
+`x` pods that are deleted. So if a StatefulSet `tmp` has size 4, it will have pods `tmp-0`, `tmp-1`, `tmp-2` and `tmp-3`.
+If that `tmp` then is scaled down to size 2, then pod `tmp-3` will be deleted first, followed by `tmp-2` because they are `tmp`'s last pods numerically.
+
+If Solr has replicas placed on the pods that will be deleted as a part of the scale-down, then it has a problem.
+Solr will expect that these replicas will eventually come back online, because they are a part of the clusterState.
+The Solr Operator can update the cluster state to handle the scale-down operation by using Solr APIs
+to move replicas off of the soon-to-be-deleted pods.
+
+If `autoscaling.vacatePodsOnScaleDown` option is not enabled, then whenever the `SolrCloud.Spec.Replicas` is decreased,
+that change will be reflected in the StatefulSet immediately.
+Pods will be deleted even if replicas live on those pods.
+
+If `autoscaling.vacatePodsOnScaleDown` option is enabled, which it is by default, then the following steps occur:
+1. Acquire a cluster-ops lock on the SolrCloud. (This means other cluster operations, such as a rolling restart, cannot occur during the scale down operation)
+1. Scale down the last pod.
+   1. Mark the pod as "notReady" so that traffic is diverted away from this pod (for requests to the common endpoint, requests that target that node directly will not be affected).
+   1. Check to see if the last pod has any replicas.
+   1. If so, start an asynchronous command to remove replicas from this pod.
+   1. Check if the async command completed, if not then loop back until the command is finished.
+   1. If the command succeeded, continue, if not go back to step #2.3.
+   1. Scale down the StatefulSet by 1. This will delete the pod that was just vacated.
+1. If the StatefulSet size == the desired SolrCloud size, continue, otherwise go back to step #2.
+1. Give up the cluster-ops lock on the SolrCloud. The scale-down operation is complete.
+
+Because of the available Solr APIs, the statefulSet can only be scaled down 1 pod at-a-time,
+this is why the Scale down step is repeated until the statefulSet size reaches the desired size.
+
+#### Scale to Zero
+
+If the `SolrCloud.spec.replicas` is set to 0, then the SolrCloud will set the statefulSet replicas to 0 without moving or deleting replicas.
+
+The data will be saved in PVCs if the SolrCloud is set to use persistent storage, and `dataStorage.persistent.reclaimPolicy` is set to `Retain`.
+If the `reclaimPolicy` is set to `Delete`, these PVCs will be deleted when the pods are scaled down.
diff --git a/docs/solr-cloud/solr-cloud-crd.md b/docs/solr-cloud/solr-cloud-crd.md
index 5596cce..634a497 100644
--- a/docs/solr-cloud/solr-cloud-crd.md
+++ b/docs/solr-cloud/solr-cloud-crd.md
@@ -258,6 +258,17 @@
 This means that even if Solr sets the ACLs on znodes, they will not be enforced by Zookeeper. If your organization requires Solr to use ZK ACLs, then you'll need to 
 deploy Zookeeper to Kubernetes using another approach, such as using a Helm chart. 
 
+## Autoscaling
+_Since v0.8.0_
+
+```yaml
+spec:
+  autoscaling:
+    vacatePodsOnScaleDown: true
+```
+
+Please refer to the [Autoscaling page](autoscaling.md) for more information.
+
 ## Override Built-in Solr Configuration Files
 _Since v0.2.7_
 
diff --git a/docs/upgrade-notes.md b/docs/upgrade-notes.md
index 35e4fae..6d3cabf 100644
--- a/docs/upgrade-notes.md
+++ b/docs/upgrade-notes.md
@@ -109,6 +109,12 @@
 
 ## Upgrade Warnings and Notes
 
+### v0.8.0
+- The new `SolrCloud.spec.autoscaling.vacatePodsOnScaleDown` option is enabled by default.
+  This means that any SolrCloud that has its `spec.replicas` decreased will have the replicas migrated off of the soon-to-be-deleted pods by default.
+  Set this value to `false` to retain the previous functionality.
+  More information can be found in the [Solr Pod Scale-Down](solr-cloud/autoscaling.md#solr-pod-scale-down) documentation.
+
 ### v0.7.0
 - **Kubernetes support is now limited to 1.21+.**  
   If you are unable to use a newer version of Kubernetes, please install the `v0.6.0` version of the Solr Operator for use with Kubernetes `1.20` and below.
diff --git a/helm/solr-operator/Chart.yaml b/helm/solr-operator/Chart.yaml
index 74efaa2..35893b5 100644
--- a/helm/solr-operator/Chart.yaml
+++ b/helm/solr-operator/Chart.yaml
@@ -59,6 +59,15 @@
       links:
         - name: Github PR
           url: https://github.com/apache/solr-operator/pull/566
+    - kind: added
+      description: Replica migration is now managed on scale down of Solr Nodes, by default.
+      links:
+        - name: Github Issue
+          url: https://github.com/apache/solr-operator/issues/559
+        - name: Github PR
+          url: https://github.com/apache/solr-operator/pull/561
+        - name: Documentation
+          url: https://apache.github.io/solr-operator/docs/solr-cloud/autoscaling.html
   artifacthub.io/images: |
     - name: solr-operator
       image: apache/solr-operator:v0.8.0-prerelease
diff --git a/helm/solr-operator/crds/crds.yaml b/helm/solr-operator/crds/crds.yaml
index 8c70121..922545a 100644
--- a/helm/solr-operator/crds/crds.yaml
+++ b/helm/solr-operator/crds/crds.yaml
@@ -338,6 +338,16 @@
                 items:
                   type: string
                 type: array
+              autoscaling:
+                description: Define how Solr nodes should be autoscaled.
+                properties:
+                  vacatePodsOnScaleDown:
+                    default: true
+                    description: VacatePodsOnScaleDown determines whether Solr replicas
+                      are moved off of a Pod before the Pod is deleted due to the
+                      SolrCloud scaling down.
+                    type: boolean
+                type: object
               availability:
                 description: Define how Solr nodes should be available.
                 properties:
diff --git a/helm/solr/README.md b/helm/solr/README.md
index 0fa1663..a94fd21 100644
--- a/helm/solr/README.md
+++ b/helm/solr/README.md
@@ -112,6 +112,8 @@
 | serviceAccount.create | boolean | `false` | Create a serviceAccount to be used for all pods being deployed (Solr & ZK). If `serviceAccount.name` is not specified, the full name of the deployment will be used. |
 | serviceAccount.name | string |  | The optional default service account used for Solr and ZK unless overridden below. If `serviceAccount.create` is set to `false`, this serviceAccount must exist in the target namespace. |
 | backupRepositories | []object | | A list of BackupRepositories to connect your SolrCloud to. Visit the [SolrBackup docs](https://apache.github.io/solr-operator/docs/solr-backup) or run `kubectl explain solrcloud.spec.backupRepositories` to see the available options. |
+| autoscaling.vacatePodsOnScaleDown | boolean | `true` | While scaling down the SolrCloud, move replicas off of Solr Pods before they are deleted. This only affects pods that will not exist after the scaleDown operation.  |
+
 
 ### Data Storage Options
 
diff --git a/helm/solr/templates/solrcloud.yaml b/helm/solr/templates/solrcloud.yaml
index ebc03e3..c20d34c 100644
--- a/helm/solr/templates/solrcloud.yaml
+++ b/helm/solr/templates/solrcloud.yaml
@@ -114,6 +114,11 @@
       {{- toYaml .Values.availability.podDisruptionBudget | nindent 6 }}
   {{- end }}
 
+  {{- if .Values.autoscaling }}
+  autoscaling:
+    vacatePodsOnScaleDown: {{ .Values.autoscaling.vacatePodsOnScaleDown }}
+  {{- end }}
+
   {{- if .Values.dataStorage }}
   dataStorage:
     {{- if eq .Values.dataStorage.type "persistent" }}
diff --git a/helm/solr/values.yaml b/helm/solr/values.yaml
index 9fd103d..8c7501e 100644
--- a/helm/solr/values.yaml
+++ b/helm/solr/values.yaml
@@ -143,6 +143,10 @@
     enabled: true
     method: ClusterWide
 
+# Various settings to control autoscaling of Solr pods and replicas
+autoscaling:
+  vacatePodsOnScaleDown: true
+
 # A list of BackupRepositories to connect your SolrCloud to
 # See either for more information:
 # - https://apache.github.io/solr-operator/docs/solr-backup
diff --git a/tests/e2e/solrcloud_scaling_test.go b/tests/e2e/solrcloud_scaling_test.go
new file mode 100644
index 0000000..bb07fa6
--- /dev/null
+++ b/tests/e2e/solrcloud_scaling_test.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package e2e
+
+import (
+	"context"
+	solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
+	"github.com/apache/solr-operator/controllers/util"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+	appsv1 "k8s.io/api/apps/v1"
+	"k8s.io/utils/pointer"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+var _ = FDescribe("E2E - SolrCloud - Scaling", func() {
+	var (
+		solrCloud *solrv1beta1.SolrCloud
+
+		solrCollection1 = "e2e-1"
+		solrCollection2 = "e2e-2"
+	)
+
+	BeforeEach(func() {
+		solrCloud = generateBaseSolrCloud(3)
+	})
+
+	JustBeforeEach(func(ctx context.Context) {
+		By("creating the SolrCloud")
+		Expect(k8sClient.Create(ctx, solrCloud)).To(Succeed())
+		DeferCleanup(func(ctx context.Context) {
+			cleanupTest(ctx, solrCloud)
+		})
+
+		By("Waiting for the SolrCloud to come up healthy")
+		solrCloud = expectSolrCloudToBeReady(ctx, solrCloud)
+
+		By("creating a first Solr Collection")
+		createAndQueryCollection(ctx, solrCloud, solrCollection1, 1, 1, 1)
+
+		By("creating a first Solr Collection")
+		createAndQueryCollection(ctx, solrCloud, solrCollection2, 1, 1, 2)
+	})
+
+	FContext("Scale Down with replica migration", func() {
+		FIt("Scales Down", func(ctx context.Context) {
+			originalSolrCloud := solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = &one
+			By("triggering a scale down via solrCloud replicas")
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
+
+			By("waiting for the scaleDown of first pod to begin")
+			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
+				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleLock), "StatefulSet does not have a scaling lock.")
+				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "2"), "StatefulSet scaling lock operation has the wrong metadata.")
+			})
+			queryCollection(ctx, solrCloud, solrCollection2, 0)
+
+			By("waiting for the scaleDown of the first pod to finish")
+			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should now have 2 pods, after the replicas have been moved off the first pod.")
+			})
+			queryCollection(ctx, solrCloud, solrCollection2, 0)
+
+			By("waiting for the scaleDown of second pod to begin")
+			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should still have 2 pods, because the scale down should first move Solr replicas")
+				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleLock), "StatefulSet does not have a scaling lock.")
+				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "1"), "StatefulSet scaling lock operation has the wrong metadata.")
+			})
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
+			// This pod check must happen after the above clusterLock and replicas check.
+			// The StatefulSet controller might take a good amount of time to actually delete the pod,
+			// and the replica migration/cluster op might already be done by the time the first pod is deleted.
+			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(2))
+
+			By("waiting for the scaleDown to finish")
+			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should now have 1 pods, after the replicas have been moved.")
+			})
+			// Once the scale down actually occurs, the statefulSet annotations should already be removed
+			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock after scaling is complete.")
+			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata after scaling is complete.")
+
+			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(1))
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
+			queryCollection(ctx, solrCloud, solrCollection2, 0)
+		})
+	})
+
+	FContext("Scale Down without replica migration", func() {
+
+		BeforeEach(func() {
+			solrCloud.Spec.Autoscaling.VacatePodsOnScaleDown = pointer.Bool(false)
+		})
+
+		FIt("Scales Down", func(ctx context.Context) {
+			originalSolrCloud := solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = &one
+			By("triggering a scale down via solrCloud replicas")
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
+
+			By("make sure scaleDown happens without a clusterLock and eventually the replicas are removed")
+			statefulSet := expectStatefulSetWithConsistentChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, statefulSet *appsv1.StatefulSet) {
+				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock after scaling is complete.")
+				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata after scaling is complete.")
+			})
+			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should now have 2 pods, after the replicas have been moved.")
+
+			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(1))
+			queryCollectionWithNoReplicaAvailable(ctx, solrCloud, solrCollection1)
+		})
+	})
+})