[FLINK-36037][snapshot] Add backwards compatibility for job upgrades
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 565811f..8379417 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -293,6 +293,16 @@
FlinkStateSnapshotUtils
.getValidatedFlinkStateSnapshotPath(
ctx.getKubernetesClient(), ref));
+ if (savepointOpt.isEmpty()) {
+ savepointOpt =
+ Optional.ofNullable(
+ ctx.getResource()
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint())
+ .flatMap(s -> Optional.ofNullable(s.getLocation()));
+ }
}
deploy(ctx, spec, deployConfig, savepointOpt, requireHaMetadata);
@@ -445,13 +455,17 @@
throws Exception {
LOG.info("Resubmitting Flink job...");
SPEC specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource());
- var lastSavepoint =
- Optional.ofNullable(
- ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference());
+
+ var upgradeSnapshotRef =
+ ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference();
+ var savepointLegacy =
+ ctx.getResource().getStatus().getJobStatus().getSavepointInfo().getLastSavepoint();
+ var lastSavepointKnown = upgradeSnapshotRef != null || savepointLegacy != null;
+
if (requireHaMetadata) {
specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
} else if (ctx.getResource().getSpec().getJob().getUpgradeMode() != UpgradeMode.STATELESS
- && lastSavepoint.isPresent()) {
+ && lastSavepointKnown) {
specToRecover.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
}
restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index 6e4581e..7e730f6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -370,15 +370,23 @@
public static boolean lastSavepointKnown(CommonStatus<?> status) {
var lastSavepoint = status.getJobStatus().getUpgradeSnapshotReference();
- if (lastSavepoint == null) {
- return true;
+ if (lastSavepoint != null) {
+ if (StringUtils.isNotBlank(lastSavepoint.getName())) {
+ return true;
+ }
+
+ var location = lastSavepoint.getPath();
+ return location != null
+ && !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
}
- if (StringUtils.isNotBlank(lastSavepoint.getName())) {
+ // Check legacy savepoint field too
+ var lastSavepointLegacy = status.getJobStatus().getSavepointInfo().getLastSavepoint();
+ if (lastSavepointLegacy == null) {
return true;
}
-
- var location = lastSavepoint.getPath();
- return location != null && !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
+ return !lastSavepointLegacy
+ .getLocation()
+ .equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 5e922ff..e411765 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -37,6 +37,7 @@
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -283,6 +284,56 @@
assertEquals("finished_sp", runningJobs.get(0).f0);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testUpgradeUsesLatestSnapshot(boolean useLegacyFields) throws Exception {
+ var savepointPath = "finished_sp";
+ var deployment = buildApplicationCluster(FlinkVersion.v1_19, UpgradeMode.SAVEPOINT);
+
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ deployment.getSpec().setRestartNonce(100L);
+ flinkService.clear();
+
+ if (useLegacyFields) {
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .updateLastSavepoint(
+ new Savepoint(
+ 0L,
+ savepointPath,
+ SnapshotTriggerType.UPGRADE,
+ SavepointFormatType.CANONICAL,
+ 0L));
+ } else {
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setUpgradeSnapshotReference(
+ FlinkStateSnapshotReference.fromPath(savepointPath));
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .updateLastSavepoint(
+ new Savepoint(
+ 0L,
+ "wrong_sp",
+ SnapshotTriggerType.UPGRADE,
+ SavepointFormatType.CANONICAL,
+ 0L));
+ }
+
+ deployment.getStatus().getJobStatus().setState("FINISHED");
+ reconciler.reconcile(deployment, context);
+ reconciler.reconcile(deployment, context);
+
+ assertEquals(1, flinkService.getRunningCount());
+ assertEquals(savepointPath, flinkService.listJobs().get(0).f0);
+ }
+
private FlinkDeployment cloneDeploymentWithUpgradeMode(
FlinkDeployment deployment, UpgradeMode upgradeMode) {
FlinkDeployment result = ReconciliationUtils.clone(deployment);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 1c7e6b1..bfdbe5f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -22,11 +22,15 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.logging.log4j.core.util.CronExpression;
import org.junit.jupiter.api.Test;
@@ -277,6 +281,36 @@
assertTrue(shouldTrigger);
}
+ @Test
+ public void testLastSavepointKnown() {
+ var status = new FlinkDeploymentStatus();
+
+ assertTrue(SnapshotUtils.lastSavepointKnown(status));
+
+ var sp = new Savepoint();
+ sp.setLocation("sp1");
+ status.getJobStatus().getSavepointInfo().setLastSavepoint(sp);
+ assertTrue(SnapshotUtils.lastSavepointKnown(status));
+
+ sp.setLocation(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
+ assertFalse(SnapshotUtils.lastSavepointKnown(status));
+
+ status.getJobStatus()
+ .setUpgradeSnapshotReference(FlinkStateSnapshotReference.fromPath("sp1"));
+ assertTrue(SnapshotUtils.lastSavepointKnown(status));
+
+ status.getJobStatus()
+ .setUpgradeSnapshotReference(
+ FlinkStateSnapshotReference.fromPath(
+ AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH));
+ assertFalse(SnapshotUtils.lastSavepointKnown(status));
+
+ status.getJobStatus()
+ .setUpgradeSnapshotReference(
+ new FlinkStateSnapshotReference("namespace", "name", null));
+ assertTrue(SnapshotUtils.lastSavepointKnown(status));
+ }
+
private static void resetTrigger(FlinkDeployment deployment, SnapshotType snapshotType) {
switch (snapshotType) {
case SAVEPOINT: