Make WagedRebalancer static by creating a ThreadLocal (#540)

ZKBucketDataAccessor has a GC logic, but this is only valid if the ZkClient inside it is active and not closed. Currently, WAGED rebalancer generates an instance of AssignmentMetadataStore every time it rebalances, which does not allow the internal ZkBucketDataAccessor to garbage collect the assignment metadata it wrote previously.

This diff makes the entire WagedRebalancer object a ThreadLocal, which has the effect of making it essentially static across different runs of the pipeline.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 6f442ea..1bd1fdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -67,8 +67,12 @@
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger logger =
       LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
-  // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only.
-  private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>();
+  // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread
+  // only.
+  private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL =
+      new ThreadLocal<>();
+  private static final ThreadLocal<WagedRebalancer> WAGED_REBALANCER_THREAD_LOCAL =
+      new ThreadLocal<>();
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -261,6 +265,7 @@
     Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
         cache.getClusterConfig().getGlobalRebalancePreference();
 
+    // Create MetricCollector ThreadLocal if it hasn't been already initialized
     if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
       try {
         // If HelixManager is null, we just pass in null for MetricCollector so that a
@@ -278,9 +283,13 @@
       }
     }
 
-    // TODO avoid creating the rebalancer on every rebalance call for performance enhancement
-    WagedRebalancer wagedRebalancer =
-        new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get());
+    // Create MetricCollector ThreadLocal if it hasn't been already initialized
+    if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) {
+      WAGED_REBALANCER_THREAD_LOCAL
+          .set(new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()));
+    }
+    WagedRebalancer wagedRebalancer = WAGED_REBALANCER_THREAD_LOCAL.get();
+
     try {
       newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
           currentStateOutput));
@@ -293,9 +302,8 @@
               "Failed to calculate the new Ideal States using the rebalancer %s due to %s",
               wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
           ex);
-    } finally {
-      wagedRebalancer.close();
     }
+
     Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator();
     while (itr.hasNext()) {
       Resource resource = itr.next();