Reset the WAGED rebalancer once the controller newly acquires leadership. (#690)

This is to prevent any cached assignment information which is recorded during the previous session from impacting the rebalance result.
Detailed change list:

Move the stateful WAGED rebalancer to the GenericHelixController object instead of the rebalance stage. This is for resolving the possible race condition between the event processing thread and leader switch handling thread.
Adding a new test regarding leadership switch to verify that the WAGED rebalancer has been reset after the processing.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 3ccea41..e47c420 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -62,6 +62,8 @@
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
+import org.apache.helix.controller.rebalancer.StatefulRebalancer;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
@@ -169,7 +171,6 @@
   Timer _onDemandRebalanceTimer = null;
   AtomicReference<RebalanceTask> _nextRebalanceTask = new AtomicReference<>();
 
-
   /**
    * A cache maintained across pipelines
    */
@@ -187,6 +188,17 @@
 
   private HelixManager _helixManager;
 
+  // Since the stateful rebalancer needs to be lazily constructed when the HelixManager instance is
+  // ready, the GenericHelixController is not constructed with a stateful rebalancer. This wrapper
+  // is to avoid the complexity of handling a nullable value in the event handling process.
+  // TODO Create the required stateful rebalancer only when it is used by any resource.
+  private final StatefulRebalancerRef _rebalancerRef = new StatefulRebalancerRef() {
+    @Override
+    protected StatefulRebalancer createRebalancer(HelixManager helixManager) {
+      return new WagedRebalancer(helixManager);
+    }
+  };
+
   /**
    * TODO: We should get rid of this once we move to:
    *  1) ZK callback should go to ClusterDataCache and trigger data cache refresh only
@@ -626,6 +638,22 @@
       return;
     }
 
+    // Event handling happens in a different thread from the onControllerChange processing thread.
+    // Thus, there are several possible conditions.
+    // 1. Event handled after leadership acquired. So we will have a valid rebalancer for the
+    // event processing.
+    // 2. Event handled shortly after leadership relinquished. And the rebalancer has not been
+    // marked as invalid yet. So the event will be processed the same as case one.
+    // 3. Event is leftover from the previous session, and it is handled when the controller
+    // regains the leadership. The rebalancer will be reset before being used. That is the
+    // expected behavior so as to avoid inconsistent rebalance result.
+    // 4. Event handled shortly after leadership relinquished. And the rebalancer has been marked
+    // as invalid. So we reset the rebalancer. But the later isLeader() check will return false and
+    // the pipeline will be triggered. So the reset rebalancer won't be used before the controller
+    // regains leadership.
+    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
+        _rebalancerRef.getRebalancer(manager));
+
     if (!manager.isLeader()) {
       logger.error("Cluster manager: " + manager.getInstanceName() + " is not leader for " + manager
           .getClusterName() + ". Pipeline will not be invoked");
@@ -1022,6 +1050,12 @@
       _clusterStatusMonitor.setMaintenance(_inMaintenanceMode);
     } else {
       enableClusterStatusMonitor(false);
+      // Note that onControllerChange is executed in parallel with the event processing thread. It
+      // is possible that the current WAGED rebalancer object is in use for handling callback. So
+      // mark the rebalancer invalid only, instead of closing it here.
+      // This to-be-closed WAGED rebalancer will be reset later on a later event processing if
+      // the controller becomes leader again.
+      _rebalancerRef.invalidateRebalancer();
     }
 
     logger.info("END: GenericClusterController.onControllerChange() for cluster " + _clusterName);
@@ -1125,6 +1159,8 @@
 
     enableClusterStatusMonitor(false);
 
+    _rebalancerRef.closeRebalancer();
+
     // TODO controller shouldn't be used in anyway after shutdown.
     // Need to record shutdown and throw Exception if the controller is used again.
   }
@@ -1202,7 +1238,6 @@
     return statusFlag;
   }
 
-
   // TODO: refactor this to use common/ClusterEventProcessor.
   @Deprecated
   private class ClusterEventProcessor extends Thread {
@@ -1258,4 +1293,59 @@
     eventThread.setDaemon(true);
     eventThread.start();
   }
+
+  /**
+   * A wrapper class for the stateful rebalancer instance that will be tracked in the
+   * GenericHelixController.
+   */
+  private abstract class StatefulRebalancerRef<T extends StatefulRebalancer> {
+    private T _rebalancer = null;
+    private boolean _isRebalancerValid = true;
+
+    /**
+     * @param helixManager
+     * @return A new stateful rebalancer instance with initial state.
+     */
+    protected abstract T createRebalancer(HelixManager helixManager);
+
+    /**
+     * Mark the current rebalancer object to be invalid, which indicates it needs to be reset before
+     * the next usage.
+     */
+    synchronized void invalidateRebalancer() {
+      _isRebalancerValid = false;
+    }
+
+    /**
+     * @return A valid rebalancer object.
+     *         If the rebalancer is no longer valid, it will be reset before returning.
+     * TODO: Make rebalancer volatile or make it singleton, if this method is called in multiple
+     * TODO: threads outside the controller object.
+     */
+    synchronized T getRebalancer(HelixManager helixManager) {
+      // Lazily initialize the stateful rebalancer instance since the GenericHelixController
+      // instance is instantiated without the HelixManager information that is required.
+      if (_rebalancer == null) {
+        _rebalancer = createRebalancer(helixManager);
+        _isRebalancerValid = true;
+      }
+      // If the rebalance exists but has been marked as invalid (due to leadership switch), it needs
+      // to be reset before return.
+      if (!_isRebalancerValid) {
+        _rebalancer.reset();
+        _isRebalancerValid = true;
+      }
+      return _rebalancer;
+    }
+
+    /**
+     * Proactively close the rebalance object to release the resources.
+     */
+    synchronized void closeRebalancer() {
+      if (_rebalancer != null) {
+        _rebalancer.close();
+        _rebalancer = null;
+      }
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/StatefulRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/StatefulRebalancer.java
new file mode 100644
index 0000000..94567bb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/StatefulRebalancer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.rebalancer;
+
+import java.util.Map;
+
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+
+
+/**
+ * Allows one to come up with custom implementation of a stateful rebalancer.<br/>
+ */
+public interface StatefulRebalancer<T extends BaseControllerDataProvider> {
+
+  /**
+   * Reset the rebalancer to the initial state.
+   */
+  void reset();
+
+  /**
+   * Release all the resources and clean up all the rebalancer state.
+   */
+  void close();
+
+  /**
+   * Compute the new IdealStates for all the input resources. The IdealStates include both new
+   * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
+   * @param clusterData The Cluster status data provider.
+   * @param resourceMap A map containing all the rebalancing resources.
+   * @param currentStateOutput The present Current States of the resources.
+   * @return A map of the new IdealStates with the resource name as key.
+   */
+  Map<String, IdealState> computeNewIdealStates(T clusterData, Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput) throws HelixRebalanceException;
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index f1c85db..8a21bbb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -41,6 +41,7 @@
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.StatefulRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
@@ -70,7 +71,7 @@
  *      Design Document
  *      </a>
  */
-public class WagedRebalancer {
+public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider> {
   private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
 
   // When any of the following change happens, the rebalancer needs to do a global rebalance which
@@ -228,7 +229,7 @@
     }
   }
 
-  // Clean up the internal cached rebalance status.
+  @Override
   public void reset() {
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.reset();
@@ -236,7 +237,8 @@
     _changeDetector.resetSnapshots();
   }
 
-  // Release all the resources.
+  // TODO the rebalancer should reject any other computing request after being closed.
+  @Override
   public void close() {
     if (_baselineCalculateExecutor != null) {
       _baselineCalculateExecutor.shutdownNow();
@@ -247,14 +249,7 @@
     _metricCollector.unregister();
   }
 
-  /**
-   * Compute the new IdealStates for all the input resources. The IdealStates include both new
-   * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
-   * @param clusterData The Cluster status data provider.
-   * @param resourceMap A map containing all the rebalancing resources.
-   * @param currentStateOutput The present Current States of the resources.
-   * @return A map of the new IdealStates with the resource name as key.
-   */
+  @Override
   public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
       Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index a2b63f8..b570568 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -38,5 +38,6 @@
   AsyncFIFOWorkerPool,
   PipelineType,
   LastRebalanceFinishTimeStamp,
-  ControllerDataProvider
+  ControllerDataProvider,
+  STATEFUL_REBALANCER
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index a73b0c5..ffaac8f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -65,12 +65,6 @@
   private static final Logger logger =
       LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
 
-  // Lazy initialize the WAGED rebalancer instance since the BestPossibleStateCalcStage instance was
-  // instantiated without the HelixManager information that is required.
-  // TODO: Initialize the WAGED rebalancer in the BestPossibleStateCalcStage constructor once it is
-  // TODO: updated so as to accept a HelixManager or HelixZkClient information.
-  private WagedRebalancer _wagedRebalancer = null;
-
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
@@ -110,30 +104,6 @@
     });
   }
 
-  // Need to keep a default constructor for backward compatibility
-  public BestPossibleStateCalcStage() {
-  }
-
-  // Construct the BestPossibleStateCalcStage with a given WAGED rebalancer for the callers other
-  // than the controller pipeline. Such as the verifiers and test cases.
-  public BestPossibleStateCalcStage(WagedRebalancer wagedRebalancer) {
-    _wagedRebalancer = wagedRebalancer;
-  }
-
-  private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
-      boolean isAsyncGlobalRebalanceEnabled) {
-    // Create WagedRebalancer instance if it hasn't been already initialized
-    if (_wagedRebalancer == null) {
-      _wagedRebalancer = new WagedRebalancer(helixManager);
-    }
-    // Since the rebalance configuration can be updated at runtime, try to update the rebalancer
-    // before returning.
-    _wagedRebalancer.updateRebalancePreference(preferences);
-    _wagedRebalancer.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
-    return _wagedRebalancer;
-  }
-
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
       CurrentStateOutput currentStateOutput) {
     ResourceControllerDataProvider cache =
@@ -143,6 +113,7 @@
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    WagedRebalancer wagedRebalancer = event.getAttribute(AttributeName.STATEFUL_REBALANCER.name());
 
     // Check whether the offline/disabled instance count in the cluster reaches the set limit,
     // if yes, pause the rebalancer.
@@ -152,8 +123,8 @@
     final List<String> failureResources = new ArrayList<>();
 
     Map<String, Resource> calculatedResourceMap =
-        computeResourceBestPossibleStateWithWagedRebalancer(cache, currentStateOutput, helixManager,
-            resourceMap, output, failureResources);
+        computeResourceBestPossibleStateWithWagedRebalancer(wagedRebalancer, cache,
+            currentStateOutput, resourceMap, output, failureResources);
 
     Map<String, Resource> remainingResourceMap = new HashMap<>(resourceMap);
     remainingResourceMap.keySet().removeAll(calculatedResourceMap.keySet());
@@ -249,23 +220,33 @@
     return true;
   }
 
