blob: b3519d7eaa3d11126138db0a905afc4003c4acc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
private final IManager configManager;
private final NodeInfo nodeInfo;
private final ReentrantLock removeConfigNodeLock;
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
this.removeConfigNodeLock = new ReentrantLock();
}
private void setGlobalConfig(DataNodeRegisterResp dataSet) {
// Set TGlobalConfig
final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
TGlobalConfig globalConfig = new TGlobalConfig();
globalConfig.setDataRegionConsensusProtocolClass(conf.getDataRegionConsensusProtocolClass());
globalConfig.setSchemaRegionConsensusProtocolClass(
conf.getSchemaRegionConsensusProtocolClass());
globalConfig.setSeriesPartitionSlotNum(conf.getSeriesPartitionSlotNum());
globalConfig.setSeriesPartitionExecutorClass(conf.getSeriesPartitionExecutorClass());
globalConfig.setTimePartitionInterval(conf.getTimePartitionInterval());
globalConfig.setReadConsistencyLevel(conf.getReadConsistencyLevel());
dataSet.setGlobalConfig(globalConfig);
}
/**
* Register DataNode
*
* @param registerDataNodePlan RegisterDataNodeReq
* @return DataNodeConfigurationDataSet. The TSStatus will be set to SUCCESS_STATUS when register
* success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
*/
public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
DataNodeRegisterResp dataSet = new DataNodeRegisterResp();
TSStatus status = new TSStatus();
if (nodeInfo.isRegisteredDataNode(registerDataNodePlan.getInfo().getLocation())) {
status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
status.setMessage("DataNode already registered.");
} else if (registerDataNodePlan.getInfo().getLocation().getDataNodeId() < 0) {
// Generating a new dataNodeId only when current DataNode doesn't exist yet
registerDataNodePlan.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId());
getConsensusManager().write(registerDataNodePlan);
// Adjust the maximum RegionGroup number of each StorageGroup
getClusterSchemaManager().adjustMaxRegionGroupCount();
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("registerDataNode success.");
}
dataSet.setStatus(status);
dataSet.setDataNodeId(registerDataNodePlan.getInfo().getLocation().getDataNodeId());
dataSet.setConfigNodeList(getRegisteredConfigNodes());
setGlobalConfig(dataSet);
return dataSet;
}
/**
* Remove DataNodes
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return DataNodeToStatusResp, The TSStatue will be SUCCEED_STATUS when request is accept,
* DATANODE_NOT_EXIST when some datanode not exist.
*/
public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
LOGGER.info("Node manager start to remove DataNode {}", removeDataNodePlan);
DataNodeRemoveHandler dataNodeRemoveHandler =
new DataNodeRemoveHandler((ConfigManager) configManager);
DataNodeToStatusResp preCheckStatus =
dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"the remove Data Node request check failed. req: {}, check result: {}",
removeDataNodePlan,
preCheckStatus.getStatus());
return preCheckStatus;
}
// if add request to queue, then return to client
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
boolean registerSucceed =
configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
if (registerSucceed) {
status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("Server accept the request");
} else {
status = new TSStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode());
status.setMessage("Server reject the request, maybe request is too much");
}
dataSet.setStatus(status);
LOGGER.info("Node manager finished to remove DataNode {}", removeDataNodePlan);
return dataSet;
}
/**
* Get TDataNodeConfiguration
*
* @param req GetDataNodeConfigurationPlan
* @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
* GetDataNodeConfigurationPlan is -1
*/
public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
}
/**
* Only leader use this interface
*
* @return The number of registered DataNodes
*/
public int getRegisteredDataNodeCount() {
return nodeInfo.getRegisteredDataNodeCount();
}
/**
* Only leader use this interface
*
* @return The number of registered TotalNodes
*/
public int getRegisteredNodeCount() {
return nodeInfo.getRegisteredNodeCount();
}
/**
* Only leader use this interface
*
* @return The number of total cpu cores in online DataNodes
*/
public int getTotalCpuCoreCount() {
return nodeInfo.getTotalCpuCoreCount();
}
/**
* Only leader use this interface
*
* @return All registered DataNodes
*/
public List<TDataNodeConfiguration> getRegisteredDataNodes() {
return nodeInfo.getRegisteredDataNodes();
}
public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
nodeInfo
.getRegisteredDataNodes()
.forEach(
dataNodeConfiguration ->
dataNodeLocations.put(
dataNodeConfiguration.getLocation().getDataNodeId(),
dataNodeConfiguration.getLocation()));
return dataNodeLocations;
}
private INodeCache getNodeCache(int nodeId) {
return getLoadManager().getNodeCacheMap().get(nodeId);
}
private String getNodeStatus(int nodeId) {
INodeCache nodeCache = getNodeCache(nodeId);
return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus();
}
public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
if (registeredDataNodes != null) {
registeredDataNodes.forEach(
(dataNodeInfo) -> {
TDataNodeInfo info = new TDataNodeInfo();
int dataNodeId = dataNodeInfo.getLocation().getDataNodeId();
info.setDataNodeId(dataNodeId);
info.setStatus(getNodeStatus(dataNodeId));
info.setRpcAddresss(dataNodeInfo.getLocation().getClientRpcEndPoint().getIp());
info.setRpcPort(dataNodeInfo.getLocation().getClientRpcEndPoint().getPort());
info.setDataRegionNum(0);
info.setSchemaRegionNum(0);
dataNodeInfoList.add(info);
});
}
return dataNodeInfoList;
}
public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
if (registeredConfigNodes != null) {
registeredConfigNodes.forEach(
(configNodeLocation) -> {
TConfigNodeInfo info = new TConfigNodeInfo();
int configNodeId = configNodeLocation.getConfigNodeId();
info.setConfigNodeId(configNodeId);
info.setStatus(getNodeStatus(configNodeId));
info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
configNodeInfoList.add(info);
});
}
configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
return configNodeInfoList;
}
/**
* Only leader use this interface, record the new ConfigNode's information
*
* @param configNodeLocation The new ConfigNode
*/
public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
// Generate new ConfigNode's index
configNodeLocation.setConfigNodeId(nodeInfo.generateNextNodeId());
ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
getConsensusManager().write(applyConfigNodePlan);
}
public void addMetrics() {
nodeInfo.addMetrics();
}
public TSStatus removeConfigNodePeer(TConfigNodeLocation tConfigNodeLocation) {
removeConfigNodeLock.tryLock();
try {
// Execute removePeer
if (getConsensusManager().removeConfigNodePeer(tConfigNodeLocation)) {
configManager
.getLoadManager()
.removeNodeHeartbeatHandCache(tConfigNodeLocation.getConfigNodeId());
return getConsensusManager()
.write(new RemoveConfigNodePlan(tConfigNodeLocation))
.getStatus();
} else {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
"Remove ConfigNode failed because update ConsensusGroup peer information failed.");
}
} finally {
removeConfigNodeLock.unlock();
}
}
public TSStatus checkConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
removeConfigNodeLock.tryLock();
try {
// Check OnlineConfigNodes number
if (getLoadManager().getOnlineConfigNodes().size() <= 1) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
"Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
}
// Check whether the registeredConfigNodes contain the ConfigNode to be removed.
if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
}
// Check whether the remove ConfigNode is leader
TConfigNodeLocation leader = getConsensusManager().getLeader();
if (leader == null) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
"Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
}
if (leader
.getInternalEndPoint()
.equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
// transfer leader
return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
}
} finally {
removeConfigNodeLock.unlock();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
.setMessage("Success remove confignode.");
}
private TSStatus transferLeader(
RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
TConfigNodeLocation newLeader =
getLoadManager().getOnlineConfigNodes().stream()
.filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
.findAny()
.get();
ConsensusGenericResponse resp =
getConsensusManager()
.getConsensusImpl()
.transferLeader(groupId, new Peer(groupId, newLeader.getConsensusEndPoint()));
if (!resp.isSuccess()) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
}
return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
.setRedirectNode(newLeader.getInternalEndPoint())
.setMessage(
"The ConfigNode to be removed is leader, already transfer Leader to "
+ newLeader
+ ".");
}
public List<TSStatus> merge(TMergeReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
if (req.dataNodeId != -1) {
dataNodeLocationMap =
dataNodeLocationMap.entrySet().stream()
.filter((e) -> req.dataNodeId == e.getKey())
.collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
}
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
null, dataNodeLocationMap, DataNodeRequestType.MERGE, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TSStatus> flush(TFlushReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
if (req.dataNodeId != -1) {
dataNodeLocationMap =
dataNodeLocationMap.entrySet().stream()
.filter((e) -> req.dataNodeId == e.getKey())
.collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
}
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TSStatus> clearCache(TClearCacheReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
if (req.dataNodeId != -1) {
dataNodeLocationMap =
dataNodeLocationMap.entrySet().stream()
.filter((e) -> req.dataNodeId == e.getKey())
.collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue()));
}
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
null, dataNodeLocationMap, DataNodeRequestType.CLEAR_CACHE, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
return nodeInfo.getRegisteredConfigNodes();
}
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
private LoadManager getLoadManager() {
return configManager.getLoadManager();
}
}