blob: b8e385d3f484d45292cd78b981cec55718f0a745 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.load;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.NodeManager;
import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
* dynamic load balancing policy and passively accepts the PartitionTable expansion request.
*/
public class LoadManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private final IManager configManager;
/** Balancers */
private final RegionBalancer regionBalancer;
private final PartitionBalancer partitionBalancer;
private final RouteBalancer routeBalancer;
/** Load balancing executor service */
private Future<?> currentLoadBalancingFuture;
private final ScheduledExecutorService loadBalancingExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
// Monitor for leadership change
private final Object scheduleMonitor = new Object();
public LoadManager(IManager configManager) {
this.configManager = configManager;
this.regionBalancer = new RegionBalancer(configManager);
this.partitionBalancer = new PartitionBalancer(configManager);
this.routeBalancer = new RouteBalancer(configManager);
MetricService.getInstance().addMetricSet(new LoadManagerMetrics(configManager));
}
/**
* Generate an optimal CreateRegionGroupsPlan
*
* @param allotmentMap Map<StorageGroupName, Region allotment>
* @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
* @return CreateRegionGroupsPlan
* @throws NotEnoughDataNodeException If there are not enough DataNodes
* @throws StorageGroupNotExistsException If some specific StorageGroups don't exist
*/
public CreateRegionGroupsPlan allocateRegionGroups(
Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
return regionBalancer.genRegionsAllocationPlan(allotmentMap, consensusGroupType);
}
/**
* Allocate SchemaPartitions
*
* @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
* @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
}
/**
* Allocate DataPartitions
*
* @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
* @return Map<StorageGroupName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
unassignedDataPartitionSlotsMap) {
return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
}
/**
* Generate an optimal real-time read/write requests routing policy.
*
* @return Map<TConsensusGroupId, TRegionReplicaSet>, The routing policy of read/write requests
* for each Region is based on the order in the TRegionReplicaSet. The replica with higher
* sorting result have higher priority.
*/
public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap() {
// Always take the latest locations of RegionGroups as the input parameter
return routeBalancer.genLatestRegionRouteMap(getPartitionManager().getAllReplicaSets());
}
/** Start the load balancing service */
public void startLoadBalancingService() {
synchronized (scheduleMonitor) {
if (currentLoadBalancingFuture == null) {
currentLoadBalancingFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
loadBalancingExecutor,
this::updateNodeLoadStatistic,
0,
NodeManager.HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS);
LOGGER.info("LoadBalancing service is started successfully.");
}
}
}
/** Stop the load balancing service */
public void stopLoadBalancingService() {
synchronized (scheduleMonitor) {
if (currentLoadBalancingFuture != null) {
currentLoadBalancingFuture.cancel(false);
currentLoadBalancingFuture = null;
LOGGER.info("LoadBalancing service is stopped successfully.");
}
}
}
private void updateNodeLoadStatistic() {
AtomicBoolean existDataNodeChangesStatus = new AtomicBoolean(false);
AtomicBoolean existSchemaRegionGroupChangesLeader = new AtomicBoolean(false);
AtomicBoolean existDataRegionGroupChangesLeader = new AtomicBoolean(false);
boolean isNeedBroadcast = false;
getNodeManager()
.getNodeCacheMap()
.values()
.forEach(
nodeCache -> {
boolean updateResult = nodeCache.updateNodeStatus();
if (nodeCache instanceof DataNodeHeartbeatCache) {
// Check if some DataNodes changes status
existDataNodeChangesStatus.compareAndSet(false, updateResult);
}
});
getPartitionManager()
.getRegionGroupCacheMap()
.values()
.forEach(
regionGroupCache -> {
boolean updateResult = regionGroupCache.updateRegionStatistics();
switch (regionGroupCache.getConsensusGroupId().getType()) {
// Check if some RegionGroups change their leader
case SchemaRegion:
existSchemaRegionGroupChangesLeader.compareAndSet(false, updateResult);
break;
case DataRegion:
existDataRegionGroupChangesLeader.compareAndSet(false, updateResult);
break;
}
});
if (existDataNodeChangesStatus.get()) {
// The RegionRouteMap must be broadcast if some DataNodes change status
isNeedBroadcast = true;
}
if (RouteBalancer.LEADER_POLICY.equals(CONF.getRoutingPolicy())) {
// Check the condition of leader routing policy
if (existSchemaRegionGroupChangesLeader.get()) {
// Broadcast the RegionRouteMap if some SchemaRegionGroups change their leader
isNeedBroadcast = true;
}
if (!ConsensusFactory.MultiLeaderConsensus.equals(CONF.getDataRegionConsensusProtocolClass())
&& existDataRegionGroupChangesLeader.get()) {
// Broadcast the RegionRouteMap if some DataRegionGroups change their leader
// and the consensus protocol isn't MultiLeader
isNeedBroadcast = true;
}
}
if (isNeedBroadcast) {
broadcastLatestRegionRouteMap();
}
}
public void broadcastLatestRegionRouteMap() {
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
getNodeManager()
.filterDataNodeThroughStatus(NodeStatus.Running)
.forEach(
onlineDataNode ->
dataNodeLocationMap.put(
onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
LOGGER.info("[latestRegionRouteMap] Begin to broadcast RegionRouteMap:");
long broadcastTime = System.currentTimeMillis();
printRegionRouteMap(broadcastTime, latestRegionRouteMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
dataNodeLocationMap,
DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
null);
LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap finished.");
}
public static void printRegionRouteMap(
long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> regionRouteMap) {
LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
LOGGER.info("[latestRegionRouteMap] RegionRouteMap:");
for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> entry : regionRouteMap.entrySet()) {
LOGGER.info(
"[latestRegionRouteMap]\t {}={}",
entry.getKey(),
entry.getValue().getDataNodeLocations().stream()
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toList()));
}
}
public RouteBalancer getRouteBalancer() {
return routeBalancer;
}
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
}