ZkClient should not keep retrying getChildren() due to large number of children (#1109)

For ZkClient's getChildren() operation, if there are a large number of children and the response packet size exceeds jute.maxbuffer default value 4MB on zk client side, ZkClient will get a ConnectionLossException and keep retrying connecting to ZK. The consequence is, the infinite retry may cause heavy GC on ZK server and kill ZK server.

This commit implements a workaround to exit retry loop for getChildren() if a large number of children cause connection loss.
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 1b4d1ca..89b3c7a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -71,8 +71,14 @@
  * WARN: Do not use this class directly, use {@link org.apache.helix.manager.zk.ZkClient} instead.
 public class ZkClient implements Watcher {
-  private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
-  private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
+  private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+  private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
+  // If number of children exceeds this limit, getChildren() should not retry on connection loss.
+  // This is a workaround for exiting retry on connection loss because of large number of children.
+  // TODO: remove it once we have a better way to exit retry for this case
+  private static final int NUM_CHILDREN_LIMIT;
   private final IZkConnection _connection;
   private final long _operationRetryTimeoutInMillis;
@@ -90,6 +96,13 @@
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
+  static {
+    // 100K is specific for helix messages which use UUID, making packet length just below 4 MB.
+    // Set it here for unit test to use reflection to change value
+    // because compilers optimize constants by replacing them inline.
+    NUM_CHILDREN_LIMIT = 100 * 1000;
+  }
   private class IZkDataListenerEntry {
     final IZkDataListener _dataListener;
     final boolean _prefetchData;
@@ -713,11 +726,33 @@
   protected List<String> getChildren(final String path, final boolean watch) {
     long startT = System.currentTimeMillis();
     try {
       List<String> children = retryUntilConnected(new Callable<List<String>>() {
+        private int connectionLossRetryCount = 0;
         public List<String> call() throws Exception {
-          return getConnection().getChildren(path, watch);
+          try {
+            return getConnection().getChildren(path, watch);
+          } catch (ConnectionLossException e) {
+            // Issue: https://github.com/apache/helix/issues/962
+            // Connection loss might be caused by an excessive number of children.
+            // Infinitely retrying connecting may cause high GC in ZK server and kill ZK server.
+            // This is a workaround to check numChildren to have a chance to exit retry loop.
+            // Check numChildren stat every other 3 connection loss, because there is a higher
+            // possibility that connection loss is caused by other factors such as network
+            // connectivity, session expired, etc.
+            // TODO: remove this check once we have a better way to exit infinite retry
+            ++connectionLossRetryCount;
+            if (connectionLossRetryCount >= 3) {
+              checkNumChildrenLimit(path);
+              connectionLossRetryCount = 0;
+            }
+            // Re-throw the ConnectionLossException for retryUntilConnected() to catch and retry.
+            throw e;
+          }
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -1765,4 +1800,25 @@
+  private void checkNumChildrenLimit(String path) throws KeeperException {
+    Stat stat = getStat(path);
+    if (stat == null) {
+      return;
+    }
+    if (stat.getNumChildren() > NUM_CHILDREN_LIMIT) {
+      LOG.error("Failed to get children for path {} because of connection loss. "
+              + "Number of children {} exceeds limit {}, aborting retry.", path,
+          stat.getNumChildren(),
+      // MarshallingErrorException could represent transport error: exceeding the
+      // Jute buffer size. So use it to exit retry loop and tell that zk is not able to
+      // transport the data because packet length is too large.
+      throw new KeeperException.MarshallingErrorException();
+    } else {
+      LOG.debug("Number of children {} is less than limit {}, not exiting retry.",
+          stat.getNumChildren(), NUM_CHILDREN_LIMIT);
+    }
+  }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index ce109b7..9cea4c2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -19,20 +19,10 @@
  * under the License.
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
@@ -42,12 +32,8 @@
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.*;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
@@ -55,6 +41,20 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 public class TestRawZkClient extends ZkUnitTestBase {
   private final String TEST_TAG = "test_monitor";
   private final String TEST_ROOT = "/my_cluster/IDEALSTATES";
@@ -412,4 +412,67 @@
+  /*
+   * Tests getChildren() when there are an excessive number of children and connection loss happens,
+   * the operation should terminate and exit retry loop.
+   */
+  @Test
+  public void testGetChildrenOnLargeNumChildren() throws Exception {
+    // Default packetLen is 4M. It is static final and initialized
+    // when first zkClient is created.
+    // So we could not just set "jute.maxbuffer" to change the value.
+    // Reflection is needed to change the value.
+    // Remove "final" modifier
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    boolean isModifierAccessible = modifiersField.isAccessible();
+    modifiersField.setAccessible(true);
+    Field packetLenField = ClientCnxn.class.getDeclaredField("packetLen");
+    Field childrenLimitField =
+            org.apache.helix.manager.zk.zookeeper.ZkClient.class.getDeclaredField("NUM_CHILDREN_LIMIT");
+    modifiersField.setInt(packetLenField, packetLenField.getModifiers() & ~Modifier.FINAL);
+    modifiersField.setInt(childrenLimitField, childrenLimitField.getModifiers() & ~Modifier.FINAL);
+    boolean isPacketLenAccessible = packetLenField.isAccessible();
+    packetLenField.setAccessible(true);
+    int originPacketLen = packetLenField.getInt(null);
+    // Keep 150 bytes for successfully creating each child node.
+    packetLenField.set(null, 150);
+    boolean isChildrenLimitAccessible = childrenLimitField.isAccessible();
+    childrenLimitField.setAccessible(true);
+    int originChildrenLimit = childrenLimitField.getInt(null);
+    childrenLimitField.set(null, 2);
+    String path = "/" + TestHelper.getTestMethodName();
+    // Create 5 children to make packet length of children exceed 150 bytes
+    // and cause connection loss for getChildren() operation
+    for (int i = 0; i < 5; i++) {
+      _zkClient.createPersistent(path + "/" + UUID.randomUUID().toString(), true);
+    }
+    try {
+      _zkClient.getChildren(path);
+      Assert.fail("Should not successfully get children.");
+    } catch (ZkException expected) {
+      Assert.assertEquals(expected.getMessage(),
+              "org.apache.zookeeper.KeeperException$MarshallingErrorException: "
+                      + "KeeperErrorCode = MarshallingError");
+    } finally {
+      packetLenField.set(null, originPacketLen);
+      packetLenField.setAccessible(isPacketLenAccessible);
+      childrenLimitField.set(null, originChildrenLimit);
+      childrenLimitField.setAccessible(isChildrenLimitAccessible);
+      modifiersField.setAccessible(isModifierAccessible);
+      Assert.assertTrue(TestHelper.verify(() -> {
+        _zkClient.deleteRecursively(path);
+        return !_zkClient.exists(path);
+      }, TestHelper.WAIT_DURATION));
+    }
+  }