[FLINK-27273] Zookeeper HA support

diff --git a/docs/content/docs/concepts/overview.md b/docs/content/docs/concepts/overview.md
index 1b3111c..a67e091 100644
--- a/docs/content/docs/concepts/overview.md
+++ b/docs/content/docs/concepts/overview.md
@@ -88,7 +88,7 @@
 ## Known Issues & Limitations
 
 ### JobManager High-availability
-The Operator leverages [Kubernetes HA Services](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/) for providing High-availability for Flink jobs. The HA solution can benefit form using additional [Standby replicas](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/), it will result in a faster recovery time, but Flink jobs will still restart when the Leader JobManager goes down.
+The Operator supports both [Kubernetes HA Services](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/) and [Zookeeper HA Services](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/) for providing High-availability for Flink jobs. The HA solution can benefit form using additional [Standby replicas](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/), it will result in a faster recovery time, but Flink jobs will still restart when the Leader JobManager goes down.
 
 ### JobResultStore Resource Leak
 To mitigate the impact of [FLINK-27569](https://issues.apache.org/jira/browse/FLINK-27569) the operator introduced a workaround [FLINK-27573](https://issues.apache.org/jira/browse/FLINK-27573) by setting `job-result-store.delete-on-commit=false` and a unique value for `job-result-store.storage-path` for every cluster launch. The storage path for older runs must be cleaned up manually, keeping the latest directory always:
diff --git a/docs/content/docs/custom-resource/job-management.md b/docs/content/docs/custom-resource/job-management.md
index da0fc4a..2081096 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -86,14 +86,14 @@
 
 |                        | Stateless               | Last State                                 | Savepoint                              |
 |------------------------|-------------------------|--------------------------------------------|----------------------------------------|
-| Config Requirement     | None                    | Checkpointing & Kubernetes HA Enabled      | Checkpoint/Savepoint directory defined |
+| Config Requirement     | None                    | Checkpointing & HA Enabled                 | Checkpoint/Savepoint directory defined |
 | Job Status Requirement | None                    | HA metadata available                      | Job Running*                           |
 | Suspend Mechanism      | Cancel / Delete         | Delete Flink deployment (keep HA metadata) | Cancel with savepoint                  |
 | Restore Mechanism      | Deploy from empty state | Recover last state using HA metadata       | Restore From savepoint                 |
 | Production Use         | Not recommended         | Recommended                                | Recommended                            |
 
 
-*\* When Kubernetes HA is enabled the `savepoint` upgrade mode may fall back to the `last-state` behaviour in cases where the job is in an unhealthy state.*
+*\* When HA is enabled the `savepoint` upgrade mode may fall back to the `last-state` behaviour in cases where the job is in an unhealthy state.*
 
 The three upgrade modes are intended to support different scenarios:
 
@@ -216,7 +216,7 @@
 
 ## Recovery of missing job deployments
 
-When Kubernetes HA is enabled, the operator can recover the Flink cluster deployments in cases when it was accidentally deleted
+When HA is enabled, the operator can recover the Flink cluster deployments in cases when it was accidentally deleted
 by the user or some external process. Deployment recovery can be turned off in the configuration by setting `kubernetes.operator.jm-deployment-recovery.enabled` to `false`, however it is recommended to keep this setting on the default `true` value.
 
 This is not something that would usually happen during normal operation and can also indicate a deeper problem,
@@ -234,7 +234,7 @@
 
 ## Restart of unhealthy job deployments
 
-When Kubernetes HA is enabled, the operator can restart the Flink cluster deployments in cases when it was considered
+When HA is enabled, the operator can restart the Flink cluster deployments in cases when it was considered
 unhealthy. Unhealthy deployment restart can be turned on in the configuration by setting `kubernetes.operator.cluster.health-check.enabled` to `true` (default: `false`).  
 In order this feature to work one must enable [recovery of missing job deployments](#recovery-of-missing-job-deployments).
 
@@ -266,7 +266,7 @@
 kubernetes.operator.deployment.rollback.enabled: true
 ```
 
-Kubernetes HA is currently required for the rollback functionality.
+HA is currently required for the rollback functionality.
 
 Applications are never rolled back to a previous running state if they were suspended before the upgrade.
 In these cases no rollback will be performed.
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index c928d33..7029e40 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -167,6 +167,20 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator-test.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <!-- Exclude an older version of junit-jupiter-api -->
+                <exclusion>
+                    <groupId>org.junit.jupiter</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes-operator-api</artifactId>
             <version>${project.version}</version>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 1542b30..dc42385 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.DeploymentOptionsInternal;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
@@ -340,6 +341,9 @@
         // Set cluster config
         effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
         effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
+        if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
+            effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+        }
         return effectiveConfig;
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 7cdd1e2..71cf9b7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -38,6 +38,7 @@
 import org.apache.flink.kubernetes.operator.exception.ValidationException;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.client.CustomResource;
@@ -277,7 +278,7 @@
 
         return previousUpgradeMode != UpgradeMode.LAST_STATE
                 && currentUpgradeMode == UpgradeMode.LAST_STATE
-                && !FlinkUtils.isKubernetesHAActivated(observeConfig);
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(observeConfig);
     }
 
     public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 5c5bcd0..afd2a9d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -41,8 +41,8 @@
 import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -428,7 +428,7 @@
 
             if (jmMissingForRunningDeployment(deployment)) {
                 LOG.debug("Jobmanager deployment is missing, trying to recover");
-                if (FlinkUtils.isKubernetesHAActivated(conf)) {
+                if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
                     LOG.debug("HA is enabled, recovering lost jobmanager deployment");
                     result = true;
                 } else {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 4db3302..c941873 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -41,6 +41,7 @@
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -85,8 +86,8 @@
         if (deployConfig.getBoolean(
                         KubernetesOperatorConfigOptions
                                 .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
-                && FlinkUtils.isKubernetesHAActivated(deployConfig)
-                && FlinkUtils.isKubernetesHAActivated(ctx.getObserveConfig())
+                && HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
+                && HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
 
@@ -128,7 +129,7 @@
             FlinkService flinkService, FlinkDeployment deployment, Configuration deployConfig) {
         deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
         flinkService.deleteClusterDeployment(
-                deployment.getMetadata(), deployment.getStatus(), false);
+                deployment.getMetadata(), deployment.getStatus(), deployConfig, false);
         flinkService.waitForClusterShutdown(deployConfig);
         LOG.info("Deleted jobmanager deployment that never started.");
     }
@@ -164,7 +165,8 @@
                 throw new RuntimeException("This indicates a bug...");
             }
             LOG.info("Deleting deployment with terminated application before new deployment");
-            flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true);
+            flinkService.deleteClusterDeployment(
+                    relatedResource.getMetadata(), status, deployConfig, true);
             flinkService.waitForClusterShutdown(deployConfig);
         }
 
@@ -223,7 +225,10 @@
         // The job has already stopped. Delete the deployment and we are ready.
         ctx.getFlinkService()
                 .deleteClusterDeployment(
-                        ctx.getResource().getMetadata(), ctx.getResource().getStatus(), false);
+                        ctx.getResource().getMetadata(),
+                        ctx.getResource().getStatus(),
+                        ctx.getDeployConfig(ctx.getResource().getSpec()),
+                        false);
     }
 
     // Workaround for https://issues.apache.org/jira/browse/FLINK-27569
@@ -299,7 +304,8 @@
                     if (deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
                         LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
                         restartNeeded = true;
-                    } else if (FlinkUtils.isKubernetesHAActivated(observeConfig)) {
+                    } else if (HighAvailabilityMode.isHighAvailabilityModeActivated(
+                            observeConfig)) {
                         LOG.debug("HA is enabled, recovering unhealthy jobmanager deployment");
                         restartNeeded = true;
                     } else {
@@ -337,7 +343,8 @@
                                             .plus(ttl));
             if (ttlPassed) {
                 LOG.info("Removing JobManager deployment for terminal application.");
-                flinkService.deleteClusterDeployment(deployment.getMetadata(), status, false);
+                flinkService.deleteClusterDeployment(
+                        deployment.getMetadata(), status, observeConfig, false);
                 return true;
             }
         }
@@ -349,8 +356,10 @@
     protected DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
         var deployment = ctx.getResource();
         var status = deployment.getStatus();
+        var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
         if (status.getReconciliationStatus().isBeforeFirstDeployment()) {
-            ctx.getFlinkService().deleteClusterDeployment(deployment.getMetadata(), status, true);
+            ctx.getFlinkService()
+                    .deleteClusterDeployment(deployment.getMetadata(), status, conf, true);
         } else {
             ctx.getFlinkService()
                     .cancelJob(deployment, UpgradeMode.STATELESS, ctx.getObserveConfig());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index fe5ad86..b1f93c2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -84,8 +84,10 @@
 
     private void deleteSessionCluster(FlinkResourceContext<FlinkDeployment> ctx) {
         var deployment = ctx.getResource();
+        var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
         ctx.getFlinkService()
-                .deleteClusterDeployment(deployment.getMetadata(), deployment.getStatus(), false);
+                .deleteClusterDeployment(
+                        deployment.getMetadata(), deployment.getStatus(), conf, false);
         ctx.getFlinkService().waitForClusterShutdown(ctx.getObserveConfig());
     }
 
@@ -163,9 +165,10 @@
                                     .toMillis());
         } else {
             LOG.info("Stopping session cluster");
+            var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
             ctx.getFlinkService()
                     .deleteClusterDeployment(
-                            deployment.getMetadata(), deployment.getStatus(), true);
+                            deployment.getMetadata(), deployment.getStatus(), conf, true);
             return DeleteControl.defaultDelete();
         }
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 933ab74..4c5d9fc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -166,13 +166,17 @@
         LOG.info(
                 "Deploying application cluster{}",
                 requireHaMetadata ? " requiring last-state from HA metadata" : "");
+
+        // If Kubernetes or Zookeeper HA are activated, delete the job graph in HA storage so that
+        // the newly changed job config (e.g. parallelism) could take effect
         if (FlinkUtils.isKubernetesHAActivated(conf)) {
             final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
             final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
-            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
-            // parallelism) could take effect
             FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+            FlinkUtils.deleteJobGraphInZookeeperHA(conf);
         }
+
         if (requireHaMetadata) {
             validateHaMetadataExists(conf);
         }
@@ -182,7 +186,13 @@
 
     @Override
     public boolean isHaMetadataAvailable(Configuration conf) {
-        return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
+        if (FlinkUtils.isKubernetesHAActivated(conf)) {
+            return FlinkUtils.isKubernetesHaMetadataAvailable(conf, kubernetesClient);
+        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+            return FlinkUtils.isZookeeperHaMetadataAvailable(conf);
+        }
+
+        return false;
     }
 
     @Override
@@ -283,7 +293,7 @@
                             LOG.error("Could not shut down cluster gracefully, deleting...", e);
                         }
                     }
-                    deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
+                    deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true);
                     break;
                 case SAVEPOINT:
                     final String savepointDirectory =
@@ -329,11 +339,14 @@
                     }
                     if (deleteClusterAfterSavepoint) {
                         LOG.info("Cleaning up deployment after stop-with-savepoint");
-                        deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
+
+                        deleteClusterDeployment(
+                                deployment.getMetadata(), deploymentStatus, conf, true);
                     }
                     break;
                 case LAST_STATE:
-                    deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
+                    deleteClusterDeployment(
+                            deployment.getMetadata(), deploymentStatus, conf, false);
                     break;
                 default:
                     throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
@@ -907,8 +920,11 @@
 
     @Override
     public final void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
-        deleteClusterInternal(meta, deleteHaData);
+            ObjectMeta meta,
+            FlinkDeploymentStatus status,
+            Configuration conf,
+            boolean deleteHaData) {
+        deleteClusterInternal(meta, conf, deleteHaData);
         updateStatusAfterClusterDeletion(status);
     }
 
@@ -917,9 +933,33 @@
      * allows deleting the native kubernetes HA resources as well.
      *
      * @param meta ObjectMeta of the deployment
-     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     * @param conf Configuration of the Flink application
+     * @param deleteHaData Flag to indicate whether k8s or Zookeeper HA metadata should be removed
+     *     as well
      */
-    protected abstract void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps);
+    protected abstract void deleteClusterInternal(
+            ObjectMeta meta, Configuration conf, boolean deleteHaData);
+
+    protected void deleteHAData(String namespace, String clusterId, Configuration conf) {
+        // We need to wait for cluster shutdown otherwise HA data might be recreated
+        waitForClusterShutdown(
+                namespace,
+                clusterId,
+                configManager
+                        .getOperatorConfiguration()
+                        .getFlinkShutdownClusterTimeout()
+                        .toSeconds());
+
+        if (FlinkUtils.isKubernetesHAActivated(conf)) {
+            LOG.info("Deleting Kubernetes HA metadata");
+            FlinkUtils.deleteKubernetesHAMetadata(clusterId, namespace, kubernetesClient);
+        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+            LOG.info("Deleting Zookeeper HA metadata");
+            FlinkUtils.deleteZookeeperHAMetadata(conf);
+        } else {
+            LOG.warn("Can't delete HA metadata since HA is not enabled");
+        }
+    }
 
     protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 6c75d12..7583ba1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -73,7 +73,10 @@
             throws Exception;
 
     void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData);
+            ObjectMeta meta,
+            FlinkDeploymentStatus status,
+            Configuration conf,
+            boolean deleteHaData);
 
     void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
             throws Exception;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 955185c..9c752f7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -38,8 +38,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
-
 /**
  * Implementation of {@link FlinkService} submitting and interacting with Native Kubernetes Flink
  * clusters and jobs.
@@ -109,14 +107,15 @@
     }
 
     @Override
-    protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
+    protected void deleteClusterInternal(
+            ObjectMeta meta, Configuration conf, boolean deleteHaData) {
 
         String namespace = meta.getNamespace();
         String clusterId = meta.getName();
 
         LOG.info(
                 "Deleting JobManager deployment {}.",
-                deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata");
+                deleteHaData ? "and HA metadata" : "while preserving HA metadata");
         kubernetesClient
                 .apps()
                 .deployments()
@@ -124,22 +123,8 @@
                 .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .delete();
 
-        if (deleteHaConfigmaps) {
-            // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
-            waitForClusterShutdown(
-                    namespace,
-                    clusterId,
-                    configManager
-                            .getOperatorConfiguration()
-                            .getFlinkShutdownClusterTimeout()
-                            .toSeconds());
-            kubernetesClient
-                    .configMaps()
-                    .inNamespace(namespace)
-                    .withLabels(
-                            KubernetesUtils.getConfigMapLabels(
-                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
-                    .delete();
+        if (deleteHaData) {
+            deleteHAData(namespace, clusterId, conf);
         }
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index de46787..6c7d705 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -36,7 +36,6 @@
 import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
 import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
-import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -48,8 +47,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
-
 /**
  * Implementation of {@link FlinkService} submitting and interacting with Standalone Kubernetes
  * Flink clusters and jobs.
@@ -131,7 +128,8 @@
     }
 
     @Override
-    protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
+    protected void deleteClusterInternal(
+            ObjectMeta meta, Configuration conf, boolean deleteHaData) {
         final String clusterId = meta.getName();
         final String namespace = meta.getNamespace();
 
@@ -150,23 +148,8 @@
                 .inNamespace(namespace)
                 .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
                 .delete();
-
-        if (deleteHaConfigmaps) {
-            // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
-            waitForClusterShutdown(
-                    namespace,
-                    clusterId,
-                    configManager
-                            .getOperatorConfiguration()
-                            .getFlinkShutdownClusterTimeout()
-                            .toSeconds());
-            kubernetesClient
-                    .configMaps()
-                    .inNamespace(namespace)
-                    .withLabels(
-                            KubernetesUtils.getConfigMapLabels(
-                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
-                    .delete();
+        if (deleteHaData) {
+            deleteHAData(namespace, clusterId, conf);
         }
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 572845c..50fcce6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -28,6 +28,8 @@
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -51,6 +53,8 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
 
@@ -102,6 +106,40 @@
         }
     }
 
+    public static void deleteZookeeperHAMetadata(Configuration conf) {
+        try (var curator = ZooKeeperUtils.startCuratorFramework(conf, exception -> {})) {
+            try {
+                curator.asCuratorFramework().delete().deletingChildrenIfNeeded().forPath("/");
+            } catch (Exception e) {
+                LOG.error(
+                        "Could not delete HA Metadata at path {} in Zookeeper",
+                        ZooKeeperUtils.generateZookeeperPath(
+                                conf.get(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT),
+                                conf.get(HighAvailabilityOptions.HA_CLUSTER_ID)),
+                        e);
+            }
+        }
+    }
+
+    public static void deleteKubernetesHAMetadata(
+            String clusterId, String namespace, KubernetesClient kubernetesClient) {
+        kubernetesClient
+                .configMaps()
+                .inNamespace(namespace)
+                .withLabels(
+                        KubernetesUtils.getConfigMapLabels(
+                                clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                .delete();
+    }
+
+    public static void deleteJobGraphInZookeeperHA(Configuration conf) throws Exception {
+        try (var curator = ZooKeeperUtils.startCuratorFramework(conf, exception -> {})) {
+            ZooKeeperUtils.deleteZNode(
+                    curator.asCuratorFramework(),
+                    conf.get(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH));
+        }
+    }
+
     public static void deleteJobGraphInKubernetesHA(
             String clusterId, String namespace, KubernetesClient kubernetesClient) {
         // The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
@@ -133,7 +171,25 @@
         }
     }
 
-    public static boolean isHaMetadataAvailable(
+    public static boolean isZookeeperHaMetadataAvailable(Configuration conf) {
+        try (var curator = ZooKeeperUtils.startCuratorFramework(conf, exception -> {})) {
+            if (curator.asCuratorFramework().checkExists().forPath("/") != null) {
+                return curator.asCuratorFramework().getChildren().forPath("/").size() != 0;
+            }
+            return false;
+        } catch (Exception e) {
+            LOG.error(
+                    "Could not check whether the HA metadata exists at path {} in Zookeeper",
+                    ZooKeeperUtils.generateZookeeperPath(
+                            conf.get(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT),
+                            conf.get(HighAvailabilityOptions.HA_CLUSTER_ID)),
+                    e);
+        }
+
+        return false;
+    }
+
+    public static boolean isKubernetesHaMetadataAvailable(
             Configuration conf, KubernetesClient kubernetesClient) {
 
         String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
@@ -171,6 +227,11 @@
         return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
     }
 
+    public static boolean isZookeeperHAActivated(Configuration configuration) {
+        return HighAvailabilityMode.fromConfig(configuration)
+                .equals(HighAvailabilityMode.ZOOKEEPER);
+    }
+
     public static boolean isKubernetesHAActivated(Configuration configuration) {
         String haMode = configuration.get(HighAvailabilityOptions.HA_MODE);
         return haMode.equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName())
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 4ddf490..4af79bc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -39,10 +40,10 @@
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.util.StringUtils;
 
@@ -60,7 +61,9 @@
             Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
     private static final String[] FORBIDDEN_CONF_KEYS =
             new String[] {
-                KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key()
+                KubernetesConfigOptions.NAMESPACE.key(),
+                KubernetesConfigOptions.CLUSTER_ID.key(),
+                HighAvailabilityOptions.HA_CLUSTER_ID.key()
             };
 
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
@@ -169,8 +172,8 @@
         }
 
         if (conf.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
-                && !FlinkUtils.isKubernetesHAActivated(conf)) {
-            return Optional.of("Kubernetes HA must be enabled for rollback support.");
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+            return Optional.of("HA must be enabled for rollback support.");
         }
 
         if (conf.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)
@@ -213,9 +216,8 @@
 
         Configuration configuration = Configuration.fromMap(confMap);
         if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
-                && !FlinkUtils.isKubernetesHAActivated(configuration)) {
-            return Optional.of(
-                    "Job could not be upgraded with last-state while Kubernetes HA disabled");
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
+            return Optional.of("Job could not be upgraded with last-state while HA disabled");
         }
 
         if (job.getUpgradeMode() != UpgradeMode.STATELESS) {
@@ -295,9 +297,10 @@
         if (replicas < 1) {
             return Optional.of("JobManager replicas should not be configured less than one.");
         } else if (replicas > 1
-                && !FlinkUtils.isKubernetesHAActivated(Configuration.fromMap(confMap))) {
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(
+                        Configuration.fromMap(confMap))) {
             return Optional.of(
-                    "Kubernetes High availability should be enabled when starting standby JobManagers.");
+                    "High availability should be enabled when starting standby JobManagers.");
         }
         return Optional.empty();
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 72aa6c8..711bb8d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -48,6 +48,7 @@
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -188,7 +189,7 @@
 
     @Override
     public boolean isHaMetadataAvailable(Configuration conf) {
-        return FlinkUtils.isKubernetesHAActivated(conf) && haDataAvailable;
+        return HighAvailabilityMode.isHighAvailabilityModeActivated(conf) && haDataAvailable;
     }
 
     public void setHaDataAvailable(boolean haDataAvailable) {
@@ -404,7 +405,8 @@
     }
 
     @Override
-    protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaMeta) {
+    protected void deleteClusterInternal(
+            ObjectMeta meta, Configuration conf, boolean deleteHaMeta) {
         jobs.clear();
         sessions.remove(meta.getName());
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 42b0c7c..ca8ece2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -443,8 +443,9 @@
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
 
         // Delete cluster and keep HA metadata
+        var conf = Configuration.fromMap(deployment.getSpec().getFlinkConfiguration());
         flinkService.deleteClusterDeployment(
-                deployment.getMetadata(), deployment.getStatus(), false);
+                deployment.getMetadata(), deployment.getStatus(), conf, false);
         flinkService.setHaDataAvailable(true);
 
         // Submit upgrade
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index 716665e..4787106 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -79,7 +79,7 @@
 
         var requestsBeforeDelete = mockServer.getRequestCount();
         flinkStandaloneService.deleteClusterDeployment(
-                flinkDeployment.getMetadata(), flinkDeployment.getStatus(), false);
+                flinkDeployment.getMetadata(), flinkDeployment.getStatus(), configuration, false);
 
         assertEquals(2, mockServer.getRequestCount() - requestsBeforeDelete);
         assertTrue(mockServer.getLastRequest().getPath().contains("taskmanager"));
@@ -100,7 +100,7 @@
         assertEquals(2, deployments.size());
 
         flinkStandaloneService.deleteClusterDeployment(
-                flinkDeployment.getMetadata(), flinkDeployment.getStatus(), true);
+                flinkDeployment.getMetadata(), flinkDeployment.getStatus(), configuration, true);
 
         deployments = kubernetesClient.apps().deployments().list().getItems();
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 136e931..93d9273 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -98,11 +98,11 @@
     }
 
     @Test
-    public void haMetaDataCheckTest() {
+    public void kubernetesHaMetaDataCheckTest() {
         var cr = TestUtils.buildApplicationCluster();
         var confManager = new FlinkConfigManager(new Configuration());
         assertFalse(
-                FlinkUtils.isHaMetadataAvailable(
+                FlinkUtils.isKubernetesHaMetadataAvailable(
                         confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
                         kubernetesClient));
 
@@ -113,7 +113,7 @@
                 cr.getMetadata().getName(),
                 null);
         assertFalse(
-                FlinkUtils.isHaMetadataAvailable(
+                FlinkUtils.isKubernetesHaMetadataAvailable(
                         confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
                         kubernetesClient));
 
@@ -123,14 +123,14 @@
                 cr.getMetadata().getName(),
                 null);
         assertTrue(
-                FlinkUtils.isHaMetadataAvailable(
+                FlinkUtils.isKubernetesHaMetadataAvailable(
                         confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
                         kubernetesClient));
 
         // Flink 1.13-1.14
         kubernetesClient.configMaps().inAnyNamespace().delete();
         assertFalse(
-                FlinkUtils.isHaMetadataAvailable(
+                FlinkUtils.isKubernetesHaMetadataAvailable(
                         confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
                         kubernetesClient));
 
@@ -140,7 +140,7 @@
                 cr.getMetadata().getName(),
                 null);
         assertFalse(
-                FlinkUtils.isHaMetadataAvailable(
+                FlinkUtils.isKubernetesHaMetadataAvailable(
                         confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
                         kubernetesClient));
 
@@ -150,7 +150,7 @@
                 cr.getMetadata().getName(),
                 null);
         assertTrue(
-                FlinkUtils.isHaMetadataAvailable(
+                FlinkUtils.isKubernetesHaMetadataAvailable(
                         confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
                         kubernetesClient));
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java
new file mode 100644
index 0000000..2b64a22
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphListener;
+import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for Zookeeper functions in FlinkUtils. */
+public class FlinkUtilsZookeeperHATest {
+
+    Configuration configuration;
+    TestingServer testingServer;
+    TemporaryFolder temporaryFolder;
+    CuratorFramework curator;
+    JobID jobID = JobID.generate();
+
+    public CuratorFrameworkWithUnhandledErrorListener getTestCurator(Configuration configuration) {
+        return ZooKeeperUtils.startCuratorFramework(configuration, new TestingFatalErrorHandler());
+    }
+
+    @BeforeEach
+    public void setupZookeeper() throws Exception {
+        // Start a ZK server
+        testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
+
+        // Generate configuration for the Curator
+        configuration = new Configuration();
+        configuration.setString(
+                HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.toString());
+        configuration.setString(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+        temporaryFolder = new TemporaryFolder();
+        temporaryFolder.create();
+        configuration.setString(
+                HighAvailabilityOptions.HA_STORAGE_PATH,
+                temporaryFolder.newFolder().getAbsolutePath());
+
+        // Create the Curator
+        curator = getTestCurator(configuration).asCuratorFramework();
+
+        // Populate the HA metadata with a test JobGraph
+        var jobGraphStore = ZooKeeperUtils.createJobGraphs(curator, configuration);
+        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
+        var jobGraph = JobGraphTestUtils.emptyJobGraph();
+        jobGraph.setJobID(jobID);
+        jobGraphStore.putJobGraph(jobGraph);
+        jobGraphStore.stop();
+    }
+
+    @AfterEach
+    public void cleanupZookeeper() throws Exception {
+        curator.close();
+        testingServer.close();
+        temporaryFolder.delete();
+    }
+
+    @Test
+    public void testDeleteZookeeperHAMetadata() throws Exception {
+        // First verify that the HA metadata path exists and is not empty
+        assertNotNull(curator.checkExists().forPath("/"));
+        assertTrue(curator.getChildren().forPath("/").size() != 0);
+
+        // Now delete all data
+        FlinkUtils.deleteZookeeperHAMetadata(configuration);
+
+        // Verify that the root path doesn't exist anymore
+        assertNull(curator.checkExists().forPath("/"));
+    }
+
+    @Test
+    public void testDeleteJobGraphInZookeeperHA() throws Exception {
+        // First verify that the JobGraph exists in ZK for the test JobID
+        var jobGraphPath = configuration.get(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+        assertEquals(List.of(jobID.toString()), curator.getChildren().forPath(jobGraphPath));
+
+        // Now delete the JobGraph
+        FlinkUtils.deleteJobGraphInZookeeperHA(configuration);
+
+        // Verify the JobGraph path doesn't exist anymore
+        assertNull(curator.checkExists().forPath(jobGraphPath));
+    }
+
+    @Test
+    public void zookeeperHaMetaDataCheckTest() throws Exception {
+        // Verify that the HA metadata exists since it was created in setupZookeeper()
+        assertTrue(FlinkUtils.isZookeeperHaMetadataAvailable(configuration));
+
+        // Now delete all data
+        curator.delete().deletingChildrenIfNeeded().forPath("/");
+
+        // Verify that the HA metadata no longer exists
+        assertFalse(FlinkUtils.isZookeeperHaMetadataAvailable(configuration));
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 9576c3e..a519863 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -109,7 +109,7 @@
                     dep.getSpec().setFlinkConfiguration(new HashMap<>());
                     dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
                 },
-                "Job could not be upgraded with last-state while Kubernetes HA disabled");
+                "Job could not be upgraded with last-state while HA disabled");
 
         testError(
                 dep -> {
@@ -190,6 +190,15 @@
                 dep ->
                         dep.getSpec()
                                 .setFlinkConfiguration(
+                                        Collections.singletonMap(
+                                                HighAvailabilityOptions.HA_CLUSTER_ID.key(),
+                                                "my-cluster-id")),
+                "Forbidden Flink config key");
+
+        testError(
+                dep ->
+                        dep.getSpec()
+                                .setFlinkConfiguration(
                                         Map.of(
                                                 KubernetesOperatorConfigOptions
                                                                 .OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED
@@ -240,7 +249,19 @@
                     dep.getSpec().setFlinkConfiguration(new HashMap<>());
                     dep.getSpec().getJobManager().setReplicas(2);
                 },
-                "Kubernetes High availability should be enabled when starting standby JobManagers.");
+                "High availability should be enabled when starting standby JobManagers.");
+
+        testError(
+                dep ->
+                        dep.getSpec()
+                                .setFlinkConfiguration(
+                                        Map.of(
+                                                KubernetesOperatorConfigOptions
+                                                        .DEPLOYMENT_ROLLBACK_ENABLED
+                                                        .key(),
+                                                "true")),
+                "HA must be enabled for rollback support.");
+
         testError(
                 dep -> dep.getSpec().getJobManager().setReplicas(0),
                 "JobManager replicas should not be configured less than one.");
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index 3ecea77..0b54fd9 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -64,7 +64,7 @@
                         .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
                         .addToArgs(JOBMANAGER_ENTRYPOINT_ARG);
 
-        if (kubernetesJobManagerParameters.isKubernetesHA()) {
+        if (kubernetesJobManagerParameters.isHAEnabled()) {
             containerBuilder.addToArgs(POD_IP_ARG);
         }
 
@@ -104,7 +104,7 @@
             args.addAll(kubernetesJobManagerParameters.getJobSpecArgs());
         }
 
-        if (kubernetesJobManagerParameters.isKubernetesHA()) {
+        if (kubernetesJobManagerParameters.isHAEnabled()) {
             args.add("--host");
             args.add(POD_IP_ARG);
         }
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index 3b887c0..c5d0253 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -20,13 +20,13 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,10 +39,6 @@
  */
 public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManagerParameters {
 
-    private static final String KUBERNETES_HA_FQN_FACTORY_CLASS =
-            "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory";
-    private static final String KUBERNETES_HA_MODE = "KUBERNETES";
-
     public StandaloneKubernetesJobManagerParameters(
             Configuration flinkConfig, ClusterSpecification clusterSpecification) {
         super(flinkConfig, clusterSpecification);
@@ -113,9 +109,7 @@
         return null;
     }
 
-    public boolean isKubernetesHA() {
-        String haMode = flinkConfig.getValue(HighAvailabilityOptions.HA_MODE);
-        return haMode.equals(KUBERNETES_HA_FQN_FACTORY_CLASS)
-                || haMode.equalsIgnoreCase(KUBERNETES_HA_MODE);
+    public boolean isHAEnabled() {
+        return HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig);
     }
 }
diff --git a/pom.xml b/pom.xml
index 789850c..f401389 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
 
         <okhttp.version>4.10.0</okhttp.version>
         <snakeyaml.version>1.33</snakeyaml.version>
+        <curator-test.version>5.2.0</curator-test.version>
     </properties>
 
     <dependencyManagement>