[FLINK-30222] Operator should handle 'kubernetes' as the 'high-availability' config key
Co-authored-by: Peter Vary <peter_vary4@apple.com>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index c17a249..b438a14 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -160,9 +160,10 @@
}
public static boolean isKubernetesHAActivated(Configuration configuration) {
- return configuration
- .get(HighAvailabilityOptions.HA_MODE)
- .equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName());
+ String haMode = configuration.get(HighAvailabilityOptions.HA_MODE);
+ return haMode.equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName())
+ // Hardcoded config value should be removed when upgrading Flink dependency to 1.16
+ || haMode.equalsIgnoreCase("kubernetes");
}
public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 80fb6af..fe2e316 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -416,6 +416,18 @@
"spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.");
testSuccess(dep -> dep.getSpec().setServiceAccount("flink"));
+
+ testSuccess(
+ dep -> {
+ dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ dep.getSpec()
+ .getFlinkConfiguration()
+ .put(
+ HighAvailabilityOptions.HA_MODE.key(),
+ // Hardcoded config value should be removed when upgrading Flink
+ // dependency to 1.16
+ "kubernetes");
+ });
}
@Test