blob: 1db91cb9a4bbb900a538eaf4c830f090ff7579b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class);
private static HelixManager _helixManager;
private static String _tableName;
private static String _partitionColumn;
private int _replication;
private TableConfig _tableConfig;
@Override
public void init(HelixManager helixManager, TableConfig tableConfig) {
_helixManager = helixManager;
_tableConfig = tableConfig;
_tableName = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
_replication = validationAndRetentionConfig.getReplicationNumber();
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
validationAndRetentionConfig.getReplicaGroupStrategyConfig();
_partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
if (_partitionColumn == null) {
LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
+ "with replication: {} without partition column for table: {} ", _replication, _tableName);
} else {
LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
+ "with replication: {} and partition column: {} for table: {}", _replication, _partitionColumn, _tableName);
}
}
/**
* Assigns the segment for the replica-group based segment assignment strategy and returns the assigned instances.
*/
@Override
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
int numPartitions = instancePartitions.getNumPartitions();
checkReplication(instancePartitions, _replication, _tableName);
int partitionId;
if (_partitionColumn == null || numPartitions == 1) {
partitionId = 0;
} else {
// Uniformly spray the segment partitions over the instance partitions
if (_tableConfig.getTableType() == TableType.OFFLINE) {
partitionId = SegmentAssignmentUtils
.getOfflineSegmentPartitionId(segmentName, _tableName, _helixManager, _partitionColumn) % numPartitions;
} else {
partitionId = SegmentAssignmentUtils
.getRealtimeSegmentPartitionId(segmentName, _tableName, _helixManager, _partitionColumn) % numPartitions;
}
}
return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
}
@Override
public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
Map<String, Map<String, String>> newAssignment;
int numPartitions = instancePartitions.getNumPartitions();
checkReplication(instancePartitions, _replication, _tableName);
if (_partitionColumn == null || numPartitions == 1) {
// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the
// table name hash as the random seed for the shuffle so that the result is deterministic.
List<String> segments = new ArrayList<>(currentAssignment.keySet());
Collections.shuffle(segments, new Random(_tableName.hashCode()));
newAssignment = new TreeMap<>();
SegmentAssignmentUtils
.rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, segments, newAssignment);
return newAssignment;
} else {
Map<Integer, List<String>> instancePartitionIdToSegmentsMap;
if (_tableConfig.getTableType() == TableType.OFFLINE) {
instancePartitionIdToSegmentsMap = SegmentAssignmentUtils
.getOfflineInstancePartitionIdToSegmentsMap(currentAssignment.keySet(),
instancePartitions.getNumPartitions(), _tableName, _helixManager, _partitionColumn);
} else {
instancePartitionIdToSegmentsMap = SegmentAssignmentUtils
.getRealtimeInstancePartitionIdToSegmentsMap(currentAssignment.keySet(),
instancePartitions.getNumPartitions(), _tableName, _helixManager, _partitionColumn);
}
// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the
// table name hash as the random seed for the shuffle so that the result is deterministic.
Random random = new Random(_tableName.hashCode());
for (List<String> segments : instancePartitionIdToSegmentsMap.values()) {
Collections.shuffle(segments, random);
}
return SegmentAssignmentUtils
.rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap);
}
}
/**
* Helper method to check whether the number of replica-groups matches the table replication for replica-group based
* instance partitions. Log a warning if they do not match and use the one inside the instance partitions. The
* mismatch can happen when table is not configured correctly (table replication and numReplicaGroups does not match
* or replication changed without reassigning instances).
*/
private static void checkReplication(InstancePartitions instancePartitions, int replication, String tableName) {
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
if (numReplicaGroups != replication) {
LOGGER.warn(
"Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for "
+ "table: {}, using: {}", instancePartitions.getInstancePartitionsName(), numReplicaGroups, replication,
tableName, numReplicaGroups);
}
}
}