[FLINK-30222] Operator should handle 'kubernetes' as the 'high-availability' config key

Co-authored-by: Peter Vary <peter_vary4@apple.com>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index c17a249..b438a14 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -160,9 +160,10 @@
     }
 
     public static boolean isKubernetesHAActivated(Configuration configuration) {
-        return configuration
-                .get(HighAvailabilityOptions.HA_MODE)
-                .equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName());
+        String haMode = configuration.get(HighAvailabilityOptions.HA_MODE);
+        return haMode.equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName())
+                // Hardcoded config value should be removed when upgrading Flink dependency to 1.16
+                || haMode.equalsIgnoreCase("kubernetes");
     }
 
     public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 80fb6af..fe2e316 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -416,6 +416,18 @@
                 "spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.");
 
         testSuccess(dep -> dep.getSpec().setServiceAccount("flink"));
+
+        testSuccess(
+                dep -> {
+                    dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+                    dep.getSpec()
+                            .getFlinkConfiguration()
+                            .put(
+                                    HighAvailabilityOptions.HA_MODE.key(),
+                                    // Hardcoded config value should be removed when upgrading Flink
+                                    // dependency to 1.16
+                                    "kubernetes");
+                });
     }
 
     @Test