blob: 9516b2501b8349aed5e32a7ec09dbca5c0a13ede [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/** The RouteBalancer guides the cluster RegionGroups' leader distribution and routing priority. */
public class RouteBalancer implements IClusterStatusSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(RouteBalancer.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
CONF.getSchemaRegionConsensusProtocolClass();
private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
CONF.getDataRegionConsensusProtocolClass();
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION =
(CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&& ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate itself as the leader
|| ConsensusFactory.SIMPLE_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION =
(CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&& ConsensusFactory.RATIS_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate itself as the leader
|| ConsensusFactory.SIMPLE_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
private static final long REGION_PRIORITY_WAITING_TIMEOUT =
Math.max(
ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2),
TimeUnit.SECONDS.toMillis(10));
private static final long WAIT_PRIORITY_INTERVAL = 10;
private final IManager configManager;
// For generating optimal Region leader distribution
private final AbstractLeaderBalancer leaderBalancer;
// For generating optimal cluster Region routing priority
private final IPriorityBalancer priorityRouter;
private final ReentrantReadWriteLock priorityMapLock;
// Map<RegionGroupId, Region priority>
// The client requests are preferentially routed to the Region with the lowest index in the
// TRegionReplicaSet
private final Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap;
// The interval of retrying to balance ratis leader after the last failed time
private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 20 * 1000L * 1000L * 1000L;
private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
public RouteBalancer(IManager configManager) {
this.configManager = configManager;
this.priorityMapLock = new ReentrantReadWriteLock();
this.regionPriorityMap = new TreeMap<>();
this.lastFailedTimeForLeaderBalance = new TreeMap<>();
switch (CONF.getLeaderDistributionPolicy()) {
case AbstractLeaderBalancer.GREEDY_POLICY:
this.leaderBalancer = new GreedyLeaderBalancer();
break;
case AbstractLeaderBalancer.CFD_POLICY:
default:
this.leaderBalancer = new MinCostFlowLeaderBalancer();
break;
}
switch (CONF.getRoutePriorityPolicy()) {
case IPriorityBalancer.GREEDY_POLICY:
this.priorityRouter = new GreedyPriorityBalancer();
break;
case IPriorityBalancer.LEADER_POLICY:
default:
this.priorityRouter = new LeaderPriorityBalancer();
break;
}
}
/** Balance cluster RegionGroup leader distribution through configured algorithm. */
private synchronized void balanceRegionLeader() {
if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
balanceRegionLeader(TConsensusGroupType.SchemaRegion, SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
}
if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
balanceRegionLeader(TConsensusGroupType.DataRegion, DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
}
}
private void balanceRegionLeader(
TConsensusGroupType regionGroupType, String consensusProtocolClass) {
// Collect the latest data and generate the optimal leader distribution
Map<TConsensusGroupId, Integer> currentLeaderMap = getLoadManager().getRegionLeaderMap();
Map<TConsensusGroupId, Integer> optimalLeaderMap =
leaderBalancer.generateOptimalLeaderDistribution(
getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
getPartitionManager().getAllReplicaSetsMap(regionGroupType),
currentLeaderMap,
getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
// Transfer leader to the optimal distribution
long currentTime = System.nanoTime();
AtomicInteger requestId = new AtomicInteger(0);
AsyncClientHandler<TRegionLeaderChangeReq, TRegionLeaderChangeResp> clientHandler =
new AsyncClientHandler<>(DataNodeRequestType.CHANGE_REGION_LEADER);
Map<TConsensusGroupId, ConsensusGroupHeartbeatSample> successTransferMap = new TreeMap<>();
optimalLeaderMap.forEach(
(regionGroupId, newLeaderId) -> {
if (ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
&& currentTime - lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
<= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS) {
return;
}
if (newLeaderId != -1 && !newLeaderId.equals(currentLeaderMap.get(regionGroupId))) {
LOGGER.info(
"[LeaderBalancer] Try to change the leader of Region: {} to DataNode: {} ",
regionGroupId,
newLeaderId);
switch (consensusProtocolClass) {
case ConsensusFactory.IOT_CONSENSUS:
case ConsensusFactory.SIMPLE_CONSENSUS:
// For IoTConsensus or SimpleConsensus protocol, change RegionRouteMap is enough
successTransferMap.put(
regionGroupId, new ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
break;
case ConsensusFactory.RATIS_CONSENSUS:
default:
// For ratis protocol, the ConfigNode-leader will send a changeLeaderRequest to the
// new
// leader.
// And the RegionRouteMap will be updated by Cluster-Heartbeat-Service later if
// change
// leader success.
// Force update region leader for ratis consensus when replication factor is 1.
if (TConsensusGroupType.SchemaRegion.equals(regionGroupType)
&& CONF.getSchemaReplicationFactor() == 1) {
successTransferMap.put(
regionGroupId, new ConsensusGroupHeartbeatSample(0, newLeaderId));
} else if (TConsensusGroupType.DataRegion.equals(regionGroupType)
&& CONF.getDataReplicationFactor() == 1) {
successTransferMap.put(
regionGroupId, new ConsensusGroupHeartbeatSample(0, newLeaderId));
} else {
TDataNodeLocation newLeader =
getNodeManager().getRegisteredDataNode(newLeaderId).getLocation();
TRegionLeaderChangeReq regionLeaderChangeReq =
new TRegionLeaderChangeReq(regionGroupId, newLeader);
int requestIndex = requestId.getAndIncrement();
clientHandler.putRequest(requestIndex, regionLeaderChangeReq);
clientHandler.putDataNodeLocation(requestIndex, newLeader);
}
break;
}
}
});
if (requestId.get() > 0) {
// Don't retry ChangeLeader request
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler);
for (int i = 0; i < requestId.get(); i++) {
if (clientHandler.getResponseMap().get(i).getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successTransferMap.put(
clientHandler.getRequest(i).getRegionId(),
new ConsensusGroupHeartbeatSample(
clientHandler.getResponseMap().get(i).getConsensusLogicalTimestamp(),
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId()));
} else {
lastFailedTimeForLeaderBalance.put(
clientHandler.getRequest(i).getRegionId(), currentTime);
LOGGER.error(
"[LeaderBalancer] Failed to change the leader of Region: {} to DataNode: {}",
clientHandler.getRequest(i).getRegionId(),
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
}
}
}
getLoadManager().forceUpdateConsensusGroupCache(successTransferMap);
}
/** Balance cluster RegionGroup route priority through configured algorithm. */
private synchronized void balanceRegionPriority() {
priorityMapLock.writeLock().lock();
AtomicBoolean needBroadcast = new AtomicBoolean(false);
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> differentPriorityMap =
new TreeMap<>();
try {
Map<TConsensusGroupId, Integer> regionLeaderMap = getLoadManager().getRegionLeaderMap();
Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();
// Balancing region priority in each SchemaRegionGroup
Map<TConsensusGroupId, TRegionReplicaSet> optimalRegionPriorityMap =
priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion),
regionLeaderMap,
dataNodeLoadScoreMap);
// Balancing region priority in each DataRegionGroup
optimalRegionPriorityMap.putAll(
priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion),
regionLeaderMap,
dataNodeLoadScoreMap));
optimalRegionPriorityMap.forEach(
(regionGroupId, optimalRegionPriority) -> {
TRegionReplicaSet currentRegionPriority = regionPriorityMap.get(regionGroupId);
if (!optimalRegionPriority.equals(currentRegionPriority)) {
differentPriorityMap.put(
regionGroupId, new Pair<>(currentRegionPriority, optimalRegionPriority));
regionPriorityMap.put(regionGroupId, optimalRegionPriority);
needBroadcast.set(true);
}
});
} finally {
priorityMapLock.writeLock().unlock();
}
if (needBroadcast.get()) {
recordRegionPriorityMap(differentPriorityMap);
broadcastLatestRegionPriorityMap();
}
}
private void broadcastLatestRegionPriorityMap() {
// Broadcast the RegionRouteMap to all DataNodes except the unknown ones
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
getNodeManager()
.filterDataNodeThroughStatus(
NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
.stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location));
long broadcastTime = System.currentTimeMillis();
Map<TConsensusGroupId, TRegionReplicaSet> tmpPriorityMap = getRegionPriorityMap();
AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
new TRegionRouteReq(broadcastTime, tmpPriorityMap),
dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
}
private void recordRegionPriorityMap(
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> differentPriorityMap) {
LOGGER.info("[RegionPriority] RegionPriorityMap: ");
for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
regionPriorityEntry : differentPriorityMap.entrySet()) {
if (!Objects.equals(
regionPriorityEntry.getValue().getRight(), regionPriorityEntry.getValue().getLeft())) {
try {
LOGGER.info(
"[RegionPriority]\t {}: {}->{}",
regionPriorityEntry.getKey(),
regionPriorityEntry.getValue().getLeft() == null
? "null"
: regionPriorityEntry.getValue().getLeft().getDataNodeLocations().stream()
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toList()),
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toList()));
} catch (Exception e) {
LOGGER.error("Unexpected exception", e);
}
}
}
}
/** @return Map<RegionGroupId, RegionPriority> */
public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() {
priorityMapLock.readLock().lock();
try {
return new TreeMap<>(regionPriorityMap);
} finally {
priorityMapLock.readLock().unlock();
}
}
public void removeRegionPriority(TConsensusGroupId regionGroupId) {
priorityMapLock.writeLock().lock();
try {
regionPriorityMap.remove(regionGroupId);
} finally {
priorityMapLock.writeLock().unlock();
}
}
public void clearRegionPriority() {
priorityMapLock.writeLock().lock();
try {
regionPriorityMap.clear();
} finally {
priorityMapLock.writeLock().unlock();
}
}
/**
* Wait for the specified RegionGroups to finish routing priority calculation.
*
* @param regionGroupIds Specified RegionGroupIds
*/
public void waitForPriorityUpdate(List<TConsensusGroupId> regionGroupIds) {
long startTime = System.currentTimeMillis();
LOGGER.info(
"[RegionPriority] Wait for Region priority update of RegionGroups: {}", regionGroupIds);
while (System.currentTimeMillis() - startTime <= REGION_PRIORITY_WAITING_TIMEOUT) {
AtomicBoolean allRegionPriorityCalculated = new AtomicBoolean(true);
priorityMapLock.readLock().lock();
try {
regionGroupIds.forEach(
regionGroupId -> {
if (!regionPriorityMap.containsKey(regionGroupId)) {
allRegionPriorityCalculated.set(false);
}
});
} finally {
priorityMapLock.readLock().unlock();
}
if (allRegionPriorityCalculated.get()) {
LOGGER.info(
"[RegionPriority] The routing priority of RegionGroups: {} is calculated.",
regionGroupIds);
return;
}
try {
TimeUnit.MILLISECONDS.sleep(WAIT_PRIORITY_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupt when wait for calculating Region priority", e);
return;
}
}
LOGGER.warn(
"[RegionPriority] The routing priority of RegionGroups: {} is not determined after 10 heartbeat interval. Some function might fail.",
regionGroupIds);
}
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
private LoadManager getLoadManager() {
return configManager.getLoadManager();
}
@Override
public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
balanceRegionLeader();
}
@Override
public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent event) {
balanceRegionLeader();
}
@Override
public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
balanceRegionLeader();
balanceRegionPriority();
}
}