blob: 326787449df846bd315cc4263464a493f74c310c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
public class RegionMaintainHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionMaintainHandler.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private final ConfigManager configManager;
/** region migrate lock */
private final LockQueue regionMigrateLock = new LockQueue();
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> dataNodeClientManager;
public RegionMaintainHandler(ConfigManager configManager) {
this.configManager = configManager;
dataNodeClientManager =
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
}
public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
return String.format(
"[dataNodeId: %s, clientRpcEndPoint: %s]",
location.getDataNodeId(), location.getClientRpcEndPoint());
}
/**
* Get all consensus group id in this node
*
* @param removedDataNode the DataNode to be removed
* @return group id list to be migrated
*/
public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) {
return configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(
replicaSet ->
replicaSet.getDataNodeLocations().contains(removedDataNode)
&& replicaSet.regionId.getType() != TConsensusGroupType.ConfigRegion)
.map(TRegionReplicaSet::getRegionId)
.collect(Collectors.toList());
}
/**
* broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
* request
*
* @param disabledDataNode TDataNodeLocation
*/
public void broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
LOGGER.info(
"DataNodeRemoveService start broadcastDisableDataNode to cluster, disabledDataNode: {}",
getIdWithRpcEndpoint(disabledDataNode));
List<TDataNodeConfiguration> otherOnlineDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.filter(node -> !node.getLocation().equals(disabledDataNode))
.collect(Collectors.toList());
for (TDataNodeConfiguration node : otherOnlineDataNodes) {
TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
node.getLocation().getInternalEndPoint(),
disableReq,
DataNodeRequestType.DISABLE_DATA_NODE);
if (!isSucceed(status)) {
LOGGER.error(
"{}, BroadcastDisableDataNode meets error, disabledDataNode: {}, error: {}",
REMOVE_DATANODE_PROCESS,
getIdWithRpcEndpoint(disabledDataNode),
status);
return;
}
}
LOGGER.info(
"{}, DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}",
REMOVE_DATANODE_PROCESS,
getIdWithRpcEndpoint(disabledDataNode));
}
/**
* Find dest data node.
*
* @param regionId region id
* @return dest data node location
*/
public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
TSStatus status;
List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Cannot find region replica nodes, region: " + regionId);
return null;
}
Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
if (!newNode.isPresent()) {
LOGGER.warn("No enough Data node to migrate region: {}", regionId);
return null;
}
return newNode.get();
}
/**
* Create a new RegionReplica and build the ConsensusGroup on the destined DataNode
*
* <p>createNewRegionPeer should be invoked on a DataNode that doesn't contain any peer of the
* specific ConsensusGroup, in order to avoid there exists one DataNode who has more than one
* RegionReplica.
*
* @param regionId The given ConsensusGroup
* @param destDataNode The destined DataNode where the new peer will be created
* @return status
*/
public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
TSStatus status;
List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn(
"{}, Cannot find region replica nodes in createPeer, regionId: {}",
REGION_MIGRATE_PROCESS,
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
return status;
}
List<TDataNodeLocation> currentPeerNodes;
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
&& IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
// parameter of createPeer for MultiLeader should be all peers
currentPeerNodes = new ArrayList<>(regionReplicaNodes);
currentPeerNodes.add(destDataNode);
} else {
// parameter of createPeer for Ratis can be empty
currentPeerNodes = Collections.emptyList();
}
String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
long ttl = Long.MAX_VALUE;
try {
ttl = configManager.getClusterSchemaManager().getDatabaseSchemaByName(storageGroup).getTTL();
} catch (DatabaseNotExistsException e) {
LOGGER.warn(
"Cannot find out the database which region {} belongs to, ttl will be set to Long.MAX_VALUE",
regionId);
}
req.setTtl(ttl);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
destDataNode.getInternalEndPoint(),
req,
DataNodeRequestType.CREATE_NEW_REGION_PEER);
if (isSucceed(status)) {
LOGGER.info(
"{}, Send action createNewRegionPeer finished, regionId: {}, newPeerDataNodeId: {}",
REGION_MIGRATE_PROCESS,
regionId,
getIdWithRpcEndpoint(destDataNode));
} else {
LOGGER.error(
"{}, Send action createNewRegionPeer error, regionId: {}, newPeerDataNodeId: {}, result: {}",
REGION_MIGRATE_PROCESS,
regionId,
getIdWithRpcEndpoint(destDataNode),
status);
}
return status;
}
/**
* Order the specific ConsensusGroup to add peer for the new RegionReplica.
*
* <p>The add peer interface could be invoked at any DataNode who contains one of the
* RegionReplica of the specified ConsensusGroup except the new one
*
* @param destDataNode The DataNodeLocation where the new RegionReplica is created
* @param regionId region id
* @return TSStatus
*/
public TSStatus submitAddRegionPeerTask(
long procedureId,
TDataNodeLocation destDataNode,
TConsensusGroupId regionId,
TDataNodeLocation coordinator) {
TSStatus status;
// Send addRegionPeer request to the selected DataNode,
// destDataNode is where the new RegionReplica is created
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode, procedureId);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
coordinator.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.ADD_REGION_PEER);
LOGGER.info(
"{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {}, destDataNode: {}, status: {}",
REGION_MIGRATE_PROCESS,
regionId,
getIdWithRpcEndpoint(coordinator),
getIdWithRpcEndpoint(destDataNode),
status);
return status;
}
/**
* Order the specific ConsensusGroup to remove peer for the old RegionReplica.
*
* <p>The remove peer interface could be invoked at any DataNode who contains one of the
* RegionReplica of the specified ConsensusGroup except the origin one
*
* @param originalDataNode The DataNodeLocation who contains the original RegionReplica
* @param regionId region id
* @return TSStatus
*/
public TSStatus submitRemoveRegionPeerTask(
long procedureId,
TDataNodeLocation originalDataNode,
TConsensusGroupId regionId,
TDataNodeLocation coordinator) {
TSStatus status;
// Send removeRegionPeer request to the rpcClientDataNode
TMaintainPeerReq maintainPeerReq =
new TMaintainPeerReq(regionId, originalDataNode, procedureId);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
coordinator.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.REMOVE_REGION_PEER);
LOGGER.info(
"{}, Send action removeRegionPeer finished, regionId: {}, rpcDataNode: {}",
REGION_MIGRATE_PROCESS,
regionId,
getIdWithRpcEndpoint(coordinator));
return status;
}
/**
* Delete a Region peer in the given ConsensusGroup and all of its data on the specified DataNode
*
* <p>If the originalDataNode is down, we should delete local data and do other cleanup works
* manually.
*
* @param originalDataNode The DataNodeLocation who contains the original RegionReplica
* @param regionId region id
* @return TSStatus
*/
public TSStatus submitDeleteOldRegionPeerTask(
long procedureId, TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
TSStatus status;
TMaintainPeerReq maintainPeerReq =
new TMaintainPeerReq(regionId, originalDataNode, procedureId);
status =
configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
== NodeStatus.Unknown
? SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
originalDataNode.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.DELETE_OLD_REGION_PEER,
1)
: SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
originalDataNode.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.DELETE_OLD_REGION_PEER);
LOGGER.info(
"{}, Send action deleteOldRegionPeer finished, regionId: {}, dataNodeId: {}",
REGION_MIGRATE_PROCESS,
regionId,
originalDataNode.getInternalEndPoint());
return status;
}
public Map<Integer, TSStatus> resetPeerList(
TConsensusGroupId regionId,
List<TDataNodeLocation> correctDataNodeLocations,
Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
AsyncClientHandler<TResetPeerListReq, TSStatus> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.RESET_PEER_LIST,
new TResetPeerListReq(regionId, correctDataNodeLocations),
dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
return clientHandler.getResponseMap();
}
// TODO: will use 'procedure yield' to refactor later
public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNodeLocation) {
// In some cases the DataNode is still working, but its status is unknown.
// In order to make task continue under this circumstance, some unconditional retries are
// performed here.
int unconditionallyRetry = 0;
while (unconditionallyRetry < 6
|| configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
!= NodeStatus.Unknown) {
try (SyncDataNodeInternalServiceClient dataNodeClient =
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
TRegionMigrateResult report = dataNodeClient.getRegionMaintainResult(taskId);
if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) {
return report;
}
} catch (Exception ignore) {
} finally {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
unconditionallyRetry++;
}
LOGGER.warn(
"{} task {} cannot contact to DataNode {}",
REGION_MIGRATE_PROCESS,
taskId,
dataNodeLocation);
TRegionMigrateResult report = new TRegionMigrateResult();
report.setTaskStatus(TRegionMaintainTaskStatus.FAIL);
report.setFailedNodeAndReason(new HashMap<>());
report.getFailedNodeAndReason().put(dataNodeLocation, TRegionMigrateFailedType.Disconnect);
return report;
}
public void addRegionLocation(
TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus regionStatus) {
AddRegionLocationPlan req = new AddRegionLocationPlan(regionId, newLocation);
TSStatus status = configManager.getPartitionManager().addRegionLocation(req);
LOGGER.info(
"AddRegionLocation finished, add region {} to {}, result is {}",
regionId,
getIdWithRpcEndpoint(newLocation),
status);
configManager
.getLoadManager()
.forceAddRegionCache(regionId, newLocation.getDataNodeId(), regionStatus);
}
public void updateRegionCache(
TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus regionStatus) {
configManager
.getLoadManager()
.forceAddRegionCache(regionId, newLocation.getDataNodeId(), regionStatus);
}
public void removeRegionLocation(
TConsensusGroupId regionId, TDataNodeLocation deprecatedLocation) {
RemoveRegionLocationPlan req = new RemoveRegionLocationPlan(regionId, deprecatedLocation);
TSStatus status = configManager.getPartitionManager().removeRegionLocation(req);
LOGGER.info(
"RemoveRegionLocation remove region {} from DataNode {}, result is {}",
regionId,
getIdWithRpcEndpoint(deprecatedLocation),
status);
configManager
.getLoadManager()
.forceRemoveRegionCache(regionId, deprecatedLocation.getDataNodeId());
}
/**
* Find all DataNodes which contains the given regionId
*
* @param regionId region id
* @return DataNode locations
*/
public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
Optional<TRegionReplicaSet> regionReplicaSet =
configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(rg -> rg.regionId.equals(regionId))
.findAny();
if (regionReplicaSet.isPresent()) {
return regionReplicaSet.get().getDataNodeLocations();
}
return Collections.emptyList();
}
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
List<TDataNodeLocation> regionReplicaNodes) {
List<TDataNodeConfiguration> dataNodeConfigurations =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
// Randomly selected to ensure a basic load balancing
Collections.shuffle(dataNodeConfigurations);
return dataNodeConfigurations.stream()
.map(TDataNodeConfiguration::getLocation)
.filter(e -> !regionReplicaNodes.contains(e))
.findAny();
}
private boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
private boolean isFailed(TSStatus status) {
return !isSucceed(status);
}
/**
* Stop old data node
*
* @param dataNode old data node
*/
public void stopDataNode(TDataNodeLocation dataNode) {
LOGGER.info(
"{}, Begin to stop DataNode and kill the DataNode process {}",
REMOVE_DATANODE_PROCESS,
dataNode);
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId());
LOGGER.info(
"{}, Stop Data Node result: {}, stoppedDataNode: {}",
REMOVE_DATANODE_PROCESS,
status,
dataNode);
}
/**
* check if the remove datanode request illegal
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return SUCCEED_STATUS when request is legal.
*/
public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
TSStatus status = checkClusterProtocol();
if (isFailed(status)) {
dataSet.setStatus(status);
return dataSet;
}
status = checkRegionReplication(removeDataNodePlan);
if (isFailed(status)) {
dataSet.setStatus(status);
return dataSet;
}
status = checkDataNodeExist(removeDataNodePlan);
if (isFailed(status)) {
dataSet.setStatus(status);
return dataSet;
}
return dataSet;
}
/**
* Check whether all DataNodes to be deleted exist in the cluster
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return SUCCEED_STATUS if all DataNodes to be deleted exist in the cluster, DATANODE_NOT_EXIST
* otherwise
*/
private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TDataNodeLocation> allDataNodes =
configManager.getNodeManager().getRegisteredDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
boolean hasNotExistNode =
removeDataNodePlan.getDataNodeLocations().stream()
.anyMatch(loc -> !allDataNodes.contains(loc));
if (hasNotExistNode) {
status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
status.setMessage("there exist Data Node in request but not in cluster");
}
return status;
}
/**
* Check whether the cluster has enough DataNodes to maintain RegionReplicas
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return SUCCEED_STATUS if the number of DataNodes is enough, LACK_REPLICATION otherwise
*/
private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TDataNodeLocation> removedDataNodes = removeDataNodePlan.getDataNodeLocations();
int availableDatanodeSize =
configManager
.getNodeManager()
.filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly)
.size();
// when the configuration is one replication, it will be failed if the data node is not in
// running state.
if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) {
for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
// check whether removed data node is in running state
if (!NodeStatus.Running.equals(
configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) {
removedDataNodes.remove(dataNodeLocation);
LOGGER.error(
"Failed to remove data node {} because it is not in running and the configuration of cluster is one replication",
dataNodeLocation);
}
if (removedDataNodes.isEmpty()) {
status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
status.setMessage("Failed to remove all requested data nodes");
return status;
}
}
}
int removedDataNodeSize =
(int)
removeDataNodePlan.getDataNodeLocations().stream()
.filter(
x ->
configManager.getLoadManager().getNodeStatus(x.getDataNodeId())
!= NodeStatus.Unknown)
.count();
if (availableDatanodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
status.setMessage(
String.format(
"Can't remove datanode due to the limit of replication factor, "
+ "availableDataNodeSize: %s, maxReplicaFactor: %s, max allowed removed Data Node size is: %s",
availableDatanodeSize,
NodeInfo.getMinimumDataNode(),
(availableDatanodeSize - NodeInfo.getMinimumDataNode())));
}
return status;
}
public LockQueue getRegionMigrateLock() {
return regionMigrateLock;
}
/**
* Remove data node in node info
*
* @param dataNodeLocation data node location
*/
public void removeDataNodePersistence(TDataNodeLocation dataNodeLocation) {
// Remove consensus record
List<TDataNodeLocation> removeDataNodes = Collections.singletonList(dataNodeLocation);
try {
configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
}
// Adjust maxRegionGroupNum
configManager.getClusterSchemaManager().adjustMaxRegionGroupNum();
// Remove metrics
PartitionMetrics.unbindDataNodePartitionMetricsWhenUpdate(
MetricService.getInstance(),
NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()));
}
/**
* Change the leader of given Region.
*
* <p>For IOT_CONSENSUS, using `changeLeaderForIoTConsensus` method to change the regionLeaderMap
* maintained in ConfigNode.
*
* <p>For RATIS_CONSENSUS, invoking `changeRegionLeader` DataNode RPC method to change the leader.
*
* @param regionId The region to be migrated
* @param originalDataNode The DataNode where the region locates
*/
public void transferRegionLeader(TConsensusGroupId regionId, TDataNodeLocation originalDataNode)
throws ProcedureException {
Optional<TDataNodeLocation> newLeaderNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
newLeaderNode.orElseThrow(() -> new ProcedureException("Cannot find the new leader"));
// ratis needs DataNode to do election by itself
long timestamp = System.nanoTime();
if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())
|| TConsensusGroupType.DataRegion.equals(regionId.getType())
&& RATIS_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
final int MAX_RETRY_TIME = 10;
int retryTime = 0;
while (true) {
TRegionLeaderChangeResp resp =
SyncDataNodeClientPool.getInstance()
.changeRegionLeader(
regionId, originalDataNode.getInternalEndPoint(), newLeaderNode.get());
if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
timestamp = resp.getConsensusLogicalTimestamp();
break;
}
if (retryTime++ > MAX_RETRY_TIME) {
throw new ProcedureException("Transfer leader fail");
}
LOGGER.warn("Call changeRegionLeader fail for the {} time", retryTime);
}
}
configManager
.getLoadManager()
.forceUpdateConsensusGroupCache(
Collections.singletonMap(
regionId,
new ConsensusGroupHeartbeatSample(timestamp, newLeaderNode.get().getDataNodeId())));
configManager.getLoadManager().getRouteBalancer().balanceRegionLeader();
configManager.getLoadManager().getRouteBalancer().balanceRegionPriority();
LOGGER.info(
"{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
REGION_MIGRATE_PROCESS,
regionId,
newLeaderNode);
}
/**
* Filter a DataNode who contains other RegionReplica excepts the given one.
*
* <p>Choose the RUNNING status datanode firstly, if no RUNNING status datanode match the
* condition, then we choose the REMOVING status datanode.
*
* <p>`addRegionPeer`, `removeRegionPeer` and `changeRegionLeader` invoke this method.
*
* @param regionId The specific RegionId
* @param filterLocation The DataNodeLocation that should be filtered
* @return A DataNodeLocation that contains other RegionReplica and different from the
* filterLocation
*/
public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
return filterDataNodeWithOtherRegionReplica(
regionId, filterLocation, NodeStatus.Running, NodeStatus.ReadOnly);
}
public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation, NodeStatus... allowingStatus) {
List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
if (regionLocations.isEmpty()) {
LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
return Optional.empty();
}
// Choosing the RUNNING DataNodes to execute firstly
// If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
List<TDataNodeLocation> aliveDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(allowingStatus).stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
// TODO return the node which has lowest load.
for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
return Optional.of(aliveDataNode);
}
}
return Optional.empty();
}
/**
* Check the protocol of the cluster, standalone is not supported to remove data node currently
*
* @return SUCCEED_STATUS if the Cluster is not standalone protocol, REMOVE_DATANODE_FAILED
* otherwise
*/
private TSStatus checkClusterProtocol() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (CONF.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)
|| CONF.getSchemaRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
status.setCode(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
status.setMessage("SimpleConsensus protocol is not supported to remove data node");
}
return status;
}
}