Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak (#2180)
Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak
Reuse zkclient in verifier and improve resource closure logic to avoid resource leak.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index d1075d4..e94148e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -88,10 +88,5 @@
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}
-
- @Override
- // BucketDataAccessor will be reused, won't be closed here.
- public void close() {
- }
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 521e3d7..c20b3f9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -41,12 +41,13 @@
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
@@ -75,10 +76,11 @@
private final int _bucketSize;
private final long _versionTTLms;
- private ZkSerializer _zkSerializer;
- private RealmAwareZkClient _zkClient;
- private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
- private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
+ private final ZkSerializer _zkSerializer;
+ private final RealmAwareZkClient _zkClient;
+ private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
+ private final Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
+ private boolean _usesExternalZkClient = false;
/**
* Constructor that allows a custom bucket size.
@@ -87,25 +89,21 @@
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
- _zkClient = createRealmAwareZkClient(zkAddr);
- _zkClient.setZkSerializer(new ZkSerializer() {
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError {
- if (data instanceof byte[]) {
- return (byte[]) data;
- }
- throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
- }
+ this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
+ }
- @Override
- public Object deserialize(byte[] data) throws ZkMarshallingError {
- return data;
- }
- });
+ public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
+ this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
+ }
+
+ private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms,
+ boolean usesExternalZkClient) {
+ _zkClient = zkClient;
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTLms = versionTTLms;
+ _usesExternalZkClient = usesExternalZkClient;
}
/**
@@ -135,6 +133,7 @@
zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
}
+ zkClient.setZkSerializer(new ByteArraySerializer());
return zkClient;
}
@@ -258,7 +257,7 @@
@Override
public void disconnect() {
- if (!_zkClient.isClosed()) {
+ if (!_usesExternalZkClient && _zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 3b133a1..1997bea 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -34,6 +34,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -91,6 +92,7 @@
_resources = resources;
_expectLiveInstances = expectLiveInstances;
_dataProvider = new ResourceControllerDataProvider();
+ // _zkClient should be closed with BestPossibleExternalViewVerifier
}
/**
@@ -105,7 +107,7 @@
public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
Set<String> resources, Map<String, Map<String, String>> errStates,
Set<String> expectLiveInstances) {
- this(zkClient, clusterName, resources, errStates, expectLiveInstances, 0);
+ this(zkClient, clusterName, errStates, resources, expectLiveInstances, 0, true);
}
@Deprecated
@@ -114,11 +116,7 @@
Set<String> expectLiveInstances, int waitTillVerify) {
// usesExternalZkClient = true because ZkClient is given by the caller
// at close(), we will not close this ZkClient because it might be being used elsewhere
- super(zkClient, clusterName, true, waitTillVerify);
- _errStates = errStates;
- _resources = resources;
- _expectLiveInstances = expectLiveInstances;
- _dataProvider = new ResourceControllerDataProvider();
+ this(zkClient, clusterName, errStates, resources, expectLiveInstances, waitTillVerify, true);
}
private BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
@@ -144,7 +142,6 @@
private Set<String> _resources;
private Set<String> _expectLiveInstances;
private RealmAwareZkClient _zkClient;
- private boolean _usesExternalZkClient = false; // false by default
public Builder(String clusterName) {
_clusterName = clusterName;
@@ -155,11 +152,12 @@
throw new IllegalArgumentException("Cluster name is missing!");
}
+ // _usesExternalZkClient == true
if (_zkClient != null) {
- return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _resources, _errStates,
- _expectLiveInstances, _waitPeriodTillVerify);
+ return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _errStates, _resources,
+ _expectLiveInstances, _waitPeriodTillVerify, true);
}
-
+ // _usesExternalZkClient == false
if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
// For backward-compatibility
return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName, _resources,
@@ -170,7 +168,7 @@
return new BestPossibleExternalViewVerifier(
createZkClient(RealmAwareZkClient.RealmMode.SINGLE_REALM, _realmAwareZkConnectionConfig,
_realmAwareZkClientConfig, _zkAddress), _clusterName, _errStates, _resources,
- _expectLiveInstances, _waitPeriodTillVerify, _usesExternalZkClient);
+ _expectLiveInstances, _waitPeriodTillVerify, false);
}
public String getClusterName() {
@@ -210,7 +208,6 @@
public Builder setZkClient(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
- _usesExternalZkClient = true; // Set the flag since external ZkClient is used
return this;
}
}
@@ -435,18 +432,15 @@
RebalanceUtil.runStage(event, new CurrentStateComputationStage());
// Note the readOnlyWagedRebalancer is just for one time usage
- DryrunWagedRebalancer dryrunWagedRebalancer =
- new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
- cache.getClusterConfig().getGlobalRebalancePreference());
- event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
- try {
+
+ try (ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(_zkClient);
+ DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(zkBucketDataAccessor,
+ cache.getClusterName(), cache.getClusterConfig().getGlobalRebalancePreference())) {
+ event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
- } finally {
- dryrunWagedRebalancer.close();
}
- BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- return output;
+ return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
}
@Override
@@ -456,15 +450,17 @@
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
}
+ // TODO: to clean up, finalize is deprecated in Java 9
@Override
public void finalize() {
close();
+ super.finalize();
}
- private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
- public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
+ private static class DryrunWagedRebalancer extends ReadOnlyWagedRebalancer implements AutoCloseable {
+ public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
- super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName, preferences);
+ super(zkBucketDataAccessor, clusterName, preferences);
}
@Override
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 623a91e..11071d3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -44,7 +44,7 @@
import org.slf4j.LoggerFactory;
public abstract class ZkHelixClusterVerifier
- implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
+ implements IZkChildListener, IZkDataListener, HelixClusterVerifier, AutoCloseable {
private static Logger LOG = LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
protected static int DEFAULT_TIMEOUT = 300 * 1000;
protected static int DEFAULT_PERIOD = 500;
@@ -229,6 +229,11 @@
return verifyByPolling(DEFAULT_TIMEOUT, DEFAULT_PERIOD);
}
+ /**
+ * Implement close() for {@link AutoCloseable} and {@link HelixClusterVerifier}.
+ * Non-external resources should be closed in this method to prevent resource leak.
+ */
+ @Override
public void close() {
if (_zkClient != null && !_usesExternalZkClient) {
_zkClient.close();
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index a1cfb66..27921f6 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -285,7 +285,7 @@
if (_zkBucketDataAccessor == null) {
synchronized (this) {
if (_zkBucketDataAccessor == null) {
- _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
+ _zkBucketDataAccessor = new ZkBucketDataAccessor(getByteArrayRealmAwareZkClient());
}
}
}