blob: f2bf9fca91a242c864264627c3eadd8cef92795f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@SuppressWarnings("unchecked")
public class ReplicaGroupSegmentAssignmentStrategyTest {
private static final int NUM_REPLICAS = 3;
private static final String SEGMENT_NAME_PREFIX = "segment_";
private static final int NUM_SEGMENTS = 12;
private static final List<String> SEGMENTS =
SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS);
private static final String INSTANCE_NAME_PREFIX = "instance_";
private static final int NUM_INSTANCES = 18;
private static final List<String> INSTANCES =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, NUM_INSTANCES);
private static final String RAW_TABLE_NAME_WITHOUT_PARTITION = "testTableWithoutPartition";
private static final String INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION =
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME_WITHOUT_PARTITION);
private static final String RAW_TABLE_NAME_WITH_PARTITION = "testTableWithPartition";
private static final String OFFLINE_TABLE_NAME_WITH_PARTITION =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_WITH_PARTITION);
private static final String INSTANCE_PARTITIONS_NAME_WITH_PARTITION =
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME_WITH_PARTITION);
private static final String PARTITION_COLUMN = "partitionColumn";
private static final int NUM_PARTITIONS = 3;
private SegmentAssignment _segmentAssignmentWithoutPartition;
private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMapWithoutPartition;
private SegmentAssignment _segmentAssignmentWithPartition;
private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMapWithPartition;
@BeforeClass
public void setUp() {
TableConfig tableConfigWithoutPartitions =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION)
.setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
_segmentAssignmentWithoutPartition =
SegmentAssignmentFactory.getSegmentAssignment(null, tableConfigWithoutPartitions);
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5],
// 0_1=[instance_6, instance_7, instance_8, instance_9, instance_10, instance_11],
// 0_2=[instance_12, instance_13, instance_14, instance_15, instance_16, instance_17]
// }
InstancePartitions instancePartitionsWithoutPartition =
new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION);
int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS;
int instanceIdToAdd = 0;
for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) {
List<String> instancesForReplicaGroup = new ArrayList<>(numInstancesPerReplicaGroup);
for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
instancesForReplicaGroup.add(INSTANCES.get(instanceIdToAdd++));
}
instancePartitionsWithoutPartition.setInstances(0, replicaGroupId, instancesForReplicaGroup);
}
_instancePartitionsMapWithoutPartition =
Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitionsWithoutPartition);
// Mock HelixManager
ZkHelixPropertyStore<ZNRecord> propertyStoreWithPartitions = mock(ZkHelixPropertyStore.class);
List<ZNRecord> segmentZKMetadataZNRecords = new ArrayList<>(NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
int partitionId = segmentId % NUM_PARTITIONS;
segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
new ColumnPartitionMetadata(null, NUM_PARTITIONS, Collections.singleton(partitionId), null))));
ZNRecord segmentZKMetadataZNRecord = segmentZKMetadata.toZNRecord();
when(propertyStoreWithPartitions.get(
eq(ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME_WITH_PARTITION, segmentName)),
any(), anyInt())).thenReturn(segmentZKMetadataZNRecord);
segmentZKMetadataZNRecords.add(segmentZKMetadataZNRecord);
}
when(propertyStoreWithPartitions
.getChildren(eq(ZKMetadataProvider.constructPropertyStorePathForResource(OFFLINE_TABLE_NAME_WITH_PARTITION)),
any(), anyInt(), anyInt(), anyInt())).thenReturn(segmentZKMetadataZNRecords);
HelixManager helixManagerWithPartitions = mock(HelixManager.class);
when(helixManagerWithPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithPartitions);
int numInstancesPerPartition = numInstancesPerReplicaGroup / NUM_REPLICAS;
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
new ReplicaGroupStrategyConfig(PARTITION_COLUMN, numInstancesPerPartition);
TableConfig tableConfigWithPartitions =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION)
.setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig).build();
_segmentAssignmentWithPartition =
SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions, tableConfigWithPartitions);
// {
// 0_0=[instance_0, instance_1], 1_0=[instance_2, instance_3], 2_0=[instance_4, instance_5],
// 0_1=[instance_6, instance_7], 1_1=[instance_8, instance_9], 2_1=[instance_10, instance_11],
// 0_2=[instance_12, instance_13], 1_2=[instance_14, instance_15], 2_2=[instance_16, instance_17]
// }
InstancePartitions instancePartitionsWithPartition =
new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
instanceIdToAdd = 0;
for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) {
for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
List<String> instancesForPartition = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
}
instancePartitionsWithPartition.setInstances(partitionId, replicaGroupId, instancesForPartition);
}
}
_instancePartitionsMapWithPartition =
Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitionsWithPartition);
}
@Test
public void testFactory() {
assertTrue(_segmentAssignmentWithoutPartition instanceof OfflineSegmentAssignment);
assertTrue(_segmentAssignmentWithPartition instanceof OfflineSegmentAssignment);
}
@Test
public void testAssignSegmentWithoutPartition() {
int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 should be assigned to instance 0, 6, 12
// Segment 1 should be assigned to instance 1, 7, 13
// Segment 2 should be assigned to instance 2, 8, 14
// Segment 3 should be assigned to instance 3, 9, 15
// Segment 4 should be assigned to instance 4, 10, 16
// Segment 5 should be assigned to instance 5, 11, 17
// Segment 6 should be assigned to instance 0, 6, 12
// Segment 7 should be assigned to instance 1, 7, 13
// ...
for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) {
int expectedAssignedInstanceId =
segmentId % numInstancesPerReplicaGroup + replicaGroupId * numInstancesPerReplicaGroup;
assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId));
}
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
}
@Test
public void testAssignSegmentWithPartition() {
int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
int numInstancesPerPartition = numInstancesPerReplicaGroup / NUM_PARTITIONS;
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 6, 12
// Segment 1 (partition 1) should be assigned to instance 2, 8, 14
// Segment 2 (partition 2) should be assigned to instance 4, 10, 16
// Segment 3 (partition 0) should be assigned to instance 1, 7, 13
// Segment 4 (partition 1) should be assigned to instance 3, 9, 15
// Segment 5 (partition 2) should be assigned to instance 5, 11, 17
// Segment 6 (partition 0) should be assigned to instance 0, 6, 12
// Segment 7 (partition 1) should be assigned to instance 2, 8, 14
// ...
int partitionId = segmentId % NUM_PARTITIONS;
for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) {
int expectedAssignedInstanceId =
(segmentId % numInstancesPerReplicaGroup) / NUM_PARTITIONS + partitionId * numInstancesPerPartition
+ replicaGroupId * numInstancesPerReplicaGroup;
assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId));
}
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
}
@Test
public void testTableBalancedWithoutPartition() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
assertEquals(currentAssignment.size(), NUM_SEGMENTS);
// Each segment should have 3 replicas
for (Map<String, String> instanceStateMap : currentAssignment.values()) {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
int numSegmentsPerInstance = NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES;
Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
assertEquals(_segmentAssignmentWithoutPartition
.rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, null, null,
new BaseConfiguration()),
currentAssignment);
}
@Test
public void testTableBalancedWithPartition() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
assertEquals(currentAssignment.size(), NUM_SEGMENTS);
// Each segment should have 3 replicas
for (Map<String, String> instanceStateMap : currentAssignment.values()) {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
int numSegmentsPerInstance = NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES;
Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
assertEquals(_segmentAssignmentWithPartition
.rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null, null,
new BaseConfiguration()),
currentAssignment);
}
@Test
public void testBootstrapTableWithoutPartition() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their alphabetical order
Configuration rebalanceConfig = new BaseConfiguration();
rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
Map<String, Map<String, String>> newAssignment = _segmentAssignmentWithoutPartition
.rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
List<String> sortedSegments = new ArrayList<>(SEGMENTS);
sortedSegments.sort(null);
for (int i = 0; i < NUM_SEGMENTS; i++) {
assertEquals(newAssignment.get(sortedSegments.get(i)), currentAssignment.get(SEGMENTS.get(i)));
}
}
@Test
public void testBootstrapTableWithPartition() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their alphabetical order within the partition
Configuration rebalanceConfig = new BaseConfiguration();
rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
Map<String, Map<String, String>> newAssignment = _segmentAssignmentWithPartition
.rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
int numSegmentsPerPartition = NUM_SEGMENTS / NUM_PARTITIONS;
String[][] partitionIdToSegmentsMap = new String[NUM_PARTITIONS][numSegmentsPerPartition];
for (int i = 0; i < NUM_SEGMENTS; i++) {
partitionIdToSegmentsMap[i % NUM_PARTITIONS][i / NUM_PARTITIONS] = SEGMENTS.get(i);
}
String[][] partitionIdToSortedSegmentsMap = new String[NUM_PARTITIONS][numSegmentsPerPartition];
for (int i = 0; i < NUM_PARTITIONS; i++) {
String[] sortedSegments = new String[numSegmentsPerPartition];
System.arraycopy(partitionIdToSegmentsMap[i], 0, sortedSegments, 0, numSegmentsPerPartition);
Arrays.sort(sortedSegments);
partitionIdToSortedSegmentsMap[i] = sortedSegments;
}
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numSegmentsPerPartition; j++) {
assertEquals(newAssignment.get(partitionIdToSortedSegmentsMap[i][j]),
currentAssignment.get(partitionIdToSegmentsMap[i][j]));
}
}
}
@Test
public void testRebalanceTableWithPartitionColumnAndInstancePartitionsMapWithOnePartition() {
// make an unbalanced assignment by assigning all segments to the first three instances
String instance0 = INSTANCE_NAME_PREFIX + "0";
String instance1 = INSTANCE_NAME_PREFIX + "1";
String instance2 = INSTANCE_NAME_PREFIX + "2";
Map<String, Map<String, String>> unbalancedAssignment = new TreeMap<>();
SEGMENTS.forEach(segName -> unbalancedAssignment.put(segName, ImmutableMap
.of(instance0, SegmentStateModel.ONLINE, instance1, SegmentStateModel.ONLINE, instance2,
SegmentStateModel.ONLINE)));
Map<String, Map<String, String>> balancedAssignment = _segmentAssignmentWithPartition
.rebalanceTable(unbalancedAssignment, _instancePartitionsMapWithoutPartition, null, null,
new BaseConfiguration());
int[] actualNumSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment, INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
Arrays.fill(expectedNumSegmentsAssignedPerInstance, NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES);
assertEquals(actualNumSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance);
}
@Test
public void testOneReplicaWithPartition() {
// Mock HelixManager
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
List<ZNRecord> segmentZKMetadataZNRecords = new ArrayList<>(NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
int partitionId = segmentId % NUM_PARTITIONS;
segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
new ColumnPartitionMetadata(null, NUM_PARTITIONS, Collections.singleton(partitionId), null))));
ZNRecord segmentZKMetadataZNRecord = segmentZKMetadata.toZNRecord();
when(propertyStore.get(
eq(ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME_WITH_PARTITION, segmentName)),
any(), anyInt())).thenReturn(segmentZKMetadataZNRecord);
segmentZKMetadataZNRecords.add(segmentZKMetadataZNRecord);
}
when(propertyStore
.getChildren(eq(ZKMetadataProvider.constructPropertyStorePathForResource(OFFLINE_TABLE_NAME_WITH_PARTITION)),
any(), anyInt(), anyInt(), anyInt())).thenReturn(segmentZKMetadataZNRecords);
HelixManager helixManager = mock(HelixManager.class);
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
int numInstancesPerPartition = NUM_INSTANCES / NUM_PARTITIONS;
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
new ReplicaGroupStrategyConfig(PARTITION_COLUMN, numInstancesPerPartition);
TableConfig tableConfigWithPartitions =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION).setNumReplicas(1)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(helixManager, tableConfigWithPartitions);
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5],
// 1_0=[instance_6, instance_7, instance_8, instance_9, instance_10, instance_11],
// 2_0=[instance_12, instance_13, instance_14, instance_15, instance_16, instance_17],
// }
InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
int instanceIdToAdd = 0;
for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
List<String> instancesForPartition = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
}
instancePartitions.setInstances(partitionId, 0, instancesForPartition);
}
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions);
// Test assignment
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
List<String> instancesAssigned =
segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
assertEquals(instancesAssigned.size(), 1);
// Segment 0 (partition 0) should be assigned to instance 0
// Segment 1 (partition 1) should be assigned to instance 6
// Segment 2 (partition 2) should be assigned to instance 12
// Segment 3 (partition 0) should be assigned to instance 1
// Segment 4 (partition 1) should be assigned to instance 7
// Segment 5 (partition 2) should be assigned to instance 13
// Segment 6 (partition 0) should be assigned to instance 2
// Segment 7 (partition 1) should be assigned to instance 8
// ...
int partitionId = segmentId % NUM_PARTITIONS;
int expectedAssignedInstanceId =
(segmentId % NUM_INSTANCES) / NUM_PARTITIONS + partitionId * numInstancesPerPartition;
assertEquals(instancesAssigned.get(0), INSTANCES.get(expectedAssignedInstanceId));
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Current assignment should already be balanced
assertEquals(
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, new BaseConfiguration()),
currentAssignment);
// Test bootstrap
// Bootstrap table should reassign all segments based on their alphabetical order within the partition
Configuration rebalanceConfig = new BaseConfiguration();
rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
Map<String, Map<String, String>> newAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
int numSegmentsPerPartition = NUM_SEGMENTS / NUM_PARTITIONS;
String[][] partitionIdToSegmentsMap = new String[NUM_PARTITIONS][numSegmentsPerPartition];
for (int i = 0; i < NUM_SEGMENTS; i++) {
partitionIdToSegmentsMap[i % NUM_PARTITIONS][i / NUM_PARTITIONS] = SEGMENTS.get(i);
}
String[][] partitionIdToSortedSegmentsMap = new String[NUM_PARTITIONS][numSegmentsPerPartition];
for (int i = 0; i < NUM_PARTITIONS; i++) {
String[] sortedSegments = new String[numSegmentsPerPartition];
System.arraycopy(partitionIdToSegmentsMap[i], 0, sortedSegments, 0, numSegmentsPerPartition);
Arrays.sort(sortedSegments);
partitionIdToSortedSegmentsMap[i] = sortedSegments;
}
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numSegmentsPerPartition; j++) {
assertEquals(newAssignment.get(partitionIdToSortedSegmentsMap[i][j]),
currentAssignment.get(partitionIdToSegmentsMap[i][j]));
}
}
}
}