[FLINK-38033] Fix accidental upgrade snapshot dispose bug
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 023396b..9e79982 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -409,25 +409,30 @@
                         conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
                                 .name());
 
-        FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
-                conf,
-                ctx.getOperatorConfig(),
-                ctx.getKubernetesClient(),
-                ctx.getResource(),
-                savepointFormatType,
-                savepointLocation);
+        var snapshotCrOpt =
+                FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+                        conf,
+                        ctx.getOperatorConfig(),
+                        ctx.getKubernetesClient(),
+                        ctx.getResource(),
+                        savepointFormatType,
+                        savepointLocation);
         var jobStatus = ctx.getResource().getStatus().getJobStatus();
         jobStatus.setUpgradeSavepointPath(savepointLocation);
 
-        // Register created savepoint in the now deprecated savepoint info and history
-        var savepoint =
-                new Savepoint(
-                        cancelTs.toEpochMilli(),
-                        savepointLocation,
-                        SnapshotTriggerType.UPGRADE,
-                        savepointFormatType,
-                        null);
-        jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
+        if (snapshotCrOpt.isEmpty()) {
+            // Register created savepoint in the now deprecated savepoint info and history
+            // only if snapshot CR was not created, otherwise it would be double recorded
+            // and disposed immediately
+            var savepoint =
+                    new Savepoint(
+                            cancelTs.toEpochMilli(),
+                            savepointLocation,
+                            SnapshotTriggerType.UPGRADE,
+                            savepointFormatType,
+                            null);
+            jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
+        }
     }
 
     /**
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b31361b..3d8ff29 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -90,6 +90,7 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.ThrowingConsumer;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -112,6 +113,7 @@
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 
 import static org.apache.flink.api.common.JobStatus.FINISHED;
 import static org.apache.flink.api.common.JobStatus.RECONCILING;
@@ -137,6 +139,7 @@
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
  * @link JobStatusObserver unit tests
@@ -235,9 +238,12 @@
     }
 
     @ParameterizedTest
-    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
-    public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
+    @MethodSource("upgradeArgs")
+    public void testUpgrade(FlinkVersion flinkVersion, boolean snapshotResource) throws Exception {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
+        conf.set(SNAPSHOT_RESOURCE_ENABLED, snapshotResource);
+        configManager.updateDefaultConfig(conf);
+        operatorConfig = configManager.getOperatorConfiguration();
 
         reconciler.reconcile(deployment, context);
         var runningJobs = flinkService.listJobs();
@@ -305,26 +311,35 @@
         assertEquals(0, flinkService.getRunningCount());
 
         var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo();
-        assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
-        assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
-        assertEquals(
-                spInfo.getLastSavepoint(),
-                new LinkedList<>(spInfo.getSavepointHistory()).getLast());
+        if (snapshotResource) {
+            assertNull(spInfo.getLastSavepoint());
+        } else {
+            assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
+            assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
+            assertEquals(
+                    spInfo.getLastSavepoint(),
+                    new LinkedList<>(spInfo.getSavepointHistory()).getLast());
+        }
 
         reconciler.reconcile(statefulUpgrade, context);
 
         runningJobs = flinkService.listJobs();
         assertEquals(1, flinkService.getRunningCount());
         var snapshots = TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
-        assertThat(snapshots).isNotEmpty();
-        assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0");
-        assertEquals(
-                SnapshotTriggerType.UPGRADE.name(),
-                snapshots
-                        .get(0)
-                        .getMetadata()
-                        .getLabels()
-                        .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
+        if (snapshotResource) {
+            assertThat(snapshots).isNotEmpty();
+            assertThat(snapshots.get(0).getSpec().getSavepoint().getPath())
+                    .isEqualTo("savepoint_0");
+            assertEquals(
+                    SnapshotTriggerType.UPGRADE.name(),
+                    snapshots
+                            .get(0)
+                            .getMetadata()
+                            .getLabels()
+                            .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
+        } else {
+            assertThat(snapshots).isEmpty();
+        }
 
         // Make sure jobId rotated on savepoint
         verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
@@ -370,6 +385,13 @@
         verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
     }
 
+    private static Stream<Arguments> upgradeArgs() {
+        return Stream.of(
+                arguments(FlinkVersion.v1_16, true),
+                arguments(FlinkVersion.v1_20, true),
+                arguments(FlinkVersion.v1_20, false));
+    }
+
     private void verifyJobId(
             FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) {
         // jobId set by operator