Refactor/clean up code without logic change (#1760)

This commmit contains:
1. Remove unused functions and logic
2. Combine the Resource/Task message generation into one stage, since they are both relying on best possible result.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4baac38..b8fac92 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,7 +19,6 @@
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -82,6 +81,7 @@
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -92,9 +92,7 @@
 import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
 import org.apache.helix.controller.stages.TopStateHandoffReportStage;
 import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
-import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
 import org.apache.helix.controller.stages.task.TaskPersistDataStage;
 import org.apache.helix.controller.stages.task.TaskSchedulingStage;
 import org.apache.helix.model.ClusterConfig;
@@ -516,7 +514,7 @@
       // Need to add MaintenanceRecoveryStage here because MAX_PARTITIONS_PER_INSTANCE check could
       // only occur after IntermediateStateCalcStage calculation
       rebalancePipeline.addStage(new MaintenanceRecoveryStage());
-      rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());
       // The IntermediateStateCalcStage should be applied after message selection
       // Messages are throttled already removed by IntermediateStateCalcStage in MessageSelection output
@@ -598,7 +596,7 @@
       rebalancePipeline.addStage(new TaskSchedulingStage());
       rebalancePipeline.addStage(new TaskPersistDataStage());
       rebalancePipeline.addStage(new TaskGarbageCollectionStage());
