Add atomic recursive delete to ZK client and use for drop instance (#2994)

Add atomic recursive delete to ZK client and use for drop instance
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d5998a1..ae91468 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -19,7 +19,6 @@
  * under the License.
  */
 
-import com.google.common.collect.ImmutableMap;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -274,18 +273,16 @@
           "Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop.");
     }
 
-    // delete config path
-    String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
-    ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
-    // delete instance path
-    dropInstancePathRecursively(instancePath, instanceConfig.getInstanceName());
+    dropInstancePathsRecursively(clusterName, instanceName);
   }
 
-  private void dropInstancePathRecursively(String instancePath, String instanceName) {
+  private void dropInstancePathsRecursively(String clusterName, String instanceName) {
+    String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
+    String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
     int retryCnt = 0;
     while (true) {
       try {
-        _zkClient.deleteRecursively(instancePath);
+        _zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
         return;
       } catch (ZkClientException e) {
         if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
@@ -333,11 +330,7 @@
 
   private void purgeInstance(String clusterName, String instanceName) {
     logger.info("Purge instance {} from cluster {}.", instanceName, clusterName);
-
-    String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
-    _zkClient.delete(instanceConfigPath);
-    String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
-    dropInstancePathRecursively(instancePath, instanceName);
+    dropInstancePathsRecursively(clusterName, instanceName);
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index b54be8c..b60b909 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -237,7 +237,7 @@
         new ZkException("ZkException: failed to delete " + instancePath,
             new KeeperException.NotEmptyException(
                 "NotEmptyException: directory" + instancePath + " is not empty"))))
-        .when(mockZkClient).deleteRecursively(instancePath);
+        .when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
 
     HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
     try {
@@ -1342,4 +1342,29 @@
       _gSetupTool.deleteCluster(clusterName);
     }
   }
+
+  @Test
+  public void testDropInstance() {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    int numInstances = 5;
+    final String clusterName = getShortClassName();
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+
+    // Add instances to cluster
+    for (int i = 0; i < numInstances; i++) {
+      admin.addInstance(clusterName, new InstanceConfig("localhost_" + i));
+      // Create dummy message nodes
+      _gZkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, "localhost_" + i, ""+i));
+    }
+    Assert.assertTrue(admin.getInstancesInCluster(clusterName).size() == numInstances, "Instances should be added");
+
+    for (int i = 0; i < 5; i++) {
+      admin.dropInstance(clusterName, new InstanceConfig("localhost_" + i));
+    }
+    Assert.assertTrue(admin.getInstancesInCluster(clusterName).isEmpty(), "Instances should be removed");
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index e24cc6e..e18e868 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -252,6 +252,9 @@
 
   void deleteRecursively(String path);
 
+  void deleteRecursivelyAtomic(String path);
+  void deleteRecursivelyAtomic(List<String> paths);
+
   boolean delete(final String path);
 
   boolean delete(final String path, final int expectedVersion);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index a1a06f4..e214806 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -384,6 +384,20 @@
   }
 
   @Override
+  public void deleteRecursivelyAtomic(String path) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.deleteRecursivelyAtomic(path);
+  }
+
+  @Override
+  public void deleteRecursivelyAtomic(List<String> paths) {
+    for (String path : paths) {
+      checkIfPathContainsShardingKey(path);
+    }
+    _rawZkClient.deleteRecursivelyAtomic(paths);
+  }
+
+  @Override
   public boolean delete(String path) {
     return delete(path, -1);
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 219ad97..7943bc2 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -44,6 +44,7 @@
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.ZooDefs;
@@ -371,6 +372,20 @@
   }
 
   @Override
+  public void deleteRecursivelyAtomic(String path) {
+    getZkClient(path).deleteRecursivelyAtomic(path);
+  }
+
+  @Override
+  public void deleteRecursivelyAtomic(List<String> paths) {
+    // Check if all paths are in the same realm. If not, throw error as we cannot guarantee atomicity across clients.
+    if (paths.stream().map(this::getZkRealm).distinct().count() > 1) {
+      throw new IllegalArgumentException("Cannot atomically delete paths across different realms");
+    }
+    getZkClient(paths.get(0)).deleteRecursivelyAtomic(paths);
+  }
+
+  @Override
   public boolean delete(String path) {
     return delete(path, -1);
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 3676288..210e82b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -413,6 +413,20 @@
   }
 
   @Override
+  public void deleteRecursivelyAtomic(String path) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.deleteRecursivelyAtomic(path);
+  }
+
+  @Override
+  public void deleteRecursivelyAtomic(List<String> paths) {
+    for (String path : paths) {
+      checkIfPathContainsShardingKey(path);
+    }
+    _innerSharedZkClient.deleteRecursivelyAtomic(paths);
+  }
+
+  @Override
   public boolean delete(String path) {
     return delete(path, -1);
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 9d21e76..33b8bc1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -20,12 +20,17 @@
  */
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.OptionalLong;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1820,6 +1825,84 @@
     }
   }
 
