Add managed scaleUp for SolrClouds (#575)

* Add a scaleUp cluster operation
* Use new balanceReplicas API, which is v2 and Solr 9.3+
* Wait for final state when replacingNode
diff --git a/api/v1beta1/solrcloud_types.go b/api/v1beta1/solrcloud_types.go
index 209a599..9ad8ff5 100644
--- a/api/v1beta1/solrcloud_types.go
+++ b/api/v1beta1/solrcloud_types.go
@@ -730,7 +730,18 @@
 	// VacatePodsOnScaleDown determines whether Solr replicas are moved off of a Pod before the Pod is
 	// deleted due to the SolrCloud scaling down.
 	// +kubebuilder:default=true
+	// +optional
 	VacatePodsOnScaleDown *bool `json:"vacatePodsOnScaleDown,omitempty"`
+
+	// PopulatePodsOnScaleUp determines whether Solr replicas should be moved to newly-created Pods that have been
+	// created due to the SolrCloud scaling up.
+	//
+	// This feature is only available to users using Solr 9.3 or newer.
+	// If this is set to "true" for a cloud that is running an unsupported version of Solr, the replicas will not be moved.
+	//
+	// +kubebuilder:default=true
+	// +optional
+	PopulatePodsOnScaleUp *bool `json:"populatePodsOnScaleUp,omitempty"`
 }
 
 // ZookeeperRef defines the zookeeper ensemble for solr to connect to
diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go
index cc3d0f4..3b9e0b7 100644
--- a/api/v1beta1/zz_generated.deepcopy.go
+++ b/api/v1beta1/zz_generated.deepcopy.go
@@ -651,6 +651,11 @@
 		*out = new(bool)
 		**out = **in
 	}
+	if in.PopulatePodsOnScaleUp != nil {
+		in, out := &in.PopulatePodsOnScaleUp, &out.PopulatePodsOnScaleUp
+		*out = new(bool)
+		**out = **in
+	}
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SolrAutoscalingOptions.
diff --git a/config/crd/bases/solr.apache.org_solrclouds.yaml b/config/crd/bases/solr.apache.org_solrclouds.yaml
index cc7aa27..5159336 100644
--- a/config/crd/bases/solr.apache.org_solrclouds.yaml
+++ b/config/crd/bases/solr.apache.org_solrclouds.yaml
@@ -92,6 +92,15 @@
               autoscaling:
                 description: Define how Solr nodes should be autoscaled.
                 properties:
+                  populatePodsOnScaleUp:
+                    default: true
+                    description: "PopulatePodsOnScaleUp determines whether Solr replicas
+                      should be moved to newly-created Pods that have been created
+                      due to the SolrCloud scaling up. \n This feature is only available
+                      to users using Solr 9.3 or newer. If this is set to \"true\"
+                      for a cloud that is running an unsupported version of Solr,
+                      the replicas will not be moved."
+                    type: boolean
                   vacatePodsOnScaleDown:
                     default: true
                     description: VacatePodsOnScaleDown determines whether Solr replicas
diff --git a/controllers/solr_cluster_ops_util.go b/controllers/solr_cluster_ops_util.go
index 9dead7e..66b718e 100644
--- a/controllers/solr_cluster_ops_util.go
+++ b/controllers/solr_cluster_ops_util.go
@@ -33,99 +33,75 @@
 	"time"
 )
 
-func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, podList []corev1.Pod, logger logr.Logger) (clusterOpLock string, clusterOpMetadata string, retryLaterDuration time.Duration, err error) {
+func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, podList []corev1.Pod, logger logr.Logger) (clusterLockAcquired bool, retryLaterDuration time.Duration, err error) {
 	desiredPods := int(*instance.Spec.Replicas)
 	configuredPods := int(*statefulSet.Spec.Replicas)
 	if desiredPods != configuredPods {
-		scaleTo := -1
-		// Start a scaling operation
-		if desiredPods < configuredPods {
-			// Scale down!
-			// The option is enabled by default, so treat "nil" like "true"
-			if instance.Spec.Autoscaling.VacatePodsOnScaleDown == nil || *instance.Spec.Autoscaling.VacatePodsOnScaleDown {
-				if desiredPods > 0 {
-					// We only support one scaling down one pod at-a-time if not scaling down to 0 pods
-					scaleTo = configuredPods - 1
-				} else {
-					// We do not do a "managed" scale-to-zero operation.
-					// Just scale down unmanaged.
-					err = scaleCloudUnmanaged(ctx, r, statefulSet, 0, logger)
-				}
-			} else {
-				// The cloud is not setup to use managed scale-down
-				err = scaleCloudUnmanaged(ctx, r, statefulSet, desiredPods, logger)
+		// We do not do a "managed" scale-to-zero operation.
+		// Only do a managed scale down if the desiredPods is positive.
+		// The VacatePodsOnScaleDown option is enabled by default, so treat "nil" like "true"
+		if desiredPods < configuredPods && desiredPods > 0 &&
+			(instance.Spec.Autoscaling.VacatePodsOnScaleDown == nil || *instance.Spec.Autoscaling.VacatePodsOnScaleDown) {
+			if len(podList) > configuredPods {
+				// There are too many pods, the statefulSet controller has yet to delete unwanted pods.
+				// Do not start the scale down until these extra pods are deleted.
+				return false, time.Second * 5, nil
 			}
-		} else if desiredPods > configuredPods {
-			// Scale up!
-			// TODO: replicasScaleUp is not supported, so do not make a clusterOp out of it, just do the patch
+
+			// Managed Scale down!
+			originalStatefulSet := statefulSet.DeepCopy()
+			statefulSet.Annotations[util.ClusterOpsLockAnnotation] = util.ScaleDownLock
+			// The scaleDown metadata is the number of nodes to scale down to.
+			// We only support scaling down one pod at-a-time when using a managed scale-down.
+			// If the user wishes to scale down by multiple nodes, this ClusterOp will be done once-per-node.
+			statefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = strconv.Itoa(configuredPods - 1)
+			if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+				logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", util.ScaleDownLock, "clusterOpMetadata", configuredPods-1)
+			} else {
+				clusterLockAcquired = true
+			}
+		} else if desiredPods > configuredPods && (instance.Spec.Autoscaling.PopulatePodsOnScaleUp == nil || *instance.Spec.Autoscaling.PopulatePodsOnScaleUp) {
+			if len(podList) < configuredPods {
+				// There are not enough pods, the statefulSet controller has yet to create the previously desired pods.
+				// Do not start the scale up until these missing pods are created.
+				return false, time.Second * 5, nil
+			}
+			// Managed Scale up!
+			originalStatefulSet := statefulSet.DeepCopy()
+			statefulSet.Annotations[util.ClusterOpsLockAnnotation] = util.ScaleUpLock
+			// The scaleUp metadata is the number of nodes that existed before the scaleUp.
+			// This allows the scaleUp operation to know which pods will be empty after the statefulSet is scaledUp.
+			statefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = strconv.Itoa(configuredPods)
+			// We want to set the number of replicas at the beginning of the scaleUp operation
+			statefulSet.Spec.Replicas = pointer.Int32(int32(desiredPods))
+			if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+				logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", util.ScaleUpLock, "clusterOpMetadata", configuredPods, "newStatefulSetSize", desiredPods)
+			} else {
+				clusterLockAcquired = true
+			}
+		} else {
 			err = scaleCloudUnmanaged(ctx, r, statefulSet, desiredPods, logger)
 		}
-		if scaleTo > -1 {
-			clusterOpLock = util.ScaleLock
-			clusterOpMetadata = strconv.Itoa(scaleTo)
-		}
 	}
 	return
 }
 
