[FLINK-33803]remove metadata/generation fields from ReconciliationMetadata

diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java
index c2f5166..cfcb52e 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java
@@ -21,7 +21,6 @@
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -35,19 +34,15 @@
 
     private String apiVersion;
 
-    private ObjectMeta metadata;
-
     private boolean firstDeployment;
 
     public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) {
-        ObjectMeta metadata = new ObjectMeta();
-        metadata.setGeneration(resource.getMetadata().getGeneration());
 
         var firstDeploy =
                 resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment()
                         || isFirstDeployment(resource);
 
-        return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
+        return new ReconciliationMetadata(resource.getApiVersion(), firstDeploy);
     }
 
     private static boolean isFirstDeployment(AbstractFlinkResource<?, ?> resource) {
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
index 0c09bda..458dd69 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
@@ -50,12 +50,12 @@
         try {
             ObjectNode wrapper = (ObjectNode) objectMapper.readTree(specWithMetaString);
             ObjectNode internalMeta = (ObjectNode) wrapper.remove(INTERNAL_METADATA_JSON_KEY);
-
             if (internalMeta == null) {
                 // migrating from old format
                 wrapper.remove("apiVersion");
                 return new SpecWithMeta<>(objectMapper.treeToValue(wrapper, specClass), null);
             } else {
+                internalMeta.remove("metadata");
                 return new SpecWithMeta<>(
                         objectMapper.treeToValue(wrapper.get("spec"), specClass),
                         objectMapper.convertValue(internalMeta, ReconciliationMetadata.class));
diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
index 8e0b761..dfb48e4 100644
--- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
+++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
@@ -35,13 +35,11 @@
     @Test
     public void testSpecSerializationWithVersion() throws JsonProcessingException {
         FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
-        app.getMetadata().setGeneration(12L);
         String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
         ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
 
         ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
         assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
-        assertEquals(12L, internalMeta.get("metadata").get("generation").asLong());
         assertEquals(
                 app.getSpec(),
                 SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
@@ -56,4 +54,26 @@
                 migrated.getSpec().getJob().getJarURI());
         assertNull(migrated.getMeta());
     }
+
+    @Test
+    public void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
+        // with regards to ReconcialiationMetadata & SpecWithMeta
+        FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+        app.getMetadata().setGeneration(12L);
+        String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
+        ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
+
+        ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
+        assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
+        assertEquals(
+                app.getSpec(),
+                SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
+        assertNull(app.getStatus().getObservedGeneration());
+
+        // test backward compatibility
+        String oldSerialized =
+                "{\"apiVersion\":\"flink.apache.org/v1beta1\",\"metadata\":{\"generation\":5},\"firstDeployment\":false}";
+        var migrated = SpecUtils.deserializeSpecWithMeta(oldSerialized, FlinkDeploymentSpec.class);
+        assertNull(migrated.getMeta());
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 47805ce..7cf1b74 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -856,7 +856,7 @@
                 status.getJobManagerDeploymentStatus());
 
         var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
-        assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
+        assertEquals(321L, status.getObservedGeneration());
         assertEquals(JobState.RUNNING, specWithMeta.getSpec().getJob().getState());
         assertEquals(5, specWithMeta.getSpec().getJob().getParallelism());
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index c08cd46..1b748ec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -152,7 +152,7 @@
                 status.getJobManagerDeploymentStatus());
 
         var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
-        assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
+        assertEquals(321L, status.getObservedGeneration());
         assertEquals("1", specWithMeta.getSpec().getFlinkConfiguration().get("k"));
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 507912a..5836777 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -1260,30 +1260,18 @@
         reconciler.reconcile(deployment, context);
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
 
-        assertEquals(
-                1L,
-                deployment
-                        .getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpecWithMeta()
-                        .getMeta()
-                        .getMetadata()
-                        .getGeneration());
+        assertEquals(1L, deployment.getStatus().getObservedGeneration());
 
         // Submit no-op upgrade
         deployment.getSpec().getFlinkConfiguration().put("kubernetes.operator.test", "value");
         deployment.getMetadata().setGeneration(2L);
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
 
         reconciler.reconcile(deployment, context);
-        assertEquals(
-                2L,
-                deployment
-                        .getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpecWithMeta()
-                        .getMeta()
-                        .getMetadata()
-                        .getGeneration());
+        assertEquals(2L, deployment.getStatus().getObservedGeneration());
     }
 
     @ParameterizedTest