+  /**
+   * Delete the path as well as all its children. This operation is atomic and will either delete all nodes or none.
+   * This operation may fail if another agent is concurrently creating or deleting nodes under the path.
+   * @param path ZK path to delete
+   */
+  public void deleteRecursivelyAtomic(String path) {
+    deleteRecursivelyAtomic(Arrays.asList(path));
+  }
+
+  /**
+   * Delete the paths as well as all their children. This operation is atomic and will either delete all nodes or none.
+   * This operation may fail if another agent is concurrently creating or deleting nodes under any of the paths.
+   * @param paths ZK paths to delete
+   */
+  public void deleteRecursivelyAtomic(List<String> paths) {
+    List<Op> ops = new ArrayList<>();
+    List<OpResult> opResults;
+    for (String path : paths) {
+      ops.addAll(getOpsForRecursiveDelete(path));
+    }
+
+    // Return early if no operations to execute
+    if (ops.isEmpty()) {
+      return;
+    }
+
+    try {
+      opResults = multi(ops);
+    } catch (Exception e) {
+      LOG.error("zkclient {}, Failed to delete paths {}, exception {}", _uid, paths, e);
+      throw new ZkClientException("Failed to delete paths " + paths, e);
+    }
+
+    // Check if any of the operations failed. Create mapping of failed paths to error codes
+    Map<String, KeeperException.Code> failedPathsMap = new HashMap<>();
+    for (int i = 0; i < opResults.size(); i++) {
+      if (opResults.get(i) instanceof OpResult.ErrorResult) {
+        failedPathsMap.put(ops.get(i).getPath(),
+            KeeperException.Code.get(((OpResult.ErrorResult) opResults.get(i)).getErr()));
+      }
+    }
+
+    // Log and throw exception if any of the operations failed
+    if (!failedPathsMap.isEmpty()) {
+      LOG.error("zkclient {}, Failed to delete paths {}, multi returned with error codes {} for sub-paths {}",
+          _uid, paths, failedPathsMap.keySet(), failedPathsMap.values());
+      throw new ZkClientException("Failed to delete paths " + paths + " with ZK KeeperException error codes: "
+          + failedPathsMap.keySet() + " for paths: " + failedPathsMap.values());
+    }
+  }
+
+  /**
+   * Get the list of operations to delete the given root and all its children. Ops will be ordered so that deletion of
+   * children will come before parent nodes.
+   * @param root the root node to delete
+   * @return the list of ZK operations to delete the given root and all its children
+   */
+  private List<Op> getOpsForRecursiveDelete(String root) {
+    List<Op> ops = new ArrayList<>();
+    // Return empty list if the root does not exist
+    if (!exists(root)) {
+      return ops;
+    }
+
+    // Level order traversal of tree, adding deleting operation for each node
+    // This will produce list of operations ordered from parent to children nodes
+    Queue<String> nodes = new LinkedList<>();
+    nodes.offer(root);
+    while (!nodes.isEmpty()) {
+      String node = nodes.poll();
+      getChildren(node, false).stream().forEach(child -> nodes.offer(node + "/" + child));
+      ops.add(Op.delete(node, -1));
+    }
+   // Reverse the list so that operations are ordered from children to parent nodes
+    Collections.reverse(ops);
+    return ops;
+  }
+
   private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
     final String path = event.getPath();
     final boolean pathExists = event.getType() != EventType.NodeDeleted;
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index e5b589f..0ce0349 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -21,6 +21,7 @@
 
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -51,6 +52,7 @@
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
@@ -1231,4 +1233,58 @@
       }
     }
   }
+
+  @Test
+  void testDeleteRecursivelyAtomic() {
+    System.out.println("Start test: " + TestHelper.getTestMethodName());
+    String grandParent = "/testDeleteRecursively";
+    String parent = grandParent + "/parent";
+    String child1 = parent + "/child1";
+    String child2 = parent + "/child2";
+    _zkClient.createPersistent(grandParent);
+    _zkClient.createPersistent(parent);
+    _zkClient.createPersistent(child1);
+    _zkClient.createPersistent(child2);
+    Assert.assertTrue(_zkClient.exists(grandParent));
+    Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+    // Test calling delete on same path twice
+    try {
+      _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, grandParent));
+      Assert.fail("Operation should not succeed when attempting to delete same path twice");
+    } catch (ZkClientException expected) {
+      // Caught expected exception
+    }
+
+    Assert.assertTrue(_zkClient.exists(grandParent));
+    Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+    // Test calling delete on path that is child of another path in the list
+    try {
+      _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, parent));
+      Assert.fail("Operation should not succeed when attempting to delete same path twice");
+    } catch (ZkClientException expected) {
+      // Caught expected exception
+    }
+
+    // Test calling delete on single node
+    Assert.assertTrue(_zkClient.exists(child2));
+    _zkClient.deleteRecursivelyAtomic(child2);
+    Assert.assertFalse(_zkClient.exists(child2));
+
+    Assert.assertTrue(_zkClient.exists(grandParent));
+    Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+    // Test successfully delete multiple paths. Also that operation succeeds when attempting to delete non-existent path
+    String newNode = "/newNode";
+    _zkClient.createPersistent(newNode);
+    Assert.assertTrue(_zkClient.exists(newNode));
+
+    String nonexistentPath = grandParent + "/nonexistent";
+    Assert.assertFalse(_zkClient.exists(nonexistentPath));
+
+    _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, newNode, nonexistentPath));
+    Assert.assertFalse(_zkClient.exists(grandParent));
+    Assert.assertFalse(_zkClient.exists(newNode));
+  }
 }