+  private void updateWagedRebalancer(WagedRebalancer wagedRebalancer, ClusterConfig clusterConfig) {
+    if (clusterConfig != null) {
+      // Since the rebalance configuration can be updated at runtime, try to update the rebalancer
+      // before calculating.
+      wagedRebalancer.updateRebalancePreference(clusterConfig.getGlobalRebalancePreference());
+      wagedRebalancer
+          .setGlobalRebalanceAsyncMode(clusterConfig.isGlobalRebalanceAsyncModeEnabled());
+    }
+  }
+
   /**
    * Rebalance with the WAGED rebalancer
    * The rebalancer only calculates the new ideal assignment for all the resources that are
    * configured to use the WAGED rebalancer.
    *
+   * @param wagedRebalancer    The WAGED rebalancer instance.
    * @param cache              Cluster data cache.
    * @param currentStateOutput The current state information.
-   * @param helixManager
    * @param resourceMap        The complete resource map. The method will filter the map for the compatible resources.
    * @param output             The best possible state output.
    * @param failureResources   The failure records that will be updated if any resource cannot be computed.
    * @return The map of all the calculated resources.
    */
   private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalancer(
-      ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
-      HelixManager helixManager, Map<String, Resource> resourceMap, BestPossibleStateOutput output,
-      List<String> failureResources) {
+      WagedRebalancer wagedRebalancer, ResourceControllerDataProvider cache,
+      CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap,
+      BestPossibleStateOutput output, List<String> failureResources) {
     if (cache.isMaintenanceModeEnabled()) {
       // The WAGED rebalancer won't be used while maintenance mode is enabled.
       return Collections.emptyMap();
@@ -282,22 +263,22 @@
 
     Map<String, IdealState> newIdealStates = new HashMap<>();
 
-    ClusterConfig clusterConfig = cache.getClusterConfig();
-    WagedRebalancer wagedRebalancer =
-        getWagedRebalancer(helixManager, clusterConfig.getGlobalRebalancePreference(),
-            clusterConfig.isGlobalRebalanceAsyncModeEnabled());
-    try {
-      newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
-          currentStateOutput));
-    } catch (HelixRebalanceException ex) {
-      // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
-      // Since it calculates for all the eligible resources globally, a partial result is invalid.
-      // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
+    if (wagedRebalancer != null) {
+      updateWagedRebalancer(wagedRebalancer, cache.getClusterConfig());
+      try {
+        newIdealStates.putAll(wagedRebalancer
+            .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput));
+      } catch (HelixRebalanceException ex) {
+        // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
+        // Since it calculates for all the eligible resources globally, a partial result is invalid.
+        // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
+        LogUtil.logError(logger, _eventId, String
+            .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
+                wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
+      }
+    } else {
       LogUtil.logError(logger, _eventId,
-          String.format(
-              "Failed to calculate the new Ideal States using the rebalancer %s due to %s",
-              wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
-          ex);
+          "Skip rebalancing using the WAGED rebalancer since it is not configured in the rebalance pipeline.");
     }
 
     Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator();
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6d2c119..66143fe 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -388,11 +388,12 @@
 
     runStage(event, new CurrentStateComputationStage());
     // Note the dryrunWagedRebalancer is just for one time usage
-    DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
-        cache.getClusterConfig().getGlobalRebalancePreference());
+    DryrunWagedRebalancer dryrunWagedRebalancer =
+        new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
+            cache.getClusterConfig().getGlobalRebalancePreference());
+    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
     try {
-      // TODO: be caution here, should be handled statelessly.
-      runStage(event, new BestPossibleStateCalcStage(dryrunWagedRebalancer));
+      runStage(event, new BestPossibleStateCalcStage());
     } finally {
       dryrunWagedRebalancer.close();
     }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 5af3d61..96180bf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -412,12 +412,12 @@
   }
 
   @Test(dependsOnMethods = "test")
