[FLINK-36037][snapshot] Add backwards compatibility for job upgrades
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 565811f..8379417 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -293,6 +293,16 @@
                                             FlinkStateSnapshotUtils
                                                     .getValidatedFlinkStateSnapshotPath(
                                                             ctx.getKubernetesClient(), ref));
+            if (savepointOpt.isEmpty()) {
+                savepointOpt =
+                        Optional.ofNullable(
+                                        ctx.getResource()
+                                                .getStatus()
+                                                .getJobStatus()
+                                                .getSavepointInfo()
+                                                .getLastSavepoint())
+                                .flatMap(s -> Optional.ofNullable(s.getLocation()));
+            }
         }
 
         deploy(ctx, spec, deployConfig, savepointOpt, requireHaMetadata);
@@ -445,13 +455,17 @@
             throws Exception {
         LOG.info("Resubmitting Flink job...");
         SPEC specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource());
-        var lastSavepoint =
-                Optional.ofNullable(
-                        ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference());
+
+        var upgradeSnapshotRef =
+                ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference();
+        var savepointLegacy =
+                ctx.getResource().getStatus().getJobStatus().getSavepointInfo().getLastSavepoint();
+        var lastSavepointKnown = upgradeSnapshotRef != null || savepointLegacy != null;
+
         if (requireHaMetadata) {
             specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         } else if (ctx.getResource().getSpec().getJob().getUpgradeMode() != UpgradeMode.STATELESS
-                && lastSavepoint.isPresent()) {
+                && lastSavepointKnown) {
             specToRecover.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         }
         restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index 6e4581e..7e730f6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -370,15 +370,23 @@
     public static boolean lastSavepointKnown(CommonStatus<?> status) {
         var lastSavepoint = status.getJobStatus().getUpgradeSnapshotReference();
 
-        if (lastSavepoint == null) {
-            return true;
+        if (lastSavepoint != null) {
+            if (StringUtils.isNotBlank(lastSavepoint.getName())) {
+                return true;
+            }
+
+            var location = lastSavepoint.getPath();
+            return location != null
+                    && !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
         }
 
-        if (StringUtils.isNotBlank(lastSavepoint.getName())) {
+        // Check legacy savepoint field too
+        var lastSavepointLegacy = status.getJobStatus().getSavepointInfo().getLastSavepoint();
+        if (lastSavepointLegacy == null) {
             return true;
         }
-
-        var location = lastSavepoint.getPath();
-        return location != null && !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
+        return !lastSavepointLegacy
+                .getLocation()
+                .equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 5e922ff..e411765 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -37,6 +37,7 @@
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -283,6 +284,56 @@
         assertEquals("finished_sp", runningJobs.get(0).f0);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testUpgradeUsesLatestSnapshot(boolean useLegacyFields) throws Exception {
+        var savepointPath = "finished_sp";
+        var deployment = buildApplicationCluster(FlinkVersion.v1_19, UpgradeMode.SAVEPOINT);
+
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        deployment.getSpec().setRestartNonce(100L);
+        flinkService.clear();
+
+        if (useLegacyFields) {
+            deployment
+                    .getStatus()
+                    .getJobStatus()
+                    .getSavepointInfo()
+                    .updateLastSavepoint(
+                            new Savepoint(
+                                    0L,
+                                    savepointPath,
+                                    SnapshotTriggerType.UPGRADE,
+                                    SavepointFormatType.CANONICAL,
+                                    0L));
+        } else {
+            deployment
+                    .getStatus()
+                    .getJobStatus()
+                    .setUpgradeSnapshotReference(
+                            FlinkStateSnapshotReference.fromPath(savepointPath));
+            deployment
+                    .getStatus()
+                    .getJobStatus()
+                    .getSavepointInfo()
+                    .updateLastSavepoint(
+                            new Savepoint(
+                                    0L,
+                                    "wrong_sp",
+                                    SnapshotTriggerType.UPGRADE,
+                                    SavepointFormatType.CANONICAL,
+                                    0L));
+        }
+
+        deployment.getStatus().getJobStatus().setState("FINISHED");
+        reconciler.reconcile(deployment, context);
+        reconciler.reconcile(deployment, context);
+
+        assertEquals(1, flinkService.getRunningCount());
+        assertEquals(savepointPath, flinkService.listJobs().get(0).f0);
+    }
+
     private FlinkDeployment cloneDeploymentWithUpgradeMode(
             FlinkDeployment deployment, UpgradeMode upgradeMode) {
         FlinkDeployment result = ReconciliationUtils.clone(deployment);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 1c7e6b1..bfdbe5f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -22,11 +22,15 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 
 import org.apache.logging.log4j.core.util.CronExpression;
 import org.junit.jupiter.api.Test;
@@ -277,6 +281,36 @@
         assertTrue(shouldTrigger);
     }
 
+    @Test
+    public void testLastSavepointKnown() {
+        var status = new FlinkDeploymentStatus();
+
+        assertTrue(SnapshotUtils.lastSavepointKnown(status));
+
+        var sp = new Savepoint();
+        sp.setLocation("sp1");
+        status.getJobStatus().getSavepointInfo().setLastSavepoint(sp);
+        assertTrue(SnapshotUtils.lastSavepointKnown(status));
+
+        sp.setLocation(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
+        assertFalse(SnapshotUtils.lastSavepointKnown(status));
+
+        status.getJobStatus()
+                .setUpgradeSnapshotReference(FlinkStateSnapshotReference.fromPath("sp1"));
+        assertTrue(SnapshotUtils.lastSavepointKnown(status));
+
+        status.getJobStatus()
+                .setUpgradeSnapshotReference(
+                        FlinkStateSnapshotReference.fromPath(
+                                AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH));
+        assertFalse(SnapshotUtils.lastSavepointKnown(status));
+
+        status.getJobStatus()
+                .setUpgradeSnapshotReference(
+                        new FlinkStateSnapshotReference("namespace", "name", null));
+        assertTrue(SnapshotUtils.lastSavepointKnown(status));
+    }
+
     private static void resetTrigger(FlinkDeployment deployment, SnapshotType snapshotType) {
         switch (snapshotType) {
             case SAVEPOINT: