[FLINK-38033] Fix accidental upgrade snapshot dispose bug
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 023396b..9e79982 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -409,25 +409,30 @@
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
.name());
- FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
- conf,
- ctx.getOperatorConfig(),
- ctx.getKubernetesClient(),
- ctx.getResource(),
- savepointFormatType,
- savepointLocation);
+ var snapshotCrOpt =
+ FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+ conf,
+ ctx.getOperatorConfig(),
+ ctx.getKubernetesClient(),
+ ctx.getResource(),
+ savepointFormatType,
+ savepointLocation);
var jobStatus = ctx.getResource().getStatus().getJobStatus();
jobStatus.setUpgradeSavepointPath(savepointLocation);
- // Register created savepoint in the now deprecated savepoint info and history
- var savepoint =
- new Savepoint(
- cancelTs.toEpochMilli(),
- savepointLocation,
- SnapshotTriggerType.UPGRADE,
- savepointFormatType,
- null);
- jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
+ if (snapshotCrOpt.isEmpty()) {
+ // Register created savepoint in the now deprecated savepoint info and history
+ // only if snapshot CR was not created, otherwise it would be double recorded
+ // and disposed immediately
+ var savepoint =
+ new Savepoint(
+ cancelTs.toEpochMilli(),
+ savepointLocation,
+ SnapshotTriggerType.UPGRADE,
+ savepointFormatType,
+ null);
+ jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
+ }
}
/**
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b31361b..3d8ff29 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -90,6 +90,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -112,6 +113,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.FINISHED;
import static org.apache.flink.api.common.JobStatus.RECONCILING;
@@ -137,6 +139,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* @link JobStatusObserver unit tests
@@ -235,9 +238,12 @@
}
@ParameterizedTest
- @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
- public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
+ @MethodSource("upgradeArgs")
+ public void testUpgrade(FlinkVersion flinkVersion, boolean snapshotResource) throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
+ conf.set(SNAPSHOT_RESOURCE_ENABLED, snapshotResource);
+ configManager.updateDefaultConfig(conf);
+ operatorConfig = configManager.getOperatorConfiguration();
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
@@ -305,26 +311,35 @@
assertEquals(0, flinkService.getRunningCount());
var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo();
- assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
- assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
- assertEquals(
- spInfo.getLastSavepoint(),
- new LinkedList<>(spInfo.getSavepointHistory()).getLast());
+ if (snapshotResource) {
+ assertNull(spInfo.getLastSavepoint());
+ } else {
+ assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
+ assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
+ assertEquals(
+ spInfo.getLastSavepoint(),
+ new LinkedList<>(spInfo.getSavepointHistory()).getLast());
+ }
reconciler.reconcile(statefulUpgrade, context);
runningJobs = flinkService.listJobs();
assertEquals(1, flinkService.getRunningCount());
var snapshots = TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
- assertThat(snapshots).isNotEmpty();
- assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0");
- assertEquals(
- SnapshotTriggerType.UPGRADE.name(),
- snapshots
- .get(0)
- .getMetadata()
- .getLabels()
- .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
+ if (snapshotResource) {
+ assertThat(snapshots).isNotEmpty();
+ assertThat(snapshots.get(0).getSpec().getSavepoint().getPath())
+ .isEqualTo("savepoint_0");
+ assertEquals(
+ SnapshotTriggerType.UPGRADE.name(),
+ snapshots
+ .get(0)
+ .getMetadata()
+ .getLabels()
+ .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
+ } else {
+ assertThat(snapshots).isEmpty();
+ }
// Make sure jobId rotated on savepoint
verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
@@ -370,6 +385,13 @@
verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
}
+ private static Stream<Arguments> upgradeArgs() {
+ return Stream.of(
+ arguments(FlinkVersion.v1_16, true),
+ arguments(FlinkVersion.v1_20, true),
+ arguments(FlinkVersion.v1_20, false));
+ }
+
private void verifyJobId(
FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) {
// jobId set by operator