blob: 43dcf460383f7aff0520ef59eb00bb87f0b6d207 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import java.util.List;
import java.util.Map;
/**
* The RegionBalancer provides interfaces to generate optimal Region allocation and migration plans
*/
public class RegionBalancer {
private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
ConfigNodeDescriptor.getInstance().getConf();
private final IManager configManager;
public RegionBalancer(IManager configManager) {
this.configManager = configManager;
}
/**
* Generate a Regions' allocation plan(CreateRegionsPlan)
*
* @param allotmentMap Map<StorageGroupName, Region allotment>
* @param consensusGroupType TConsensusGroupType of the new Regions
* @return CreateRegionsPlan
* @throws NotEnoughDataNodeException When the number of DataNodes is not enough for allocation
* @throws StorageGroupNotExistsException When some StorageGroups don't exist
*/
public CreateRegionGroupsPlan genRegionsAllocationPlan(
Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
// The new Regions will occupy online DataNodes firstly
List<TDataNodeConfiguration> onlineDataNodes =
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
// Some new Regions will have to occupy unknown DataNodes
// if the number of online DataNodes is insufficient
List<TDataNodeConfiguration> availableDataNodes =
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown);
// Make sure the number of available DataNodes is enough for allocating new Regions
for (String storageGroup : allotmentMap.keySet()) {
int replicationFactor =
getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
if (availableDataNodes.size() < replicationFactor) {
throw new NotEnoughDataNodeException();
}
}
CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
IRegionAllocator regionAllocator = genRegionAllocator();
// Only considering the specified ConsensusGroupType when doing allocation
List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
allocatedRegions.removeIf(
allocateRegion -> allocateRegion.getRegionId().getType() != consensusGroupType);
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
String storageGroup = entry.getKey();
int allotment = entry.getValue();
int replicationFactor =
getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
List<TDataNodeConfiguration> targetDataNodes =
onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : availableDataNodes;
for (int i = 0; i < allotment; i++) {
// Generate allocation plan
TRegionReplicaSet newRegion =
regionAllocator.allocateRegion(
targetDataNodes,
allocatedRegions,
replicationFactor,
new TConsensusGroupId(
consensusGroupType, getPartitionManager().generateNextRegionGroupId()));
createRegionGroupsPlan.addRegionGroup(storageGroup, newRegion);
allocatedRegions.add(newRegion);
}
}
return createRegionGroupsPlan;
}
private IRegionAllocator genRegionAllocator() {
RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
CONFIG_NODE_CONFIG.getRegionAllocateStrategy();
if (regionAllocateStrategy == null) {
return new GreedyRegionAllocator();
}
switch (regionAllocateStrategy) {
case COPY_SET:
return new CopySetRegionAllocator();
default:
return new GreedyRegionAllocator();
}
}
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
/** region allocate strategy */
public enum RegionAllocateStrategy {
COPY_SET,
GREEDY
}
}