-func handleLockedClusterOpScale(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, podList []corev1.Pod, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
-	if scalingToNodes, hasAnn := statefulSet.Annotations[util.ClusterOpsMetadataAnnotation]; hasAnn {
-		if scalingToNodesInt, convErr := strconv.Atoi(scalingToNodes); convErr != nil {
-			logger.Error(convErr, "Could not convert statefulSet annotation to int for scale-down-to information", "annotation", util.ClusterOpsMetadataAnnotation, "value", scalingToNodes)
-			err = convErr
-		} else {
-			replicaManagementComplete := false
-			if scalingToNodesInt < int(*statefulSet.Spec.Replicas) {
-				// Manage scaling down the SolrCloud
-				replicaManagementComplete, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, scalingToNodesInt, podList, logger)
-				// } else if scalingToNodesInt > int(*statefulSet.Spec.Replicas) {
-				// TODO: Utilize the scaled-up nodes in the future, however Solr does not currently have APIs for this.
-				// TODO: Think about the order of scale-up and restart when individual nodeService IPs are injected into the pods.
-				// TODO: Will likely want to do a scale-up of the service first, then do the rolling restart of the cluster, then utilize the node.
-			} else {
-				// This shouldn't happen. The ScalingToNodesAnnotation is removed when the statefulSet size changes, through a Patch.
-				// But if it does happen, we should just remove the annotation and move forward.
-				patchedStatefulSet := statefulSet.DeepCopy()
-				delete(patchedStatefulSet.Annotations, util.ClusterOpsLockAnnotation)
-				delete(patchedStatefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
-				if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
-					logger.Error(err, "Error while patching StatefulSet to remove unneeded clusterLockOp annotation for scaling to the current amount of nodes")
-				} else {
-					statefulSet = patchedStatefulSet
-				}
-			}
-
-			// Scale the statefulSet to represent the new number of pods, if it is lower than the current number of pods
-			// Also remove the lock annotations, as the cluster operation is done. Other operations can now take place.
-			if replicaManagementComplete {
-				patchedStatefulSet := statefulSet.DeepCopy()
-				patchedStatefulSet.Spec.Replicas = pointer.Int32(int32(scalingToNodesInt))
-				delete(patchedStatefulSet.Annotations, util.ClusterOpsLockAnnotation)
-				delete(patchedStatefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
-				if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
-					logger.Error(err, "Error while patching StatefulSet to scale down SolrCloud", "newUtilizedNodes", scalingToNodesInt)
-				}
-
-				// TODO: Create event for the CRD.
-			} else {
-				// Retry after five minutes to check if the replica management commands have been completed
-				retryLaterDuration = time.Second * 5
-			}
-		}
-		// If everything succeeded, the statefulSet will have an annotation updated
-		// and the reconcile loop will be called again.
-
-		return
-	} else {
-		err = errors.New("no clusterOpMetadata annotation is present in the statefulSet")
-		logger.Error(err, "Cannot perform scaling operation when no scale-to-nodes is provided via the clusterOpMetadata")
-		return time.Second * 10, err
-	}
-}
-
 // handleManagedCloudScaleDown does the logic of a managed and "locked" cloud scale down operation.
-// This will likely take many reconcile loops to complete, as it is moving replicas away from the nodes that will be scaled down.
-func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownTo int, podList []corev1.Pod, logger logr.Logger) (replicaManagementComplete bool, err error) {
+// This will likely take many reconcile loops to complete, as it is moving replicas away from the pods that will be scaled down.
+func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownToRaw string, podList []corev1.Pod, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
+	var scaleDownTo int
+	if scaleDownTo, err = strconv.Atoi(scaleDownToRaw); err != nil {
+		logger.Error(err, "Could not convert statefulSet annotation to int for scale-down-to information", "annotation", util.ClusterOpsMetadataAnnotation, "value", scaleDownToRaw)
+		return
+		// TODO: Create event for the CRD.
+	}
+
+	if scaleDownTo >= int(*statefulSet.Spec.Replicas) {
+		// This shouldn't happen, but we don't want to be stuck if it does.
+		// Just remove the cluster Op, because the cluster has already been scaled down.
+		err = clearClusterOp(ctx, r, statefulSet, "statefulSet already scaled-down", logger)
+	}
+
 	// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
 	podStoppedReadinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
 		util.SolrIsNotStoppedReadinessCondition: {
@@ -135,18 +111,63 @@
 		},
 	}
 
-	if scaleDownTo == 0 {
-		// Eventually we might want to delete all collections & data,
-		// the user wants no data left if scaling the solrcloud down to 0.
-		// However, for now we do not offer managed scale down to zero, so this line of code shouldn't even happen.
-		replicaManagementComplete = true
-	} else {
-		// Only evict the last pod, even if we are trying to scale down multiple pods.
-		// Scale down will happen one pod at a time.
-		replicaManagementComplete, err = evictSinglePod(ctx, r, instance, scaleDownTo, podList, podStoppedReadinessConditions, logger)
-	}
 	// TODO: It would be great to support a multi-node scale down when Solr supports evicting many SolrNodes at once.
+	// Only evict the last pod, even if we are trying to scale down multiple pods.
+	// Scale down will happen one pod at a time.
+	if replicaManagementComplete, evictErr := evictSinglePod(ctx, r, instance, scaleDownTo, podList, podStoppedReadinessConditions, logger); err != nil {
+		err = evictErr
+	} else if replicaManagementComplete {
+		originalStatefulSet := statefulSet.DeepCopy()
+		statefulSet.Spec.Replicas = pointer.Int32(int32(scaleDownTo))
+		delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
+		delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
+		if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+			logger.Error(err, "Error while patching StatefulSet to finish the managed SolrCloud scale down clusterOp", "newStatefulSetReplicas", scaleDownTo)
+		}
 
