Refactor/clean up code without logic change (#1760)
This commmit contains:
1. Remove unused functions and logic
2. Combine the Resource/Task message generation into one stage, since they are both relying on best possible result.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4baac38..b8fac92 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,7 +19,6 @@
* under the License.
*/
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -82,6 +81,7 @@
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -92,9 +92,7 @@
import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
import org.apache.helix.controller.stages.TopStateHandoffReportStage;
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
-import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.controller.stages.task.TaskSchedulingStage;
import org.apache.helix.model.ClusterConfig;
@@ -516,7 +514,7 @@
// Need to add MaintenanceRecoveryStage here because MAX_PARTITIONS_PER_INSTANCE check could
// only occur after IntermediateStateCalcStage calculation
rebalancePipeline.addStage(new MaintenanceRecoveryStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
// The IntermediateStateCalcStage should be applied after message selection
// Messages are throttled already removed by IntermediateStateCalcStage in MessageSelection output
@@ -598,7 +596,7 @@
rebalancePipeline.addStage(new TaskSchedulingStage());
rebalancePipeline.addStage(new TaskPersistDataStage());
rebalancePipeline.addStage(new TaskGarbageCollectionStage());
- rebalancePipeline.addStage(new TaskMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new TaskMessageDispatchStage());
// backward compatibility check
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b06352d..36bc9cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -582,73 +582,6 @@
}
/**
- * For a partition, given its preferenceList, bestPossibleState, and currentState, determine which
- * type of rebalance is needed to model IdealState's states defined by the state model definition.
- * @return RebalanceType needed to bring the replicas to idea states
- * RECOVERY_BALANCE - not all required states (replicas) are available through all
- * replicas, or the partition is disabled
- * NONE - current state matches the ideal state
- * LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the
- * allocation
- */
- private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
- Map<String, String> bestPossibleMap, List<String> preferenceList,
- StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
- IdealState idealState, String partitionName) {
- if (preferenceList == null) {
- preferenceList = Collections.emptyList();
- }
-
- // If there is a minimum active replica number specified in IS, we should respect it.
- // TODO: We should implement the per replica level throttling with generated message
- // Issue: https://github.com/apache/helix/issues/343
- int replica = idealState.getMinActiveReplicas() == -1
- ? idealState.getReplicaCount(preferenceList.size())
- : idealState.getMinActiveReplicas();
- Set<String> activeList = new HashSet<>(preferenceList);
- activeList.retainAll(cache.getEnabledLiveInstances());
-
- // For each state, check that this partition currently has the required number of that state as
- // required by StateModelDefinition.
- LinkedHashMap<String, Integer> expectedStateCountMap =
- stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
- // Current counts without disabled partitions or disabled instances
- Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
- currentStateMapWithoutDisabled.keySet().removeAll(
- cache.getDisabledInstancesForPartition(idealState.getResourceName(), partitionName));
- Map<String, Integer> currentStateCounts =
- StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
-
- // Go through each state and compare counts
- for (String state : expectedStateCountMap.keySet()) {
- Integer expectedCount = expectedStateCountMap.get(state);
- Integer currentCount = currentStateCounts.get(state);
- expectedCount = expectedCount == null ? 0 : expectedCount;
- currentCount = currentCount == null ? 0 : currentCount;
-
- // If counts do not match up, this partition requires recovery
- if (currentCount < expectedCount) {
- // Recovery is not needed in cases where this partition just started, was dropped, or is in
- // error
- if (!state.equals(HelixDefinedState.DROPPED.name())
- && !state.equals(HelixDefinedState.ERROR.name())
- && !state.equals(stateModelDef.getInitialState())) {
- return RebalanceType.RECOVERY_BALANCE;
- }
- }
- }
- // No recovery needed, all expected replicas exist
- // Check if this partition is actually in the BestPossibleState
- if (currentStateMap.equals(bestPossibleMap)) {
- return RebalanceType.NONE; // No further action required
- } else {
- return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to
- // achieve BestPossibleState, load balance may be required
- // to shift replicas around
- }
- }
-
- /**
* Determine the message rebalance type with message and current states.
* @param desiredStates Ideally how may states we needed for guarantee the health of replica
* @param message The message to be determined what is the rebalance type
@@ -914,30 +847,4 @@
});
}
}
-
- /**
- * Handle a partition with a pending message so that the partition will not be double-charged or double-assigned during recovery and load balance.
- * @param partition
- * @param partitionsNeedRecovery
- * @param partitionsNeedLoadbalance
- * @param rebalanceType
- */
- private void handlePendingStateTransitionsForThrottling(Partition partition,
- Set<Partition> partitionsNeedRecovery, Set<Partition> partitionsNeedLoadbalance,
- RebalanceType rebalanceType, PartitionStateMap bestPossiblePartitionStateMap,
- PartitionStateMap intermediatePartitionStateMap) {
- // Pass the best possible state directly into intermediatePartitionStateMap
- // This is safe to do so because we already have a pending transition for this partition, implying that the assignment has been made in previous pipeline
- intermediatePartitionStateMap
- .setState(partition, bestPossiblePartitionStateMap.getPartitionMap(partition));
- // Remove the partition's name from the set of partition (names) that need to be charged and assigned to prevent double-processing
- switch (rebalanceType) {
- case RECOVERY_BALANCE:
- partitionsNeedRecovery.remove(partition);
- break;
- case LOAD_BALANCE:
- partitionsNeedLoadbalance.remove(partition);
- break;
- }
- }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 3112327..836e5df 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -58,7 +58,7 @@
/**
* Compares the currentState, pendingState with IdealState and generate messages
*/
-public abstract class MessageGenerationPhase extends AbstractBaseStage {
+public class MessageGenerationPhase extends AbstractBaseStage {
private final static String NO_DESIRED_STATE = "NoDesiredState";
// If we see there is any invalid pending message leaving on host, i.e. message
@@ -75,8 +75,10 @@
private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);
- protected void processEvent(ClusterEvent event, ResourcesStateMap resourcesStateMap)
- throws Exception {
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
_eventId = event.getEventId();
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
@@ -86,9 +88,9 @@
Map<String, Map<String, Message>> messagesToCleanUp = new HashMap<>();
if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
- || resourcesStateMap == null) {
+ || bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE");
+ + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BESTPOSSIBLE_STATE");
}
Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
@@ -101,7 +103,7 @@
for (Resource resource : resourceMap.values()) {
try {
- generateMessage(resource, cache, resourcesStateMap, currentStateOutput, manager,
+ generateMessage(resource, cache, bestPossibleStateOutput, currentStateOutput, manager,
sessionIdMap, event.getEventType(), output, messagesToCleanUp);
} catch (HelixException ex) {
LogUtil.logError(logger, _eventId,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
deleted file mode 100644
index 73b309e..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.resource;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages for regular resource
- */
-public class ResourceMessageGenerationPhase extends MessageGenerationPhase {
- @Override
- public void process(ClusterEvent event) throws Exception {
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- processEvent(event, bestPossibleStateOutput);
- }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
deleted file mode 100644
index 5ef6c93..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages for task pipeline
- */
-
-public class TaskMessageGenerationPhase extends MessageGenerationPhase {
- @Override
- public void process(ClusterEvent event) throws Exception {
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- processEvent(event, bestPossibleStateOutput);
- }
-}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index b231e6d..2fcfc3e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -100,15 +100,16 @@
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
// set up resource state map
- ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+ BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
Map<String, String> instanceStateMap = new HashMap<>();
instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
stateMap.put(partition, instanceStateMap);
- resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+ bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap);
- processEvent(event, resourcesStateMap);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ process(event);
MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 1);
}
@@ -194,16 +195,17 @@
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
// set up resource state map
- ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+ BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
Map<String, String> instanceStateMap = new HashMap<>();
instanceStateMap.put(TEST_INSTANCE, currentState);
stateMap.put(partition, instanceStateMap);
- resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+ bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap);
// Process the event
- processEvent(event, resourcesStateMap);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ process(event);
MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
return output.getMessages(TEST_RESOURCE, partition);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d262b7b..8b1b7bc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -38,7 +38,6 @@
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -97,7 +96,7 @@
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -133,7 +132,7 @@
Pipeline messagePipeline = new Pipeline();
messagePipeline.addStage(new BestPossibleStateCalcStage());
- messagePipeline.addStage(new ResourceMessageGenerationPhase());
+ messagePipeline.addStage(new MessageGenerationPhase());
messagePipeline.addStage(new MessageSelectionStage());
messagePipeline.addStage(new IntermediateStateCalcStage());
messagePipeline.addStage(new MessageThrottleStage());
@@ -334,7 +333,7 @@
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -431,7 +430,7 @@
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -510,7 +509,7 @@
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -590,7 +589,7 @@
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
index 9539b98..899a427 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -30,10 +30,10 @@
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.task.TaskSynchronizedTestBase;
@@ -77,7 +77,7 @@
runStage(event, new CurrentStateComputationStage());
runStage(event, new BestPossibleStateCalcStage());
Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 1f1d7bd..649ff7e 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -37,12 +37,12 @@
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
@@ -100,7 +100,7 @@
_fullPipeline.addStage(new ResourceComputationStage());
_fullPipeline.addStage(new CurrentStateComputationStage());
_fullPipeline.addStage(new BestPossibleStateCalcStage());
- _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+ _fullPipeline.addStage(new MessageGenerationPhase());
_fullPipeline.addStage(new MessageSelectionStage());
_fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 1d164c5..40d5c97 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -33,11 +33,11 @@
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -87,13 +87,13 @@
_fullPipeline = new Pipeline("FullPipeline");
_fullPipeline.addStage(new ReadClusterDataStage());
_fullPipeline.addStage(new BestPossibleStateCalcStage());
- _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+ _fullPipeline.addStage(new MessageGenerationPhase());
_fullPipeline.addStage(new MessageSelectionStage());
_fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new MessageThrottleStage());
_messagePipeline = new Pipeline("MessagePipeline");
- _messagePipeline.addStage(new ResourceMessageGenerationPhase());
+ _messagePipeline.addStage(new MessageGenerationPhase());
_messagePipeline.addStage(new MessageSelectionStage());
_messagePipeline.addStage(new IntermediateStateCalcStage());
_messagePipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index cec0d2b..307022f 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -33,11 +33,11 @@
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -199,7 +199,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
pipeline = new Pipeline("test");
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
@@ -223,7 +223,7 @@
pipeline = new Pipeline("test");
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
@@ -246,7 +246,7 @@
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
pipeline = new Pipeline("test");
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new MessageThrottleStage());
@@ -359,7 +359,7 @@
Pipeline pipeline = new Pipeline("test");
pipeline.addStage(new ReadClusterDataStage());
pipeline.addStage(new BestPossibleStateCalcStage());
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index f685bf4..ea2a4aa 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -35,9 +35,8 @@
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ClusterConfig;
@@ -61,7 +60,7 @@
@Test
public void testP2PWithStateCancellationMessage() {
ClusterEvent event = generateClusterEvent();
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name());
// No message should be sent for partition 0
Assert.assertEquals(messageOutput.getMessages(RESOURCE_NAME, new Partition("0")).size(), 0);
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index ccc1431..57a3ad0 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -13,9 +13,9 @@
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -79,7 +79,7 @@
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, maxPending);
runStage(event, new BestPossibleStateCalcStage());
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
@@ -140,7 +140,7 @@
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
runStage(event, new BestPossibleStateCalcStage());
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());