Add atomic recursive delete to ZK client and use for drop instance (#2994)
Add atomic recursive delete to ZK client and use for drop instance
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d5998a1..ae91468 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -19,7 +19,6 @@
* under the License.
*/
-import com.google.common.collect.ImmutableMap;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -274,18 +273,16 @@
"Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop.");
}
- // delete config path
- String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
- ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
- // delete instance path
- dropInstancePathRecursively(instancePath, instanceConfig.getInstanceName());
+ dropInstancePathsRecursively(clusterName, instanceName);
}
- private void dropInstancePathRecursively(String instancePath, String instanceName) {
+ private void dropInstancePathsRecursively(String clusterName, String instanceName) {
+ String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
+ String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
int retryCnt = 0;
while (true) {
try {
- _zkClient.deleteRecursively(instancePath);
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
return;
} catch (ZkClientException e) {
if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
@@ -333,11 +330,7 @@
private void purgeInstance(String clusterName, String instanceName) {
logger.info("Purge instance {} from cluster {}.", instanceName, clusterName);
-
- String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
- _zkClient.delete(instanceConfigPath);
- String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
- dropInstancePathRecursively(instancePath, instanceName);
+ dropInstancePathsRecursively(clusterName, instanceName);
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index b54be8c..b60b909 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -237,7 +237,7 @@
new ZkException("ZkException: failed to delete " + instancePath,
new KeeperException.NotEmptyException(
"NotEmptyException: directory" + instancePath + " is not empty"))))
- .when(mockZkClient).deleteRecursively(instancePath);
+ .when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
try {
@@ -1342,4 +1342,29 @@
_gSetupTool.deleteCluster(clusterName);
}
}
+
+ @Test
+ public void testDropInstance() {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ int numInstances = 5;
+ final String clusterName = getShortClassName();
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+
+ // Add instances to cluster
+ for (int i = 0; i < numInstances; i++) {
+ admin.addInstance(clusterName, new InstanceConfig("localhost_" + i));
+ // Create dummy message nodes
+ _gZkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, "localhost_" + i, ""+i));
+ }
+ Assert.assertTrue(admin.getInstancesInCluster(clusterName).size() == numInstances, "Instances should be added");
+
+ for (int i = 0; i < 5; i++) {
+ admin.dropInstance(clusterName, new InstanceConfig("localhost_" + i));
+ }
+ Assert.assertTrue(admin.getInstancesInCluster(clusterName).isEmpty(), "Instances should be removed");
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index e24cc6e..e18e868 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -252,6 +252,9 @@
void deleteRecursively(String path);
+ void deleteRecursivelyAtomic(String path);
+ void deleteRecursivelyAtomic(List<String> paths);
+
boolean delete(final String path);
boolean delete(final String path, final int expectedVersion);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index a1a06f4..e214806 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -384,6 +384,20 @@
}
@Override
+ public void deleteRecursivelyAtomic(String path) {
+ checkIfPathContainsShardingKey(path);
+ _rawZkClient.deleteRecursivelyAtomic(path);
+ }
+
+ @Override
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ for (String path : paths) {
+ checkIfPathContainsShardingKey(path);
+ }
+ _rawZkClient.deleteRecursivelyAtomic(paths);
+ }
+
+ @Override
public boolean delete(String path) {
return delete(path, -1);
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 219ad97..7943bc2 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -44,6 +44,7 @@
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
@@ -371,6 +372,20 @@
}
@Override
+ public void deleteRecursivelyAtomic(String path) {
+ getZkClient(path).deleteRecursivelyAtomic(path);
+ }
+
+ @Override
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ // Check if all paths are in the same realm. If not, throw error as we cannot guarantee atomicity across clients.
+ if (paths.stream().map(this::getZkRealm).distinct().count() > 1) {
+ throw new IllegalArgumentException("Cannot atomically delete paths across different realms");
+ }
+ getZkClient(paths.get(0)).deleteRecursivelyAtomic(paths);
+ }
+
+ @Override
public boolean delete(String path) {
return delete(path, -1);
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 3676288..210e82b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -413,6 +413,20 @@
}
@Override
+ public void deleteRecursivelyAtomic(String path) {
+ checkIfPathContainsShardingKey(path);
+ _innerSharedZkClient.deleteRecursivelyAtomic(path);
+ }
+
+ @Override
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ for (String path : paths) {
+ checkIfPathContainsShardingKey(path);
+ }
+ _innerSharedZkClient.deleteRecursivelyAtomic(paths);
+ }
+
+ @Override
public boolean delete(String path) {
return delete(path, -1);
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 9d21e76..33b8bc1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -20,12 +20,17 @@
*/
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalLong;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -1820,6 +1825,84 @@
}
}
+ /**
+ * Delete the path as well as all its children. This operation is atomic and will either delete all nodes or none.
+ * This operation may fail if another agent is concurrently creating or deleting nodes under the path.
+ * @param path ZK path to delete
+ */
+ public void deleteRecursivelyAtomic(String path) {
+ deleteRecursivelyAtomic(Arrays.asList(path));
+ }
+
+ /**
+ * Delete the paths as well as all their children. This operation is atomic and will either delete all nodes or none.
+ * This operation may fail if another agent is concurrently creating or deleting nodes under any of the paths.
+ * @param paths ZK paths to delete
+ */
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ List<Op> ops = new ArrayList<>();
+ List<OpResult> opResults;
+ for (String path : paths) {
+ ops.addAll(getOpsForRecursiveDelete(path));
+ }
+
+ // Return early if no operations to execute
+ if (ops.isEmpty()) {
+ return;
+ }
+
+ try {
+ opResults = multi(ops);
+ } catch (Exception e) {
+ LOG.error("zkclient {}, Failed to delete paths {}, exception {}", _uid, paths, e);
+ throw new ZkClientException("Failed to delete paths " + paths, e);
+ }
+
+ // Check if any of the operations failed. Create mapping of failed paths to error codes
+ Map<String, KeeperException.Code> failedPathsMap = new HashMap<>();
+ for (int i = 0; i < opResults.size(); i++) {
+ if (opResults.get(i) instanceof OpResult.ErrorResult) {
+ failedPathsMap.put(ops.get(i).getPath(),
+ KeeperException.Code.get(((OpResult.ErrorResult) opResults.get(i)).getErr()));
+ }
+ }
+
+ // Log and throw exception if any of the operations failed
+ if (!failedPathsMap.isEmpty()) {
+ LOG.error("zkclient {}, Failed to delete paths {}, multi returned with error codes {} for sub-paths {}",
+ _uid, paths, failedPathsMap.keySet(), failedPathsMap.values());
+ throw new ZkClientException("Failed to delete paths " + paths + " with ZK KeeperException error codes: "
+ + failedPathsMap.keySet() + " for paths: " + failedPathsMap.values());
+ }
+ }
+
+ /**
+ * Get the list of operations to delete the given root and all its children. Ops will be ordered so that deletion of
+ * children will come before parent nodes.
+ * @param root the root node to delete
+ * @return the list of ZK operations to delete the given root and all its children
+ */
+ private List<Op> getOpsForRecursiveDelete(String root) {
+ List<Op> ops = new ArrayList<>();
+ // Return empty list if the root does not exist
+ if (!exists(root)) {
+ return ops;
+ }
+
+ // Level order traversal of tree, adding deleting operation for each node
+ // This will produce list of operations ordered from parent to children nodes
+ Queue<String> nodes = new LinkedList<>();
+ nodes.offer(root);
+ while (!nodes.isEmpty()) {
+ String node = nodes.poll();
+ getChildren(node, false).stream().forEach(child -> nodes.offer(node + "/" + child));
+ ops.add(Op.delete(node, -1));
+ }
+ // Reverse the list so that operations are ordered from children to parent nodes
+ Collections.reverse(ops);
+ return ops;
+ }
+
private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
final String path = event.getPath();
final boolean pathExists = event.getType() != EventType.NodeDeleted;
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index e5b589f..0ce0349 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -21,6 +21,7 @@
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -51,6 +52,7 @@
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
@@ -1231,4 +1233,58 @@
}
}
}
+
+ @Test
+ void testDeleteRecursivelyAtomic() {
+ System.out.println("Start test: " + TestHelper.getTestMethodName());
+ String grandParent = "/testDeleteRecursively";
+ String parent = grandParent + "/parent";
+ String child1 = parent + "/child1";
+ String child2 = parent + "/child2";
+ _zkClient.createPersistent(grandParent);
+ _zkClient.createPersistent(parent);
+ _zkClient.createPersistent(child1);
+ _zkClient.createPersistent(child2);
+ Assert.assertTrue(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+ // Test calling delete on same path twice
+ try {
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, grandParent));
+ Assert.fail("Operation should not succeed when attempting to delete same path twice");
+ } catch (ZkClientException expected) {
+ // Caught expected exception
+ }
+
+ Assert.assertTrue(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+ // Test calling delete on path that is child of another path in the list
+ try {
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, parent));
+ Assert.fail("Operation should not succeed when attempting to delete same path twice");
+ } catch (ZkClientException expected) {
+ // Caught expected exception
+ }
+
+ // Test calling delete on single node
+ Assert.assertTrue(_zkClient.exists(child2));
+ _zkClient.deleteRecursivelyAtomic(child2);
+ Assert.assertFalse(_zkClient.exists(child2));
+
+ Assert.assertTrue(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+ // Test successfully delete multiple paths. Also that operation succeeds when attempting to delete non-existent path
+ String newNode = "/newNode";
+ _zkClient.createPersistent(newNode);
+ Assert.assertTrue(_zkClient.exists(newNode));
+
+ String nonexistentPath = grandParent + "/nonexistent";
+ Assert.assertFalse(_zkClient.exists(nonexistentPath));
+
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, newNode, nonexistentPath));
+ Assert.assertFalse(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.exists(newNode));
+ }
}