[FLINK-37320] [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index 38d4b83..713e132 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -152,7 +152,10 @@
}
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
+
+ if (!ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
+ deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
+ }
if (previousJmStatus != JobManagerDeploymentStatus.MISSING
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index b9adee6..83e7fad 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -876,4 +876,26 @@
observer.observe(deployment, TestUtils.createEmptyContext());
assertTrue(reconStatus.isBeforeFirstDeployment());
}
+
+ @Test
+ public void jobStatusNotOverwrittenWhenTerminal() throws Exception {
+ Configuration conf =
+ configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
+ flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
+ bringToReadyStatus(deployment);
+
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.FINISHED);
+
+ // Simulate missing deployment
+ var emptyContext = TestUtils.createEmptyContext();
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ observer.observe(deployment, emptyContext);
+
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.FINISHED,
+ deployment.getStatus().getJobStatus().getState());
+ }
}