| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.service; |
| |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.commons.ServerCommandLine; |
| import org.apache.iotdb.commons.exception.BadNodeUrlException; |
| import org.apache.iotdb.commons.exception.ConfigurationException; |
| import org.apache.iotdb.commons.utils.NodeUrlUtils; |
| import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp; |
| import org.apache.iotdb.db.client.ConfigNodeClient; |
| import org.apache.iotdb.db.client.ConfigNodeInfo; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.conf.IoTDBStopCheck; |
| |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| public class DataNodeServerCommandLine extends ServerCommandLine { |
| |
| private static final Logger logger = LoggerFactory.getLogger(DataNodeServerCommandLine.class); |
| |
| // join an established cluster |
| private static final String MODE_START = "-s"; |
| // send a request to remove a node, more arguments: ip-of-removed-node |
| // metaport-of-removed-node |
| private static final String MODE_REMOVE = "-r"; |
| |
| private static final String USAGE = |
| "Usage: <-s|-r> " |
| + "[-D{} <configure folder>] \n" |
| + "-s: start the node to the cluster\n" |
| + "-r: remove the node out of the cluster\n"; |
| |
| @Override |
| protected String getUsage() { |
| return USAGE; |
| } |
| |
| @Override |
| protected int run(String[] args) { |
| if (args.length < 1) { |
| usage(null); |
| return -1; |
| } |
| |
| DataNode dataNode = DataNode.getInstance(); |
| // check config of iotdb,and set some configs in cluster mode |
| try { |
| dataNode.serverCheckAndInit(); |
| } catch (ConfigurationException | IOException e) { |
| logger.error("meet error when doing start checking", e); |
| return -1; |
| } |
| String mode = args[0]; |
| logger.info("Running mode {}", mode); |
| |
| // initialize the current node and its services |
| if (!dataNode.initLocalEngines()) { |
| logger.error("initLocalEngines error, stop process!"); |
| return -1; |
| } |
| |
| // we start IoTDB kernel first. then we start the cluster module. |
| if (MODE_START.equals(mode)) { |
| dataNode.doAddNode(args); |
| } else if (MODE_REMOVE.equals(mode)) { |
| doRemoveNode(args); |
| } else { |
| logger.error("Unrecognized mode {}", mode); |
| } |
| return 0; |
| } |
| |
| /** |
| * remove datanodes from cluster |
| * |
| * @param args IPs for removed datanodes, split with ',' |
| */ |
| private void doRemoveNode(String[] args) { |
| try { |
| removePrepare(args); |
| removeNodesFromCluster(args); |
| removeTail(); |
| } catch (Exception e) { |
| logger.error("remove Data Nodes error", e); |
| } |
| } |
| |
| private void removePrepare(String[] args) throws BadNodeUrlException { |
| ConfigNodeInfo.getInstance() |
| .updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList()); |
| try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { |
| TDataNodeConfigurationResp resp = configNodeClient.getDataNodeConfiguration(-1); |
| // 1. online Data Node size - removed Data Node size < replication,NOT ALLOW remove |
| // But replication size is set in Config Node's configuration, so check it in remote Config |
| // Node |
| |
| // 2. removed Data Node IP not contained in below map, CAN NOT remove. |
| Map<Integer, TDataNodeConfiguration> nodeIdToNodeConfiguration = |
| resp.getDataNodeConfigurationMap(); |
| List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args[1]); |
| List<String> removedDataNodeIps = |
| endPoints.stream().map(TEndPoint::getIp).collect(Collectors.toList()); |
| |
| List<String> onlineDataNodeIps = |
| nodeIdToNodeConfiguration.values().stream() |
| .map(TDataNodeConfiguration::getLocation) |
| .map(TDataNodeLocation::getInternalEndPoint) |
| .map(TEndPoint::getIp) |
| .collect(Collectors.toList()); |
| IoTDBStopCheck.getInstance().checkIpInCluster(removedDataNodeIps, onlineDataNodeIps); |
| } catch (TException e) { |
| logger.error("remove Data Nodes check failed", e); |
| } |
| } |
| |
| private void removeNodesFromCluster(String[] args) throws BadNodeUrlException { |
| logger.info("start to remove DataNode from cluster"); |
| List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]); |
| if (dataNodeLocations.isEmpty()) { |
| logger.error("data nodes location is empty"); |
| // throw Exception OR return? |
| return; |
| } |
| logger.info( |
| "there has data nodes location will be removed. size is: {}, detail: {}", |
| dataNodeLocations.size(), |
| dataNodeLocations); |
| TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations); |
| try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { |
| TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq); |
| logger.info("Remove result {} ", removeResp.toString()); |
| } catch (TException e) { |
| logger.error("send remove Data Node request failed!", e); |
| } |
| } |
| |
| /** |
| * fetch all datanode info from ConfigNode, then compare with input 'ips' |
| * |
| * @param endPorts data node ip:port, split with ',' |
| * @return TDataNodeLocation list |
| */ |
| private List<TDataNodeLocation> buildDataNodeLocations(String endPorts) |
| throws BadNodeUrlException { |
| List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); |
| if (endPorts == null || endPorts.trim().isEmpty()) { |
| return dataNodeLocations; |
| } |
| |
| List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(endPorts); |
| |
| try (ConfigNodeClient client = new ConfigNodeClient()) { |
| dataNodeLocations = |
| client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream() |
| .map(TDataNodeConfiguration::getLocation) |
| .filter(location -> endPoints.contains(location.getClientRpcEndPoint())) |
| .collect(Collectors.toList()); |
| } catch (TException e) { |
| logger.error("get data node locations failed", e); |
| } |
| |
| if (endPoints.size() != dataNodeLocations.size()) { |
| logger.error( |
| "build DataNode locations error, " |
| + "because number of input ip NOT EQUALS the number of fetched DataNodeLocations, will return empty locations"); |
| return dataNodeLocations; |
| } |
| |
| return dataNodeLocations; |
| } |
| |
| private void removeTail() {} |
| } |