Fix early deletion of PVCs during scale down (#689)

diff --git a/api/v1beta1/solrcloud_types.go b/api/v1beta1/solrcloud_types.go
index 99dc3f4..c028337 100644
--- a/api/v1beta1/solrcloud_types.go
+++ b/api/v1beta1/solrcloud_types.go
@@ -1115,7 +1115,7 @@
 	//+listMapKey:=name
 	SolrNodes []SolrNodeStatus `json:"solrNodes"`
 
-	// Replicas is the number of desired replicas in the cluster
+	// Replicas is the number of pods created by the StatefulSet
 	// +kubebuilder:validation:Minimum=0
 	// +kubebuilder:default=0
 	Replicas int32 `json:"replicas"`
@@ -1123,7 +1123,7 @@
 	// PodSelector for SolrCloud pods, required by the HPA
 	PodSelector string `json:"podSelector"`
 
-	// ReadyReplicas is the number of ready replicas in the cluster
+	// ReadyReplicas is the number of ready pods in the cluster
 	// +kubebuilder:validation:Minimum=0
 	// +kubebuilder:default=0
 	ReadyReplicas int32 `json:"readyReplicas"`
diff --git a/config/crd/bases/solr.apache.org_solrclouds.yaml b/config/crd/bases/solr.apache.org_solrclouds.yaml
index ded6503..90c7313 100644
--- a/config/crd/bases/solr.apache.org_solrclouds.yaml
+++ b/config/crd/bases/solr.apache.org_solrclouds.yaml
@@ -16674,14 +16674,13 @@
                 type: string
               readyReplicas:
                 default: 0
-                description: ReadyReplicas is the number of ready replicas in the
-                  cluster
+                description: ReadyReplicas is the number of ready pods in the cluster
                 format: int32
                 minimum: 0
                 type: integer
               replicas:
                 default: 0
-                description: Replicas is the number of desired replicas in the cluster
+                description: Replicas is the number of pods created by the StatefulSet
                 format: int32
                 minimum: 0
                 type: integer
diff --git a/controllers/solr_cluster_ops_util.go b/controllers/solr_cluster_ops_util.go
index f97011a..c9f352a 100644
--- a/controllers/solr_cluster_ops_util.go
+++ b/controllers/solr_cluster_ops_util.go
@@ -410,8 +410,8 @@
 	return
 }
 
-// cleanupManagedCloudScaleDown does the logic of cleaning-up an incomplete scale down operation.
-// This will remove any bad readinessConditions that the scaleDown might have set when trying to scaleDown pods.
+// cleanupManagedCloudRollingUpdate does the logic of cleaning-up an incomplete rolling update operation.
+// This will remove any bad readinessConditions that the rollingUpdate might have set when trying to restart pods.
 func cleanupManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
 	// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
 	// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
