[FLINK-35108] Do not trigger deployment recovery for finished/failed jobs
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index be99fb1..986e048 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -505,9 +506,16 @@
private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
var deployedJob = ReconciliationUtils.getDeployedSpec(deployment).getJob();
- return (deployedJob == null || deployedJob.getState() == JobState.RUNNING)
- && (deployment.getStatus().getJobManagerDeploymentStatus()
- == JobManagerDeploymentStatus.MISSING);
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ boolean sessionCluster = deployedJob == null;
+ boolean nonTerminalApplication =
+ !sessionCluster
+ && deployedJob.getState() == JobState.RUNNING
+ && !JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
+ boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
+ return jmShouldBeRunning
+ && (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING);
}
protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 394be6e..e80942d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -82,6 +82,7 @@
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.ThrowingConsumer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
@@ -941,18 +942,32 @@
}
@Test
- public void testTerminalJmTtl() throws Exception {
- FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ public void testTerminalJmTtlOnSuspend() throws Throwable {
+ testTerminalJmTtl(
+ dep -> {
+ getJobSpec(dep).setState(JobState.SUSPENDED);
+ reconciler.reconcile(dep, context);
+ });
+ }
+
+ @Test
+ public void testTerminalJmTtlOnFinished() throws Throwable {
+ testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FINISHED"));
+ }
+
+ @Test
+ public void testTerminalJmTtlOnFailed() throws Throwable {
+ testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FAILED"));
+ }
+
+ public void testTerminalJmTtl(ThrowingConsumer<FlinkDeployment> deploymentSetup)
+ throws Throwable {
+ var deployment = TestUtils.buildApplicationCluster();
getJobSpec(deployment).setUpgradeMode(UpgradeMode.SAVEPOINT);
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
-
- getJobSpec(deployment).setState(JobState.SUSPENDED);
- reconciler.reconcile(deployment, context);
+ deploymentSetup.accept(deployment);
var status = deployment.getStatus();
- assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.toString(),
- status.getJobStatus().getState());
assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
deployment
@@ -976,6 +991,9 @@
.setClock(Clock.fixed(now.plus(Duration.ofMinutes(6)), ZoneId.systemDefault()));
reconciler.reconcile(deployment, context);
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
+ // Make sure we don't resubmit
+ reconciler.reconcile(deployment, context);
+ assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
}
@ParameterizedTest