blob: d67601110325c0189e6078f1cc2bebef37230115 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.ServerCommandLine;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isNumeric;
public class DataNodeServerCommandLine extends ServerCommandLine {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeServerCommandLine.class);
// join an established cluster
public static final String MODE_START = "-s";
// send a request to remove a node, more arguments: ip-of-removed-node
// metaport-of-removed-node
public static final String MODE_REMOVE = "-r";
private static final String USAGE =
"Usage: <-s|-r> "
+ "[-D{} <configure folder>] \n"
+ "-s: start the node to the cluster\n"
+ "-r: remove the node out of the cluster\n";
@Override
protected String getUsage() {
return USAGE;
}
@Override
protected int run(String[] args) throws Exception {
if (args.length < 1) {
usage(null);
return -1;
}
DataNode dataNode = DataNode.getInstance();
String mode = args[0];
LOGGER.info("Running mode {}", mode);
// Start IoTDB kernel first, then start the cluster module
if (MODE_START.equals(mode)) {
dataNode.doAddNode();
} else if (MODE_REMOVE.equals(mode)) {
doRemoveDataNode(args);
} else {
LOGGER.error("Unrecognized mode {}", mode);
}
return 0;
}
/**
* remove datanodes from cluster
*
* @param args id or ip:rpc_port for removed datanode
*/
private void doRemoveDataNode(String[] args)
throws BadNodeUrlException, TException, IoTDBException, ClientManagerException {
if (args.length != 2) {
LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
return;
}
LOGGER.info("Starting to remove DataNode from cluster, parameter: {}, {}", args[0], args[1]);
// Load ConfigNodeList from system.properties file
ConfigNodeInfo.getInstance().loadConfigNodeList();
List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
if (dataNodeLocations.isEmpty()) {
throw new BadNodeUrlException("No DataNode to remove");
}
LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations);
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
try (ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
LOGGER.info("Remove result {} ", removeResp);
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new IoTDBException(
removeResp.getStatus().toString(), removeResp.getStatus().getCode());
}
LOGGER.info(
"Submit remove-datanode request successfully, but the process may fail. "
+ "more details are shown in the logs of confignode-leader and removed-datanode, "
+ "and after the process of removing datanode ends successfully, "
+ "you are supposed to delete directory and data of the removed-datanode manually");
}
}
/**
* fetch all datanode info from ConfigNode, then compare with input 'args'
*
* @param args datanode id or ip:rpc_port
* @return TDataNodeLocation list
*/
private List<TDataNodeLocation> buildDataNodeLocations(String args) {
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
if (args == null || args.trim().isEmpty()) {
return dataNodeLocations;
}
// Now support only single datanode deletion
if (args.split(",").length > 1) {
LOGGER.info("Incorrect input format, usage: <id>/<ip>:<rpc-port>");
return dataNodeLocations;
}
// Below supports multiple datanode deletion, split by ',', and is reserved for extension
try {
List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args);
try (ConfigNodeClient client =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
dataNodeLocations =
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
.map(TDataNodeConfiguration::getLocation)
.filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
.collect(Collectors.toList());
} catch (TException | ClientManagerException e) {
LOGGER.error("Get data node locations failed", e);
}
} catch (BadNodeUrlException e) {
try (ConfigNodeClient client =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
for (String id : args.split(",")) {
if (!isNumeric(id)) {
LOGGER.warn("Incorrect id format {}, skipped...", id);
continue;
}
List<TDataNodeLocation> nodeLocationResult =
client
.getDataNodeConfiguration(Integer.parseInt(id))
.getDataNodeConfigurationMap()
.values()
.stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
if (nodeLocationResult.isEmpty()) {
LOGGER.warn("DataNode {} is not in cluster, skipped...", id);
continue;
}
if (!dataNodeLocations.contains(nodeLocationResult.get(0))) {
dataNodeLocations.add(nodeLocationResult.get(0));
}
}
} catch (TException | ClientManagerException e1) {
LOGGER.error("Get data node locations failed", e);
}
}
return dataNodeLocations;
}
}