[FLINK-35108] Do not trigger deployment recovery for finished/failed jobs
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index be99fb1..986e048 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -505,9 +506,16 @@
 
     private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
         var deployedJob = ReconciliationUtils.getDeployedSpec(deployment).getJob();
-        return (deployedJob == null || deployedJob.getState() == JobState.RUNNING)
-                && (deployment.getStatus().getJobManagerDeploymentStatus()
-                        == JobManagerDeploymentStatus.MISSING);
+        var status = deployment.getStatus();
+        var jobStatus = status.getJobStatus();
+        boolean sessionCluster = deployedJob == null;
+        boolean nonTerminalApplication =
+                !sessionCluster
+                        && deployedJob.getState() == JobState.RUNNING
+                        && !JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
+        boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
+        return jmShouldBeRunning
+                && (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING);
     }
 
     protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 394be6e..e80942d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -82,6 +82,7 @@
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.ThrowingConsumer;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -941,18 +942,32 @@
     }
 
     @Test
-    public void testTerminalJmTtl() throws Exception {
-        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+    public void testTerminalJmTtlOnSuspend() throws Throwable {
+        testTerminalJmTtl(
+                dep -> {
+                    getJobSpec(dep).setState(JobState.SUSPENDED);
+                    reconciler.reconcile(dep, context);
+                });
+    }
+
+    @Test
+    public void testTerminalJmTtlOnFinished() throws Throwable {
+        testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FINISHED"));
+    }
+
+    @Test
+    public void testTerminalJmTtlOnFailed() throws Throwable {
+        testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FAILED"));
+    }
+
+    public void testTerminalJmTtl(ThrowingConsumer<FlinkDeployment> deploymentSetup)
+            throws Throwable {
+        var deployment = TestUtils.buildApplicationCluster();
         getJobSpec(deployment).setUpgradeMode(UpgradeMode.SAVEPOINT);
         reconciler.reconcile(deployment, context);
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
-
-        getJobSpec(deployment).setState(JobState.SUSPENDED);
-        reconciler.reconcile(deployment, context);
+        deploymentSetup.accept(deployment);
         var status = deployment.getStatus();
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.FINISHED.toString(),
-                status.getJobStatus().getState());
         assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
 
         deployment
@@ -976,6 +991,9 @@
                 .setClock(Clock.fixed(now.plus(Duration.ofMinutes(6)), ZoneId.systemDefault()));
         reconciler.reconcile(deployment, context);
         assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
+        // Make sure we don't resubmit
+        reconciler.reconcile(deployment, context);
+        assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
     }
 
     @ParameterizedTest