blob: 8cef7ef57ec83ecadd154e42a4ca892310bb1276 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class BalancedNumSegmentAssignmentStrategyTest {
private static final int NUM_REPLICAS = 3;
private static final String SEGMENT_NAME_PREFIX = "segment_";
private static final int NUM_SEGMENTS = 100;
private static final List<String> SEGMENTS =
SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS);
private static final String INSTANCE_NAME_PREFIX = "instance_";
private static final int NUM_INSTANCES = 10;
private static final List<String> INSTANCES =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, NUM_INSTANCES);
private static final String RAW_TABLE_NAME = "assignmentTable";
private static final String INSTANCE_PARTITIONS_NAME =
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME);
private SegmentAssignment _segmentAssignment;
private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap;
@BeforeClass
public void setUp() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
_segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig);
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7,
// instance_8, instance_9]
// }
InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME);
instancePartitions.setInstances(0, 0, INSTANCES);
_instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions);
}
@Test
public void testFactory() {
assertTrue(_segmentAssignment instanceof OfflineSegmentAssignment);
}
@Test
public void testAssignSegment() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Segment 0 should be assigned to instance 0, 1, 2
// Segment 1 should be assigned to instance 3, 4, 5
// Segment 2 should be assigned to instance 6, 7, 8
// Segment 3 should be assigned to instance 9, 0, 1
// Segment 4 should be assigned to instance 2, 3, 4
// ...
int expectedAssignedInstanceId = 0;
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId));
expectedAssignedInstanceId = (expectedAssignedInstanceId + 1) % NUM_INSTANCES;
}
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
}
@Test
public void testTableBalanced() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// There should be 100 segments assigned
assertEquals(currentAssignment.size(), NUM_SEGMENTS);
// Each segment should have 3 replicas
for (Map<String, String> instanceStateMap : currentAssignment.values()) {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
// Each instance should have 30 segments assigned
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
int numSegmentsPerInstance = NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES;
Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
assertEquals(_segmentAssignment
.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new BaseConfiguration()),
currentAssignment);
}
@Test
public void testBootstrapTable() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their alphabetical order
Configuration rebalanceConfig = new BaseConfiguration();
rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
Map<String, Map<String, String>> newAssignment =
_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
List<String> sortedSegments = new ArrayList<>(SEGMENTS);
sortedSegments.sort(null);
for (int i = 0; i < NUM_SEGMENTS; i++) {
assertEquals(newAssignment.get(sortedSegments.get(i)), currentAssignment.get(SEGMENTS.get(i)));
}
}
}