blob: d93ca8398a5246de65185f03a662618df0df97f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class DataNodeRemoveHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private final ConfigManager configManager;
/** region migrate lock */
private final LockQueue regionMigrateLock = new LockQueue();
public DataNodeRemoveHandler(ConfigManager configManager) {
this.configManager = configManager;
}
/**
* Get all consensus group id in this node
*
* @param dataNodeLocation data node location
* @return group id list
*/
public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
return configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(
rg ->
rg.getDataNodeLocations().contains(dataNodeLocation)
&& rg.regionId.getType() != TConsensusGroupType.PartitionRegion)
.map(TRegionReplicaSet::getRegionId)
.collect(Collectors.toList());
}
/**
* broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
* request
*
* @param disabledDataNode TDataNodeLocation
*/
public void broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
LOGGER.info(
"DataNodeRemoveService start broadcastDisableDataNode to cluster, disabledDataNode: {}",
getIdWithRpcEndpoint(disabledDataNode));
List<TDataNodeConfiguration> otherOnlineDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.filter(node -> !node.getLocation().equals(disabledDataNode))
.collect(Collectors.toList());
for (TDataNodeConfiguration node : otherOnlineDataNodes) {
TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
node.getLocation().getInternalEndPoint(),
disableReq,
DataNodeRequestType.DISABLE_DATA_NODE);
if (!isSucceed(status)) {
LOGGER.error(
"broadcastDisableDataNode meets error, disabledDataNode: {}, error: {}",
getIdWithRpcEndpoint(disabledDataNode),
status);
return;
}
}
LOGGER.info(
"DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}",
getIdWithRpcEndpoint(disabledDataNode));
}
/**
* Find dest data node
*
* @param regionId region id
* @return dest data node location
*/
public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
TSStatus status;
List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn("Not find region replica nodes, region: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("not find region replica nodes, region: " + regionId);
return null;
}
Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
if (!newNode.isPresent()) {
LOGGER.warn("No enough Data node to migrate region: {}", regionId);
return null;
}
return newNode.get();
}
/**
* Order the specific ConsensusGroup to add peer for the new RegionReplica.
*
* <p>The add peer interface could be invoked at any DataNode who contains one of the
* RegionReplica of the specified ConsensusGroup except the new one
*
* @param destDataNode The DataNodeLocation where the new RegionReplica is created
* @param regionId region id
* @return TSStatus
*/
public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId regionId) {
TSStatus status;
// Here we pick the DataNode who contains one of the RegionReplica of the specified
// ConsensusGroup except the new one
// in order to notify the origin ConsensusGroup that another peer is created and demand to join
Optional<TDataNodeLocation> selectedDataNode =
filterDataNodeWithOtherRegionReplica(regionId, destDataNode);
if (!selectedDataNode.isPresent()) {
LOGGER.warn(
"There are no other DataNodes could be selected to perform the add peer process, "
+ "please check RegionGroup: {} by SQL: show regions",
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
"There are no other DataNodes could be selected to perform the add peer process, "
+ "please check by SQL: show regions");
return status;
}
// Send addRegionPeer request to the selected DataNode,
// destDataNode is where the new RegionReplica is created
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
selectedDataNode.get().getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.ADD_REGION_PEER);
LOGGER.info(
"Send action addRegionPeer, wait it finished, regionId: {}, dataNode: {}",
regionId,
getIdWithRpcEndpoint(selectedDataNode.get()));
return status;
}
/**
* Order the specific ConsensusGroup to remove peer for the old RegionReplica.
*
* <p>The remove peer interface could be invoked at any DataNode who contains one of the
* RegionReplica of the specified ConsensusGroup except the origin one
*
* @param originalDataNode The DataNodeLocation who contains the original RegionReplica
* @param regionId region id
* @return TSStatus
*/
public TSStatus removeRegionPeer(
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode,
TConsensusGroupId regionId) {
TSStatus status;
TDataNodeLocation rpcClientDataNode = null;
// Here we pick the DataNode who contains one of the RegionReplica of the specified
// ConsensusGroup except the origin one
// in order to notify the new ConsensusGroup that the origin peer should secede now
// if the selectedDataNode equals null, we choose the destDataNode to execute the method
Optional<TDataNodeLocation> selectedDataNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
rpcClientDataNode = selectedDataNode.orElse(destDataNode);
// Send removeRegionPeer request to the rpcClientDataNode
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
rpcClientDataNode.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.REMOVE_REGION_PEER);
LOGGER.info(
"Send action removeRegionPeer, wait it finished, regionId: {}, dataNode: {}",
regionId,
rpcClientDataNode.getInternalEndPoint());
return status;
}
/**
* Delete a Region peer in the given ConsensusGroup and all of its data on the specified DataNode
*
* <p>If the originalDataNode is down, we should delete local data and do other cleanup works
* manually.
*
* @param originalDataNode The DataNodeLocation who contains the original RegionReplica
* @param regionId region id
* @return TSStatus
*/
public TSStatus deleteOldRegionPeer(
TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
// when SchemaReplicationFactor==1, execute deleteOldRegionPeer method will cause error
// user must delete the related data manually
if (CONF.getSchemaReplicationFactor() == 1
&& TConsensusGroupType.SchemaRegion.equals(regionId.getType())) {
String errorMessage =
"deleteOldRegionPeer is not supported for schemaRegion when SchemaReplicationFactor equals 1, "
+ "you are supposed to delete the region data of datanode manually";
LOGGER.info(errorMessage);
TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMessage);
return status;
}
// when DataReplicationFactor==1, execute deleteOldRegionPeer method will cause error
// user must delete the related data manually
// TODO if multi-leader supports deleteOldRegionPeer when DataReplicationFactor==1?
if (CONF.getDataReplicationFactor() == 1
&& TConsensusGroupType.DataRegion.equals(regionId.getType())) {
String errorMessage =
"deleteOldRegionPeer is not supported for dataRegion when DataReplicationFactor equals 1, "
+ "you are supposed to delete the region data of datanode manually";
LOGGER.info(errorMessage);
TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMessage);
return status;
}
TSStatus status;
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
originalDataNode.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.DELETE_OLD_REGION_PEER);
LOGGER.info(
"Send action deleteOldRegionPeer to regionId {} on dataNodeId {}, wait it finished",
regionId,
originalDataNode.getInternalEndPoint());
return status;
}
/**
* Update region location cache
*
* @param regionId region id
* @param originalDataNode old location data node
* @param destDataNode dest data node
*/
public void updateRegionLocationCache(
TConsensusGroupId regionId,
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode) {
LOGGER.info(
"start to update region {} location from {} to {} when it migrate succeed",
regionId,
originalDataNode.getInternalEndPoint().getIp(),
destDataNode.getInternalEndPoint().getIp());
UpdateRegionLocationPlan req =
new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
LOGGER.info(
"update region {} location finished, result:{}, old:{}, new:{}",
regionId,
status,
originalDataNode.getInternalEndPoint().getIp(),
destDataNode.getInternalEndPoint().getIp());
// Broadcast the latest RegionRouteMap when Region migration finished
configManager.getLoadManager().broadcastLatestRegionRouteMap();
}
/**
* Find region replication Nodes
*
* @param regionId region id
* @return data node location
*/
public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
List<TRegionReplicaSet> regionReplicaSets =
configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(rg -> rg.regionId.equals(regionId))
.collect(Collectors.toList());
if (regionReplicaSets.isEmpty()) {
LOGGER.warn("not find TRegionReplica for region: {}", regionId);
return Collections.emptyList();
}
return regionReplicaSets.get(0).getDataNodeLocations();
}
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
List<TDataNodeLocation> regionReplicaNodes) {
return configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.filter(e -> !regionReplicaNodes.contains(e))
.findAny();
}
/**
* Create a new RegionReplica and build the ConsensusGroup on the destined DataNode
*
* <p>createNewRegionPeer should be invoked on a DataNode that doesn't contain any peer of the
* specific ConsensusGroup, in order to avoid there exists one DataNode who has more than one
* RegionReplica.
*
* @param regionId The given ConsensusGroup
* @param destDataNode The destined DataNode where the new peer will be created
* @return status
*/
public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
TSStatus status;
List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn("Not find region replica nodes in createPeer, regionId: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
return status;
}
List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
currentPeerNodes.add(destDataNode);
String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
// TODO replace with real ttl
req.setTtl(Long.MAX_VALUE);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
destDataNode.getInternalEndPoint(),
req,
DataNodeRequestType.CREATE_NEW_REGION_PEER);
LOGGER.info(
"Send action createNewRegionPeer, regionId: {}, dataNode: {}", regionId, destDataNode);
if (isFailed(status)) {
LOGGER.error(
"Send action createNewRegionPeer, regionId: {}, dataNode: {}, result: {}",
regionId,
destDataNode,
status);
}
return status;
}
private boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
private boolean isFailed(TSStatus status) {
return !isSucceed(status);
}
/**
* Stop old data node
*
* @param dataNode old data node
* @return status
* @throws ProcedureException procedure exception
*/
public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException {
LOGGER.info("Begin to stop Data Node {}", dataNode);
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
LOGGER.info("stop Data Node {} result: {}", dataNode, status);
return status;
}
/**
* check if the remove datanode request illegal
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return SUCCEED_STATUS when request is legal.
*/
public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
TSStatus status = checkClusterProtocol();
if (isFailed(status)) {
dataSet.setStatus(status);
return dataSet;
}
status = checkRegionReplication(removeDataNodePlan);
if (isFailed(status)) {
dataSet.setStatus(status);
return dataSet;
}
status = checkDataNodeExist(removeDataNodePlan);
if (isFailed(status)) {
dataSet.setStatus(status);
return dataSet;
}
return dataSet;
}
/**
* Check whether all DataNodes to be deleted exist in the cluster
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return SUCCEED_STATUS if all DataNodes to be deleted exist in the cluster, DATANODE_NOT_EXIST
* otherwise
*/
private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TDataNodeLocation> allDataNodes =
configManager.getNodeManager().getRegisteredDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
boolean hasNotExistNode =
removeDataNodePlan.getDataNodeLocations().stream()
.anyMatch(loc -> !allDataNodes.contains(loc));
if (hasNotExistNode) {
status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
status.setMessage("there exist Data Node in request but not in cluster");
}
return status;
}
/**
* Check whether the cluster has enough DataNodes to maintain RegionReplicas
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return SUCCEED_STATUS if the number of DataNodes is enough, LACK_REPLICATION otherwise
*/
private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TDataNodeLocation> removedDataNodes = removeDataNodePlan.getDataNodeLocations();
int allDataNodeSize = configManager.getNodeManager().getRegisteredDataNodeCount();
// when the configuration is one replication, it will be failed if the data node is not in
// running state.
if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) {
for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
// check whether removed data node is in running state
BaseNodeCache nodeCache =
configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
if (!NodeStatus.Running.equals(nodeCache.getNodeStatus())) {
removedDataNodes.remove(dataNodeLocation);
LOGGER.error(
"Failed to remove data node {} because it is not in running and the configuration of cluster is one replication",
dataNodeLocation);
}
if (removedDataNodes.size() == 0) {
status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
status.setMessage("Failed to remove all requested data nodes");
return status;
}
}
}
int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size();
if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
status.setMessage(
"lack replication, allow most removed Data Node size : "
+ (allDataNodeSize - NodeInfo.getMinimumDataNode()));
}
return status;
}
public LockQueue getRegionMigrateLock() {
return regionMigrateLock;
}
/**
* Remove data node in node info
*
* @param tDataNodeLocation data node location
*/
public void removeDataNodePersistence(TDataNodeLocation tDataNodeLocation) {
List<TDataNodeLocation> removeDataNodes = new ArrayList<>();
removeDataNodes.add(tDataNodeLocation);
configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
}
public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
Optional<TDataNodeLocation> newLeaderNode =
filterDataNodeWithOtherRegionReplica(regionId, tDataNodeLocation);
if (newLeaderNode.isPresent()) {
SyncDataNodeClientPool.getInstance()
.changeRegionLeader(
regionId, tDataNodeLocation.getInternalEndPoint(), newLeaderNode.get());
LOGGER.info(
"Change region leader finished, region is {}, newLeaderNode is {}",
regionId,
newLeaderNode);
}
}
/**
* Filter a DataNode who contains other RegionReplica excepts the given one
*
* @param regionId The specific RegionId
* @param filterLocation The DataNodeLocation that should be filtered
* @return A DataNodeLocation that contains other RegionReplica and different from the
* filterLocation
*/
private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn("Not find region replica nodes, region: {}", regionId);
return Optional.empty();
}
List<TDataNodeLocation> aliveDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
// TODO replace findAny() by select the low load node.
return regionReplicaNodes.stream()
.filter(e -> aliveDataNodes.contains(e) && !e.equals(filterLocation))
.findAny();
}
private String getIdWithRpcEndpoint(TDataNodeLocation location) {
return String.format(
"dataNodeId: %s, clientRpcEndPoint: %s",
location.getDataNodeId(), location.getClientRpcEndPoint());
}
/**
* Check the protocol of the cluster, standalone is not supported to remove data node currently
*
* @return SUCCEED_STATUS if the Cluster is not standalone protocol, REMOVE_DATANODE_FAILED
* otherwise
*/
private TSStatus checkClusterProtocol() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (CONF.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.StandAloneConsensus)
|| CONF.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.StandAloneConsensus)) {
status.setCode(TSStatusCode.REMOVE_DATANODE_FAILED.getStatusCode());
status.setMessage("standalone protocol is not supported to remove data node");
}
return status;
}
}