[FLINK-33803] Set observedGeneration at end of reconciliation

diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 649e5fb..030e043 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -243,6 +243,7 @@
 | ----------| ---- | ---- |
 | jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last observed status of the Flink job on Application/Session cluster. |
 | error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the FlinkDeployment/FlinkSessionJob. |
 | lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
 | clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Information from running clusters. |
 | jobManagerDeploymentStatus | org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus | Last observed status of the JobManager deployment. |
@@ -270,6 +271,7 @@
 | ----------| ---- | ---- |
 | jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last observed status of the Flink job on Application/Session cluster. |
 | error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the FlinkDeployment/FlinkSessionJob. |
 | lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
 | reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |
 
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 6c54c4c..605c1e7 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -45,6 +45,9 @@
     /** Error information about the FlinkDeployment/FlinkSessionJob. */
     private String error;
 
+    /** Last observed generation of the FlinkDeployment/FlinkSessionJob. */
+    private Long observedGeneration;
+
     /** Lifecycle state of the Flink resource (including being rolled back, failed etc.). */
     @PrinterColumn(name = "Lifecycle State")
     // Calculated from the status, requires no setter. The purpose of this is to expose as a printer
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
index 88a9dc9..51203f1 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
@@ -81,6 +81,7 @@
     public void serializeAndSetLastReconciledSpec(
             SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
         setLastReconciledSpec(SpecUtils.writeSpecWithMeta(spec, resource));
+        resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
     }
 
     public void markReconciledSpecAsStable() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index a4c97be..5929bfc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -420,10 +420,7 @@
                 // running
                 deployment.getSpec().getJob().setState(JobState.RUNNING);
             }
-            deployment
-                    .getMetadata()
-                    .setGeneration(
-                            lastReconciledSpecWithMeta.getMeta().getMetadata().getGeneration());
+            deployment.getMetadata().setGeneration(status.getObservedGeneration());
             return true;
         }
     }
@@ -469,16 +466,9 @@
      * @return The spec generation for the upgrade.
      */
     public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> resource) {
-        var lastSpecWithMeta =
-                resource.getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpecWithMeta();
+        var observedGeneration = resource.getStatus().getObservedGeneration();
 
-        if (lastSpecWithMeta.getMeta() == null) {
-            return -1L;
-        }
-
-        return lastSpecWithMeta.getMeta().getMetadata().getGeneration();
+        return observedGeneration == null ? -1L : observedGeneration;
     }
 
     /**
@@ -569,5 +559,6 @@
 
         reconciliationStatus.setLastReconciledSpec(
                 SpecUtils.writeSpecWithMeta(lastSpecWithMeta.getSpec(), newMeta));
+        resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index dc161da..63751ce 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -34,6 +34,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
@@ -98,4 +99,16 @@
                 ReconciliationUtils.toUpdateControl(operatorConfiguration, current, previous, true);
         assertEquals(0, updateControl.getScheduleDelay().get());
     }
+
+    @Test
+    public void testObservedGenerationStatus() throws Exception {
+        FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+        app.getSpec().getJob().setState(JobState.RUNNING);
+        app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+        app.getMetadata().setGeneration(1L);
+        assertNull(app.getStatus().getObservedGeneration());
+        ReconciliationUtils.updateStatusForDeployedSpec(app, new Configuration());
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(app, new Configuration());
+        assertEquals(1L, app.getStatus().getObservedGeneration());
+    }
 }
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 14de115..66fd486 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9832,6 +9832,8 @@
                 type: object
               error:
                 type: string
+              observedGeneration:
+                type: integer
               lifecycleState:
                 enum:
                 - CREATED
diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index d9b758f..d3ef621 100644
--- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -217,6 +217,8 @@
                 type: object
               error:
                 type: string
+              observedGeneration:
+                type: integer
               lifecycleState:
                 enum:
                 - CREATED