[FLINK-33803] Set observedGeneration at end of reconciliation
diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 649e5fb..030e043 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -243,6 +243,7 @@
| ----------| ---- | ---- |
| jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last observed status of the Flink job on Application/Session cluster. |
| error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the FlinkDeployment/FlinkSessionJob. |
| lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
| clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Information from running clusters. |
| jobManagerDeploymentStatus | org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus | Last observed status of the JobManager deployment. |
@@ -270,6 +271,7 @@
| ----------| ---- | ---- |
| jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last observed status of the Flink job on Application/Session cluster. |
| error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the FlinkDeployment/FlinkSessionJob. |
| lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
| reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 6c54c4c..605c1e7 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -45,6 +45,9 @@
/** Error information about the FlinkDeployment/FlinkSessionJob. */
private String error;
+ /** Last observed generation of the FlinkDeployment/FlinkSessionJob. */
+ private Long observedGeneration;
+
/** Lifecycle state of the Flink resource (including being rolled back, failed etc.). */
@PrinterColumn(name = "Lifecycle State")
// Calculated from the status, requires no setter. The purpose of this is to expose as a printer
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
index 88a9dc9..51203f1 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
@@ -81,6 +81,7 @@
public void serializeAndSetLastReconciledSpec(
SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
setLastReconciledSpec(SpecUtils.writeSpecWithMeta(spec, resource));
+ resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
}
public void markReconciledSpecAsStable() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index a4c97be..5929bfc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -420,10 +420,7 @@
// running
deployment.getSpec().getJob().setState(JobState.RUNNING);
}
- deployment
- .getMetadata()
- .setGeneration(
- lastReconciledSpecWithMeta.getMeta().getMetadata().getGeneration());
+ deployment.getMetadata().setGeneration(status.getObservedGeneration());
return true;
}
}
@@ -469,16 +466,9 @@
* @return The spec generation for the upgrade.
*/
public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> resource) {
- var lastSpecWithMeta =
- resource.getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpecWithMeta();
+ var observedGeneration = resource.getStatus().getObservedGeneration();
- if (lastSpecWithMeta.getMeta() == null) {
- return -1L;
- }
-
- return lastSpecWithMeta.getMeta().getMetadata().getGeneration();
+ return observedGeneration == null ? -1L : observedGeneration;
}
/**
@@ -569,5 +559,6 @@
reconciliationStatus.setLastReconciledSpec(
SpecUtils.writeSpecWithMeta(lastSpecWithMeta.getSpec(), newMeta));
+ resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index dc161da..63751ce 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -34,6 +34,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
@@ -98,4 +99,16 @@
ReconciliationUtils.toUpdateControl(operatorConfiguration, current, previous, true);
assertEquals(0, updateControl.getScheduleDelay().get());
}
+
+ @Test
+ public void testObservedGenerationStatus() throws Exception {
+ FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+ app.getSpec().getJob().setState(JobState.RUNNING);
+ app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+ app.getMetadata().setGeneration(1L);
+ assertNull(app.getStatus().getObservedGeneration());
+ ReconciliationUtils.updateStatusForDeployedSpec(app, new Configuration());
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(app, new Configuration());
+ assertEquals(1L, app.getStatus().getObservedGeneration());
+ }
}
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 14de115..66fd486 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9832,6 +9832,8 @@
type: object
error:
type: string
+ observedGeneration:
+ type: integer
lifecycleState:
enum:
- CREATED
diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index d9b758f..d3ef621 100644
--- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -217,6 +217,8 @@
type: object
error:
type: string
+ observedGeneration:
+ type: integer
lifecycleState:
enum:
- CREATED