[FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode

When calling ZooKeeperHaServices#closeAndCleanupAllData it can happen that a child of the owned
zNode of the ZooKeeperHaServices is being concurrently deleted (e.g. a LeaderElectionService has
been shut down). In order to tolerate concurrent deletions, we use now ZKPaths#deleteChildren.

This closes #7929.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 1b2ff44..2596981 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -251,7 +251,21 @@
 
 	private void deleteOwnedZNode() throws Exception {
 		// delete the HA_CLUSTER_ID znode which is owned by this cluster
-		client.delete().deletingChildrenIfNeeded().forPath("/");
+
+		// Since we are using Curator version 2.12 there is a bug in deleting the children
+		// if there is a concurrent delete operation. Therefore we need to add this retry
+		// logic. See https://issues.apache.org/jira/browse/CURATOR-430 for more information.
+		// The retry logic can be removed once we upgrade to Curator version >= 4.0.1.
+		boolean zNodeDeleted = false;
+		while (!zNodeDeleted) {
+			try {
+				client.delete().deletingChildrenIfNeeded().forPath("/");
+				zNodeDeleted = true;
+			} catch (KeeperException.NoNodeException ignored) {
+				// concurrent delete operation. Try again.
+				LOG.debug("Retrying to delete owned znode because of other concurrent delete operation.");
+			}
+		}
 	}
 
 	/**