+		// TODO: Create event for the CRD.
+	} else {
+		// Retry after five seconds to check if the replica management commands have been completed
+		retryLaterDuration = time.Second * 5
+	}
+	return
+}
+
+// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
+// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
+func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleUpFromRaw string, logger logr.Logger) (retryLaterDuration time.Duration, err error) {
+	// TODO: Think about bad pod specs, that will never come up healthy. We want to try a rolling restart in between if necessary
+	if balanceComplete, balanceErr := util.BalanceReplicasForCluster(ctx, instance, statefulSet, "scaleUp", scaleUpFromRaw, logger); err != nil {
+		err = balanceErr
+	} else if balanceComplete {
+		// Once the replica balancing is complete, finish the cluster operation by deleting the statefulSet annotations
+		originalStatefulSet := statefulSet.DeepCopy()
+		delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
+		delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
+		if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+			logger.Error(err, "Error while patching StatefulSet to finish the managed SolrCloud scale up clusterOp")
+		}
+
+		// TODO: Create event for the CRD.
+	} else {
+		// Retry after five seconds to check if the replica management commands have been completed
+		retryLaterDuration = time.Second * 5
+	}
+	return
+}
+
+// clearClusterOp simply removes any clusterOp for the given statefulSet.
+// This should only be used as a "break-glass" scenario. Do not use this to finish off successful clusterOps.
+func clearClusterOp(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
+	logger = logger.WithValues("reason", reason, "clusterOp", statefulSet.Annotations[util.ClusterOpsLockAnnotation], "clusterOpMetadata", statefulSet.Annotations[util.ClusterOpsMetadataAnnotation])
+	originalStatefulSet := statefulSet.DeepCopy()
+	delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
+	delete(statefulSet.Annotations, util.ClusterOpsMetadataAnnotation)
+	if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
+		logger.Error(err, "Error while patching StatefulSet to remove unneeded clusterLockOp annotation")
+	} else {
+		logger.Error(err, "Removed unneeded clusterLockOp annotation from statefulSet")
+	}
 	return
 }
 
@@ -237,10 +258,8 @@
 	queryParams := url.Values{}
 	queryParams.Add("action", "CLUSTERSTATUS")
 	err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
