blob: cc0c3196ffb565f3b06436d5e25e210b7730ef46 [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomUtils;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestCurrentStateComputationStageForHandlingCapacity {
private static final int INSTANCE_COUNT = 3;
private static final int RESOURCE_COUNT = 2;
private static final int PARTITION_COUNT = 3;
private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU", "PARTCOUNT", "DISK");
private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);
private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
ImmutableMap.of("CU", 10, "PARTCOUNT", 1, "DISK", 1);
private ResourceControllerDataProvider _clusterData;
private Map<String, Resource> _resourceMap;
private CurrentStateOutput _currentStateOutput;
private WagedInstanceCapacity _wagedInstanceCapacity;
private CurrentStateComputationStage _currentStateComputationStage;
@BeforeMethod
public void setUp() {
// prepare cluster data
_clusterData = Mockito.spy(new ResourceControllerDataProvider());
Map<String, InstanceConfig> instanceConfigMap = generateInstanceCapacityConfigs();
_clusterData.setInstanceConfigMap(instanceConfigMap);
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
_clusterData.setIdealStates(generateIdealStates());
Mockito.doReturn(ImmutableMap.of()).when(_clusterData).getAllInstancesMessages();
ClusterConfig clusterConfig = new ClusterConfig("test");
clusterConfig.setTopologyAwareEnabled(false);
clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
_clusterData.setClusterConfig(clusterConfig);
// prepare current state output
_resourceMap = generateResourceMap();
_currentStateOutput = populateCurrentStatesForResources(_resourceMap, instanceConfigMap.keySet());
// prepare instance of waged-instance capacity
_wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
_currentStateComputationStage = new CurrentStateComputationStage();
}
@Test
public void testProcessEventWithNoWagedResources() {
// We create ideal states with all WAGED enabled.
Map<String, IdealState> idealStates = _clusterData.getIdealStates();
// remove one WAGED resource from all resources.
idealStates.forEach((resourceName, idealState) -> {
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
});
ClusterEvent clusterEvent = new ClusterEvent("test", ClusterEventType.CurrentStateChange);
clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), _clusterData);
clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent, _clusterData, _currentStateOutput);
// validate that we did not compute and set the capacity map.
Assert.assertNull(_clusterData.getWagedInstanceCapacity());
}
@Test
public void testProcessEventWithSomeWagedResources() {
// We create ideal states with all WAGED enabled.
Map<String, IdealState> idealStates = _clusterData.getIdealStates();
// remove WAGED from one resource.
IdealState firstIdealState = idealStates.entrySet().iterator().next().getValue();
firstIdealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
firstIdealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
int totalIdealStates = idealStates.size();
ClusterEvent clusterEvent = new ClusterEvent("test", ClusterEventType.CurrentStateChange);
clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), _clusterData);
clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent, _clusterData, _currentStateOutput);
// validate that we did not compute and set the capacity map.
WagedInstanceCapacity wagedInstanceCapacity = _clusterData.getWagedInstanceCapacity();
Assert.assertNotNull(wagedInstanceCapacity);
Map<String, Map<String, Set<String>>> allocatedPartitionsMap = wagedInstanceCapacity.getAllocatedPartitionsMap();
Set<String> resourcesAllocated = allocatedPartitionsMap.values().stream()
.map(Map::keySet)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
Assert.assertEquals(resourcesAllocated.size(), totalIdealStates - 1);
}
@Test
public void testProcessEventWithAllWagedResources() {
// We create ideal states with all WAGED enabled.
Map<String, IdealState> idealStates = _clusterData.getIdealStates();
ClusterEvent clusterEvent = new ClusterEvent("test", ClusterEventType.CurrentStateChange);
clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), _clusterData);
clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent, _clusterData, _currentStateOutput);
// validate that we did not compute and set the capacity map.
WagedInstanceCapacity wagedInstanceCapacity = _clusterData.getWagedInstanceCapacity();
Assert.assertNotNull(wagedInstanceCapacity);
Map<String, Map<String, Set<String>>> allocatedPartitionsMap = wagedInstanceCapacity.getAllocatedPartitionsMap();
Set<String> resourcesAllocated = allocatedPartitionsMap.values().stream()
.map(Map::keySet)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
Assert.assertEquals(resourcesAllocated.size(), idealStates.size());
}
@Test
public void testSkipCapacityCalculation() {
// case: when resource-map is null
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
new ResourceControllerDataProvider(), null, new ClusterEvent(ClusterEventType.LiveInstanceChange)));
// case: when resource-map is empty
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
new ResourceControllerDataProvider(), ImmutableMap.of(), new ClusterEvent(ClusterEventType.LiveInstanceChange)));
// case: when instance capacity is null
Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
new ResourceControllerDataProvider(), _resourceMap, new ClusterEvent(ClusterEventType.LiveInstanceChange)));
// case: when event is of no-op
ResourceControllerDataProvider dataProvider = Mockito.mock(ResourceControllerDataProvider.class);
WagedInstanceCapacity instanceCapacity = Mockito.mock(WagedInstanceCapacity.class);
Mockito.when(dataProvider.getWagedInstanceCapacity()).thenReturn(instanceCapacity);
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.CustomizedStateChange)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.CustomizedViewChange)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.CustomizeStateConfigChange)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.ExternalViewChange)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.IdealStateChange)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.OnDemandRebalance)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.Resume)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.RetryRebalance)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.StateVerifier)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.TargetExternalViewChange)));
Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.TaskCurrentStateChange)));
Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.LiveInstanceChange)));
Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.CurrentStateChange)));
Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.MessageChange)));
Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
dataProvider, _resourceMap, new ClusterEvent(ClusterEventType.PeriodicalRebalance)));
}
// -- static helpers
private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
for (int i = 0; i < INSTANCE_COUNT; i ++) {
String instanceName = "instance-" + i;
InstanceConfig config = new InstanceConfig(instanceName);
config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
instanceConfigMap.put(instanceName, config);
}
return instanceConfigMap;
}
private Map<String, ResourceConfig> generateResourcePartitionCapacityConfigs() {
Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();
try {
Map<String, Map<String, Integer>> partitionsCapacityMap = new HashMap<>();
partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);
for (String resourceName : getResourceNames()) {
ResourceConfig config = new ResourceConfig(resourceName);
config.setPartitionCapacityMap(partitionsCapacityMap);
resourceConfigMap.put(resourceName, config);
}
} catch(IOException e) {
throw new RuntimeException("error while setting partition capacity map");
}
return resourceConfigMap;
}
private List<IdealState> generateIdealStates() {
return getResourceNames().stream()
.map(resourceName -> {
IdealState idealState = new IdealState(resourceName);
idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
idealState.setRebalancerClassName(WagedRebalancer.class.getName());
return idealState;
})
.collect(Collectors.toList());
}
private static CurrentStateOutput populateCurrentStatesForResources(
Map<String, Resource> resourceMap, Set<String> instanceNames) {
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
resourceMap.forEach((resourceName, resource) ->
resource.getPartitions().forEach(partition -> {
int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
int idx = 0;
for (Iterator<String> it = instanceNames.iterator(); it.hasNext(); idx ++) {
currentStateOutput.setCurrentState(
resourceName, partition, it.next(), (idx == masterPartIdx) ? "MASTER" : "SLAVE");
}
}));
return currentStateOutput;
}
private static Map<String, Resource> generateResourceMap() {
return getResourceNames().stream()
.map(resourceName -> {
Resource resource = new Resource(resourceName);
IntStream.range(0, PARTITION_COUNT)
.mapToObj(i -> "partition-" + i)
.forEach(resource::addPartition);
return resource;
})
.collect(Collectors.toMap(Resource::getResourceName, Function.identity()));
}
private static List<String> getResourceNames() {
return IntStream.range(0, RESOURCE_COUNT)
.mapToObj(i -> "resource-" + i)
.collect(Collectors.toList());
}
}