blob: 2e41b6dbeaef117654a5a4927efbad94243b7f70 [file] [log] [blame]
package org.apache.helix.controller.rebalancer.waged.model;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.HelixConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
public class TestClusterModelProvider extends AbstractTestClusterModel {
Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
@Override
protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
ResourceControllerDataProvider testCache = super.setupClusterDataCache();
// Set up mock idealstate
Map<String, IdealState> isMap = new HashMap<>();
for (String resource : _resourceNames) {
IdealState is = new IdealState(resource);
is.setNumPartitions(_partitionNames.size());
is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
is.setStateModelDefRef("MasterSlave");
is.setReplicas("3");
is.setRebalancerClassName(WagedRebalancer.class.getName());
_partitionNames.stream()
.forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
isMap.put(resource, is);
}
when(testCache.getIdealState(anyString())).thenAnswer(
(Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
// Set up 2 more instances
for (int i = 1; i < 3; i++) {
String instanceName = _testInstanceId + i;
_instances.add(instanceName);
// 1. Set up the default instance information with capacity configuration.
InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
instanceConfigMap.put(instanceName, testInstanceConfig);
when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
// 2. Mock the live instance node for the default instance.
LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
liveInstanceMap.put(instanceName, testLiveInstance);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
}
return testCache;
}
@Test
public void testFindToBeAssignedReplicasForMinActiveReplica() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
String instance1 = _testInstanceId;
String offlineInstance = _testInstanceId + "1";
String instance2 = _testInstanceId + "2";
Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
liveInstanceMap.put(instance1, createMockLiveInstance(instance1));
liveInstanceMap.put(instance2, createMockLiveInstance(instance2));
Set<String> activeInstances = new HashSet<>();
activeInstances.add(instance1);
activeInstances.add(instance2);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
// test 0, empty input
Assert.assertEquals(
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, Collections.emptySet(),
activeInstances, Collections.emptyMap(), new HashMap<>()),
Collections.emptySet());
// test 1, one partition under minActiveReplica
Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
_partitionNames.get(1), ImmutableMap.of("OFFLINE", offlineInstance)), // Partition2-MASTER
_resourceNames.get(1),
ImmutableMap.of(
_partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
_partitionNames.get(3), ImmutableMap.of("SLAVE", instance1))
);
Map<String, Set<AssignableReplica>> replicaMap = new HashMap<>(); // to populate
Map<String, ResourceAssignment> currentAssignment = new HashMap<>(); // to populate
prepareData(input, replicaMap, currentAssignment, testCache, 1);
Map<String, Set<AssignableReplica>> allocatedReplicas = new HashMap<>();
Set<AssignableReplica> toBeAssignedReplicas =
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, replicaMap.keySet(), activeInstances,
currentAssignment, allocatedReplicas);
Assert.assertEquals(toBeAssignedReplicas.size(), 1);
Assert.assertTrue(toBeAssignedReplicas.stream().map(AssignableReplica::toString).collect(Collectors.toSet())
.contains("Resource1-Partition2-MASTER"));
AssignableReplica replica = toBeAssignedReplicas.iterator().next();
Assert.assertEquals(replica.getReplicaState(), "MASTER");
Assert.assertEquals(replica.getPartitionName(), "Partition2");
// test 2, no additional replica to be assigned
testCache = setupClusterDataCache();
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
_partitionNames.get(1), ImmutableMap.of("SLAVE", instance2)),
_resourceNames.get(1),
ImmutableMap.of(
_partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
_partitionNames.get(3), ImmutableMap.of("SLAVE", instance2))
);
replicaMap = new HashMap<>(); // to populate
currentAssignment = new HashMap<>(); // to populate
prepareData(input, replicaMap, currentAssignment, testCache, 1);
allocatedReplicas = new HashMap<>();
toBeAssignedReplicas =
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, replicaMap.keySet(), activeInstances,
currentAssignment, allocatedReplicas);
Assert.assertTrue(toBeAssignedReplicas.isEmpty());
Assert.assertEquals(allocatedReplicas.get(instance1).size(), 2);
Assert.assertEquals(allocatedReplicas.get(instance2).size(), 2);
// test 3, minActiveReplica==2, two partitions falling short
testCache = setupClusterDataCache();
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2),
_partitionNames.get(1), ImmutableMap.of("MASTER", instance1, "OFFLINE", offlineInstance)), // Partition2-SLAVE
_resourceNames.get(1),
ImmutableMap.of(
_partitionNames.get(2), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2),
_partitionNames.get(3), ImmutableMap.of("SLAVE", instance1, "OFFLINE", offlineInstance)) // Partition4-MASTER
);
replicaMap = new HashMap<>(); // to populate
currentAssignment = new HashMap<>(); // to populate
prepareData(input, replicaMap, currentAssignment, testCache, 2);
allocatedReplicas = new HashMap<>();
toBeAssignedReplicas =
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, replicaMap.keySet(), activeInstances,
currentAssignment, allocatedReplicas);
Assert.assertEquals(toBeAssignedReplicas.size(), 2);
Assert.assertEquals(toBeAssignedReplicas.stream().map(AssignableReplica::toString).collect(Collectors.toSet()),
ImmutableSet.of("Resource1-Partition2-SLAVE", "Resource2-Partition4-MASTER"));
Assert.assertEquals(allocatedReplicas.get(instance1).size(), 4);
Assert.assertEquals(allocatedReplicas.get(instance2).size(), 2);
}
@Test(dependsOnMethods = "testFindToBeAssignedReplicasForMinActiveReplica")
public void testClusterModelForDelayedRebalanceOverwrite() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
String instance1 = _testInstanceId;
String offlineInstance = _testInstanceId + "1";
String instance2 = _testInstanceId + "2";
Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
liveInstanceMap.put(instance1, createMockLiveInstance(instance1));
liveInstanceMap.put(instance2, createMockLiveInstance(instance2));
Set<String> activeInstances = new HashSet<>();
activeInstances.add(instance1);
activeInstances.add(instance2);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
// test 1, one partition under minActiveReplica
Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
_partitionNames.get(1), ImmutableMap.of("OFFLINE", offlineInstance), // Partition2-MASTER
_partitionNames.get(2), ImmutableMap.of("MASTER", instance2),
_partitionNames.get(3), ImmutableMap.of("MASTER", instance2)),
_resourceNames.get(1),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance2),
_partitionNames.get(1), ImmutableMap.of("MASTER", instance2),
_partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
_partitionNames.get(3), ImmutableMap.of("OFFLINE", offlineInstance)) // Partition4-MASTER
);
Map<String, Set<AssignableReplica>> replicaMap = new HashMap<>(); // to populate
Map<String, ResourceAssignment> currentAssignment = new HashMap<>(); // to populate
prepareData(input, replicaMap, currentAssignment, testCache, 1);
Map<String, Resource> resourceMap = _resourceNames.stream().collect(Collectors.toMap(resource -> resource, Resource::new));
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(testCache,
resourceMap, activeInstances, currentAssignment);
Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance1));
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance2));
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance1).getAssignedReplicas().size(), 2);
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance2).getAssignedReplicas().size(), 4);
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource1").size(), 1);
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource1").iterator().next().toString(),
"Resource1-Partition2-MASTER");
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource2").size(), 1);
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource2").iterator().next().toString(),
"Resource2-Partition4-MASTER");
// test 2, minActiveReplica==2, three partitions falling short
testCache = setupClusterDataCache();
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2),
_partitionNames.get(1), ImmutableMap.of("MASTER", instance1, "OFFLINE", offlineInstance), // Partition2-SLAVE
_partitionNames.get(2), ImmutableMap.of("OFFLINE", offlineInstance, "SLAVE", instance2), // Partition3-MASTER
_partitionNames.get(3), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2)),
_resourceNames.get(1),
ImmutableMap.of(
_partitionNames.get(0), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2),
_partitionNames.get(1), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2),
_partitionNames.get(2), ImmutableMap.of("MASTER", instance1, "SLAVE", instance2),
_partitionNames.get(3), ImmutableMap.of("OFFLINE", offlineInstance, "ERROR", instance2)) // Partition4-MASTER
);
replicaMap = new HashMap<>(); // to populate
currentAssignment = new HashMap<>(); // to populate
prepareData(input, replicaMap, currentAssignment, testCache, 2);
clusterModel = ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(testCache,
resourceMap, activeInstances, currentAssignment);
Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance1));
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance2));
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance1).getAssignedReplicas().size(), 6);
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance2).getAssignedReplicas().size(), 7);
Set<String> replicaSet = clusterModel.getAssignableReplicaMap().get(_resourceNames.get(0))
.stream()
.map(AssignableReplica::toString)
.collect(Collectors.toSet());
Assert.assertEquals(replicaSet.size(), 2);
Assert.assertTrue(replicaSet.contains("Resource1-Partition2-SLAVE"));
Assert.assertTrue(replicaSet.contains("Resource1-Partition3-MASTER"));
replicaSet = clusterModel.getAssignableReplicaMap().get(_resourceNames.get(1))
.stream()
.map(AssignableReplica::toString)
.collect(Collectors.toSet());
Assert.assertEquals(replicaSet.size(), 1);
Assert.assertTrue(replicaSet.contains("Resource2-Partition4-MASTER"));
}
/**
* Prepare mock objects with given input. This methods prepare replicaMap and populate testCache with currentState.
*
* @param input <resource, <partition, <state, instance> > >
* @param replicaMap The data map to prepare, a set of AssignableReplica by resource name.
* @param currentAssignment The data map to prepare, resourceAssignment by resource name
* @param testCache The mock object to prepare
*/
private void prepareData(Map<String, Map<String, Map<String, String>>> input,
Map<String, Set<AssignableReplica>> replicaMap,
Map<String, ResourceAssignment> currentAssignment,
ResourceControllerDataProvider testCache,
int minActiveReplica) {
// Set up mock idealstate
Map<String, IdealState> isMap = new HashMap<>();
for (String resource : _resourceNames) {
ResourceConfig resourceConfig = new ResourceConfig.Builder(resource)
.setMinActiveReplica(minActiveReplica)
.setNumReplica(3)
.build();
_resourceConfigMap.put(resource, resourceConfig);
IdealState is = new IdealState(resource);
is.setNumPartitions(_partitionNames.size());
is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
is.setStateModelDefRef("MasterSlave");
is.setReplicas("3");
is.setMinActiveReplicas(minActiveReplica);
is.setRebalancerClassName(WagedRebalancer.class.getName());
_partitionNames.forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
isMap.put(resource, is);
}
when(testCache.getIdealState(anyString())).thenAnswer(
(Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
when(testCache.getResourceConfig(anyString())).thenAnswer(
(Answer<ResourceConfig>) invocationOnMock -> _resourceConfigMap.get(invocationOnMock.getArguments()[0]));
// <instance, <resource, CurrentState>>
Map<String, Map<String, CurrentState>> currentStateByInstanceByResource = new HashMap<>();
Map<String, Map<String, Map<String, String>>> stateByInstanceByResourceByPartition = new HashMap<>();
for (String resource : input.keySet()) {
Set<AssignableReplica> replicas = new HashSet<>();
replicaMap.put(resource, replicas);
ResourceConfig resourceConfig = _resourceConfigMap.get(resource);
for (String partition : input.get(resource).keySet()) {
input.get(resource).get(partition).forEach(
(state, instance) -> {
stateByInstanceByResourceByPartition
.computeIfAbsent(instance, k -> new HashMap<>())
.computeIfAbsent(resource, k -> new HashMap<>())
.put(partition, state);
replicas.add(new MockAssignableReplica(resourceConfig, partition, state));
});
}
}
for (String instance : stateByInstanceByResourceByPartition.keySet()) {
for (String resource : stateByInstanceByResourceByPartition.get(instance).keySet()) {
Map<String, String> partitionState = stateByInstanceByResourceByPartition.get(instance).get(resource);
CurrentState testCurrentStateResource = mockCurrentStateResource(partitionState);
currentStateByInstanceByResource.computeIfAbsent(instance, k -> new HashMap<>()).put(resource, testCurrentStateResource);
}
}
for (String instance : currentStateByInstanceByResource.keySet()) {
when(testCache.getCurrentState(instance, _sessionId)).thenReturn(currentStateByInstanceByResource.get(instance));
when(testCache.getCurrentState(instance, _sessionId, false))
.thenReturn(currentStateByInstanceByResource.get(instance));
}
// Mock a baseline assignment based on the current states.
for (String resource : _resourceNames) {
// <partition, <instance, state>>
Map<String, Map<String, String>> assignmentMap = new HashMap<>();
for (String instance : _instances) {
CurrentState cs = testCache.getCurrentState(instance, _sessionId).get(resource);
if (cs != null) {
for (Map.Entry<String, String> stateEntry : cs.getPartitionStateMap().entrySet()) {
assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new HashMap<>())
.put(instance, stateEntry.getValue());
}
ResourceAssignment assignment = new ResourceAssignment(resource);
assignmentMap.keySet().forEach(partition -> assignment
.addReplicaMap(new Partition(partition), assignmentMap.get(partition)));
currentAssignment.put(resource, assignment);
}
}
}
}
private CurrentState mockCurrentStateResource(Map<String, String> partitionState) {
CurrentState testCurrentStateResource = Mockito.mock(CurrentState.class);
when(testCurrentStateResource.getResourceName()).thenReturn(_resourceNames.get(0));
when(testCurrentStateResource.getPartitionStateMap()).thenReturn(partitionState);
when(testCurrentStateResource.getStateModelDefRef()).thenReturn("MasterSlave");
when(testCurrentStateResource.getSessionId()).thenReturn(_sessionId);
for (Map.Entry<String, String> entry : partitionState.entrySet()) {
when(testCurrentStateResource.getState(entry.getKey())).thenReturn(entry.getValue());
}
return testCurrentStateResource;
}
@Test
public void testGenerateClusterModel() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
// 1. test generating a cluster model with empty assignment
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, Collections.emptyMap(), Collections.emptyMap());
// There should be no existing assignment.
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.anyMatch(resourceMap -> !resourceMap.isEmpty()));
Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
.anyMatch(node -> node.getAssignedReplicaCount() != 0));
// Have all 3 instances
Assert.assertEquals(
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
.collect(Collectors.toSet()), _instances);
// Shall have 2 resources and 4 replicas, since all nodes are in the same fault zone.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 4));
// Adjust instance fault zone, so they have different fault zones.
testCache.getAssignableInstanceConfigMap().values().stream()
.forEach(config -> config.setZoneId(config.getInstanceName()));
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, Collections.emptyMap(), Collections.emptyMap());
// Shall have 2 resources and 12 replicas after fault zone adjusted.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 12));
// 2. test with only one active node
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
Collections.singleton(_testInstanceId), Collections.emptyMap(), Collections.emptyMap());
// Have only one instance
Assert.assertEquals(
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
.collect(Collectors.toSet()), Collections.singleton(_testInstanceId));
// Shall have 4 assignable replicas because there is only one valid node.
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 4));
// 3. test with no active instance
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap());
// Have only one instance
Assert.assertEquals(clusterModel.getAssignableNodes().size(), 0);
// Shall have 0 assignable replicas because there are 0 valid nodes.
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.isEmpty()));
// 4. test with baseline assignment
// Mock a baseline assignment based on the current states.
Map<String, ResourceAssignment> baselineAssignment = new HashMap<>();
for (String resource : _resourceNames) {
// <partition, <instance, state>>
Map<String, Map<String, String>> assignmentMap = new HashMap<>();
CurrentState cs = testCache.getCurrentState(_testInstanceId, _sessionId).get(resource);
if (cs != null) {
for (Map.Entry<String, String> stateEntry : cs.getPartitionStateMap().entrySet()) {
assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new HashMap<>())
.put(_testInstanceId, stateEntry.getValue());
}
ResourceAssignment assignment = new ResourceAssignment(resource);
assignmentMap.keySet().stream().forEach(partition -> assignment
.addReplicaMap(new Partition(partition), assignmentMap.get(partition)));
baselineAssignment.put(resource, assignment);
}
}
// Generate a cluster model based on the best possible assignment
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, Collections.emptyMap(), baselineAssignment);
// There should be 4 existing assignments in total (each resource has 2) in the specified instance
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.allMatch(resourceMap -> resourceMap.values().stream()
.allMatch(partitionSet -> partitionSet.size() == 2)));
Assert.assertEquals(
clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 4);
// Since each resource has 2 replicas assigned, the assignable replica count should be 10.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 10));
// 5. test with best possible assignment but cluster topology is changed
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances,
Collections.singletonMap(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()),
baselineAssignment);
// There should be no existing assignment since the topology change invalidates all existing assignment
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.allMatch(resourceMap -> resourceMap.isEmpty()));
Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
.anyMatch(node -> node.getAssignedReplicaCount() != 0));
// Shall have 2 resources and 12 replicas
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 12));
// 6. test with best possible assignment and one resource config change
// Generate a cluster model based on the same best possible assignment, but resource1 config is changed
String changedResourceName = _resourceNames.get(0);
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
Collections.singleton(changedResourceName)), baselineAssignment);
// There should be no existing assignment for all the resource except for resource2
Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(), 1);
Map<String, Set<String>> resourceAssignmentMap =
clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testInstanceId);
// Should be only resource2 in the map
Assert.assertEquals(resourceAssignmentMap.size(), 1);
for (String resource : _resourceNames) {
Assert
.assertEquals(resourceAssignmentMap.getOrDefault(resource, Collections.emptySet()).size(),
resource.equals(changedResourceName) ? 0 : 2);
}
// Only the first instance will have 2 assignment from resource2.
for (String instance : _instances) {
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance).getAssignedReplicaCount(),
instance.equals(_testInstanceId) ? 2 : 0);
}
// Shall have 2 resources and 12 replicas
Assert.assertEquals(clusterModel.getAssignableReplicaMap().keySet().size(), 2);
for (String resource : _resourceNames) {
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get(resource).size(),
resource.equals(changedResourceName) ? 12 : 10);
}
// 7. test with best possible assignment but the instance becomes inactive
// Generate a cluster model based on the best possible assignment, but the assigned node is disabled
Set<String> limitedActiveInstances = new HashSet<>(_instances);
limitedActiveInstances.remove(_testInstanceId);
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
limitedActiveInstances, Collections.emptyMap(), baselineAssignment);
// There should be no existing assignment.
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.anyMatch(resourceMap -> !resourceMap.isEmpty()));
Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
.anyMatch(node -> node.getAssignedReplicaCount() != 0));
// Have only 2 instances
Assert.assertEquals(
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
.collect(Collectors.toSet()), limitedActiveInstances);
// Since only 2 instances are active, we shall have 8 assignable replicas in each resource.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 8));
}
@Test (dependsOnMethods = "testGenerateClusterModel")
public void testGenerateClusterModelForPartialRebalance() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
// 1. test generating a cluster model with empty assignment
ClusterModel clusterModel = ClusterModelProvider
.generateClusterModelForPartialRebalance(testCache, _resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, Collections.emptyMap(), Collections.emptyMap());
// There should be no existing assignment.
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.anyMatch(resourceMap -> !resourceMap.isEmpty()));
Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
.anyMatch(node -> node.getAssignedReplicaCount() != 0));
// Have all 3 instances
Assert.assertEquals(
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
.collect(Collectors.toSet()), _instances);
// Shall have 0 resources and 0 replicas since the baseline is empty. The partial rebalance
// should not rebalance any replica.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
// Adjust instance fault zone, so they have different fault zones.
testCache.getAssignableInstanceConfigMap().values().stream()
.forEach(config -> config.setZoneId(config.getInstanceName()));
// 2. test with a pair of identical best possible assignment and baseline assignment
// Mock a best possible assignment based on the current states.
Map<String, ResourceAssignment> bestPossibleAssignment = new HashMap<>();
for (String resource : _resourceNames) {
// <partition, <instance, state>>
Map<String, Map<String, String>> assignmentMap = new HashMap<>();
CurrentState cs = testCache.getCurrentState(_testInstanceId, _sessionId).get(resource);
if (cs != null) {
for (Map.Entry<String, String> stateEntry : cs.getPartitionStateMap().entrySet()) {
assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new HashMap<>())
.put(_testInstanceId, stateEntry.getValue());
}
ResourceAssignment assignment = new ResourceAssignment(resource);
assignmentMap.keySet().stream().forEach(partition -> assignment
.addReplicaMap(new Partition(partition), assignmentMap.get(partition)));
bestPossibleAssignment.put(resource, assignment);
}
}
Map<String, ResourceAssignment> baseline = new HashMap<>(bestPossibleAssignment);
// Generate a cluster model for partial rebalance
clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, baseline, bestPossibleAssignment);
// There should be 4 existing assignments in total (each resource has 2) in the specified instance
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.allMatch(resourceMap -> resourceMap.values().stream()
.allMatch(partitionSet -> partitionSet.size() == 2)));
Assert.assertEquals(
clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 4);
// Since the best possible matches the baseline, no replica needs to be reassigned.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
// 3. test with inactive instance in the baseline and the best possible assignment
Set<String> partialInstanceList = new HashSet<>(_instances);
partialInstanceList.remove(_testInstanceId);
clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
partialInstanceList, baseline, bestPossibleAssignment);
// Have the other 2 active instances
Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
// All the replicas in the existing assignment should be rebalanced.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 2));
// Shall have 0 assigned replicas
Assert.assertTrue(clusterModel.getAssignableNodes().values().stream()
.allMatch(assignableNode -> assignableNode.getAssignedReplicaCount() == 0));
// 4. test with one resource that is only in the baseline
String resourceInBaselineOnly = _resourceNames.get(0);
Map<String, ResourceAssignment> partialBestPossibleAssignment =
new HashMap<>(bestPossibleAssignment);
partialBestPossibleAssignment.remove(resourceInBaselineOnly);
// Generate a cluster mode with the adjusted best possible assignment
clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, baseline, partialBestPossibleAssignment);
// There should be 2 existing assignments in total in the specified instance
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.allMatch(resourceMap -> resourceMap.values().stream()
.allMatch(partitionSet -> partitionSet.size() == 2)));
// Only the replicas of one resource require rebalance
Assert.assertEquals(
clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 2);
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 1);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().containsKey(resourceInBaselineOnly));
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 2));
// 5. test with one resource only in the best possible assignment
String resourceInBestPossibleOnly = _resourceNames.get(1);
Map<String, ResourceAssignment> partialBaseline = new HashMap<>(baseline);
partialBaseline.remove(resourceInBestPossibleOnly);
// Generate a cluster model with the adjusted baseline
clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
_resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, partialBaseline, bestPossibleAssignment);
// There should be 2 existing assignments in total and all of them require rebalance.
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.allMatch(resourceMap -> resourceMap.values().stream()
.allMatch(partitionSet -> partitionSet.size() == 2)));
Assert.assertEquals(
clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 2);
// No need to rebalance the replicas that are not in the baseline yet.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
}
static class MockAssignableReplica extends AssignableReplica {
MockAssignableReplica(ResourceConfig resourceConfig, String partition, String replicaState) {
super(new ClusterConfig("testCluster"), resourceConfig, partition, replicaState, 1);
}
}
}