-	if err == nil {
-		if hasError, apiErr := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader); hasError {
-			err = apiErr
-		}
+	if _, apiError := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader, clusterResp.Error); apiError != nil {
+		err = apiError
 	}
 	podNodeName := util.SolrNodeName(cloud, podName)
 	if err == nil {
diff --git a/controllers/solrcloud_controller.go b/controllers/solrcloud_controller.go
index 47a125c..a4f91ec 100644
--- a/controllers/solrcloud_controller.go
+++ b/controllers/solrcloud_controller.go
@@ -447,30 +447,24 @@
 
 	var retryLaterDuration time.Duration
 	if clusterOpLock, hasAnn := statefulSet.Annotations[util.ClusterOpsLockAnnotation]; hasAnn {
+		clusterOpMetadata := statefulSet.Annotations[util.ClusterOpsMetadataAnnotation]
 		switch clusterOpLock {
-		case util.ScaleLock:
-			retryLaterDuration, err = handleLockedClusterOpScale(ctx, r, instance, statefulSet, podList, logger)
+		case util.ScaleDownLock:
+			retryLaterDuration, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, clusterOpMetadata, podList, logger)
+		case util.ScaleUpLock:
+			retryLaterDuration, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOpMetadata, logger)
+		default:
+			// This shouldn't happen, but we don't want to be stuck if it does.
+			// Just remove the cluster Op, because the solr operator version running does not support it.
+			err = clearClusterOp(ctx, r, statefulSet, "clusterOp not supported", logger)
 		}
 	} else {
 		// Start cluster operations if needed.
 		// The operations will be actually run in future reconcile loops, but a clusterOpLock will be acquired here.
 		// And that lock will tell future reconcile loops that the operation needs to be done.
-		clusterOpLock = ""
-		clusterOpMetadata := ""
 		// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
 		// a "locked" cluster operation
-		clusterOpLock, clusterOpMetadata, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, podList, logger)
-
-		if retryLaterDuration <= 0 && err == nil && clusterOpLock != "" {
-			patchedStatefulSet := statefulSet.DeepCopy()
-			patchedStatefulSet.Annotations[util.ClusterOpsLockAnnotation] = clusterOpLock
-			patchedStatefulSet.Annotations[util.ClusterOpsMetadataAnnotation] = clusterOpMetadata
-			if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
-				logger.Error(err, "Error while patching StatefulSet to start clusterOp", "clusterOp", clusterOpLock, "clusterOpMetadata", clusterOpMetadata)
-			} else {
-				statefulSet = patchedStatefulSet
-			}
-		}
+		_, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, podList, logger)
 	}
 	if err != nil && retryLaterDuration == 0 {
 		retryLaterDuration = time.Second * 5
diff --git a/controllers/util/backup_util.go b/controllers/util/backup_util.go
index 882884b..ca4056e 100644
--- a/controllers/util/backup_util.go
+++ b/controllers/util/backup_util.go
@@ -133,11 +133,10 @@
 
 func DeleteAsyncInfoForBackup(ctx context.Context, cloud *solr.SolrCloud, collection string, backupName string, logger logr.Logger) (err error) {
 	logger.Info("Calling to delete async info for backup command.", "solrCloud", cloud.Name, "collection", collection)
-	var message string
-	message, err = solr_api.DeleteAsyncRequest(ctx, cloud, AsyncIdForCollectionBackup(collection, backupName))
+	_, err = solr_api.DeleteAsyncRequest(ctx, cloud, AsyncIdForCollectionBackup(collection, backupName))
 
 	if err != nil {
-		logger.Error(err, "Error deleting async data for collection backup", "solrCloud", cloud.Name, "collection", collection, "message", message)
+		logger.Error(err, "Error deleting async data for collection backup", "solrCloud", cloud.Name, "collection", collection)
 	}
 
 	return err
@@ -217,10 +216,8 @@
 	queryParams := url.Values{}
 	queryParams.Add("action", "LIST")
 	err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, resp)
-	if err == nil {
-		if hasError, apiErr := solr_api.CheckForCollectionsApiError("LIST", resp.ResponseHeader); hasError {
-			err = apiErr
-		}
+	if _, apiErr := solr_api.CheckForCollectionsApiError("LIST", resp.ResponseHeader, resp.Error); apiErr != nil {
+		err = apiErr
 	}
 	return resp.Collections, err
 }
diff --git a/controllers/util/solr_api/api.go b/controllers/util/solr_api/api.go
index e64d863..b73ad9b 100644
--- a/controllers/util/solr_api/api.go
+++ b/controllers/util/solr_api/api.go
@@ -23,7 +23,7 @@
 	"encoding/json"
 	"fmt"
 	solr "github.com/apache/solr-operator/api/v1beta1"
-	"io/ioutil"
+	"io"
 	"k8s.io/apimachinery/pkg/api/errors"
 	"net/http"
 	"net/url"
@@ -55,6 +55,9 @@
 
 	// +optional
 	Status SolrAsyncStatus `json:"status,omitempty"`
+
+	// +optional
+	Error *SolrErrorResponse `json:"error,omitempty"`
 }
 
 type SolrResponseHeader struct {
@@ -77,6 +80,9 @@
 
 	// +optional
 	Status SolrAsyncStatus `json:"status,omitempty"`
+
+	// +optional
+	Error *SolrErrorResponse `json:"error,omitempty"`
 }
 
 type SolrDeleteRequestStatus struct {
@@ -85,6 +91,9 @@
 	// Status of the delete request
 	// +optional
 	Status string `json:"status,omitempty"`
+
+	// +optional
+	Error *SolrErrorResponse `json:"error,omitempty"`
 }
 
 type SolrCollectionsListing struct {
@@ -92,6 +101,9 @@
 
 	// +optional
 	Collections []string `json:"collections,omitempty"`
+
+	// +optional
+	Error *SolrErrorResponse `json:"error,omitempty"`
 }
 
 func CheckAsyncRequest(ctx context.Context, cloud *solr.SolrCloud, asyncId string) (asyncState string, message string, err error) {
@@ -100,25 +112,30 @@
 	queryParams := url.Values{}
 	queryParams.Set("action", "REQUESTSTATUS")
 	queryParams.Set("requestid", asyncId)
-	if err = CallCollectionsApi(ctx, cloud, queryParams, asyncStatus); err == nil {
-		if _, err = CheckForCollectionsApiError("REQUESTSTATUS", asyncStatus.ResponseHeader); err == nil {
-			asyncState = asyncStatus.Status.AsyncState
-			message = asyncStatus.Status.Message
-		}
+	err = CallCollectionsApi(ctx, cloud, queryParams, asyncStatus)
+	if _, apiErr := CheckForCollectionsApiError("REQUESTSTATUS", asyncStatus.ResponseHeader, asyncStatus.Error); apiErr != nil {
+		err = apiErr
+	}
+	if err == nil {
+		asyncState = asyncStatus.Status.AsyncState
+		message = asyncStatus.Status.Message
 	}
 
 	return
 }
 
-func DeleteAsyncRequest(ctx context.Context, cloud *solr.SolrCloud, asyncId string) (message string, err error) {
+func DeleteAsyncRequest(ctx context.Context, cloud *solr.SolrCloud, asyncId string) (status string, err error) {
 	deleteStatus := &SolrDeleteRequestStatus{}
 
 	queryParams := url.Values{}
 	queryParams.Set("action", "DELETESTATUS")
 	queryParams.Set("requestid", asyncId)
-	if err = CallCollectionsApi(ctx, cloud, queryParams, deleteStatus); err == nil {
-		_, err = CheckForCollectionsApiError("DELETESTATUS", deleteStatus.ResponseHeader)
-		message = deleteStatus.Status
+	err = CallCollectionsApi(ctx, cloud, queryParams, deleteStatus)
+	if _, apiErr := CheckForCollectionsApiError("DELETESTATUS", deleteStatus.ResponseHeader, deleteStatus.Error); apiErr != nil {
+		err = apiErr
+	}
+	if err == nil {
+		status = deleteStatus.Status
 	}
 
 	return
@@ -153,8 +170,8 @@
 
 	defer resp.Body.Close()
 
-	if err == nil && resp.StatusCode != 200 {
-		b, _ := ioutil.ReadAll(resp.Body)
+	if err == nil && resp.StatusCode >= 400 {
+		b, _ := io.ReadAll(resp.Body)
 		err = errors.NewServiceUnavailable(fmt.Sprintf("Recieved bad response code of %d from solr with response: %s", resp.StatusCode, string(b)))
 	}
 
diff --git a/controllers/util/solr_api/cluster_status.go b/controllers/util/solr_api/cluster_status.go
index ac39bc7..ac9633a 100644
--- a/controllers/util/solr_api/cluster_status.go
+++ b/controllers/util/solr_api/cluster_status.go
@@ -33,6 +33,9 @@
 
 	// +optional
 	CollectionQueueSize int `json:"overseer_collection_queue_size,omitempty"`
+
+	// +optional
+	Error *SolrErrorResponse `json:"error,omitempty"`
 }
 
 type SolrClusterStatusResponse struct {
@@ -40,6 +43,9 @@
 
 	// +optional
 	ClusterStatus SolrClusterStatus `json:"cluster,omitempty"`
+
+	// +optional
+	Error *SolrErrorResponse `json:"error,omitempty"`
 }
 
 type SolrClusterStatus struct {
diff --git a/controllers/util/solr_api/errors.go b/controllers/util/solr_api/errors.go
index 5d28fbb..30dafc2 100644
--- a/controllers/util/solr_api/errors.go
+++ b/controllers/util/solr_api/errors.go
@@ -19,15 +19,17 @@
 
 import "fmt"
 
-func CheckForCollectionsApiError(action string, header SolrResponseHeader) (hasError bool, err error) {
-	if header.Status > 0 {
-		hasError = true
+func CheckForCollectionsApiError(action string, header SolrResponseHeader, errorBody *SolrErrorResponse) (apiUnsupported bool, err error) {
+	if errorBody != nil {
+		err = *errorBody
+		apiUnsupported = IsNotSupportedApiError(errorBody)
+	} else if header.Status > 0 {
 		err = APIError{
 			Detail: fmt.Sprintf("Error occurred while calling the Collections api for action=%s", action),
 			Status: header.Status,
 		}
 	}
-	return hasError, err
+	return
 }
 
 func CollectionsAPIError(action string, responseStatus int) error {
@@ -48,3 +50,21 @@
 	}
 	return fmt.Sprintf("Solr response status: %d. %s", e.Status, e.Detail)
 }
+
+type SolrErrorResponse struct {
+	Metadata SolrErrorMetadata `json:"metadata,omitempty"`
+
+	Message string `json:"msg,omitempty"`
+
+	Code int `json:"code,omitempty"`
+}
+
+type SolrErrorMetadata struct {
+	ErrorClass string `json:"error-class,omitempty"`
+
+	RootErrorClass string `json:"root-error-class,omitempty"`
+}
+
+func (e SolrErrorResponse) Error() string {
+	return fmt.Sprintf("Error returned from Solr API: %d. %s", e.Code, e.Message)
+}
diff --git a/controllers/util/solr_api/v2.go b/controllers/util/solr_api/v2.go
new file mode 100644
index 0000000..6323478
--- /dev/null
+++ b/controllers/util/solr_api/v2.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package solr_api
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	solr "github.com/apache/solr-operator/api/v1beta1"
+	"io"
+	"k8s.io/apimachinery/pkg/api/errors"
+	"net/http"
+	"net/url"
+	"strings"
+)
+
+type SolrRebalanceRequest struct {
+	Nodes []string `json:"nodes,omitempty"`
+
+	WaitForFinalState bool `json:"waitForFinalState,omitempty"`
+
+	Async string `json:"async,omitempty"`
+}
+
+func CallCollectionsApiV2(ctx context.Context, cloud *solr.SolrCloud, urlMethod string, urlPath string, urlParams url.Values, body interface{}, response interface{}) (err error) {
+	client := noVerifyTLSHttpClient
+	if mTLSHttpClient != nil {
+		client = mTLSHttpClient
+	}
+
+	cloudUrl := solr.InternalURLForCloud(cloud)
+	if !strings.HasPrefix(urlPath, "/") {
+		urlPath = "/" + urlPath
+	}
+
+	cloudUrl += urlPath
+	if len(urlParams) > 0 {
+		cloudUrl += "?" + urlParams.Encode()
+	}
+
+	resp := &http.Response{}
+
+	var b *bytes.Buffer
+	if body != nil {
+		b = new(bytes.Buffer)
+		if err = json.NewEncoder(b).Encode(body); err != nil {
+			return
+		}
+	}
+	var req *http.Request
+	if req, err = http.NewRequestWithContext(ctx, urlMethod, cloudUrl, b); err != nil {
+		return err
+	}
+
+	// Any custom HTTP headers passed through the Context
+	if httpHeaders, hasHeaders := ctx.Value(HTTP_HEADERS_CONTEXT_KEY).(map[string]string); hasHeaders {
+		for key, header := range httpHeaders {
+			req.Header.Add(key, header)
+		}
+	}
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Accept", "application/json")
+
+	if resp, err = client.Do(req); err != nil {
+		return err
+	}
+
+	defer resp.Body.Close()
+
+	if err == nil && resp.StatusCode >= 400 {
+		b, _ := io.ReadAll(resp.Body)
+		err = errors.NewServiceUnavailable(fmt.Sprintf("Recieved bad response code of %d from solr with response: %s", resp.StatusCode, string(b)))
+		// try to read the response, just in case Solr returned an error that we can read
+		json.NewDecoder(bytes.NewReader(b)).Decode(&response)
+	}
+
+	if err == nil {
+		err = json.NewDecoder(resp.Body).Decode(&response)
+	}
+
+	return err
+}
+
+func IsNotSupportedApiError(errorBody *SolrErrorResponse) bool {
+	return errorBody.Code == 404 &&
+		(strings.Contains(errorBody.Message, "Cannot find API for the path") || strings.Contains(errorBody.Message, "no core retrieved for null"))
+}
diff --git a/controllers/util/solr_scale_util.go b/controllers/util/solr_scale_util.go
new file mode 100644
index 0000000..1f7be5c
--- /dev/null
+++ b/controllers/util/solr_scale_util.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import (
+	"context"
+	solr "github.com/apache/solr-operator/api/v1beta1"
+	"github.com/apache/solr-operator/controllers/util/solr_api"
+	"github.com/go-logr/logr"
+	appsv1 "k8s.io/api/apps/v1"
+)
+
+// BalanceReplicasForCluster takes a SolrCloud and balances all replicas across the Pods that are currently alive.
+//
+// Note: unlike EvictReplicasForPodIfNecessary, the only way we know that we are done balancingReplicas is by seeing
+// a successful status returned from the command. So if we delete the asyncStatus, and then something happens in the operator,
+// and we lose our state, then we will need to retry the balanceReplicas command. This should be ok since calling
+// balanceReplicas multiple times should not be bad when the replicas for the cluster are already balanced.
+func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, err error) {
+	logger = logger.WithValues("balanceReason", balanceReason)
+	// If the Cloud has 1 or zero pods, there is no reason to balance replicas.
+	if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas < 1 {
+		balanceComplete = true
+	} else {
+		requestId := "balance-replicas-" + balanceCmdUniqueId
+
+		// First check to see if the Async Balance request has started
+		if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
+			err = asyncErr
+			logger.Error(err, "Error occurred while checking the status of the balance replicas task. Will try again.", "requestId", requestId)
+		} else if asyncState == "notfound" {
+			// Only start the balance command if all pods are ready
+			if *statefulSet.Spec.Replicas != statefulSet.Status.ReadyReplicas {
+				logger.Info("Cannot start balancing replicas until all pods are ready.", "pods", *statefulSet.Spec.Replicas, "readyPods", statefulSet.Status.ReadyReplicas)
+			} else {
+				// Submit new BalanceReplicas request
+				rebalanceRequest := &solr_api.SolrRebalanceRequest{
+					WaitForFinalState: true,
+					Async:             requestId,
+				}
+				rebalanceResponse := &solr_api.SolrAsyncResponse{}
+				err = solr_api.CallCollectionsApiV2(ctx, solrCloud, "POST", "/api/cluster/replicas/balance", nil, rebalanceRequest, rebalanceResponse)
+				if isUnsupportedApi, apiError := solr_api.CheckForCollectionsApiError("BALANCE_REPLICAS", rebalanceResponse.ResponseHeader, rebalanceResponse.Error); isUnsupportedApi {
+					// TODO: Remove this if-statement when Solr 9.3 is the lowest supported version
+					logger.Error(err, "Could not balance replicas across the cluster, because the SolrCloud's version does not support this feature.")
+					// Swallow the error after logging it, because it's not a real error.
+					// Balancing is not supported, so we just need to finish the clusterOp.
+					err = nil
+					balanceComplete = true
+				} else if apiError != nil {
+					err = apiError
+				}
+				if err == nil {
+					logger.Info("Started balancing replicas across cluster.", "requestId", requestId)
+				} else {
+					logger.Error(err, "Could not balance replicas across the cluster. Will try again.")
+				}
+			}
+		} else {
+			logger.Info("Found async status", "requestId", requestId, "state", asyncState)
+			// Only continue to delete the pod if the ReplaceNode request is complete and successful
+			if asyncState == "completed" {
+				balanceComplete = true
+				logger.Info("Replica Balancing command completed successfully")
+			} else if asyncState == "failed" {
+				logger.Info("Replica Balancing command failed. Will try again", "message", message)
+			}
+
+			// Delete the async request Id if the async request is successful or failed.
+			// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
+			if asyncState == "completed" || asyncState == "failed" {
+				if _, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
+					logger.Error(err, "Could not delete Async request status.", "requestId", requestId)
+					balanceComplete = false
+				}
+			}
+		}
+	}
+	return
+}
diff --git a/controllers/util/solr_update_util.go b/controllers/util/solr_update_util.go
index 6d37092..a5235f5 100644
--- a/controllers/util/solr_update_util.go
+++ b/controllers/util/solr_update_util.go
@@ -116,14 +116,10 @@
 			queryParams.Add("action", "CLUSTERSTATUS")
 			err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
 			if err == nil {
-				if hasError, apiErr := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader); hasError {
+				queryParams.Set("action", "OVERSEERSTATUS")
+				err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, overseerResp)
+				if _, apiErr := solr_api.CheckForCollectionsApiError("OVERSEERSTATUS", overseerResp.ResponseHeader, overseerResp.Error); apiErr != nil {
 					err = apiErr
-				} else {
-					queryParams.Set("action", "OVERSEERSTATUS")
-					err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, overseerResp)
-					if hasError, apiErr = solr_api.CheckForCollectionsApiError("OVERSEERSTATUS", clusterResp.ResponseHeader); hasError {
-						err = apiErr
-					}
 				}
 			}
 			if err != nil {
@@ -502,7 +498,7 @@
 	return allNodeNames
 }
 
-// EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod, if the Pod is using ephemeral storage.
+// EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod.
 // For updates this will only be called for pods using ephemeral data.
 // For scale-down operations, this can be called for pods using ephemeral or persistent data.
 func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, podHasReplicas bool, evictionReason string, logger logr.Logger) (err error, canDeletePod bool) {
@@ -521,6 +517,7 @@
 		// First check to see if the Async Replace request has started
 		if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
 			err = asyncErr
+			logger.Error(err, "Error occurred while checking the status of the ReplaceNode task. Will try again.", "requestId", requestId)
 		} else if asyncState == "notfound" {
 			if podHasReplicas {
 				// Submit new Replace Node request
@@ -529,15 +526,16 @@
 				queryParams.Add("action", "REPLACENODE")
 				queryParams.Add("parallel", "true")
 				queryParams.Add("sourceNode", SolrNodeName(solrCloud, pod.Name))
+				queryParams.Add("waitForFinalState", "true")
 				queryParams.Add("async", requestId)
 				err = solr_api.CallCollectionsApi(ctx, solrCloud, queryParams, replaceResponse)
-				if hasError, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader); hasError {
+				if _, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader, replaceResponse.Error); apiErr != nil {
 					err = apiErr
 				}
 				if err == nil {
 					logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
 				} else {
-					logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again later.", "requestId", requestId, "message", message)
+					logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again.")
 				}
 			} else {
 				canDeletePod = true
@@ -556,8 +554,8 @@
 			// Delete the async request Id if the async request is successful or failed.
 			// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
 			if asyncState == "completed" || asyncState == "failed" {
-				if message, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
-					logger.Error(err, "Could not delete Async request status.", "requestId", requestId, "message", message)
+				if _, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
+					logger.Error(err, "Could not delete Async request status.", "requestId", requestId)
 					canDeletePod = false
 				}
 			}