-  public void testNewInstances()
-      throws InterruptedException {
+  public void testNewInstances() throws InterruptedException {
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.setGlobalRebalancePreference(ImmutableMap
-        .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0, ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10));
+        .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0,
+            ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10));
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
     int i = 0;
@@ -473,6 +473,61 @@
     }
   }
 
+  /**
+   * The stateful WAGED rebalancer will be reset while the controller regains the leadership.
+   * This test is to verify if the reset has been done and the rebalancer has forgotten any previous
+   * status after leadership switched.
+   */
+  @Test(dependsOnMethods = "test")
+  public void testRebalancerReset() throws Exception {
+    // Configure the rebalance preference so as to trigger more partition movements for evenness.
+    // This is to ensure the controller will try to move something if the rebalancer has been reset.
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setGlobalRebalancePreference(ImmutableMap
+        .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 10,
+            ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 0));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+    Thread.sleep(300);
+    validate(_replica);
+
+    // Adding one more resource. Since it is added after the other resources, the assignment is
+    // impacted because of the other resources' assignment.
+    String moreDB = "More-Test-DB";
+    createResourceWithWagedRebalance(CLUSTER_NAME, moreDB,
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica);
+    _allDBs.add(moreDB);
+    // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+    Thread.sleep(300);
+    validate(_replica);
+    ExternalView oldEV =
+        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, moreDB);
+
+    // Expire the controller session so it will reset the internal rebalancer's status.
+    simulateSessionExpiry(_controller.getZkClient());
+    // After reset done, the rebalancer will try to rebalance all the partitions since it has
+    // forgotten the previous state.
+    // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+    Thread.sleep(300);
+    validate(_replica);
+    ExternalView newEV =
+        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, moreDB);
+
+    // To verify that the controller has moved some partitions.
+    Assert.assertFalse(newEV.equals(oldEV));
+  }
+
   private void validate(int expectedReplica) {
     HelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)