blob: bbe3f2fe4d6214600ff93572e42abd9f1cc13cfc [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pinot.controller.helix.core.assignment.segment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.Pairs;
* Utility class for segment assignment.
public class SegmentAssignmentUtils {
private SegmentAssignmentUtils() {
* Returns the number of segments assigned to each instance.
public static int[] getNumSegmentsAssignedPerInstance(Map<String, Map<String, String>> segmentAssignment,
List<String> instances) {
int[] numSegmentsPerInstance = new int[instances.size()];
Map<String, Integer> instanceNameToIdMap = getInstanceNameToIdMap(instances);
for (Map<String, String> instanceStateMep : segmentAssignment.values()) {
for (String instanceName : instanceStateMep.keySet()) {
Integer instanceId = instanceNameToIdMap.get(instanceName);
if (instanceId != null) {
return numSegmentsPerInstance;
private static Map<String, Integer> getInstanceNameToIdMap(List<String> instances) {
int numInstances = instances.size();
Map<String, Integer> instanceNameToIdMap = new HashMap<>();
for (int i = 0; i < numInstances; i++) {
instanceNameToIdMap.put(instances.get(i), i);
return instanceNameToIdMap;
* Returns instances for non-replica-group based assignment.
public static List<String> getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions instancePartitions,
int replication) {
.checkState(instancePartitions.getNumReplicaGroups() == 1 && instancePartitions.getNumPartitions() == 1,
"Instance partitions: %s should contain 1 replica and 1 partition for non-replica-group based assignment",
List<String> instances = instancePartitions.getInstances(0, 0);
int numInstances = instances.size();
Preconditions.checkState(numInstances >= replication,
"There are less instances: %s in instance partitions: %s than the table replication: %s", numInstances,
instancePartitions.getInstancePartitionsName(), replication);
return instances;
* Assigns the segment for the non-replica-group based segment assignment strategy and returns the assigned instances.
public static List<String> assignSegmentWithoutReplicaGroup(Map<String, Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, int replication) {
List<String> instances = getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, replication);
int[] numSegmentsAssignedPerInstance = getNumSegmentsAssignedPerInstance(currentAssignment, instances);
int numInstances = numSegmentsAssignedPerInstance.length;
PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator());
for (int instanceId = 0; instanceId < numInstances; instanceId++) {
heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId));
List<String> instancesAssigned = new ArrayList<>(replication);
for (int i = 0; i < replication; i++) {
return instancesAssigned;
* Assigns the segment for the replica-group based segment assignment strategy and returns the assigned instances.
public static List<String> assignSegmentWithReplicaGroup(Map<String, Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, int partitionId) {
// First assign the segment to replica-group 0
List<String> instances = instancePartitions.getInstances(partitionId, 0);
int[] numSegmentsAssignedPerInstance = getNumSegmentsAssignedPerInstance(currentAssignment, instances);
int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0];
int instanceIdWithLeastSegmentsAssigned = 0;
int numInstances = numSegmentsAssignedPerInstance.length;
for (int instanceId = 1; instanceId < numInstances; instanceId++) {
if (numSegmentsAssignedPerInstance[instanceId] < minNumSegmentsAssigned) {
minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId];
instanceIdWithLeastSegmentsAssigned = instanceId;
// Mirror the assignment to all replica-groups
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
List<String> instancesAssigned = new ArrayList<>(numReplicaGroups);
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
.add(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
return instancesAssigned;
* Rebalances the table with Helix AutoRebalanceStrategy.
public static Map<String, Map<String, String>> rebalanceTableWithHelixAutoRebalanceStrategy(
Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) {
// Use Helix AutoRebalanceStrategy to rebalance the table
LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
states.put(SegmentStateModel.ONLINE, replication);
AutoRebalanceStrategy autoRebalanceStrategy =
new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states);
// Make a copy of the current assignment because this step might change the passed in assignment
Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap));
return autoRebalanceStrategy.computePartitionAssignment(instances, instances, currentAssignmentCopy, null)
* Rebalances the table for the replica-group based segment assignment strategy by uniformly spraying group of
* segments belonging to each instancePartitionId to the instances of that instance partition.
public static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions,
Map<Integer, List<String>> instancePartitionIdToSegmentsMap) {
Map<String, Map<String, String>> newAssignment = new TreeMap<>();
for (Map.Entry<Integer, List<String>> entry : instancePartitionIdToSegmentsMap.entrySet()) {
// Uniformly spray the segment partitions over the instance partitions
int instancePartitionId = entry.getKey();
List<String> segments = entry.getValue();
rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, instancePartitionId, segments,
return newAssignment;
* Rebalances one partition of the table for the replica-group based segment assignment strategy.
* <ul>
* <li>
* 1. Calculate the target number of segments on each instance
* </li>
* <li>
* 2. Loop over all the segments and keep the assignment if target number of segments for the instance has not
* been reached and track the not assigned segments
* </li>
* <li>
* 3. Assign the left-over segments to the instances with the least segments, or the smallest index if there is a
* tie
* </li>
* <li>
* 4. Mirror the assignment to other replica-groups
* </li>
* </ul>
public static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, int partitionId, List<String> segments,
Map<String, Map<String, String>> newAssignment) {
// Fetch instances in replica-group 0
List<String> instances = instancePartitions.getInstances(partitionId, 0);
Map<String, Integer> instanceNameToIdMap = getInstanceNameToIdMap(instances);
// Calculate target number of segments per instance
// NOTE: in order to minimize the segment movements, use the ceiling of the quotient
int numInstances = instances.size();
int numSegments = segments.size();
int targetNumSegmentsPerInstance = (numSegments + numInstances - 1) / numInstances;
// Do not move segment if target number of segments is not reached, track the segments need to be moved
int[] numSegmentsAssignedPerInstance = new int[numInstances];
List<String> segmentsNotAssigned = new ArrayList<>();
for (String segmentName : segments) {
boolean segmentAssigned = false;
for (String instanceName : currentAssignment.get(segmentName).keySet()) {
Integer instanceId = instanceNameToIdMap.get(instanceName);
if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < targetNumSegmentsPerInstance) {
.put(segmentName, getReplicaGroupBasedInstanceStateMap(instancePartitions, partitionId, instanceId));
segmentAssigned = true;
if (!segmentAssigned) {
// Assign each not assigned segment to the instance with the least segments, or the smallest id if there is a tie
PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator());
for (int instanceId = 0; instanceId < numInstances; instanceId++) {
heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId));
for (String segmentName : segmentsNotAssigned) {
Pairs.IntPair intPair = heap.remove();
int instanceId = intPair.getRight();
newAssignment.put(segmentName, getReplicaGroupBasedInstanceStateMap(instancePartitions, partitionId, instanceId));
intPair.setLeft(intPair.getLeft() + 1);
* Returns the map from instance name to Helix partition state for the replica-group based segment assignment
* strategy, which can be put into the segment assignment. The instances are picked from the instance partitions by
* the given partition id and instance id.
private static Map<String, String> getReplicaGroupBasedInstanceStateMap(InstancePartitions instancePartitions,
int partitionId, int instanceId) {
Map<String, String> instanceStateMap = new TreeMap<>();
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
.put(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceId), SegmentStateModel.ONLINE);
return instanceStateMap;
* Returns the map from instance name to Helix partition state, which can be put into the segment assignment.
public static Map<String, String> getInstanceStateMap(Collection<String> instances, String state) {
Map<String, String> instanceStateMap = new TreeMap<>();
for (String instanceName : instances) {
instanceStateMap.put(instanceName, state);
return instanceStateMap;
* Returns a map from instance name to number of segments to be moved to it.
public static Map<String, Integer> getNumSegmentsToBeMovedPerInstance(Map<String, Map<String, String>> oldAssignment,
Map<String, Map<String, String>> newAssignment) {
Map<String, Integer> numSegmentsToBeMovedPerInstance = new TreeMap<>();
for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) {
String segmentName = entry.getKey();
Set<String> newInstancesAssigned = entry.getValue().keySet();
Set<String> oldInstancesAssigned = oldAssignment.get(segmentName).keySet();
// For each new assigned instance, check if the segment needs to be moved to it
for (String instanceName : newInstancesAssigned) {
if (!oldInstancesAssigned.contains(instanceName)) {
numSegmentsToBeMovedPerInstance.merge(instanceName, 1, Integer::sum);
return numSegmentsToBeMovedPerInstance;
* Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments.
static class CompletedConsumingOfflineSegmentAssignment {
private final Map<String, Map<String, String>> _completedSegmentAssignment = new TreeMap<>();
private final Map<String, Map<String, String>> _consumingSegmentAssignment = new TreeMap<>();
private final Map<String, Map<String, String>> _offlineSegmentAssignment = new TreeMap<>();
// NOTE: split the segments based on the following criteria:
// 1. At least one instance ONLINE -> COMPLETED segment
// 2. At least one instance CONSUMING -> CONSUMING segment
// 3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment
CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) {
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
_completedSegmentAssignment.put(segmentName, instanceStateMap);
} else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
_consumingSegmentAssignment.put(segmentName, instanceStateMap);
} else {
_offlineSegmentAssignment.put(segmentName, instanceStateMap);
Map<String, Map<String, String>> getCompletedSegmentAssignment() {
return _completedSegmentAssignment;
Map<String, Map<String, String>> getConsumingSegmentAssignment() {
return _consumingSegmentAssignment;
Map<String, Map<String, String>> getOfflineSegmentAssignment() {
return _offlineSegmentAssignment;
* Takes a segment assignment and splits them up based on which tiers the segments are eligible for. Only considers
* ONLINE segments.
* Tiers are selected according to the order provided in the tiers list.
static class TierSegmentAssignment {
private final Map<String, Map<String, Map<String, String>>> _tierNameToSegmentAssignmentMap = new TreeMap<>();
private final Map<String, Map<String, String>> _nonTierSegmentAssignment = new TreeMap<>();
* Creates a TierSegmentAssignment from the given segmentAssignment
* @param tableNameWithType table to which the segment assignment belongs
* @param sortedTiers list of tiers, pre-sorted as per desired order by caller
* @param segmentAssignment segment assignment of the table
TierSegmentAssignment(String tableNameWithType, List<Tier> sortedTiers,
Map<String, Map<String, String>> segmentAssignment) {
// initialize tier to segmentAssignment map
sortedTiers.forEach(t -> _tierNameToSegmentAssignmentMap.put(t.getName(), new TreeMap<>()));
// iterate over all segments
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
boolean selected = false;
// only consider ONLINE segments for tiers
if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
// find an eligible tier for the segment, from the ordered list of tiers
for (Tier tier : sortedTiers) {
if (tier.getSegmentSelector().selectSegment(tableNameWithType, segmentName)) {
_tierNameToSegmentAssignmentMap.get(tier.getName()).put(segmentName, instanceStateMap);
selected = true;
// if segment not eligible for any tier, put in ordinary segments map
if (!selected) {
_nonTierSegmentAssignment.put(segmentName, instanceStateMap);
// remove tiers with no eligible segments
_tierNameToSegmentAssignmentMap.entrySet().removeIf(e -> e.getValue().isEmpty());
* Returns a map from tier name to segment assignments for segments which are eligible for that tier
public Map<String, Map<String, Map<String, String>>> getTierNameToSegmentAssignmentMap() {
return _tierNameToSegmentAssignmentMap;
* Returns segment assignments of segments which were not eligible for any tier
public Map<String, Map<String, String>> getNonTierSegmentAssignment() {
return _nonTierSegmentAssignment;
* Returns a partition id for offline table
public static int getOfflineSegmentPartitionId(String segmentName, String offlineTableName, HelixManager helixManager,
@Nullable String partitionColumn) {
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName, segmentName);
.checkState(segmentZKMetadata != null, "Failed to find segment ZK metadata for segment: %s of table: %s",
segmentName, offlineTableName);
return getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn);
private static int getPartitionId(SegmentZKMetadata segmentZKMetadata, String offlineTableName,
@Nullable String partitionColumn) {
String segmentName = segmentZKMetadata.getSegmentName();
ColumnPartitionMetadata partitionMetadata =
Preconditions.checkState(partitionMetadata != null,
"Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s",
segmentName, offlineTableName, partitionColumn);
Set<Integer> partitions = partitionMetadata.getPartitions();
Preconditions.checkState(partitions.size() == 1,
"Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s", segmentName,
offlineTableName, partitionColumn);
return partitions.iterator().next();
* Returns map of instance partition id to segments for offline tables
public static Map<Integer, List<String>> getOfflineInstancePartitionIdToSegmentsMap(Set<String> segments,
int numInstancePartitions, String offlineTableName, HelixManager helixManager, @Nullable String partitionColumn) {
// Fetch partition id from segment ZK metadata
List<SegmentZKMetadata> segmentsZKMetadata =
ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName);
Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
Set<String> segmentsWithoutZKMetadata = new HashSet<>(segments);
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
if (segmentsWithoutZKMetadata.remove(segmentName)) {
int partitionId = getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn);
int instancePartitionId = partitionId % numInstancePartitions;
instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName);
Preconditions.checkState(segmentsWithoutZKMetadata.isEmpty(), "Failed to find ZK metadata for segments: %s",
return instancePartitionIdToSegmentsMap;
* Returns a partition id for realtime table
public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
HelixManager helixManager, @Nullable String partitionColumn) {
Integer segmentPartitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, realtimeTableName, helixManager, partitionColumn);
if (segmentPartitionId == null) {
// This case is for the uploaded segments for which there's no partition information.
// A random, but consistent, partition id is calculated based on the hash code of the segment name.
// Note that '% 10K' is used to prevent having partition ids with large value which will be problematic later in
// instance assignment formula.
segmentPartitionId = Math.abs(segmentName.hashCode() % 10_000);
return segmentPartitionId;
* Returns map of instance partition id to segments for realtime tables
public static Map<Integer, List<String>> getRealtimeInstancePartitionIdToSegmentsMap(Set<String> segments,
int numInstancePartitions, String realtimeTableName, HelixManager helixManager,
@Nullable String partitionColumn) {
Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
for (String segmentName : segments) {
int instancePartitionId =
getRealtimeSegmentPartitionId(segmentName, realtimeTableName, helixManager, partitionColumn)
% numInstancePartitions;
instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName);
return instancePartitionIdToSegmentsMap;