blob: ccf2f5caaed8cf89657300e8e7b6fe9a07f5051a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.structure.BalanceTreeMap;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.partition.DataPartitionPolicyTable;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The SeriesPartitionSlotBalancer provides interfaces to generate optimal Partition allocation and
* migration plans
*/
public class PartitionBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionBalancer.class);
private final IManager configManager;
// Map<DatabaseName, DataPartitionPolicyTable>
private final Map<String, DataPartitionPolicyTable> dataPartitionPolicyTableMap;
public PartitionBalancer(IManager configManager) {
this.configManager = configManager;
this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>();
}
/**
* Allocate SchemaPartitions
*
* @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
* @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
throws NoAvailableRegionGroupException {
Map<String, SchemaPartitionTable> result = new HashMap<>();
for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
unassignedSchemaPartitionSlotsMap.entrySet()) {
final String database = slotsMapEntry.getKey();
final List<TSeriesPartitionSlot> unassignedPartitionSlots = slotsMapEntry.getValue();
// Filter available SchemaRegionGroups and
// sort them by the number of allocated SchemaPartitions
BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>();
List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
getPartitionManager()
.getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.SchemaRegion);
for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
counter.put(pair.getRight(), pair.getLeft().intValue());
}
// Enumerate SeriesPartitionSlot
Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new HashMap<>();
for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) {
// Greedy allocation: allocate the unassigned SchemaPartition to
// the RegionGroup whose allocated SchemaPartitions is the least
TConsensusGroupId consensusGroupId = counter.getKeyWithMinValue();
schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
counter.put(consensusGroupId, counter.get(consensusGroupId) + 1);
}
result.put(database, new SchemaPartitionTable(schemaPartitionMap));
}
return result;
}
/**
* Allocate DataPartitions
*
* @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
* @return Map<DatabaseName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
throws NoAvailableRegionGroupException {
Map<String, DataPartitionTable> result = new HashMap<>();
for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> slotsMapEntry :
unassignedDataPartitionSlotsMap.entrySet()) {
final String database = slotsMapEntry.getKey();
final Map<TSeriesPartitionSlot, TTimeSlotList> unassignedPartitionSlotsMap =
slotsMapEntry.getValue();
// Filter available DataRegionGroups and
// sort them by the number of allocated DataPartitions
BalanceTreeMap<TConsensusGroupId, Integer> availableDataRegionGroupCounter =
new BalanceTreeMap<>();
List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
getPartitionManager()
.getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.DataRegion);
for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
availableDataRegionGroupCounter.put(pair.getRight(), pair.getLeft().intValue());
}
DataPartitionTable dataPartitionTable = new DataPartitionTable();
DataPartitionPolicyTable allotTable = dataPartitionPolicyTableMap.get(database);
try {
allotTable.acquireLock();
// Enumerate SeriesPartitionSlot
for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry :
unassignedPartitionSlotsMap.entrySet()) {
SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
// Enumerate TimePartitionSlot in ascending order
TSeriesPartitionSlot seriesPartitionSlot = seriesPartitionEntry.getKey();
List<TTimePartitionSlot> timePartitionSlots =
seriesPartitionEntry.getValue().getTimePartitionSlots();
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
// 1. The historical DataPartition will try to inherit successor DataPartition first
TConsensusGroupId successor =
getPartitionManager()
.getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot);
if (successor != null && availableDataRegionGroupCounter.containsKey(successor)) {
seriesPartitionTable.putDataPartition(timePartitionSlot, successor);
availableDataRegionGroupCounter.put(
successor, availableDataRegionGroupCounter.get(successor) + 1);
continue;
}
// 2. Assign DataPartition base on the DataAllotTable
TConsensusGroupId allotGroupId =
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
if (availableDataRegionGroupCounter.containsKey(allotGroupId)) {
seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId);
availableDataRegionGroupCounter.put(
allotGroupId, availableDataRegionGroupCounter.get(allotGroupId) + 1);
continue;
}
// 3. The allotDataRegionGroup is unavailable,
// try to inherit predecessor DataPartition
TConsensusGroupId predecessor =
getPartitionManager()
.getPredecessorDataPartition(database, seriesPartitionSlot, timePartitionSlot);
if (predecessor != null && availableDataRegionGroupCounter.containsKey(predecessor)) {
seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor);
availableDataRegionGroupCounter.put(
predecessor, availableDataRegionGroupCounter.get(predecessor) + 1);
continue;
}
// 4. Assign the DataPartition to DataRegionGroup with the least DataPartitions
// If the above DataRegionGroups are unavailable
TConsensusGroupId greedyGroupId = availableDataRegionGroupCounter.getKeyWithMinValue();
seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId);
availableDataRegionGroupCounter.put(
greedyGroupId, availableDataRegionGroupCounter.get(greedyGroupId) + 1);
LOGGER.warn(
"[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will be allocated to DataRegionGroup: {}, because the original target: {} is currently unavailable.",
seriesPartitionSlot,
timePartitionSlot,
greedyGroupId,
allotGroupId);
}
dataPartitionTable
.getDataPartitionMap()
.put(seriesPartitionEntry.getKey(), seriesPartitionTable);
}
} finally {
allotTable.releaseLock();
}
result.put(database, dataPartitionTable);
}
return result;
}
/**
* Re-balance the DataPartitionPolicyTable.
*
* @param database Database name
*/
public void reBalanceDataPartitionPolicy(String database) {
try {
DataPartitionPolicyTable dataPartitionPolicyTable =
dataPartitionPolicyTableMap.computeIfAbsent(
database, empty -> new DataPartitionPolicyTable());
try {
dataPartitionPolicyTable.acquireLock();
dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
dataPartitionPolicyTable.logDataAllotTable(database);
} finally {
dataPartitionPolicyTable.releaseLock();
}
} catch (DatabaseNotExistsException e) {
LOGGER.error("Database {} not exists when updateDataAllotTable", database);
}
}
/** Set up the PartitionBalancer when the current ConfigNode becomes leader. */
public void setupPartitionBalancer() {
dataPartitionPolicyTableMap.clear();
getClusterSchemaManager()
.getDatabaseNames()
.forEach(
database -> {
DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable();
dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable);
try {
dataPartitionPolicyTable.acquireLock();
// Put all DataRegionGroups into the DataPartitionPolicyTable
dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
getPartitionManager()
.getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
// Load the last DataAllotTable
dataPartitionPolicyTable.setDataAllotMap(
getPartitionManager().getLastDataAllotTable(database));
} catch (DatabaseNotExistsException e) {
LOGGER.error("Database {} not exists when setupPartitionBalancer", database);
} finally {
dataPartitionPolicyTable.releaseLock();
}
});
}
/** Clear the PartitionBalancer when the current ConfigNode is no longer the leader. */
public void clearPartitionBalancer() {
dataPartitionPolicyTableMap.clear();
}
public void clearDataPartitionPolicyTable(String database) {
dataPartitionPolicyTableMap.remove(database);
}
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
}