-      rebalancePipeline.addStage(new TaskMessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new TaskMessageDispatchStage());
 
       // backward compatibility check
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b06352d..36bc9cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -582,73 +582,6 @@
   }
 
   /**
-   * For a partition, given its preferenceList, bestPossibleState, and currentState, determine which
-   * type of rebalance is needed to model IdealState's states defined by the state model definition.
-   * @return RebalanceType needed to bring the replicas to idea states
-   *         RECOVERY_BALANCE - not all required states (replicas) are available through all
-   *         replicas, or the partition is disabled
-   *         NONE - current state matches the ideal state
-   *         LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the
-   *         allocation
-   */
-  private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
-      Map<String, String> bestPossibleMap, List<String> preferenceList,
-      StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
-      IdealState idealState, String partitionName) {
-    if (preferenceList == null) {
-      preferenceList = Collections.emptyList();
-    }
-
-    // If there is a minimum active replica number specified in IS, we should respect it.
-    // TODO: We should implement the per replica level throttling with generated message
-    // Issue: https://github.com/apache/helix/issues/343
-    int replica = idealState.getMinActiveReplicas() == -1
-        ? idealState.getReplicaCount(preferenceList.size())
-        : idealState.getMinActiveReplicas();
-    Set<String> activeList = new HashSet<>(preferenceList);
-    activeList.retainAll(cache.getEnabledLiveInstances());
-
-    // For each state, check that this partition currently has the required number of that state as
-    // required by StateModelDefinition.
-    LinkedHashMap<String, Integer> expectedStateCountMap =
-        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
-    // Current counts without disabled partitions or disabled instances
-    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
-    currentStateMapWithoutDisabled.keySet().removeAll(
-        cache.getDisabledInstancesForPartition(idealState.getResourceName(), partitionName));
-    Map<String, Integer> currentStateCounts =
-        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
-
-    // Go through each state and compare counts
-    for (String state : expectedStateCountMap.keySet()) {
-      Integer expectedCount = expectedStateCountMap.get(state);
-      Integer currentCount = currentStateCounts.get(state);
-      expectedCount = expectedCount == null ? 0 : expectedCount;
-      currentCount = currentCount == null ? 0 : currentCount;
-
-      // If counts do not match up, this partition requires recovery
-      if (currentCount < expectedCount) {
-        // Recovery is not needed in cases where this partition just started, was dropped, or is in
-        // error
-        if (!state.equals(HelixDefinedState.DROPPED.name())
-            && !state.equals(HelixDefinedState.ERROR.name())
-            && !state.equals(stateModelDef.getInitialState())) {
-          return RebalanceType.RECOVERY_BALANCE;
-        }
-      }
-    }
-    // No recovery needed, all expected replicas exist
-    // Check if this partition is actually in the BestPossibleState
-    if (currentStateMap.equals(bestPossibleMap)) {
-      return RebalanceType.NONE; // No further action required
-    } else {
-      return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to
-      // achieve BestPossibleState, load balance may be required
-      // to shift replicas around
-    }
-  }
-
-  /**
    * Determine the message rebalance type with message and current states.
    * @param desiredStates         Ideally how may states we needed for guarantee the health of replica
    * @param message               The message to be determined what is the rebalance type
@@ -914,30 +847,4 @@
       });
     }
   }
-
-  /**
-   * Handle a partition with a pending message so that the partition will not be double-charged or double-assigned during recovery and load balance.
-   * @param partition
-   * @param partitionsNeedRecovery
-   * @param partitionsNeedLoadbalance
-   * @param rebalanceType
-   */
-  private void handlePendingStateTransitionsForThrottling(Partition partition,
-      Set<Partition> partitionsNeedRecovery, Set<Partition> partitionsNeedLoadbalance,
-      RebalanceType rebalanceType, PartitionStateMap bestPossiblePartitionStateMap,
-      PartitionStateMap intermediatePartitionStateMap) {
-    // Pass the best possible state directly into intermediatePartitionStateMap
-    // This is safe to do so because we already have a pending transition for this partition, implying that the assignment has been made in previous pipeline
-    intermediatePartitionStateMap
-        .setState(partition, bestPossiblePartitionStateMap.getPartitionMap(partition));
-    // Remove the partition's name from the set of partition (names) that need to be charged and assigned to prevent double-processing
-    switch (rebalanceType) {
-    case RECOVERY_BALANCE:
-      partitionsNeedRecovery.remove(partition);
-      break;
-    case LOAD_BALANCE:
-      partitionsNeedLoadbalance.remove(partition);
-      break;
-    }
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 3112327..836e5df 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -58,7 +58,7 @@
 /**
  * Compares the currentState, pendingState with IdealState and generate messages
  */
-public abstract class MessageGenerationPhase extends AbstractBaseStage {
+public class MessageGenerationPhase extends AbstractBaseStage {
   private final static String NO_DESIRED_STATE = "NoDesiredState";
 
   // If we see there is any invalid pending message leaving on host, i.e. message
@@ -75,8 +75,10 @@
 
   private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);
 
-  protected void processEvent(ClusterEvent event, ResourcesStateMap resourcesStateMap)
-      throws Exception {
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     _eventId = event.getEventId();
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
     BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
@@ -86,9 +88,9 @@
 
     Map<String, Map<String, Message>> messagesToCleanUp = new HashMap<>();
     if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
-        || resourcesStateMap == null) {
+        || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE");
+          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BESTPOSSIBLE_STATE");
     }
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
@@ -101,7 +103,7 @@
 
     for (Resource resource : resourceMap.values()) {
       try {
-        generateMessage(resource, cache, resourcesStateMap, currentStateOutput, manager,
+        generateMessage(resource, cache, bestPossibleStateOutput, currentStateOutput, manager,
             sessionIdMap, event.getEventType(), output, messagesToCleanUp);
       } catch (HelixException ex) {
         LogUtil.logError(logger, _eventId,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
deleted file mode 100644
index 73b309e..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.resource;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages for regular resource
- */
-public class ResourceMessageGenerationPhase extends MessageGenerationPhase {
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    processEvent(event, bestPossibleStateOutput);
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
deleted file mode 100644
index 5ef6c93..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages for task pipeline
- */
-
-public class TaskMessageGenerationPhase extends MessageGenerationPhase {
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    processEvent(event, bestPossibleStateOutput);
-  }
-}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index b231e6d..2fcfc3e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -100,15 +100,16 @@
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
 
     // set up resource state map
-    ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
     PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
     Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
     Map<String, String> instanceStateMap = new HashMap<>();
     instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
     stateMap.put(partition, instanceStateMap);
-    resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+    bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap);
 
-    processEvent(event, resourcesStateMap);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    process(event);
     MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
     Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 1);
   }
@@ -194,16 +195,17 @@
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
 
     // set up resource state map
-    ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
     PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
     Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
     Map<String, String> instanceStateMap = new HashMap<>();
     instanceStateMap.put(TEST_INSTANCE, currentState);
     stateMap.put(partition, instanceStateMap);
-    resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+    bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap);
 
     // Process the event
-    processEvent(event, resourcesStateMap);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    process(event);
     MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
 
     return output.getMessages(TEST_RESOURCE, partition);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d262b7b..8b1b7bc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -38,7 +38,6 @@
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -97,7 +96,7 @@
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -133,7 +132,7 @@
 
     Pipeline messagePipeline = new Pipeline();
     messagePipeline.addStage(new BestPossibleStateCalcStage());
-    messagePipeline.addStage(new ResourceMessageGenerationPhase());
+    messagePipeline.addStage(new MessageGenerationPhase());
     messagePipeline.addStage(new MessageSelectionStage());
     messagePipeline.addStage(new IntermediateStateCalcStage());
     messagePipeline.addStage(new MessageThrottleStage());
@@ -334,7 +333,7 @@
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -431,7 +430,7 @@
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -510,7 +509,7 @@
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -590,7 +589,7 @@
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
index 9539b98..899a427 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -30,10 +30,10 @@
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.task.TaskSynchronizedTestBase;
@@ -77,7 +77,7 @@
     runStage(event, new CurrentStateComputationStage());
     runStage(event, new BestPossibleStateCalcStage());
     Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 1f1d7bd..649ff7e 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -37,12 +37,12 @@
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
@@ -100,7 +100,7 @@
     _fullPipeline.addStage(new ResourceComputationStage());
     _fullPipeline.addStage(new CurrentStateComputationStage());
     _fullPipeline.addStage(new BestPossibleStateCalcStage());
-    _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+    _fullPipeline.addStage(new MessageGenerationPhase());
     _fullPipeline.addStage(new MessageSelectionStage());
     _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 1d164c5..40d5c97 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -33,11 +33,11 @@
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -87,13 +87,13 @@
     _fullPipeline = new Pipeline("FullPipeline");
     _fullPipeline.addStage(new ReadClusterDataStage());
     _fullPipeline.addStage(new BestPossibleStateCalcStage());
-    _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+    _fullPipeline.addStage(new MessageGenerationPhase());
     _fullPipeline.addStage(new MessageSelectionStage());
     _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new MessageThrottleStage());
 
     _messagePipeline = new Pipeline("MessagePipeline");
-    _messagePipeline.addStage(new ResourceMessageGenerationPhase());
+    _messagePipeline.addStage(new MessageGenerationPhase());
     _messagePipeline.addStage(new MessageSelectionStage());
     _messagePipeline.addStage(new IntermediateStateCalcStage());
     _messagePipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index cec0d2b..307022f 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -33,11 +33,11 @@
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -199,7 +199,7 @@
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
@@ -223,7 +223,7 @@
 
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
@@ -246,7 +246,7 @@
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new MessageThrottleStage());
 
@@ -359,7 +359,7 @@
     Pipeline pipeline = new Pipeline("test");
     pipeline.addStage(new ReadClusterDataStage());
     pipeline.addStage(new BestPossibleStateCalcStage());
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index f685bf4..ea2a4aa 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -35,9 +35,8 @@
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
@@ -61,7 +60,7 @@
   @Test
   public void testP2PWithStateCancellationMessage() {
     ClusterEvent event = generateClusterEvent();
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name());
     // No message should be sent for partition 0
     Assert.assertEquals(messageOutput.getMessages(RESOURCE_NAME, new Partition("0")).size(), 0);
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index ccc1431..57a3ad0 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -13,9 +13,9 @@
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -79,7 +79,7 @@
     setupThrottleConfig(cache.getClusterConfig(),
         StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, maxPending);
     runStage(event, new BestPossibleStateCalcStage());
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 
@@ -140,7 +140,7 @@
     setupThrottleConfig(cache.getClusterConfig(),
         StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
     runStage(event, new BestPossibleStateCalcStage());
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());