blob: 57a3ad0bc834e81155529df8ea76d2b1019c04e6 [file] [log] [blame]
package org.apache.helix.monitoring.mbeans;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.testng.Assert;
import org.testng.annotations.Test;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public class TestRebalancerMetrics extends BaseStageTest {
@Test
public void testRecoveryRebalanceMetrics() {
System.out
.println("START testRecoveryRebalanceMetrics at " + new Date(System.currentTimeMillis()));
String resource = "testResourceName";
int numPartition = 100;
int numReplica = 3;
int maxPending = 3;
setupIdealState(5, new String[] {resource}, numPartition,
numReplica, IdealState.RebalanceMode.FULL_AUTO,
BuiltInStateModelDefinitions.MasterSlave.name());
setupInstances(5);
setupLiveInstances(5);
setupStateModel();
Map<String, Resource> resourceMap =
getResourceMap(new String[] {resource}, numPartition,
BuiltInStateModelDefinitions.MasterSlave.name());
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
monitor.active();
event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
runStage(event, new ReadClusterDataStage());
ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, maxPending);
runStage(event, new BestPossibleStateCalcStage());
runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
ResourceMonitor resourceMonitor = clusterStatusMonitor.getResourceMonitor(resource);
Assert.assertEquals(resourceMonitor.getNumPendingRecoveryRebalanceReplicas(),
numPartition * numReplica - resourceMonitor.getNumPendingLoadRebalanceReplicas());
Assert.assertEquals(resourceMonitor.getNumRecoveryRebalanceThrottledReplicas(),
numPartition * numReplica - resourceMonitor.getNumPendingLoadRebalanceReplicas() - maxPending);
System.out
.println("END testRecoveryRebalanceMetrics at " + new Date(System.currentTimeMillis()));
}
@Test
public void testLoadBalanceMetrics() {
System.out
.println("START testLoadBalanceMetrics at " + new Date(System.currentTimeMillis()));
String resource = "testResourceName";
int numPartition = 100;
int numReplica = 3;
int maxPending = 3;
setupIdealState(5, new String[] {resource}, numPartition,
numReplica, IdealState.RebalanceMode.FULL_AUTO,
BuiltInStateModelDefinitions.MasterSlave.name());
setupInstances(5);
setupLiveInstances(4);
setupStateModel();
Map<String, Resource> resourceMap =
getResourceMap(new String[] {resource}, numPartition,
BuiltInStateModelDefinitions.MasterSlave.name());
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
monitor.active();
event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
runStage(event, new ReadClusterDataStage());
runStage(event, new BestPossibleStateCalcStage());
BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
currentStateOutput = copyCurrentStateFromBestPossible(bestPossibleStateOutput, resource);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
setupLiveInstances(4);
ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
cache.clearCachedResourceAssignments();
runStage(event, new ReadClusterDataStage());
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
runStage(event, new BestPossibleStateCalcStage());
runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
ResourceMonitor resourceMonitor = clusterStatusMonitor.getResourceMonitor(resource);
long numPendingLoadBalance = resourceMonitor.getNumPendingLoadRebalanceReplicas();
Assert.assertTrue(numPendingLoadBalance > 0);
Assert.assertEquals(resourceMonitor.getNumLoadRebalanceThrottledReplicas(), numPendingLoadBalance - maxPending);
System.out
.println("END testLoadBalanceMetrics at " + new Date(System.currentTimeMillis()));
}
private void setupThrottleConfig(ClusterConfig clusterConfig,
StateTransitionThrottleConfig.RebalanceType rebalanceType, int maxPending) {
StateTransitionThrottleConfig resourceThrottle =
new StateTransitionThrottleConfig(rebalanceType,
StateTransitionThrottleConfig.ThrottleScope.RESOURCE, maxPending);
clusterConfig.setStateTransitionThrottleConfigs(Arrays.asList(resourceThrottle));
}
private CurrentStateOutput copyCurrentStateFromBestPossible(
BestPossibleStateOutput bestPossibleStateOutput, String resource) {
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource);
for (Partition partition : partitionStateMap.partitionSet()) {
Map<String, String> stateMap = partitionStateMap.getPartitionMap(partition);
for (String instance : stateMap.keySet()) {
currentStateOutput.setCurrentState(resource, partition, instance, stateMap.get(instance));
}
}
return currentStateOutput;
}
}