blob: eddef3369024b61047151f6ea5ee94ca27e525ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer.command;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.TextStringBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.HostsFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
* Common interface for command handling.
*/
public abstract class Command extends Configured implements Closeable {
private static final ObjectReader READER =
new ObjectMapper().readerFor(HashMap.class);
static final Logger LOG = LoggerFactory.getLogger(Command.class);
private Map<String, String> validArgs = new HashMap<>();
private URI clusterURI;
private FileSystem fs = null;
private DiskBalancerCluster cluster = null;
private int topNodes;
private PrintStream ps;
private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer");
private Path diskBalancerLogs;
/**
* Constructs a command.
*/
public Command(Configuration conf) {
this(conf, System.out);
}
/**
* Constructs a command.
*/
public Command(Configuration conf, final PrintStream ps) {
super(conf);
// These arguments are valid for all commands.
topNodes = 0;
this.ps = ps;
}
/**
* Cleans any resources held by this command.
* <p>
* The main goal is to delete id file created in
* {@link org.apache.hadoop.hdfs.server.balancer
* .NameNodeConnector#checkAndMarkRunning}
* , otherwise, it's not allowed to run multiple commands in a row.
* </p>
*/
@Override
public void close() throws IOException {
if (fs != null) {
fs.close();
}
}
/**
* Gets printing stream.
* @return print stream
*/
PrintStream getPrintStream() {
return ps;
}
/**
* Executes the Client Calls.
*
* @param cmd - CommandLine
* @throws Exception
*/
public abstract void execute(CommandLine cmd) throws Exception;
/**
* Gets extended help for this command.
*/
public abstract void printHelp();
/**
* Process the URI and return the cluster with nodes setup. This is used in
* all commands.
*
* @param cmd - CommandLine
* @return DiskBalancerCluster
* @throws Exception
*/
protected DiskBalancerCluster readClusterInfo(CommandLine cmd) throws
Exception {
Preconditions.checkNotNull(cmd);
setClusterURI(FileSystem.getDefaultUri(getConf()));
LOG.debug("using name node URI : {}", this.getClusterURI());
ClusterConnector connector = ConnectorFactory.getCluster(this.clusterURI,
getConf());
cluster = new DiskBalancerCluster(connector);
LOG.debug("Reading cluster info");
cluster.readClusterInfo();
return cluster;
}
/**
* Setup the outpath.
*
* @param path - Path or null to use default path.
* @throws IOException
*/
protected void setOutputPath(String path) throws IOException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MMM-dd-HH-mm-ss");
Date now = new Date();
fs = FileSystem.get(getClusterURI(), getConf());
if (path == null || path.isEmpty()) {
if (getClusterURI().getScheme().startsWith("file")) {
diskBalancerLogs = new Path(
System.getProperty("user.dir") + DEFAULT_LOG_DIR.toString() +
Path.SEPARATOR + format.format(now));
} else {
diskBalancerLogs = new Path(DEFAULT_LOG_DIR.toString() +
Path.SEPARATOR + format.format(now));
}
} else {
diskBalancerLogs = new Path(path);
}
if (fs.exists(diskBalancerLogs)) {
LOG.debug("Another Diskbalancer instance is running ? - Target " +
"Directory already exists. {}", diskBalancerLogs);
throw new IOException("Another DiskBalancer files already exist at the " +
"target location. " + diskBalancerLogs.toString());
}
fs.mkdirs(diskBalancerLogs);
}
/**
* Sets the nodes to process.
*
* @param node - Node
*/
protected void setNodesToProcess(DiskBalancerDataNode node) {
List<DiskBalancerDataNode> nodelist = new LinkedList<>();
nodelist.add(node);
setNodesToProcess(nodelist);
}
/**
* Sets the list of Nodes to process.
*
* @param nodes Nodes.
*/
protected void setNodesToProcess(List<DiskBalancerDataNode> nodes) {
if (cluster == null) {
throw new IllegalStateException("Set nodes to process invoked before " +
"initializing cluster. Illegal usage.");
}
cluster.setNodesToProcess(nodes);
}
/**
* Returns a DiskBalancer Node from the Cluster or null if not found.
*
* @param nodeName - can the hostname, IP address or UUID of the node.
* @return - DataNode if found.
*/
DiskBalancerDataNode getNode(String nodeName) {
DiskBalancerDataNode node = null;
if (nodeName == null || nodeName.isEmpty()) {
return node;
}
if (cluster.getNodes().size() == 0) {
return node;
}
node = cluster.getNodeByName(nodeName);
if (node != null) {
return node;
}
node = cluster.getNodeByIPAddress(nodeName);
if (node != null) {
return node;
}
node = cluster.getNodeByUUID(nodeName);
return node;
}
/**
* Gets the node set from a file or a string.
*
* @param listArg - String File URL or a comma separated list of node names.
* @return Set of node names
* @throws IOException
*/
protected Set<String> getNodeList(String listArg) throws IOException {
URL listURL;
String nodeData;
Set<String> resultSet = new TreeSet<>();
if ((listArg == null) || listArg.isEmpty()) {
return resultSet;
}
if (listArg.startsWith("file://")) {
listURL = new URL(listArg);
try {
HostsFileReader.readFileToSet("include",
Paths.get(listURL.getPath()).toString(), resultSet);
} catch (FileNotFoundException e) {
String warnMsg = String
.format("The input host file path '%s' is not a valid path. "
+ "Please make sure the host file exists.", listArg);
throw new DiskBalancerException(warnMsg,
DiskBalancerException.Result.INVALID_HOST_FILE_PATH);
}
} else {
nodeData = listArg;
String[] nodes = nodeData.split(",");
if (nodes.length == 0) {
String warnMsg = "The number of input nodes is 0. "
+ "Please input the valid nodes.";
throw new DiskBalancerException(warnMsg,
DiskBalancerException.Result.INVALID_NODE);
}
Collections.addAll(resultSet, nodes);
}
return resultSet;
}
/**
* Returns a DiskBalancer Node list from the Cluster or null if not found.
*
* @param listArg String File URL or a comma separated list of node names.
* @return List of DiskBalancer Node
* @throws IOException
*/
protected List<DiskBalancerDataNode> getNodes(String listArg)
throws IOException {
Set<String> nodeNames = null;
List<DiskBalancerDataNode> nodeList = Lists.newArrayList();
List<String> invalidNodeList = Lists.newArrayList();
if ((listArg == null) || listArg.isEmpty()) {
return nodeList;
}
nodeNames = getNodeList(listArg);
DiskBalancerDataNode node = null;
if (!nodeNames.isEmpty()) {
for (String name : nodeNames) {
node = getNode(name);
if (node != null) {
nodeList.add(node);
} else {
invalidNodeList.add(name);
}
}
}
if (!invalidNodeList.isEmpty()) {
String invalidNodes = StringUtils.join(invalidNodeList.toArray(), ",");
String warnMsg = String.format(
"The node(s) '%s' not found. "
+ "Please make sure that '%s' exists in the cluster.",
invalidNodes, invalidNodes);
throw new DiskBalancerException(warnMsg,
DiskBalancerException.Result.INVALID_NODE);
}
return nodeList;
}
/**
* Verifies if the command line options are sane.
*
* @param commandName - Name of the command
* @param cmd - Parsed Command Line
*/
protected void verifyCommandOptions(String commandName, CommandLine cmd) {
@SuppressWarnings("unchecked")
Iterator<Option> iter = cmd.iterator();
while (iter.hasNext()) {
Option opt = iter.next();
if (!validArgs.containsKey(opt.getLongOpt())) {
String errMessage = String
.format("%nInvalid argument found for command %s : %s%n",
commandName, opt.getLongOpt());
StringBuilder validArguments = new StringBuilder();
validArguments.append(String.format("Valid arguments are : %n"));
for (Map.Entry<String, String> args : validArgs.entrySet()) {
String key = args.getKey();
String desc = args.getValue();
String s = String.format("\t %s : %s %n", key, desc);
validArguments.append(s);
}
LOG.error(errMessage + validArguments.toString());
throw new IllegalArgumentException("Invalid Arguments found.");
}
}
}
/**
* Gets cluster URL.
*
* @return - URL
*/
public URI getClusterURI() {
return clusterURI;
}
/**
* Set cluster URL.
*
* @param clusterURI - URL
*/
public void setClusterURI(URI clusterURI) {
this.clusterURI = clusterURI;
}
/**
* Copied from DFSAdmin.java. -- Creates a connection to dataNode.
*
* @param datanode - dataNode.
* @return ClientDataNodeProtocol
* @throws IOException
*/
public ClientDatanodeProtocol getDataNodeProxy(String datanode)
throws IOException {
InetSocketAddress datanodeAddr = NetUtils.createSocketAddr(datanode);
// For datanode proxy the server principal should be DN's one.
getConf().set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
getConf().get(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client
ClientDatanodeProtocol dnProtocol =
DFSUtilClient.createClientDatanodeProtocolProxy(datanodeAddr, getUGI(),
getConf(), NetUtils.getSocketFactory(getConf(),
ClientDatanodeProtocol
.class));
return dnProtocol;
}
/**
* Returns UGI.
*
* @return UserGroupInformation.
* @throws IOException
*/
private static UserGroupInformation getUGI()
throws IOException {
return UserGroupInformation.getCurrentUser();
}
/**
* Returns a file created in the cluster.
*
* @param fileName - fileName to open.
* @return OutputStream.
* @throws IOException
*/
protected FSDataOutputStream create(String fileName) throws IOException {
Preconditions.checkNotNull(fileName);
if(fs == null) {
fs = FileSystem.get(getConf());
}
return fs.create(new Path(this.diskBalancerLogs, fileName));
}
/**
* Returns a InputStream to read data.
*/
protected FSDataInputStream open(String fileName) throws IOException {
Preconditions.checkNotNull(fileName);
if(fs == null) {
fs = FileSystem.get(getConf());
}
return fs.open(new Path(fileName));
}
/**
* Returns the output path where the plan and snapshot gets written.
*
* @return Path
*/
protected Path getOutputPath() {
return diskBalancerLogs;
}
/**
* Adds valid params to the valid args table.
*
* @param key
* @param desc
*/
protected void addValidCommandParameters(String key, String desc) {
validArgs.put(key, desc);
}
/**
* Returns the cluster.
*
* @return Cluster.
*/
@VisibleForTesting
DiskBalancerCluster getCluster() {
return cluster;
}
/**
* returns default top number of nodes.
* @return default top number of nodes.
*/
protected int getDefaultTop() {
return DiskBalancerCLI.DEFAULT_TOP;
}
/**
* Put output line to log and string buffer.
* */
protected void recordOutput(final TextStringBuilder result,
final String outputLine) {
LOG.info(outputLine);
result.appendln(outputLine);
}
/**
* Parse top number of nodes to be processed.
* @return top number of nodes to be processed.
*/
protected int parseTopNodes(final CommandLine cmd, final TextStringBuilder result)
throws IllegalArgumentException {
String outputLine = "";
int nodes = 0;
final String topVal = cmd.getOptionValue(DiskBalancerCLI.TOP);
if (StringUtils.isBlank(topVal)) {
outputLine = String.format(
"No top limit specified, using default top value %d.",
getDefaultTop());
LOG.info(outputLine);
result.appendln(outputLine);
nodes = getDefaultTop();
} else {
try {
nodes = Integer.parseInt(topVal);
} catch (NumberFormatException nfe) {
outputLine = String.format(
"Top limit input is not numeric, using default top value %d.",
getDefaultTop());
LOG.info(outputLine);
result.appendln(outputLine);
nodes = getDefaultTop();
}
if (nodes <= 0) {
throw new IllegalArgumentException(
"Top limit input should be a positive numeric value");
}
}
return Math.min(nodes, cluster.getNodes().size());
}
/**
* Reads the Physical path of the disks we are balancing. This is needed to
* make the disk balancer human friendly and not used in balancing.
*
* @param node - Disk Balancer Node.
*/
protected void populatePathNames(
DiskBalancerDataNode node) throws IOException {
// if the cluster is a local file system, there is no need to
// invoke rpc call to dataNode.
if (getClusterURI().getScheme().startsWith("file")) {
return;
}
String dnAddress = node.getDataNodeIP() + ":" + node.getDataNodePort();
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
String volumeNameJson = dnClient.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
@SuppressWarnings("unchecked")
Map<String, String> volumeMap =
READER.readValue(volumeNameJson);
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
for (DiskBalancerVolume vol : set.getVolumes()) {
if (volumeMap.containsKey(vol.getUuid())) {
vol.setPath(volumeMap.get(vol.getUuid()));
}
}
}
}
/**
* Set top number of nodes to be processed.
* */
public void setTopNodes(int topNodes) {
this.topNodes = topNodes;
}
/**
* Get top number of nodes to be processed.
* @return top number of nodes to be processed.
* */
public int getTopNodes() {
return topNodes;
}
/**
* Set DiskBalancer cluster
*/
@VisibleForTesting
public void setCluster(DiskBalancerCluster newCluster) {
this.cluster = newCluster;
}
}