[FLINK-32774] Improve checking for already upgraded deployments
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index d967175..9e732b9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -232,6 +232,11 @@
         var flinkDep = ctx.getResource();
         var status = flinkDep.getStatus();
 
+        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
+            // We know that the current deployment is not missing, nothing to check
+            return false;
+        }
+
         // We are performing a full upgrade
         Optional<Deployment> depOpt = ctx.getJosdkContext().getSecondaryResource(Deployment.class);
 
@@ -241,6 +246,10 @@
         }
 
         var deployment = depOpt.get();
+        if (deployment.isMarkedForDeletion()) {
+            logger.debug("Deployment already marked for deletion, ignoring...");
+            return false;
+        }
 
         Map<String, String> annotations = deployment.getMetadata().getAnnotations();
         if (annotations == null) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 95b69f2..284a178 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -42,6 +42,7 @@
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -162,14 +163,12 @@
         setRandomJobResultStorePath(deployConfig);
 
         if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
-            if (!ReconciliationUtils.isJobInTerminalState(status)) {
-                LOG.error("Invalid status for deployment: {}", status);
-                throw new RuntimeException("This indicates a bug...");
-            }
+            Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(status));
             LOG.info("Deleting deployment with terminated application before new deployment");
             flinkService.deleteClusterDeployment(
                     relatedResource.getMetadata(), status, deployConfig, true);
             flinkService.waitForClusterShutdown(deployConfig);
+            statusRecorder.patchAndCacheStatus(relatedResource);
         }
 
         setJobIdIfNecessary(spec, relatedResource, deployConfig);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
index 8cdd325..a822542 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
@@ -24,7 +24,6 @@
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -38,7 +37,7 @@
     protected TestingFlinkService flinkService;
     protected EventCollector eventCollector = new EventCollector();
     protected EventRecorder eventRecorder;
-    protected StatusRecorder statusRecorder = new TestingStatusRecorder();
+    protected TestingStatusRecorder statusRecorder = new TestingStatusRecorder();
     protected KubernetesOperatorMetricGroup operatorMetricGroup;
 
     protected Context<?> context;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 8db76f0..b60a752 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -670,12 +670,15 @@
         assertEquals(status.getReconciliationStatus().getState(), ReconciliationState.UPGRADING);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
 
+        // Kubernetes Deployment is not there yet
         observer.observe(deployment, TestUtils.createEmptyContext());
         assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
 
+        // Kubernetes Deployment is there but without the correct label
         observer.observe(deployment, context);
         assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
 
+        // We set the correct generation label on the kubernetes deployment
         kubernetesDeployment
                 .getMetadata()
                 .getAnnotations()
@@ -684,8 +687,28 @@
         deployment.getMetadata().setGeneration(322L);
         deployment.getSpec().getJob().setParallelism(4);
 
+        // Simulate marked for deletion, make sure we don't recognize this as a valid deployment
+        kubernetesDeployment.getMetadata().setDeletionTimestamp(Instant.now().toString());
+        observer.observe(deployment, context);
+        assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+        assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
+
+        // Reset deletion flag
+        kubernetesDeployment.getMetadata().setDeletionTimestamp(null);
+
+        // Simulate non-missing deployment, this happens in the middle of savepoint upgrades
+        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         observer.observe(deployment, context);
 
+        assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+        assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
+
+        // Reset to missing
+        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+
+        // Deployment is missing and kubernetes deployment matches the target generation
+        // should be recognized as deployed
+        observer.observe(deployment, context);
         assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index e9f50e6..9a88818 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -282,7 +282,6 @@
     @Test
     public void triggerSavepoint() throws Exception {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-
         reconciler.reconcile(deployment, context);
         var runningJobs = flinkService.listJobs();
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);