[FLINK-28625] Guard against changing target session deployment for sessionjob

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index de3c7b6..93cd099 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -342,6 +342,7 @@
 
     private Optional<String> validateSessionJobOnly(FlinkSessionJob sessionJob) {
         return firstPresent(
+                validateDeploymentName(sessionJob.getSpec().getDeploymentName()),
                 validateJobNotEmpty(sessionJob),
                 validateNotLastStateUpgradeMode(sessionJob),
                 validateSpecChange(sessionJob),
@@ -400,16 +401,24 @@
     private Optional<String> validateSpecChange(FlinkSessionJob sessionJob) {
         FlinkSessionJobSpec newSpec = sessionJob.getSpec();
 
-        if (sessionJob.getStatus() == null
-                || sessionJob.getStatus().getReconciliationStatus() == null
-                || sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec()
-                        == null) {
+        if (sessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
             // New job
             if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
                 return Optional.of("Job must start in running state");
             }
 
             return Optional.empty();
+        } else {
+            var lastReconciledSpec =
+                    sessionJob
+                            .getStatus()
+                            .getReconciliationStatus()
+                            .deserializeLastReconciledSpec();
+            if (!lastReconciledSpec
+                    .getDeploymentName()
+                    .equals(sessionJob.getSpec().getDeploymentName())) {
+                return Optional.of("The deploymentName can't be changed");
+            }
         }
 
         return Optional.empty();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5189ab4..b6868bf 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -430,6 +430,17 @@
                 flinkDeployment -> {},
                 "Invalid session job flinkConfiguration key: kubernetes.operator.reconcile.interval."
                         + " Allowed keys are [kubernetes.operator.user.artifacts.http.header]");
+
+        testSessionJobValidateWithModifier(
+                sessionJob -> {
+                    sessionJob
+                            .getStatus()
+                            .getReconciliationStatus()
+                            .serializeAndSetLastReconciledSpec(sessionJob.getSpec(), sessionJob);
+                    sessionJob.getSpec().setDeploymentName("new-deployment-name");
+                },
+                flinkDeployment -> {},
+                "The deploymentName can't be changed");
     }
 
     private void testSessionJobValidateWithModifier(