diff --git a/controllers/solrcloud_controller.go b/controllers/solrcloud_controller.go
index 706c066..5ad2213 100644
--- a/controllers/solrcloud_controller.go
+++ b/controllers/solrcloud_controller.go
@@ -428,7 +428,7 @@
 	// Do not reconcile the storage finalizer unless we have PVC Labels that we know the Solr data PVCs are using.
 	// Otherwise it will delete all PVCs possibly
 	if len(statefulSet.Spec.Selector.MatchLabels) > 0 {
-		if err = r.reconcileStorageFinalizer(ctx, instance, statefulSet.Spec.Selector.MatchLabels, logger); err != nil {
+		if err = r.reconcileStorageFinalizer(ctx, instance, statefulSet, logger); err != nil {
 			logger.Error(err, "Cannot delete PVCs while garbage collecting after deletion.")
 			updateRequeueAfter(&requeueOrNot, time.Second*15)
 		}
@@ -481,6 +481,7 @@
 			err = clearClusterOpLockWithPatch(ctx, r, statefulSet, "clusterOp not supported", logger)
 		}
 		if operationFound {
+			err = nil
 			if operationComplete {
 				if nextClusterOperation == nil {
 					// Once the operation is complete, finish the cluster operation by deleting the statefulSet annotations
@@ -926,9 +927,10 @@
 // Logic derived from:
 // - https://book.kubebuilder.io/reference/using-finalizers.html
 // - https://github.com/pravega/zookeeper-operator/blob/v0.2.9/pkg/controller/zookeepercluster/zookeepercluster_controller.go#L629
-func (r *SolrCloudReconciler) reconcileStorageFinalizer(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, logger logr.Logger) error {
+func (r *SolrCloudReconciler) reconcileStorageFinalizer(ctx context.Context, cloud *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, logger logr.Logger) error {
 	// If persistentStorage is being used by the cloud, and the reclaim policy is set to "Delete",
 	// then set a finalizer for the storage on the cloud, and delete the PVCs if the solrcloud has been deleted.
+	pvcLabelSelector := statefulSet.Spec.Selector.MatchLabels
 
 	if cloud.Spec.StorageOptions.PersistentStorage != nil && cloud.Spec.StorageOptions.PersistentStorage.VolumeReclaimPolicy == solrv1beta1.VolumeReclaimPolicyDelete {
 		if cloud.ObjectMeta.DeletionTimestamp.IsZero() {
@@ -940,7 +942,7 @@
 					return err
 				}
 			}
-			return r.cleanupOrphanPVCs(ctx, cloud, pvcLabelSelector, logger)
+			return r.cleanupOrphanPVCs(ctx, cloud, statefulSet, pvcLabelSelector, logger)
 		} else if util.ContainsString(cloud.ObjectMeta.Finalizers, util.SolrStorageFinalizer) {
 			// The object is being deleted
 			logger.Info("Deleting PVCs for SolrCloud")
@@ -977,17 +979,22 @@
 	return pvcCount, nil
 }
 
-func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, logger logr.Logger) (err error) {
+func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, pvcLabelSelector map[string]string, logger logr.Logger) (err error) {
 	// this check should make sure we do not delete the PVCs before the STS has scaled down
 	if cloud.Status.ReadyReplicas == cloud.Status.Replicas {
 		pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
 		if err != nil {
 			return err
 		}
+		// We only want to delete PVCs if we will not use them in the future, as in the user has asked for less replicas.
+		// Even if the statefulSet currently has less replicas, we don't want to delete them if we will eventually scale back up.
 		if len(pvcList.Items) > int(*cloud.Spec.Replicas) {
 			for _, pvcItem := range pvcList.Items {
 				// delete only Orphan PVCs
-				if util.IsPVCOrphan(pvcItem.Name, *cloud.Spec.Replicas) {
+				// for orphans, we will use the status replicas (which is derived from the statefulSet)
+				// Don't use the Spec replicas here, because we might be rolling down 1-by-1 and the PVCs for
+				// soon-to-be-deleted pods should not be deleted until the pod is deleted.
+				if util.IsPVCOrphan(pvcItem.Name, *statefulSet.Spec.Replicas) {
 					r.deletePVC(ctx, pvcItem, logger)
 				}
 			}
diff --git a/controllers/util/solr_scale_util.go b/controllers/util/solr_scale_util.go
index 550bd15..3f556af 100644
--- a/controllers/util/solr_scale_util.go
+++ b/controllers/util/solr_scale_util.go
@@ -70,7 +70,7 @@
 				if !balanceComplete && err == nil {
 					logger.Info("Started balancing replicas across cluster.", "requestId", requestId)
 					requestInProgress = true
-				} else if err == nil {
+				} else if err != nil {
 					logger.Error(err, "Could not balance replicas across the cluster. Will try again.")
 				}
 			}
@@ -88,7 +88,7 @@
 
 			// Delete the async request Id if the async request is successful or failed.
 			// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
-			if asyncState == "completed" || asyncState == "failed" {
+			if !requestInProgress {
 				if _, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
 					logger.Error(err, "Could not delete Async request status.", "requestId", requestId)
 					balanceComplete = false
diff --git a/helm/solr-operator/Chart.yaml b/helm/solr-operator/Chart.yaml
index 765525f..61a8a14 100644
--- a/helm/solr-operator/Chart.yaml
+++ b/helm/solr-operator/Chart.yaml
@@ -61,6 +61,13 @@
           url: https://github.com/apache/solr-operator/issues/624
         - name: Github PR
           url: https://github.com/apache/solr-operator/pull/648
+    - kind: fixed
+      description: SolrCloud scaling is now safe when using persistent storage with a 'Delete' reclaim policy  
+      links:
+        - name: Github Issue
+          url: https://github.com/apache/solr-operator/issues/688
+        - name: Github PR
+          url: https://github.com/apache/solr-operator/pull/689
   artifacthub.io/images: |
     - name: solr-operator
       image: apache/solr-operator:v0.9.0-prerelease
diff --git a/helm/solr-operator/crds/crds.yaml b/helm/solr-operator/crds/crds.yaml
index bd311a5..e94093e 100644
--- a/helm/solr-operator/crds/crds.yaml
+++ b/helm/solr-operator/crds/crds.yaml
@@ -16923,14 +16923,13 @@
                 type: string
               readyReplicas:
                 default: 0
-                description: ReadyReplicas is the number of ready replicas in the
-                  cluster
+                description: ReadyReplicas is the number of ready pods in the cluster
                 format: int32
                 minimum: 0
                 type: integer
               replicas:
                 default: 0
-                description: Replicas is the number of desired replicas in the cluster
+                description: Replicas is the number of pods created by the StatefulSet
                 format: int32
                 minimum: 0
                 type: integer
diff --git a/tests/e2e/resource_utils_test.go b/tests/e2e/resource_utils_test.go
index 7dba163..a58904c 100644
--- a/tests/e2e/resource_utils_test.go
+++ b/tests/e2e/resource_utils_test.go
@@ -320,6 +320,19 @@
 	}).Should(MatchError("statefulsets.apps \""+statefulSetName+"\" not found"), "StatefulSet exists when it should not")
 }
 
+func expectNoPvc(ctx context.Context, parentResource client.Object, pvcName string, additionalOffset ...int) {
+	EventuallyWithOffset(resolveOffset(additionalOffset), func() error {
+		return k8sClient.Get(ctx, resourceKey(parentResource, pvcName), &corev1.PersistentVolumeClaim{})
+	}).Should(MatchError("persistentvolumeclaims \""+pvcName+"\" not found"), "Pod exists when it should not")
+}
+
+func expectNoPvcNow(ctx context.Context, parentResource client.Object, pvcName string, additionalOffset ...int) {
+	ExpectWithOffset(
+		resolveOffset(additionalOffset),
+		k8sClient.Get(ctx, resourceKey(parentResource, pvcName), &corev1.PersistentVolumeClaim{}),
+	).To(MatchError("persistentvolumeclaims \""+pvcName+"\" not found"), "Pod exists when it should not")
+}
+
 func expectNoPod(ctx context.Context, parentResource client.Object, podName string, additionalOffset ...int) {
 	EventuallyWithOffset(resolveOffset(additionalOffset), func() error {
 		return k8sClient.Get(ctx, resourceKey(parentResource, podName), &corev1.Pod{})
diff --git a/tests/e2e/solrcloud_scaling_test.go b/tests/e2e/solrcloud_scaling_test.go
index 81ab608..53d3630 100644
--- a/tests/e2e/solrcloud_scaling_test.go
+++ b/tests/e2e/solrcloud_scaling_test.go
@@ -24,6 +24,8 @@
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	appsv1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
 	"k8s.io/utils/pointer"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 	"strings"
@@ -138,6 +140,46 @@
 		})
 	})
 
+	FContext("with replica migration and deleted persistent data", func() {
+
+		BeforeEach(func() {
+			solrCloud.Spec.StorageOptions.PersistentStorage = &solrv1beta1.SolrPersistentDataStorageOptions{
+				VolumeReclaimPolicy: solrv1beta1.VolumeReclaimPolicyDelete,
+				PersistentVolumeClaimTemplate: solrv1beta1.PersistentVolumeClaimTemplate{
+					ObjectMeta: solrv1beta1.TemplateMeta{},
+					Spec: corev1.PersistentVolumeClaimSpec{
+						Resources: corev1.ResourceRequirements{
+							Requests: map[corev1.ResourceName]resource.Quantity{"storage": resource.MustParse("5Gi")},
+						},
+					},
+				},
+			}
+		})
+
+		FIt("Scales Down", func(ctx context.Context) {
+			originalSolrCloud := solrCloud.DeepCopy()
+			solrCloud.Spec.Replicas = pointer.Int32(1)
+			By("triggering a scale down via solrCloud replicas")
+			Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale down")
+
+			By("waiting for the scaleDown of cloud to finish")
+			expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Minute*2, time.Millisecond*100, func(g Gomega, found *appsv1.StatefulSet) {
+				clusterOp, err := controllers.GetCurrentClusterOp(found)
+				g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
+				g.Expect(clusterOp).To(BeNil(), "No more cluster operations should be running")
+				g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(1)), "StatefulSet should eventually hav 1 pod, after the scale down is complete")
+			})
+
+			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(2))
+			expectNoPvc(ctx, solrCloud, "data-"+solrCloud.GetSolrPodName(2))
+			expectNoPod(ctx, solrCloud, solrCloud.GetSolrPodName(1))
+			expectNoPvc(ctx, solrCloud, "data-"+solrCloud.GetSolrPodName(1))
+
+			queryCollection(ctx, solrCloud, solrCollection1, 0)
+			queryCollection(ctx, solrCloud, solrCollection2, 0)
+		})
+	})
+
 	FContext("without replica migration", func() {
 
 		BeforeEach(func() {