| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.confignode.manager; |
| |
| import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TFlushReq; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.consensus.ConsensusGroupId; |
| import org.apache.iotdb.confignode.client.DataNodeRequestType; |
| import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool; |
| import org.apache.iotdb.confignode.conf.ConfigNodeConfig; |
| import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; |
| import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan; |
| import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp; |
| import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp; |
| import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp; |
| import org.apache.iotdb.confignode.manager.load.LoadManager; |
| import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache; |
| import org.apache.iotdb.confignode.persistence.NodeInfo; |
| import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; |
| import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo; |
| import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; |
| import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig; |
| import org.apache.iotdb.confignode.rpc.thrift.TMergeReq; |
| import org.apache.iotdb.consensus.common.DataSet; |
| import org.apache.iotdb.consensus.common.Peer; |
| import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.stream.Collectors; |
| |
| /** NodeManager manages cluster node addition and removal requests */ |
| public class NodeManager { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class); |
| |
| private final IManager configManager; |
| private final NodeInfo nodeInfo; |
| |
| private final ReentrantLock removeConfigNodeLock; |
| |
| public NodeManager(IManager configManager, NodeInfo nodeInfo) { |
| this.configManager = configManager; |
| this.nodeInfo = nodeInfo; |
| this.removeConfigNodeLock = new ReentrantLock(); |
| } |
| |
| private void setGlobalConfig(DataNodeRegisterResp dataSet) { |
| // Set TGlobalConfig |
| final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); |
| TGlobalConfig globalConfig = new TGlobalConfig(); |
| globalConfig.setDataRegionConsensusProtocolClass(conf.getDataRegionConsensusProtocolClass()); |
| globalConfig.setSchemaRegionConsensusProtocolClass( |
| conf.getSchemaRegionConsensusProtocolClass()); |
| globalConfig.setSeriesPartitionSlotNum(conf.getSeriesPartitionSlotNum()); |
| globalConfig.setSeriesPartitionExecutorClass(conf.getSeriesPartitionExecutorClass()); |
| globalConfig.setTimePartitionInterval(conf.getTimePartitionInterval()); |
| globalConfig.setReadConsistencyLevel(conf.getReadConsistencyLevel()); |
| dataSet.setGlobalConfig(globalConfig); |
| } |
| |
| /** |
| * Register DataNode |
| * |
| * @param registerDataNodePlan RegisterDataNodeReq |
| * @return DataNodeConfigurationDataSet. The TSStatus will be set to SUCCESS_STATUS when register |
| * success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist. |
| */ |
| public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) { |
| DataNodeRegisterResp dataSet = new DataNodeRegisterResp(); |
| TSStatus status = new TSStatus(); |
| |
| if (nodeInfo.isRegisteredDataNode(registerDataNodePlan.getInfo().getLocation())) { |
| status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()); |
| status.setMessage("DataNode already registered."); |
| } else if (registerDataNodePlan.getInfo().getLocation().getDataNodeId() < 0) { |
| // Generating a new dataNodeId only when current DataNode doesn't exist yet |
| registerDataNodePlan.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId()); |
| getConsensusManager().write(registerDataNodePlan); |
| |
| // Adjust the maximum RegionGroup number of each StorageGroup |
| getClusterSchemaManager().adjustMaxRegionGroupCount(); |
| |
| status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
| status.setMessage("registerDataNode success."); |
| } |
| |
| dataSet.setStatus(status); |
| dataSet.setDataNodeId(registerDataNodePlan.getInfo().getLocation().getDataNodeId()); |
| dataSet.setConfigNodeList(getRegisteredConfigNodes()); |
| setGlobalConfig(dataSet); |
| return dataSet; |
| } |
| |
| /** |
| * Remove DataNodes |
| * |
| * @param removeDataNodePlan RemoveDataNodeReq |
| * @return DataNodeToStatusResp, The TSStatue will be SUCCEED_STATUS when request is accept, |
| * DATANODE_NOT_EXIST when some datanode not exist. |
| */ |
| public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) { |
| LOGGER.info("Node manager start to remove DataNode {}", removeDataNodePlan); |
| |
| DataNodeRemoveHandler dataNodeRemoveHandler = |
| new DataNodeRemoveHandler((ConfigManager) configManager); |
| DataNodeToStatusResp preCheckStatus = |
| dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan); |
| if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| LOGGER.error( |
| "the remove Data Node request check failed. req: {}, check result: {}", |
| removeDataNodePlan, |
| preCheckStatus.getStatus()); |
| return preCheckStatus; |
| } |
| // if add request to queue, then return to client |
| DataNodeToStatusResp dataSet = new DataNodeToStatusResp(); |
| boolean registerSucceed = |
| configManager.getProcedureManager().removeDataNode(removeDataNodePlan); |
| TSStatus status; |
| if (registerSucceed) { |
| status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
| status.setMessage("Server accept the request"); |
| } else { |
| status = new TSStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode()); |
| status.setMessage("Server reject the request, maybe request is too much"); |
| } |
| dataSet.setStatus(status); |
| |
| LOGGER.info("Node manager finished to remove DataNode {}", removeDataNodePlan); |
| return dataSet; |
| } |
| |
| /** |
| * Get TDataNodeConfiguration |
| * |
| * @param req GetDataNodeConfigurationPlan |
| * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in |
| * GetDataNodeConfigurationPlan is -1 |
| */ |
| public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) { |
| return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset(); |
| } |
| |
| /** |
| * Only leader use this interface |
| * |
| * @return The number of registered DataNodes |
| */ |
| public int getRegisteredDataNodeCount() { |
| return nodeInfo.getRegisteredDataNodeCount(); |
| } |
| |
| /** |
| * Only leader use this interface |
| * |
| * @return The number of registered TotalNodes |
| */ |
| public int getRegisteredNodeCount() { |
| return nodeInfo.getRegisteredNodeCount(); |
| } |
| |
| /** |
| * Only leader use this interface |
| * |
| * @return The number of total cpu cores in online DataNodes |
| */ |
| public int getTotalCpuCoreCount() { |
| return nodeInfo.getTotalCpuCoreCount(); |
| } |
| |
| /** |
| * Only leader use this interface |
| * |
| * @return All registered DataNodes |
| */ |
| public List<TDataNodeConfiguration> getRegisteredDataNodes() { |
| return nodeInfo.getRegisteredDataNodes(); |
| } |
| |
| public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() { |
| Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>(); |
| nodeInfo |
| .getRegisteredDataNodes() |
| .forEach( |
| dataNodeConfiguration -> |
| dataNodeLocations.put( |
| dataNodeConfiguration.getLocation().getDataNodeId(), |
| dataNodeConfiguration.getLocation())); |
| return dataNodeLocations; |
| } |
| |
| private INodeCache getNodeCache(int nodeId) { |
| return getLoadManager().getNodeCacheMap().get(nodeId); |
| } |
| |
| private String getNodeStatus(int nodeId) { |
| INodeCache nodeCache = getNodeCache(nodeId); |
| return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus(); |
| } |
| |
| public List<TDataNodeInfo> getRegisteredDataNodeInfoList() { |
| List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>(); |
| List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes(); |
| if (registeredDataNodes != null) { |
| registeredDataNodes.forEach( |
| (dataNodeInfo) -> { |
| TDataNodeInfo info = new TDataNodeInfo(); |
| int dataNodeId = dataNodeInfo.getLocation().getDataNodeId(); |
| info.setDataNodeId(dataNodeId); |
| info.setStatus(getNodeStatus(dataNodeId)); |
| info.setRpcAddresss(dataNodeInfo.getLocation().getClientRpcEndPoint().getIp()); |
| info.setRpcPort(dataNodeInfo.getLocation().getClientRpcEndPoint().getPort()); |
| info.setDataRegionNum(0); |
| info.setSchemaRegionNum(0); |
| dataNodeInfoList.add(info); |
| }); |
| } |
| return dataNodeInfoList; |
| } |
| |
| public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() { |
| List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>(); |
| List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes(); |
| if (registeredConfigNodes != null) { |
| registeredConfigNodes.forEach( |
| (configNodeLocation) -> { |
| TConfigNodeInfo info = new TConfigNodeInfo(); |
| int configNodeId = configNodeLocation.getConfigNodeId(); |
| info.setConfigNodeId(configNodeId); |
| info.setStatus(getNodeStatus(configNodeId)); |
| info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp()); |
| info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort()); |
| configNodeInfoList.add(info); |
| }); |
| } |
| configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId)); |
| return configNodeInfoList; |
| } |
| |
| /** |
| * Only leader use this interface, record the new ConfigNode's information |
| * |
| * @param configNodeLocation The new ConfigNode |
| */ |
| public void applyConfigNode(TConfigNodeLocation configNodeLocation) { |
| // Generate new ConfigNode's index |
| configNodeLocation.setConfigNodeId(nodeInfo.generateNextNodeId()); |
| ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation); |
| getConsensusManager().write(applyConfigNodePlan); |
| } |
| |
| public void addMetrics() { |
| nodeInfo.addMetrics(); |
| } |
| |
| public TSStatus removeConfigNodePeer(TConfigNodeLocation tConfigNodeLocation) { |
| removeConfigNodeLock.tryLock(); |
| try { |
| // Execute removePeer |
| if (getConsensusManager().removeConfigNodePeer(tConfigNodeLocation)) { |
| configManager |
| .getLoadManager() |
| .removeNodeHeartbeatHandCache(tConfigNodeLocation.getConfigNodeId()); |
| return getConsensusManager() |
| .write(new RemoveConfigNodePlan(tConfigNodeLocation)) |
| .getStatus(); |
| } else { |
| return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()) |
| .setMessage( |
| "Remove ConfigNode failed because update ConsensusGroup peer information failed."); |
| } |
| } finally { |
| removeConfigNodeLock.unlock(); |
| } |
| } |
| |
| public TSStatus checkConfigNode(RemoveConfigNodePlan removeConfigNodePlan) { |
| removeConfigNodeLock.tryLock(); |
| try { |
| // Check OnlineConfigNodes number |
| if (getLoadManager().getOnlineConfigNodes().size() <= 1) { |
| return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()) |
| .setMessage( |
| "Remove ConfigNode failed because there is only one ConfigNode in current Cluster."); |
| } |
| |
| // Check whether the registeredConfigNodes contain the ConfigNode to be removed. |
| if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) { |
| return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()) |
| .setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster."); |
| } |
| |
| // Check whether the remove ConfigNode is leader |
| TConfigNodeLocation leader = getConsensusManager().getLeader(); |
| if (leader == null) { |
| return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()) |
| .setMessage( |
| "Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry."); |
| } |
| |
| if (leader |
| .getInternalEndPoint() |
| .equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) { |
| // transfer leader |
| return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId()); |
| } |
| |
| } finally { |
| removeConfigNodeLock.unlock(); |
| } |
| |
| return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()) |
| .setMessage("Success remove confignode."); |
| } |
| |
| private TSStatus transferLeader( |
| RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) { |
| TConfigNodeLocation newLeader = |
| getLoadManager().getOnlineConfigNodes().stream() |
| .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation())) |
| .findAny() |
| .get(); |
| ConsensusGenericResponse resp = |
| getConsensusManager() |
| .getConsensusImpl() |
| .transferLeader(groupId, new Peer(groupId, newLeader.getConsensusEndPoint())); |
| if (!resp.isSuccess()) { |
| return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()) |
| .setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed."); |
| } |
| return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode()) |
| .setRedirectNode(newLeader.getInternalEndPoint()) |
| .setMessage( |
| "The ConfigNode to be removed is leader, already transfer Leader to " |
| + newLeader |
| + "."); |
| } |
| |
| public List<TSStatus> merge(TMergeReq req) { |
| Map<Integer, TDataNodeLocation> dataNodeLocationMap = |
| configManager.getNodeManager().getRegisteredDataNodeLocations(); |
| if (req.dataNodeId != -1) { |
| dataNodeLocationMap = |
| dataNodeLocationMap.entrySet().stream() |
| .filter((e) -> req.dataNodeId == e.getKey()) |
| .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue())); |
| } |
| List<TSStatus> dataNodeResponseStatus = |
| Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size())); |
| AsyncDataNodeClientPool.getInstance() |
| .sendAsyncRequestToDataNodeWithRetry( |
| null, dataNodeLocationMap, DataNodeRequestType.MERGE, dataNodeResponseStatus); |
| return dataNodeResponseStatus; |
| } |
| |
| public List<TSStatus> flush(TFlushReq req) { |
| Map<Integer, TDataNodeLocation> dataNodeLocationMap = |
| configManager.getNodeManager().getRegisteredDataNodeLocations(); |
| if (req.dataNodeId != -1) { |
| dataNodeLocationMap = |
| dataNodeLocationMap.entrySet().stream() |
| .filter((e) -> req.dataNodeId == e.getKey()) |
| .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue())); |
| } |
| List<TSStatus> dataNodeResponseStatus = |
| Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size())); |
| AsyncDataNodeClientPool.getInstance() |
| .sendAsyncRequestToDataNodeWithRetry( |
| req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus); |
| return dataNodeResponseStatus; |
| } |
| |
| public List<TSStatus> clearCache(TClearCacheReq req) { |
| Map<Integer, TDataNodeLocation> dataNodeLocationMap = |
| configManager.getNodeManager().getRegisteredDataNodeLocations(); |
| if (req.dataNodeId != -1) { |
| dataNodeLocationMap = |
| dataNodeLocationMap.entrySet().stream() |
| .filter((e) -> req.dataNodeId == e.getKey()) |
| .collect(Collectors.toMap((e) -> e.getKey(), (e) -> e.getValue())); |
| } |
| List<TSStatus> dataNodeResponseStatus = |
| Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size())); |
| AsyncDataNodeClientPool.getInstance() |
| .sendAsyncRequestToDataNodeWithRetry( |
| null, dataNodeLocationMap, DataNodeRequestType.CLEAR_CACHE, dataNodeResponseStatus); |
| return dataNodeResponseStatus; |
| } |
| |
| public List<TConfigNodeLocation> getRegisteredConfigNodes() { |
| return nodeInfo.getRegisteredConfigNodes(); |
| } |
| |
| private ConsensusManager getConsensusManager() { |
| return configManager.getConsensusManager(); |
| } |
| |
| private ClusterSchemaManager getClusterSchemaManager() { |
| return configManager.getClusterSchemaManager(); |
| } |
| |
| private LoadManager getLoadManager() { |
| return configManager.getLoadManager(); |
| } |
| } |