[FLINK-30406] Detect when jobmanager never started

diff --git a/e2e-tests/test_application_kubernetes_ha.sh b/e2e-tests/test_application_kubernetes_ha.sh
index 3cafd0a..3c1a4d8 100755
--- a/e2e-tests/test_application_kubernetes_ha.sh
+++ b/e2e-tests/test_application_kubernetes_ha.sh
@@ -47,7 +47,7 @@
 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
 
-check_operator_log_for_errors '|grep -v "REST service in session cluster is bad now"' || exit 1
+check_operator_log_for_errors '|grep -v "REST service in session cluster timed out"' || exit 1
 
 echo "Successfully run the Flink Kubernetes application HA test"
 
diff --git a/e2e-tests/test_sessionjob_kubernetes_ha.sh b/e2e-tests/test_sessionjob_kubernetes_ha.sh
index b768d43..37c8c37 100755
--- a/e2e-tests/test_sessionjob_kubernetes_ha.sh
+++ b/e2e-tests/test_sessionjob_kubernetes_ha.sh
@@ -48,7 +48,7 @@
 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
 
-check_operator_log_for_errors '|grep -v "REST service in session cluster is bad now"' || exit 1
+check_operator_log_for_errors '|grep -v "REST service in session cluster timed out"' || exit 1
 
 echo "Successfully run the Flink Session Job HA test"
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 41279e1..df4b61c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -51,7 +51,7 @@
                 rs.markReconciledSpecAsStable();
             }
         } catch (Exception e) {
-            logger.error("REST service in session cluster is bad now", e);
+            logger.error("REST service in session cluster timed out", e);
             if (e instanceof TimeoutException) {
                 // check for problems with the underlying deployment
                 observeJmDeployment(deployment, context, observerContext.getDeployedConfig());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 94d154a..411b58f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -27,9 +27,12 @@
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -99,7 +102,7 @@
     }
 
     @Override
-    public final void reconcile(CR cr, Context<?> ctx) throws Exception {
+    public void reconcile(CR cr, Context<?> ctx) throws Exception {
         var spec = cr.getSpec();
         var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
         var status = cr.getStatus();
@@ -115,12 +118,7 @@
         // No further logic is required at this point.
         if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
-
-            // Before we try to submit the job we record the current spec in the status so we can
-            // handle subsequent deployment and status update errors
-            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
-            statusRecorder.patchAndCacheStatus(cr);
-
+            updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status);
             deploy(
                     cr,
                     spec,
@@ -189,6 +187,36 @@
     }
 
     /**
+     * Update the status before the first deployment. We have to record the upgrade mode based on
+     * the initial savepoint path provided, and record the to-be-deployed spec in the status.
+     *
+     * @param cr Related flink resource
+     * @param spec Spec to be deployed
+     * @param deployConfig Deploy configuration
+     * @param status Resource status
+     */
+    private void updateStatusBeforeFirstDeployment(
+            CR cr, SPEC spec, Configuration deployConfig, STATUS status) {
+        if (spec.getJob() != null) {
+            var initialUpgradeMode = UpgradeMode.STATELESS;
+            var initialSp = spec.getJob().getInitialSavepointPath();
+
+            if (initialSp != null) {
+                status.getJobStatus()
+                        .getSavepointInfo()
+                        .setLastSavepoint(Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+                initialUpgradeMode = UpgradeMode.SAVEPOINT;
+            }
+
+            spec.getJob().setUpgradeMode(initialUpgradeMode);
+        }
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+        // Before we try to submit the job we record the current spec in the status so we can
+        // handle subsequent deployment and status update errors
+        statusRecorder.patchAndCacheStatus(cr);
+    }
+
+    /**
      * Get Flink configuration object for deploying the given spec using {@link #deploy}.
      *
      * @param meta ObjectMeta of the related resource.
@@ -260,7 +288,7 @@
             CR cr, Context<?> context, Configuration observeConfig) throws Exception;
 
     @Override
-    public final DeleteControl cleanup(CR resource, Context<?> context) {
+    public DeleteControl cleanup(CR resource, Context<?> context) {
         resourceScaler.cleanup(resource);
         return cleanupInternal(resource, context);
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 33b2841..5140f14 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -102,7 +102,7 @@
                 LOG.info("Upgrading/Restarting running job, suspending first...");
             }
             Optional<UpgradeMode> availableUpgradeMode =
-                    getAvailableUpgradeMode(resource, deployConfig, observeConfig);
+                    getAvailableUpgradeMode(resource, ctx, deployConfig, observeConfig);
             if (availableUpgradeMode.isEmpty()) {
                 return;
             }
@@ -122,7 +122,14 @@
                 ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
             }
         }
+
         if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+            // We inherit the upgrade mode unless stateless upgrade requested
+            if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+                currentDeploySpec
+                        .getJob()
+                        .setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
+            }
             // We record the target spec into an upgrading state before deploying
             ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
             statusRecorder.patchAndCacheStatus(resource);
@@ -141,7 +148,7 @@
     }
 
     protected Optional<UpgradeMode> getAvailableUpgradeMode(
-            CR resource, Configuration deployConfig, Configuration observeConfig) {
+            CR resource, Context<?> ctx, Configuration deployConfig, Configuration observeConfig) {
         var status = resource.getStatus();
         var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
@@ -150,7 +157,9 @@
             return Optional.of(UpgradeMode.STATELESS);
         }
 
-        if (ReconciliationUtils.isJobInTerminalState(status)) {
+        var flinkService = getFlinkService(resource, ctx);
+        if (ReconciliationUtils.isJobInTerminalState(status)
+                && !flinkService.isHaMetadataAvailable(observeConfig)) {
             LOG.info(
                     "Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
             return Optional.of(UpgradeMode.SAVEPOINT);
@@ -203,6 +212,7 @@
             throws Exception {
         var reconciliationStatus = resource.getStatus().getReconciliationStatus();
         var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
+        rollbackSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
 
         UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
@@ -249,6 +259,7 @@
             throws Exception {
         LOG.info("Resubmitting Flink job...");
         SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+        specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         restoreJob(
                 deployment,
                 specToRecover,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 54463dc..918ec0e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
@@ -94,11 +95,14 @@
 
     @Override
     protected Optional<UpgradeMode> getAvailableUpgradeMode(
-            FlinkDeployment deployment, Configuration deployConfig, Configuration observeConfig) {
+            FlinkDeployment deployment,
+            Context<?> ctx,
+            Configuration deployConfig,
+            Configuration observeConfig) {
 
         var status = deployment.getStatus();
         var availableUpgradeMode =
-                super.getAvailableUpgradeMode(deployment, deployConfig, observeConfig);
+                super.getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
 
         if (availableUpgradeMode.isPresent()) {
             return availableUpgradeMode;
@@ -112,20 +116,27 @@
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
 
-            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-                if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
-                    // initial deployment failure, reset to allow for spec change to proceed
-                    return resetOnMissingStableSpec(deployment, deployConfig);
-                }
-            } else {
+            if (flinkService.isHaMetadataAvailable(deployConfig)) {
                 LOG.info(
                         "Job is not running but HA metadata is available for last state restore, ready for upgrade");
                 return Optional.of(UpgradeMode.LAST_STATE);
             }
         }
 
-        if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
-                || status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
+        var jmDeployStatus = status.getJobManagerDeploymentStatus();
+        if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
+                && status.getReconciliationStatus()
+                                .deserializeLastReconciledSpec()
+                                .getJob()
+                                .getUpgradeMode()
+                        != UpgradeMode.LAST_STATE
+                && FlinkUtils.jmPodNeverStarted(ctx)) {
+            deleteJmThatNeverStarted(deployment, deployConfig);
+            return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
+        }
+
+        if (jmDeployStatus == JobManagerDeploymentStatus.MISSING
+                || jmDeployStatus == JobManagerDeploymentStatus.ERROR) {
             throw new RecoveryFailureException(
                     "JobManager deployment is missing and HA data is not available to make stateful upgrades. "
                             + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
@@ -138,21 +149,12 @@
         return Optional.empty();
     }
 
-    private Optional<UpgradeMode> resetOnMissingStableSpec(
-            FlinkDeployment deployment, Configuration deployConfig) {
-        // initial deployment failure, reset to allow for spec change to proceed
+    private void deleteJmThatNeverStarted(FlinkDeployment deployment, Configuration deployConfig) {
+        deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
         flinkService.deleteClusterDeployment(
                 deployment.getMetadata(), deployment.getStatus(), false);
         flinkService.waitForClusterShutdown(deployConfig);
-        if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-            LOG.info("Job never entered stable state. Resetting status for initial deploy");
-            ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
-            return Optional.empty();
-        } else {
-            // proceed with upgrade if deployment succeeded between check and delete
-            LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
-            return Optional.of(UpgradeMode.LAST_STATE);
-        }
+        LOG.info("Deleted jobmanager deployment that never started.");
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 221d055..572845c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -38,7 +38,10 @@
 import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +49,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
@@ -221,4 +225,35 @@
         return new JobID(
                 Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation));
     }
+
+    /**
+     * Check if the jobmanager pod has never successfully started. This is an important check to
+     * determine whether it is possible that the job has started and taken any checkpoints that we
+     * are unaware of.
+     *
+     * <p>The way we check this is by using the availability condition transition timestamp. If the
+     * deployment never transitioned out of the unavailable state, we can assume that the JM never
+     * started.
+     *
+     * @param context Resource context
+     * @return True only if we are sure that the jobmanager pod never started
+     */
+    public static boolean jmPodNeverStarted(Context<?> context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        if (depOpt.isPresent()) {
+            Deployment deployment = depOpt.get();
+            for (DeploymentCondition condition : deployment.getStatus().getConditions()) {
+                if (condition.getType().equals("Available")) {
+                    var createTs = deployment.getMetadata().getCreationTimestamp();
+                    if ("False".equals(condition.getStatus())
+                            && createTs.equals(condition.getLastTransitionTime())) {
+                        return true;
+                    }
+                }
+            }
+        }
+
+        // If unsure, return false to be on the safe side
+        return false;
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index aac61d8..1d99f25 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -57,6 +57,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -94,15 +95,27 @@
     }
 
     public static Deployment createDeployment(boolean ready) {
-        DeploymentStatus status = new DeploymentStatus();
+        String nowTs = Instant.now().toString();
+        var status = new DeploymentStatus();
         status.setAvailableReplicas(ready ? 1 : 0);
         status.setReplicas(1);
+        var availableCondition = new DeploymentCondition();
+        availableCondition.setType("Available");
+        availableCondition.setStatus(ready ? "True" : "False");
+        availableCondition.setLastTransitionTime(nowTs);
+        status.setConditions(List.of(availableCondition));
+
         DeploymentSpec spec = new DeploymentSpec();
         spec.setReplicas(1);
+
+        var meta = new ObjectMeta();
+        meta.setCreationTimestamp(nowTs);
+
         Deployment deployment = new Deployment();
-        deployment.setMetadata(new ObjectMeta());
+        deployment.setMetadata(meta);
         deployment.setSpec(spec);
         deployment.setStatus(status);
+
         return deployment;
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
new file mode 100644
index 0000000..b624bf3
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+
+/** Testing wrapper for {@link ApplicationReconciler}. */
+public class TestingApplicationReconciler extends ApplicationReconciler {
+    public TestingApplicationReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder,
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(
+                kubernetesClient,
+                flinkService,
+                configManager,
+                eventRecorder,
+                statusRecorder,
+                operatorMetricGroup);
+    }
+
+    @Override
+    public void reconcile(FlinkDeployment flinkDeployment, Context<?> context) throws Exception {
+        var cr = ReconciliationUtils.clone(flinkDeployment);
+        cr.setStatus(flinkDeployment.getStatus());
+        super.reconcile(cr, context);
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkDeployment flinkDeployment, Context<?> context) {
+        var cr = ReconciliationUtils.clone(flinkDeployment);
+        cr.setStatus(flinkDeployment.getStatus());
+        return super.cleanup(cr, context);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 22efa1f..72aa6c8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -96,6 +96,7 @@
     private final Set<String> sessions = new HashSet<>();
     private boolean isPortReady = true;
     private boolean haDataAvailable = true;
+    private boolean jobManagerReady = true;
     private boolean deployFailure = false;
     private Runnable sessionJobSubmittedCallback;
     private PodList podList = new PodList();
@@ -123,7 +124,7 @@
                 if (jobs.isEmpty() && sessions.isEmpty()) {
                     return Optional.empty();
                 }
-                return (Optional<T>) Optional.of(TestUtils.createDeployment(true));
+                return (Optional<T>) Optional.of(TestUtils.createDeployment(jobManagerReady));
             }
         };
     }
@@ -194,6 +195,10 @@
         this.haDataAvailable = haDataAvailable;
     }
 
+    public void setJobManagerReady(boolean jmReady) {
+        this.jobManagerReady = jmReady;
+    }
+
     public void setDeployFailure(boolean deployFailure) {
         this.deployFailure = deployFailure;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 99a6ca3..f34d804 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -96,6 +96,9 @@
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+
+        // We started without savepoint
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         assertEquals(
                 appCluster.getSpec(),
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 9772ab0..b0b09a3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -767,8 +767,12 @@
     private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception {
         flinkService.clear();
         testController.reconcile(appCluster, context);
+        var specClone = ReconciliationUtils.clone(appCluster.getSpec());
+        if (specClone.getJob() != null) {
+            specClone.getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        }
         assertEquals(
-                appCluster.getSpec(),
+                specClone,
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
 
         flinkService.setPortReady(false);
@@ -786,8 +790,15 @@
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        var expectedSpec = ReconciliationUtils.clone(appCluster.getSpec());
+        if (expectedSpec.getJob() != null
+                && expectedSpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+            expectedSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        }
+
         assertEquals(
-                appCluster.getSpec(),
+                expectedSpec,
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
 
         flinkService.setPortReady(true);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 07dae7d..a4f6754 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -100,9 +100,6 @@
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                appCluster.getSpec(),
-                appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
     }
 
     private static Stream<Arguments> applicationTestParams() {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index a4f4acb..813c490 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -94,6 +95,7 @@
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
     private TestingFlinkService flinkService;
     private ApplicationReconciler reconciler;
+
     private Context<FlinkDeployment> context;
     private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
 
@@ -107,7 +109,7 @@
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         reconciler =
-                new ApplicationReconciler(
+                new TestingApplicationReconciler(
                         kubernetesClient,
                         flinkService,
                         configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 0e8549a..675055b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -47,10 +48,14 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -75,7 +80,7 @@
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         reconciler =
-                new ApplicationReconciler(
+                new TestingApplicationReconciler(
                         kubernetesClient,
                         flinkService,
                         configManager,
@@ -260,36 +265,213 @@
     }
 
     @ParameterizedTest
-    @EnumSource(UpgradeMode.class)
-    public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+    @MethodSource("testUpgradeJmDeployCannotStartParams")
+    public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+            throws Exception {
+
+        flinkService.setHaDataAvailable(true);
+        flinkService.setJobManagerReady(true);
+
+        // Prepare running deployment
+        var deployment = TestUtils.buildApplicationCluster();
+        var jobSpec = deployment.getSpec().getJob();
+        jobSpec.setUpgradeMode(fromMode);
+
+        reconciler.reconcile(deployment, context);
+        var runningJobs = flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+        // Suspend running deployment and assert that correct upgradeMode is set
+        jobSpec.setState(JobState.SUSPENDED);
+        reconciler.reconcile(deployment, context);
+
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+        assertEquals(fromMode, lastReconciledSpec.getJob().getUpgradeMode());
+
+        // Restore deployment and assert that correct upgradeMode is set
+        jobSpec.setState(JobState.RUNNING);
+        jobSpec.setUpgradeMode(toMode);
+        reconciler.reconcile(deployment, context);
+
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+        assertEquals(
+                toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : fromMode,
+                lastReconciledSpec.getJob().getUpgradeMode());
+
+        // Simulate JM failure after deployment, we need this to test the actual upgrade behaviour
+        // with a jobmanager that never started
+        flinkService.setJobManagerReady(false);
         flinkService.setHaDataAvailable(false);
 
-        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+        // Send in a new upgrade while the jobmanager still not started
+        jobSpec.setState(JobState.RUNNING);
+        jobSpec.setEntryClass("newClass");
+        reconciler.reconcile(deployment, context);
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // Make sure the upgrade was executed as long as we have the savepoint information
+        if (fromMode == UpgradeMode.LAST_STATE && toMode != UpgradeMode.STATELESS) {
+            // We cant make progress as no HA meta available after LAST_STATE, upgrade. It means the
+            // job started and terminated, but we didn't see...
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYING,
+                    deployment.getStatus().getJobManagerDeploymentStatus());
+            assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+        } else {
+            assertEquals(
+                    JobManagerDeploymentStatus.MISSING,
+                    deployment.getStatus().getJobManagerDeploymentStatus());
+            assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+            assertEquals(
+                    toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT,
+                    lastReconciledSpec.getJob().getUpgradeMode());
+
+            // Complete upgrade and recover succesfully with the latest savepoint
+            reconciler.reconcile(deployment, context);
+            lastReconciledSpec =
+                    deployment
+                            .getStatus()
+                            .getReconciliationStatus()
+                            .deserializeLastReconciledSpec();
+
+            assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+            assertEquals(1, flinkService.listJobs().size());
+            if (fromMode == UpgradeMode.STATELESS || toMode == UpgradeMode.STATELESS) {
+                assertNull(flinkService.listJobs().get(0).f0);
+            } else {
+                assertEquals("savepoint_0", flinkService.listJobs().get(0).f0);
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("testInitialJmDeployCannotStartParams")
+    public void testInitialJmDeployCannotStart(UpgradeMode upgradeMode, boolean initSavepoint)
+            throws Exception {
+
+        // We simulate JM failure to test the initial submission/upgrade behavior when the JM can
+        // never start initially
+        flinkService.setHaDataAvailable(false);
+        flinkService.setJobManagerReady(false);
+
+        var deployment = TestUtils.buildApplicationCluster();
+        if (initSavepoint) {
+            deployment.getSpec().getJob().setInitialSavepointPath("init-sp");
+        }
 
         reconciler.reconcile(deployment, context);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        // Ready for spec changes, the reconciliation should be performed
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // Make sure savepoint path is recorded in status and upgradeMode set correctly for initial
+        // startup. Either stateless or savepoint depending only on the initialSavepointPath
+        // setting.
+        if (initSavepoint) {
+            assertEquals("init-sp", flinkService.listJobs().get(0).f0);
+            assertEquals(
+                    "init-sp",
+                    deployment
+                            .getStatus()
+                            .getJobStatus()
+                            .getSavepointInfo()
+                            .getLastSavepoint()
+                            .getLocation());
+            assertEquals(UpgradeMode.SAVEPOINT, lastReconciledSpec.getJob().getUpgradeMode());
+        } else {
+            assertNull(flinkService.listJobs().get(0).f0);
+            assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+            assertEquals(UpgradeMode.STATELESS, lastReconciledSpec.getJob().getUpgradeMode());
+        }
+
+        // JM is failed, but we submit an upgrade, this should always be possible on initial deploy
+        // failure
         final String newImage = "new-image-1";
         deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
         deployment.getSpec().setImage(newImage);
         reconciler.reconcile(deployment, context);
-        if (!UpgradeMode.STATELESS.equals(upgradeMode)) {
-            assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
-            assertEquals(
-                    ReconciliationState.UPGRADING,
-                    deployment.getStatus().getReconciliationStatus().getState());
-            reconciler.reconcile(deployment, context);
-        }
         assertEquals(
-                newImage,
-                deployment
-                        .getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpec()
-                        .getImage());
+                ReconciliationState.UPGRADING,
+                deployment.getStatus().getReconciliationStatus().getState());
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // We make sure that stateless upgrade request is respected (drop state)
+        assertEquals(
+                upgradeMode == UpgradeMode.STATELESS
+                        ? UpgradeMode.STATELESS
+                        : UpgradeMode.SAVEPOINT,
+                lastReconciledSpec.getJob().getUpgradeMode());
+
+        reconciler.reconcile(deployment, context);
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(newImage, lastReconciledSpec.getImage());
+        assertEquals(
+                upgradeMode == UpgradeMode.STATELESS
+                        ? UpgradeMode.STATELESS
+                        : UpgradeMode.SAVEPOINT,
+                lastReconciledSpec.getJob().getUpgradeMode());
+        assertEquals(1, flinkService.listJobs().size());
+        assertEquals(
+                initSavepoint && upgradeMode != UpgradeMode.STATELESS ? "init-sp" : null,
+                flinkService.listJobs().get(0).f0);
+    }
+
+    private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
+        return Stream.of(
+                Arguments.of(UpgradeMode.LAST_STATE, true),
+                Arguments.of(UpgradeMode.LAST_STATE, false),
+                Arguments.of(UpgradeMode.SAVEPOINT, true),
+                Arguments.of(UpgradeMode.SAVEPOINT, false),
+                Arguments.of(UpgradeMode.STATELESS, true),
+                Arguments.of(UpgradeMode.STATELESS, false));
+    }
+
+    private static Stream<Arguments> testUpgradeJmDeployCannotStartParams() {
+        var args = new ArrayList<Arguments>();
+        for (UpgradeMode from : UpgradeMode.values()) {
+            for (UpgradeMode to : UpgradeMode.values()) {
+                args.add(Arguments.of(from, to));
+            }
+        }
+        return args.stream();
+    }
+
+    @Test
+    public void testLastStateOnDeletedDeployment() throws Exception {
+        // Bootstrap running deployment
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        // Delete cluster and keep HA metadata
+        flinkService.deleteClusterDeployment(
+                deployment.getMetadata(), deployment.getStatus(), false);
+        flinkService.setHaDataAvailable(true);
+
+        // Submit upgrade
+        deployment.getSpec().setRestartNonce(123L);
+        reconciler.reconcile(deployment, context);
+
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // Make sure we correctly record upgrade mode to last state
+        assertEquals(UpgradeMode.LAST_STATE, lastReconciledSpec.getJob().getUpgradeMode());
+        assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index ca7d0c3..136e931 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -30,16 +30,22 @@
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
 import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Test;
 
 import java.net.HttpURLConnection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -150,6 +156,43 @@
     }
 
     @Test
+    public void testJmNeverStartedDetection() {
+        var jmDeployment = new Deployment();
+        jmDeployment.setMetadata(new ObjectMeta());
+        jmDeployment.getMetadata().setCreationTimestamp("create-ts");
+        jmDeployment.setStatus(new DeploymentStatus());
+        var deployStatus = jmDeployment.getStatus();
+        var jmNeverStartedCondition =
+                new DeploymentCondition("create-ts", null, null, null, "False", "Available");
+        var jmStartedButStopped =
+                new DeploymentCondition("other-ts", null, null, null, "False", "Available");
+        var jmAvailable =
+                new DeploymentCondition("other-ts", null, null, null, "True", "Available");
+
+        var context =
+                new TestUtils.TestingContext<Deployment>() {
+                    @Override
+                    public <R> Optional<R> getSecondaryResource(Class<R> aClass, String name) {
+                        return (Optional<R>) Optional.of(jmDeployment);
+                    }
+                };
+
+        deployStatus.setConditions(Collections.emptyList());
+        assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+        deployStatus.setConditions(List.of(jmNeverStartedCondition));
+        assertTrue(FlinkUtils.jmPodNeverStarted(context));
+
+        deployStatus.setConditions(List.of(jmStartedButStopped));
+        assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+        deployStatus.setConditions(List.of(jmAvailable));
+        assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+        assertFalse(FlinkUtils.jmPodNeverStarted(TestUtils.createEmptyContext()));
+    }
+
+    @Test
     public void testDeleteJobGraphInKubernetesHAShouldNotUpdateWithEmptyConfigMap() {
         final String name = "empty-ha-configmap";
         final String clusterId = "cluster-id-2";