blob: 191319779abc91398645a5c24047e21d48b10d58 [file] [log] [blame]
package org.apache.helix.controller.rebalancer;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.helix.HelixException;
import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
import org.apache.helix.controller.rebalancer.constraint.TotalCapacityConstraint;
import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockCapacityProvider;
import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockPartitionWeightProvider;
import org.apache.helix.controller.rebalancer.strategy.ConstraintRebalanceStrategy;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestConstraintRebalanceStrategy {
private static Logger _logger = LoggerFactory.getLogger(TestConstraintRebalanceStrategy.class);
final String resourceNamePrefix = "resource";
final int nParticipants = 40;
final int nResources = 20;
final int nPartitions = 100;
final int nReplicas = 3;
final int defaultCapacity = 6000; // total = 6000*40 = 240000
final int resourceWeight = 10; // total = 20*100*3*10 = 60000
final String topState = "ONLINE";
final List<String> resourceNames = new ArrayList<>();
final List<String> instanceNames = new ArrayList<>();
final List<String> partitions = new ArrayList<>(nPartitions);
final ResourceControllerDataProvider cache = new ResourceControllerDataProvider();
final LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
@BeforeClass
public void beforeClass() {
for (int i = 0; i < nResources; i++) {
resourceNames.add(resourceNamePrefix + i);
}
for (int i = 0; i < nParticipants; i++) {
instanceNames.add("node" + i);
}
for (int i = 0; i < nPartitions; i++) {
partitions.add(Integer.toString(i));
}
setupMockCluster();
}
private void setupMockCluster() {
List<LiveInstance> liveInstanceList = new ArrayList<>();
Map<String, InstanceConfig> instanceConfigs = new HashMap<>();
for (String instance : instanceNames) {
LiveInstance liveInstance = new LiveInstance(instance);
liveInstanceList.add(liveInstance);
InstanceConfig config = new InstanceConfig(instance);
instanceConfigs.put(instance, config);
}
cache.setLiveInstances(liveInstanceList);
cache.setInstanceConfigMap(instanceConfigs);
ClusterConfig clusterConfig = new ClusterConfig("test");
clusterConfig.setTopologyAwareEnabled(false);
cache.setClusterConfig(clusterConfig);
states.put("OFFLINE", 0);
states.put(topState, nReplicas);
}
private Map<String, Map<String, Map<String, String>>> calculateAssignment(
List<AbstractRebalanceHardConstraint> hardConstraints,
List<AbstractRebalanceSoftConstraint> softConstraints) {
Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
ConstraintRebalanceStrategy strategy =
new ConstraintRebalanceStrategy(hardConstraints, softConstraints);
for (String resourceName : resourceNames) {
Map<String, Map<String, String>> partitionMap = new HashMap<>();
strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
new HashMap<String, Map<String, String>>(), cache).getMapFields());
result.put(resourceName, partitionMap);
}
return result;
}
private Map<String, Integer> checkPartitionUsage(
Map<String, Map<String, Map<String, String>>> assignment,
PartitionWeightProvider weightProvider) {
Map<String, Integer> weightCount = new HashMap<>();
for (String resource : assignment.keySet()) {
Map<String, Map<String, String>> partitionMap = assignment.get(resource);
for (String partition : partitionMap.keySet()) {
// check states
Map<String, Integer> stateCount = new HashMap<>(states);
Map<String, String> stateMap = partitionMap.get(partition);
for (String state : stateMap.values()) {
Assert.assertTrue(stateCount.containsKey(state));
stateCount.put(state, stateCount.get(state) - 1);
}
for (int count : stateCount.values()) {
Assert.assertEquals(count, 0);
}
// report weight
int partitionWeight = weightProvider.getPartitionWeight(resource, partition);
for (String instance : partitionMap.get(partition).keySet()) {
if (!weightCount.containsKey(instance)) {
weightCount.put(instance, partitionWeight);
} else {
weightCount.put(instance, weightCount.get(instance) + partitionWeight);
}
}
}
}
return weightCount;
}
@Test
public void testEvenness() {
// capacity / weight
Map<String, Integer> capacity = new HashMap<>();
for (String instance : instanceNames) {
capacity.put(instance, defaultCapacity);
}
PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
TotalCapacityConstraint capacityConstraint =
new TotalCapacityConstraint(weightProvider, capacityProvider);
PartitionWeightAwareEvennessConstraint evenConstraint =
new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
Map<String, Map<String, Map<String, String>>> assignment = calculateAssignment(
Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
int max = Collections.max(weightCount.values());
int min = Collections.min(weightCount.values());
// Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
Assert.assertTrue((max - min) <= defaultCapacity / 100);
}
@Test
public void testEvennessByDefaultConstraint() {
Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy();
for (String resourceName : resourceNames) {
Map<String, Map<String, String>> partitionMap = new HashMap<>();
strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
new HashMap<String, Map<String, String>>(), cache).getMapFields());
result.put(resourceName, partitionMap);
}
Map<String, Integer> weightCount = checkPartitionUsage(result, new PartitionWeightProvider() {
@Override
public int getPartitionWeight(String resource, String partition) {
return 1;
}
});
int max = Collections.max(weightCount.values());
int min = Collections.min(weightCount.values());
// Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
Assert.assertTrue((max - min) <= defaultCapacity / 100);
}
@Test
public void testCapacityAwareEvenness() {
// capacity / weight
int totalBucket = 0;
Map<String, Integer> capacity = new HashMap<>();
for (int i = 0; i < instanceNames.size(); i++) {
capacity.put(instanceNames.get(i), defaultCapacity * (1 + i % 3));
totalBucket += 1 + i % 3;
}
int partitionWeightGranularity = (int) (resourceWeight * 1.5);
int totalPartitionWeight = 0;
Random ran = new Random(System.currentTimeMillis());
Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
for (String resource : resourceNames) {
Map<String, Integer> weights = new HashMap<>();
for (String partition : partitions) {
int weight = resourceWeight / 2 + ran.nextInt(resourceWeight);
weights.put(partition, weight);
totalPartitionWeight += weight * nReplicas;
}
partitionWeightMap.put(resource, weights);
}
PartitionWeightProvider weightProvider =
new MockPartitionWeightProvider(partitionWeightMap, resourceWeight);
CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
PartitionWeightAwareEvennessConstraint evenConstraint =
new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
Map<String, Map<String, Map<String, String>>> assignment =
calculateAssignment(Collections.EMPTY_LIST,
Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
for (int i = 0; i < instanceNames.size(); i++) {
String instanceName = instanceNames.get(i);
int expectedUsage = (int) ((double) totalPartitionWeight) / totalBucket * (1 + i % 3);
int realUsage = weightCount.get(instanceName);
// When have different capacity, calculation in the rebalance algorithm would have more fractions, so lose the restriction to 90% to 110% compared with the ideal value.
Assert.assertTrue((expectedUsage - partitionWeightGranularity) * 0.9 <= realUsage
&& (expectedUsage + partitionWeightGranularity) * 1.1 >= realUsage);
}
}
@Test
public void testHardConstraintFails() {
// capacity / weight
Map<String, Integer> capacity = new HashMap<>();
for (String instance : instanceNames) {
// insufficient capacity
capacity.put(instance, defaultCapacity / 100);
}
PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
TotalCapacityConstraint capacityConstraint =
new TotalCapacityConstraint(weightProvider, capacityProvider);
try {
calculateAssignment(
Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
Collections.EMPTY_LIST);
Assert.fail("Assignment should fail because of insufficient capacity.");
} catch (HelixException e) {
// expected
}
}
@Test(dependsOnMethods = "testHardConstraintFails")
public void testConflictConstraint() {
// capacity / weight
Map<String, Integer> capacity = new HashMap<>();
for (String instance : instanceNames) {
// insufficient capacity
capacity.put(instance, defaultCapacity);
}
PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
TotalCapacityConstraint normalCapacityConstraint =
new TotalCapacityConstraint(weightProvider, capacityProvider);
TotalCapacityConstraint conflictingCapacityConstraint =
new TotalCapacityConstraint(weightProvider,
new MockCapacityProvider(Collections.EMPTY_MAP, 0));
List<AbstractRebalanceHardConstraint> constraints = new ArrayList<>();
constraints.add(normalCapacityConstraint);
constraints.add(conflictingCapacityConstraint);
try {
calculateAssignment(constraints, Collections.EMPTY_LIST);
Assert.fail("Assignment should fail because of the conflicting capacity constraint.");
} catch (HelixException e) {
// expected
}
}
@Test(dependsOnMethods = "testEvenness")
public void testSoftConstraintFails() {
// capacity / weight
Map<String, Integer> capacity = new HashMap<>();
for (String instance : instanceNames) {
// insufficient capacity
capacity.put(instance, defaultCapacity / 50);
}
PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
PartitionWeightAwareEvennessConstraint evenConstraint =
new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
Map<String, Map<String, Map<String, String>>> assignment =
calculateAssignment(Collections.EMPTY_LIST,
Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
int max = Collections.max(weightCount.values());
int min = Collections.min(weightCount.values());
// Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
Assert.assertTrue((max - min) <= defaultCapacity / 100);
}
@Test(dependsOnMethods = "testEvenness")
public void testRebalanceWithPreferredAssignment() {
// capacity / weight
Map<String, Integer> capacity = new HashMap<>();
for (String instance : instanceNames) {
capacity.put(instance, defaultCapacity);
}
PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
PartitionWeightAwareEvennessConstraint evenConstraint =
new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
// inject valid partition assignment for one resources into preferred assignment.
List<String> instances = instanceNames.subList(0, nReplicas);
Map<String, Map<String, String>> preferredPartitionAssignment = new HashMap<>();
Map<String, String> replicaState = new HashMap<>();
for (String instance : instances) {
replicaState.put(instance, topState);
}
preferredPartitionAssignment.put(partitions.get(0), replicaState);
Map<String, Map<String, Map<String, String>>> preferredAssignment = new HashMap<>();
preferredAssignment.put(resourceNames.get(0), preferredPartitionAssignment);
// inject invalid partition assignment for one resources into preferred assignment.
instances = instanceNames.subList(0, nReplicas - 1);
Map<String, String> invalidReplicaState = new HashMap<>();
for (String instance : instances) {
invalidReplicaState.put(instance, topState);
}
preferredPartitionAssignment = new HashMap<>();
preferredPartitionAssignment.put(partitions.get(0), invalidReplicaState);
preferredAssignment.put(resourceNames.get(1), preferredPartitionAssignment);
Map<String, Map<String, Map<String, String>>> assignment = new HashMap<>();
ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy(Collections.EMPTY_LIST,
Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
for (String resourceName : resourceNames) {
Map<String, Map<String, String>> partitionMap = new HashMap<>();
strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
preferredAssignment.containsKey(resourceName) ?
preferredAssignment.get(resourceName) :
Collections.EMPTY_MAP, cache).getMapFields());
assignment.put(resourceName, partitionMap);
}
// Even with preferred assignment, the weight should still be balance
Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
int max = Collections.max(weightCount.values());
int min = Collections.min(weightCount.values());
// Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
Assert.assertTrue((max - min) <= defaultCapacity / 100);
// the resource 0 assignment should be kept the same
Collection<String> resource_0_Assignment =
assignment.get(resourceNames.get(0)).get(partitions.get(0)).keySet();
Assert.assertTrue(resource_0_Assignment.containsAll(instanceNames.subList(0, nReplicas))
&& resource_0_Assignment.size() == nReplicas);
// the resource 1 assignment should be set to a valid one
Assert.assertTrue(
assignment.get(resourceNames.get(1)).get(partitions.get(0)).size() == nReplicas);
}
@Test
public void testTopologyAwareAssignment() {
// Topology Aware configuration
ResourceControllerDataProvider cache = new ResourceControllerDataProvider();
List<LiveInstance> liveInstanceList = new ArrayList<>();
Map<String, InstanceConfig> instanceConfigs = new HashMap<>();
for (int i = 0; i < instanceNames.size(); i++) {
String instance = instanceNames.get(i);
LiveInstance liveInstance = new LiveInstance(instance);
liveInstanceList.add(liveInstance);
InstanceConfig config = new InstanceConfig(instance);
config.setDomain(String.format("Rack=%s,Host=%s", i % (nParticipants / 5), instance));
instanceConfigs.put(instance, config);
}
cache.setLiveInstances(liveInstanceList);
cache.setInstanceConfigMap(instanceConfigs);
ClusterConfig clusterConfig = new ClusterConfig("test");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/Rack/Host");
clusterConfig.setFaultZoneType("Rack");
cache.setClusterConfig(clusterConfig);
Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy();
for (String resourceName : resourceNames) {
Map<String, Map<String, String>> partitionMap = new HashMap<>();
strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
new HashMap<String, Map<String, String>>(), cache).getMapFields());
result.put(resourceName, partitionMap);
}
Map<String, Integer> weightCount = checkPartitionUsage(result, new PartitionWeightProvider() {
@Override
public int getPartitionWeight(String resource, String partition) {
return defaultCapacity;
}
});
int max = Collections.max(weightCount.values());
int min = Collections.min(weightCount.values());
Assert.assertTrue((max - min) <= defaultCapacity / 100);
// check for domain assignment
Map<String, Set<String>> domainPartitionMap = new HashMap<>();
for (Map<String, Map<String, String>> partitionMap : result.values()) {
domainPartitionMap.clear();
for (String partition : partitionMap.keySet()) {
for (String instance : partitionMap.get(partition).keySet()) {
String domain = instanceConfigs.get(instance).getDomainAsString().split(",")[0].split("=")[1];
if (domainPartitionMap.containsKey(domain)) {
Assert.assertFalse(domainPartitionMap.get(domain).contains(partition));
} else {
domainPartitionMap.put(domain, new HashSet<String>());
}
domainPartitionMap.get(domain).add(partition);
}
}
}
}
}