diff --git a/controllers/util/solr_util.go b/controllers/util/solr_util.go
index 1c44776..2d09d0b 100644
--- a/controllers/util/solr_util.go
+++ b/controllers/util/solr_util.go
@@ -54,7 +54,8 @@
 	// Protected StatefulSet annotations
 	// These are to be saved on a statefulSet update
 	ClusterOpsLockAnnotation     = "solr.apache.org/clusterOpsLock"
-	ScaleLock                    = "scaling"
+	ScaleDownLock                = "scalingDown"
+	ScaleUpLock                  = "scalingUp"
 	UpdateLock                   = "rollingUpdate"
 	ClusterOpsMetadataAnnotation = "solr.apache.org/clusterOpsMetadata"
 
diff --git a/docs/solr-cloud/autoscaling.md b/docs/solr-cloud/autoscaling.md
index 64732a4..78be501 100644
--- a/docs/solr-cloud/autoscaling.md
+++ b/docs/solr-cloud/autoscaling.md
@@ -34,6 +34,7 @@
 spec:
   autoscaling:
     vacatePodsOnScaleDown: true # Default: true
+    populatePodsOnScaleUp: true # Default: true
 ```
 
 ## Replica Movement
@@ -63,7 +64,7 @@
 Pods will be deleted even if replicas live on those pods.
 
 If `autoscaling.vacatePodsOnScaleDown` option is enabled, which it is by default, then the following steps occur:
-1. Acquire a cluster-ops lock on the SolrCloud. (This means other cluster operations, such as a rolling restart, cannot occur during the scale down operation)
+1. Acquire a cluster-ops lock on the SolrCloud. (This means other cluster operations, such as a rolling restart and scale up, cannot occur during the scale down operation)
 1. Scale down the last pod.
    1. Mark the pod as "notReady" so that traffic is diverted away from this pod (for requests to the common endpoint, requests that target that node directly will not be affected).
    1. Check to see if the last pod has any replicas.
@@ -83,3 +84,33 @@
 
 The data will be saved in PVCs if the SolrCloud is set to use persistent storage, and `dataStorage.persistent.reclaimPolicy` is set to `Retain`.
 If the `reclaimPolicy` is set to `Delete`, these PVCs will be deleted when the pods are scaled down.
+
+### Solr Pod Scale-Up
+
+When the desired number of Solr Pods that should be run `SolrCloud.Spec.Replicas` is increased,
+the `SolrCloud.spec.autoscaling.populatePodsOnScaleUp` option determines whether the Solr Operator should move replicas
+onto the pods that have been created because of the scale-up.
+
+If `autoscaling.populatePodsOnScaleUp` option is not enabled, then whenever the `SolrCloud.Spec.Replicas` is increased,
+the StatefulSet's replicas will be increased, and no other actions will be taken by the Solr Operator.
+This means that the new pods that are created will likely remain empty until the user takes an action themselves.
+This could be creating collections, migrating replicas or scaling up existing shards/collections.
+
+If `autoscaling.populatePodsOnScaleUp` option is enabled, which it is by default, then the following steps occur:
+1. Acquire a cluster-ops lock on the SolrCloud. (This means other cluster operations, such as a rolling restart and scale down, cannot occur during the scale up operation)
+1. Scale up to the StatefulSet to the desired `spec.replicas` (number of pods).
+1. Wait for all pods in the cluster to become healthy.
+   * Rolling restarts cannot occur at the same time, so most likely every existing pod will be ready, and we will just be waiting for the newly created pods.
+1. Start an asynchronous command to balance replicas across all pods. (This does not just target the newly created pods)
+1. Check if the async command completed, if not then loop back until the command is finished.
+1. If the command succeeded, continue, if not go back to step #4.
+1. Give up the cluster-ops lock on the SolrCloud. The scale-up operation is complete.
+
+
+#### Solr Version Compatibility
+
+The managed scale-up option relies on the BalanceReplicas API in Solr, which was added in Solr 9.3.
+Therefore, this option cannot be used with Solr versions < 9.3.
+If `autoscaling.populatePodsOnScaleUp` option is enabled and an unsupported version of Solr is used, the cluster lock will
+be given up after the BalanceReplicas API call fails.
+This behavior is very similar to `autoscaling.populatePodsOnScaleUp` being disabled.
diff --git a/helm/solr-operator/Chart.yaml b/helm/solr-operator/Chart.yaml
index b7932e3..f7e4586 100644
--- a/helm/solr-operator/Chart.yaml
+++ b/helm/solr-operator/Chart.yaml
@@ -60,14 +60,23 @@
         - name: Github PR
           url: https://github.com/apache/solr-operator/pull/566
     - kind: added
-      description: Replica migration is now managed on scale down of Solr Nodes, by default.
+      description: Scale down of Solr Pods includes Replica migration by default.
       links:
         - name: Github Issue
           url: https://github.com/apache/solr-operator/issues/559
         - name: Github PR
           url: https://github.com/apache/solr-operator/pull/561
         - name: Documentation
-          url: https://apache.github.io/solr-operator/docs/solr-cloud/autoscaling.html
+          url: https://apache.github.io/solr-operator/docs/solr-cloud/autoscaling.html#solr-pod-scale-down
+    - kind: added
+      description: Scale up of Solr Pods includes Replica migration by default, for Solr 9.3+.
+      links:
+        - name: Github Issue
+          url: https://github.com/apache/solr-operator/issues/567
+        - name: Github PR
+          url: https://github.com/apache/solr-operator/pull/575
+        - name: Documentation
+          url: https://apache.github.io/solr-operator/docs/solr-cloud/autoscaling.html#solr-pod-scale-up
     - kind: changed
       description: The Solr Operator is now built with Go 1.20
       links:
diff --git a/helm/solr-operator/crds/crds.yaml b/helm/solr-operator/crds/crds.yaml
index 922545a..846658e 100644
--- a/helm/solr-operator/crds/crds.yaml
+++ b/helm/solr-operator/crds/crds.yaml
@@ -341,6 +341,15 @@
               autoscaling:
                 description: Define how Solr nodes should be autoscaled.
                 properties:
+                  populatePodsOnScaleUp:
+                    default: true
+                    description: "PopulatePodsOnScaleUp determines whether Solr replicas
+                      should be moved to newly-created Pods that have been created
+                      due to the SolrCloud scaling up. \n This feature is only available
+                      to users using Solr 9.3 or newer. If this is set to \"true\"
+                      for a cloud that is running an unsupported version of Solr,
+                      the replicas will not be moved."
+                    type: boolean
                   vacatePodsOnScaleDown:
                     default: true
                     description: VacatePodsOnScaleDown determines whether Solr replicas
diff --git a/helm/solr/README.md b/helm/solr/README.md
index a94fd21..407c4f9 100644
--- a/helm/solr/README.md
+++ b/helm/solr/README.md
@@ -113,7 +113,7 @@
 | serviceAccount.name | string |  | The optional default service account used for Solr and ZK unless overridden below. If `serviceAccount.create` is set to `false`, this serviceAccount must exist in the target namespace. |
 | backupRepositories | []object | | A list of BackupRepositories to connect your SolrCloud to. Visit the [SolrBackup docs](https://apache.github.io/solr-operator/docs/solr-backup) or run `kubectl explain solrcloud.spec.backupRepositories` to see the available options. |
 | autoscaling.vacatePodsOnScaleDown | boolean | `true` | While scaling down the SolrCloud, move replicas off of Solr Pods before they are deleted. This only affects pods that will not exist after the scaleDown operation.  |
-
+| autoscaling.populatePodsOnScaleUp | boolean | `true` | While scaling up the SolrCloud, migrate replicas onto the new Solr Pods after they are created. This uses the Balance Replicas API in Solr that is only available in Solr 9.3+. This option will be ignored if using an unsupported version of Solr.  |
 
 ### Data Storage Options
 
diff --git a/helm/solr/templates/solrcloud.yaml b/helm/solr/templates/solrcloud.yaml
index c20d34c..b351ece 100644
--- a/helm/solr/templates/solrcloud.yaml
+++ b/helm/solr/templates/solrcloud.yaml
@@ -117,6 +117,7 @@
   {{- if .Values.autoscaling }}
   autoscaling:
     vacatePodsOnScaleDown: {{ .Values.autoscaling.vacatePodsOnScaleDown }}
+    populatePodsOnScaleUp: {{ .Values.autoscaling.populatePodsOnScaleUp }}
   {{- end }}
 
   {{- if .Values.dataStorage }}
diff --git a/helm/solr/values.yaml b/helm/solr/values.yaml
index 8c7501e..d1d6d36 100644
--- a/helm/solr/values.yaml
+++ b/helm/solr/values.yaml
@@ -146,6 +146,7 @@
 # Various settings to control autoscaling of Solr pods and replicas
 autoscaling:
   vacatePodsOnScaleDown: true
+  populatePodsOnScaleUp: true
 
 # A list of BackupRepositories to connect your SolrCloud to
 # See either for more information:
diff --git a/tests/e2e/solrcloud_scaling_test.go b/tests/e2e/solrcloud_scaling_test.go
index bb07fa6..e0f2955 100644
--- a/tests/e2e/solrcloud_scaling_test.go
+++ b/tests/e2e/solrcloud_scaling_test.go
@@ -28,7 +28,7 @@
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
 
-var _ = FDescribe("E2E - SolrCloud - Scaling", func() {
+var _ = FDescribe("E2E - SolrCloud - Scale Down", func() {
 	var (
 		solrCloud *solrv1beta1.SolrCloud
 
@@ -53,21 +53,21 @@
 		By("creating a first Solr Collection")
 		createAndQueryCollection(ctx, solrCloud, solrCollection1, 1, 1, 1)
 
-		By("creating a first Solr Collection")
+		By("creating a second Solr Collection")
 		createAndQueryCollection(ctx, solrCloud, solrCollection2, 1, 1, 2)
 	})
 
-	FContext("Scale Down with replica migration", func() {
+	FContext("with replica migration", func() {
 		FIt("Scales Down", func(ctx context.Context) {
 			originalSolrCloud := solrCloud.DeepCopy()
-			solrCloud.Spec.Replicas = &one
+			solrCloud.Spec.Replicas = pointer.Int32(1)
 			By("triggering a scale down via solrCloud replicas")
 			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
 
 			By("waiting for the scaleDown of first pod to begin")
 			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
-				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleLock), "StatefulSet does not have a scaling lock.")
+				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
 				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "2"), "StatefulSet scaling lock operation has the wrong metadata.")
 			})
 			queryCollection(ctx, solrCloud, solrCollection2, 0)
@@ -81,7 +81,7 @@
 			By("waiting for the scaleDown of second pod to begin")
 			expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
 				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(2)), "StatefulSet should still have 2 pods, because the scale down should first move Solr replicas")
-				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleLock), "StatefulSet does not have a scaling lock.")
+				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleDownLock), "StatefulSet does not have a scaleDown lock.")
 				g.Expect(found.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "1"), "StatefulSet scaling lock operation has the wrong metadata.")
 			})
 			queryCollection(ctx, solrCloud, solrCollection1, 0)
@@ -104,7 +104,7 @@
 		})
 	})
 
-	FContext("Scale Down without replica migration", func() {
+	FContext("without replica migration", func() {
 
 		BeforeEach(func() {
 			solrCloud.Spec.Autoscaling.VacatePodsOnScaleDown = pointer.Bool(false)
@@ -112,19 +112,103 @@
 
 		FIt("Scales Down", func(ctx context.Context) {
 			originalSolrCloud := solrCloud.DeepCopy()
-			solrCloud.Spec.Replicas = &one
+			solrCloud.Spec.Replicas = pointer.Int32(int32(1))
 			By("triggering a scale down via solrCloud replicas")
 			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
 
 			By("make sure scaleDown happens without a clusterLock and eventually the replicas are removed")
 			statefulSet := expectStatefulSetWithConsistentChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, statefulSet *appsv1.StatefulSet) {
-				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock after scaling is complete.")
-				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata after scaling is complete.")
+				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock while scaling unmanaged.")
+				g.Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata while scaling unmanaged.")
 			})
-			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should now have 2 pods, after the replicas have been moved.")
+			Expect(statefulSet.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should immediately have 1 pod, since the scaleDown is unmanaged.")
 
 			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(1))
 			queryCollectionWithNoReplicaAvailable(ctx, solrCloud, solrCollection1)
 		})
 	})
 })
+
+var _ = FDescribe("E2E - SolrCloud - Scale Up", func() {
+	var (
+		solrCloud *solrv1beta1.SolrCloud
+
+		solrCollection1 = "e2e-1"
+		solrCollection2 = "e2e-2"
+	)
+
+	BeforeEach(func() {
+		solrCloud = generateBaseSolrCloud(1)
+	})
+
+	JustBeforeEach(func(ctx context.Context) {
+		By("creating the SolrCloud")
+		Expect(k8sClient.Create(ctx, solrCloud)).To(Succeed())
+		DeferCleanup(func(ctx context.Context) {
+			cleanupTest(ctx, solrCloud)
+		})
+
+		By("Waiting for the SolrCloud to come up healthy")
+		solrCloud = expectSolrCloudToBeReady(ctx, solrCloud)
+
+		By("creating a first Solr Collection")
+		createAndQueryCollection(ctx, solrCloud, solrCollection1, 1, 1)
+
+		By("creating a second Solr Collection")
+		createAndQueryCollection(ctx, solrCloud, solrCollection2, 2, 1)
+	})
+
+	FContext("with replica migration", func() {
+
+		FIt("Scales Up", func(ctx context.Context) {
+			originalSolrCloud := solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = pointer.Int32(int32(3))
+			By("triggering a scale down via solrCloud replicas")
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale up")
+
+			By("waiting for the scaleDown of first pod to begin")
+			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
+			})
+			Expect(statefulSet.Annotations).To(HaveKeyWithValue(util.ClusterOpsLockAnnotation, util.ScaleUpLock), "StatefulSet does not have a scaleUp lock after starting managed scaleUp.")
+			Expect(statefulSet.Annotations).To(HaveKeyWithValue(util.ClusterOpsMetadataAnnotation, "1"), "StatefulSet scaleUp lock operation has the wrong metadata.")
+
+			By("waiting for the scaleUp to finish")
+			statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock after scaling is complete.")
+			})
+			// Once the scale down actually occurs, the statefulSet annotations should already be removed
+			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have scaling lock metadata after scaling is complete.")
+
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
+			queryCollection(ctx, solrCloud, solrCollection2, 0)
+		})
+	})
+
+	FContext("without replica migration", func() {
+
+		BeforeEach(func() {
+			solrCloud.Spec.Autoscaling.PopulatePodsOnScaleUp = pointer.Bool(false)
+		})
+
+		FIt("Scales Up", func(ctx context.Context) {
+			originalSolrCloud := solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = pointer.Int32(int32(3))
+			By("triggering a scale down via solrCloud replicas")
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
+
+			By("make sure scaleDown happens without a clusterLock and eventually the replicas are removed")
+			statefulSet := expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should immediately have 3 pods.")
+			})
+			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsLockAnnotation)), "StatefulSet should not have a scaling lock, since scaleUp is unmanaged.")
+			Expect(statefulSet.Annotations).To(Not(HaveKey(util.ClusterOpsMetadataAnnotation)), "StatefulSet should not have a scaling lock metadata, since scaleUp is unmanaged.")
+
+			By("Waiting for the new solrCloud pods to become ready")
+			solrCloud = expectSolrCloudToBeReady(ctx, solrCloud)
+
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
+			queryCollection(ctx, solrCloud, solrCollection2, 0)
+		})
+	})
+})
diff --git a/tests/e2e/test_utils_test.go b/tests/e2e/test_utils_test.go
index a1adff8..055254b 100644
--- a/tests/e2e/test_utils_test.go
+++ b/tests/e2e/test_utils_test.go
@@ -229,7 +229,7 @@
 			[]string{
 				"curl",
 				fmt.Sprintf(
-					"http://localhost:%d/solr/admin/collections?action=CREATE&name=%s&replicationFactor=%d&numShards=%d%s&async=%s",
+					"http://localhost:%d/solr/admin/collections?action=CREATE&name=%s&replicationFactor=%d&numShards=%d%s&async=%s&maxShardsPerNode=10",
 					solrCloud.Spec.SolrAddressability.PodPort,
 					collection,
 					replicasPerShard,
@@ -328,7 +328,7 @@
 		)
 		g.Expect(err).ToNot(HaveOccurred(), "Error occurred while querying empty Solr Collection")
 		g.Expect(response).To(ContainSubstring("Error trying to proxy request for url"), "Wrong occurred while querying Solr Collection '%s', expected a proxy forwarding error", collection)
-	}, time.Second*5).WithContext(ctx).Should(Succeed(), "Could not successfully query collection")
+	}, time.Second*5).WithContext(ctx).Should(Succeed(), "Collection query did not fail in the correct way")
 }
 
 func getPrometheusExporterPod(ctx context.Context, solrPrometheusExporter *solrv1beta1.SolrPrometheusExporter) (podName string) {