[FLINK-33803]remove metadata/generation fields from ReconciliationMetadata
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java
index c2f5166..cfcb52e 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java
@@ -21,7 +21,6 @@
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import com.fasterxml.jackson.annotation.JsonInclude;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -35,19 +34,15 @@
private String apiVersion;
- private ObjectMeta metadata;
-
private boolean firstDeployment;
public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) {
- ObjectMeta metadata = new ObjectMeta();
- metadata.setGeneration(resource.getMetadata().getGeneration());
var firstDeploy =
resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment()
|| isFirstDeployment(resource);
- return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
+ return new ReconciliationMetadata(resource.getApiVersion(), firstDeploy);
}
private static boolean isFirstDeployment(AbstractFlinkResource<?, ?> resource) {
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
index 0c09bda..458dd69 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
@@ -50,12 +50,12 @@
try {
ObjectNode wrapper = (ObjectNode) objectMapper.readTree(specWithMetaString);
ObjectNode internalMeta = (ObjectNode) wrapper.remove(INTERNAL_METADATA_JSON_KEY);
-
if (internalMeta == null) {
// migrating from old format
wrapper.remove("apiVersion");
return new SpecWithMeta<>(objectMapper.treeToValue(wrapper, specClass), null);
} else {
+ internalMeta.remove("metadata");
return new SpecWithMeta<>(
objectMapper.treeToValue(wrapper.get("spec"), specClass),
objectMapper.convertValue(internalMeta, ReconciliationMetadata.class));
diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
index 8e0b761..dfb48e4 100644
--- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
+++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
@@ -35,13 +35,11 @@
@Test
public void testSpecSerializationWithVersion() throws JsonProcessingException {
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
- app.getMetadata().setGeneration(12L);
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
- assertEquals(12L, internalMeta.get("metadata").get("generation").asLong());
assertEquals(
app.getSpec(),
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
@@ -56,4 +54,26 @@
migrated.getSpec().getJob().getJarURI());
assertNull(migrated.getMeta());
}
+
+ @Test
+ public void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
+ // with regards to ReconcialiationMetadata & SpecWithMeta
+ FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+ app.getMetadata().setGeneration(12L);
+ String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
+ ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
+
+ ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
+ assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
+ assertEquals(
+ app.getSpec(),
+ SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
+ assertNull(app.getStatus().getObservedGeneration());
+
+ // test backward compatibility
+ String oldSerialized =
+ "{\"apiVersion\":\"flink.apache.org/v1beta1\",\"metadata\":{\"generation\":5},\"firstDeployment\":false}";
+ var migrated = SpecUtils.deserializeSpecWithMeta(oldSerialized, FlinkDeploymentSpec.class);
+ assertNull(migrated.getMeta());
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 47805ce..7cf1b74 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -856,7 +856,7 @@
status.getJobManagerDeploymentStatus());
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
- assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
+ assertEquals(321L, status.getObservedGeneration());
assertEquals(JobState.RUNNING, specWithMeta.getSpec().getJob().getState());
assertEquals(5, specWithMeta.getSpec().getJob().getParallelism());
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index c08cd46..1b748ec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -152,7 +152,7 @@
status.getJobManagerDeploymentStatus());
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
- assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
+ assertEquals(321L, status.getObservedGeneration());
assertEquals("1", specWithMeta.getSpec().getFlinkConfiguration().get("k"));
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 507912a..5836777 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -1260,30 +1260,18 @@
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
- assertEquals(
- 1L,
- deployment
- .getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpecWithMeta()
- .getMeta()
- .getMetadata()
- .getGeneration());
+ assertEquals(1L, deployment.getStatus().getObservedGeneration());
// Submit no-op upgrade
deployment.getSpec().getFlinkConfiguration().put("kubernetes.operator.test", "value");
deployment.getMetadata().setGeneration(2L);
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
reconciler.reconcile(deployment, context);
- assertEquals(
- 2L,
- deployment
- .getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpecWithMeta()
- .getMeta()
- .getMetadata()
- .getGeneration());
+ assertEquals(2L, deployment.getStatus().getObservedGeneration());
}
@ParameterizedTest