[FLINK-28625] Guard against changing target session deployment for sessionjob
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index de3c7b6..93cd099 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -342,6 +342,7 @@
private Optional<String> validateSessionJobOnly(FlinkSessionJob sessionJob) {
return firstPresent(
+ validateDeploymentName(sessionJob.getSpec().getDeploymentName()),
validateJobNotEmpty(sessionJob),
validateNotLastStateUpgradeMode(sessionJob),
validateSpecChange(sessionJob),
@@ -400,16 +401,24 @@
private Optional<String> validateSpecChange(FlinkSessionJob sessionJob) {
FlinkSessionJobSpec newSpec = sessionJob.getSpec();
- if (sessionJob.getStatus() == null
- || sessionJob.getStatus().getReconciliationStatus() == null
- || sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec()
- == null) {
+ if (sessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
// New job
if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
return Optional.of("Job must start in running state");
}
return Optional.empty();
+ } else {
+ var lastReconciledSpec =
+ sessionJob
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec();
+ if (!lastReconciledSpec
+ .getDeploymentName()
+ .equals(sessionJob.getSpec().getDeploymentName())) {
+ return Optional.of("The deploymentName can't be changed");
+ }
}
return Optional.empty();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5189ab4..b6868bf 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -430,6 +430,17 @@
flinkDeployment -> {},
"Invalid session job flinkConfiguration key: kubernetes.operator.reconcile.interval."
+ " Allowed keys are [kubernetes.operator.user.artifacts.http.header]");
+
+ testSessionJobValidateWithModifier(
+ sessionJob -> {
+ sessionJob
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(sessionJob.getSpec(), sessionJob);
+ sessionJob.getSpec().setDeploymentName("new-deployment-name");
+ },
+ flinkDeployment -> {},
+ "The deploymentName can't be changed");
}
private void testSessionJobValidateWithModifier(