Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak (#2180)

Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak

Reuse zkclient in verifier and improve resource closure logic to avoid resource leak.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index d1075d4..e94148e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -88,10 +88,5 @@
       _bestPossibleAssignment = bestPossibleAssignment;
       return true;
     }
-
-    @Override
-    // BucketDataAccessor will be reused, won't be closed here.
-    public void close() {
-    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 521e3d7..c20b3f9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -41,12 +41,13 @@
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.slf4j.Logger;
@@ -75,10 +76,11 @@
 
   private final int _bucketSize;
   private final long _versionTTLms;
-  private ZkSerializer _zkSerializer;
-  private RealmAwareZkClient _zkClient;
-  private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
-  private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
+  private final ZkSerializer _zkSerializer;
+  private final RealmAwareZkClient _zkClient;
+  private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
+  private final Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
+  private boolean _usesExternalZkClient = false;
 
   /**
    * Constructor that allows a custom bucket size.
@@ -87,25 +89,21 @@
    * @param versionTTLms in ms
    */
   public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
-    _zkClient = createRealmAwareZkClient(zkAddr);
-    _zkClient.setZkSerializer(new ZkSerializer() {
-      @Override
-      public byte[] serialize(Object data) throws ZkMarshallingError {
-        if (data instanceof byte[]) {
-          return (byte[]) data;
-        }
-        throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
-      }
+    this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
+  }
 
-      @Override
-      public Object deserialize(byte[] data) throws ZkMarshallingError {
-        return data;
-      }
-    });
+  public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
+    this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
+  }
+
+  private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms,
+      boolean usesExternalZkClient) {
+    _zkClient = zkClient;
     _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
     _versionTTLms = versionTTLms;
+    _usesExternalZkClient = usesExternalZkClient;
   }
 
   /**
@@ -135,6 +133,7 @@
       zkClient = DedicatedZkClientFactory.getInstance()
           .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     }
+    zkClient.setZkSerializer(new ByteArraySerializer());
     return zkClient;
   }
 
@@ -258,7 +257,7 @@
 
   @Override
   public void disconnect() {
-    if (!_zkClient.isClosed()) {
+    if (!_usesExternalZkClient && _zkClient != null && !_zkClient.isClosed()) {
       _zkClient.close();
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 3b133a1..1997bea 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -34,6 +34,7 @@
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -91,6 +92,7 @@
     _resources = resources;
     _expectLiveInstances = expectLiveInstances;
     _dataProvider = new ResourceControllerDataProvider();
+    // _zkClient should be closed with BestPossibleExternalViewVerifier
   }
 
   /**
@@ -105,7 +107,7 @@
   public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
       Set<String> resources, Map<String, Map<String, String>> errStates,
       Set<String> expectLiveInstances) {
-    this(zkClient, clusterName, resources, errStates, expectLiveInstances, 0);
+    this(zkClient, clusterName, errStates, resources, expectLiveInstances, 0, true);
   }
 
   @Deprecated
@@ -114,11 +116,7 @@
       Set<String> expectLiveInstances, int waitTillVerify) {
     // usesExternalZkClient = true because ZkClient is given by the caller
     // at close(), we will not close this ZkClient because it might be being used elsewhere
-    super(zkClient, clusterName, true, waitTillVerify);
-    _errStates = errStates;
-    _resources = resources;
-    _expectLiveInstances = expectLiveInstances;
-    _dataProvider = new ResourceControllerDataProvider();
+    this(zkClient, clusterName, errStates, resources, expectLiveInstances, waitTillVerify, true);
   }
 
   private BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
@@ -144,7 +142,6 @@
     private Set<String> _resources;
     private Set<String> _expectLiveInstances;
     private RealmAwareZkClient _zkClient;
-    private boolean _usesExternalZkClient = false; // false by default
 
     public Builder(String clusterName) {
       _clusterName = clusterName;
@@ -155,11 +152,12 @@
         throw new IllegalArgumentException("Cluster name is missing!");
       }
 
+      // _usesExternalZkClient == true
       if (_zkClient != null) {
-        return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _resources, _errStates,
-            _expectLiveInstances, _waitPeriodTillVerify);
+        return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _errStates, _resources,
+            _expectLiveInstances, _waitPeriodTillVerify, true);
       }
-
+      // _usesExternalZkClient == false
       if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
         // For backward-compatibility
         return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName, _resources,
@@ -170,7 +168,7 @@
       return new BestPossibleExternalViewVerifier(
           createZkClient(RealmAwareZkClient.RealmMode.SINGLE_REALM, _realmAwareZkConnectionConfig,
               _realmAwareZkClientConfig, _zkAddress), _clusterName, _errStates, _resources,
-          _expectLiveInstances, _waitPeriodTillVerify, _usesExternalZkClient);
+          _expectLiveInstances, _waitPeriodTillVerify, false);
     }
 
     public String getClusterName() {
@@ -210,7 +208,6 @@
 
     public Builder setZkClient(RealmAwareZkClient zkClient) {
       _zkClient = zkClient;
-      _usesExternalZkClient = true; // Set the flag since external ZkClient is used
       return this;
     }
   }
@@ -435,18 +432,15 @@
 
     RebalanceUtil.runStage(event, new CurrentStateComputationStage());
     // Note the readOnlyWagedRebalancer is just for one time usage
-    DryrunWagedRebalancer dryrunWagedRebalancer =
-        new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
-            cache.getClusterConfig().getGlobalRebalancePreference());
-    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
-    try {
+
+    try (ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(_zkClient);
+        DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(zkBucketDataAccessor,
+            cache.getClusterName(), cache.getClusterConfig().getGlobalRebalancePreference())) {
+      event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
       RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
-    } finally {
-      dryrunWagedRebalancer.close();
     }
 
-    BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    return output;
+    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
   }
 
   @Override
@@ -456,15 +450,17 @@
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
   }
 
+  // TODO: to clean up, finalize is deprecated in Java 9
   @Override
   public void finalize() {
     close();
+    super.finalize();
   }
 
-  private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
-    public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
+  private static class DryrunWagedRebalancer extends ReadOnlyWagedRebalancer implements AutoCloseable {
+    public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName,
         Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-      super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName, preferences);
+      super(zkBucketDataAccessor, clusterName, preferences);
     }
 
     @Override
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 623a91e..11071d3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -44,7 +44,7 @@
 import org.slf4j.LoggerFactory;
 
 public abstract class ZkHelixClusterVerifier
-    implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
+    implements IZkChildListener, IZkDataListener, HelixClusterVerifier, AutoCloseable {
   private static Logger LOG = LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
   protected static int DEFAULT_TIMEOUT = 300 * 1000;
   protected static int DEFAULT_PERIOD = 500;
@@ -229,6 +229,11 @@
     return verifyByPolling(DEFAULT_TIMEOUT, DEFAULT_PERIOD);
   }
 
+  /**
+   * Implement close() for {@link AutoCloseable} and {@link HelixClusterVerifier}.
+   * Non-external resources should be closed in this method to prevent resource leak.
+   */
+  @Override
   public void close() {
     if (_zkClient != null && !_usesExternalZkClient) {
       _zkClient.close();
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index a1cfb66..27921f6 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -285,7 +285,7 @@
     if (_zkBucketDataAccessor == null) {
       synchronized (this) {
         if (_zkBucketDataAccessor == null) {
-          _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
+          _zkBucketDataAccessor = new ZkBucketDataAccessor(getByteArrayRealmAwareZkClient());
         }
       }
     }