blob: 7661d25ee7f37050b0a0d87b0c8d0716fa947f28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Formatter;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Preconditions;
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster.
* The tool is deployed as an application program that can be run by the
* cluster administrator on a live HDFS cluster while applications
* adding and deleting files.
*
* <p>SYNOPSIS
* <pre>
* To start:
* bin/start-balancer.sh [-threshold <threshold>]
* Example: bin/ start-balancer.sh
* start the balancer with a default threshold of 10%
* bin/ start-balancer.sh -threshold 5
* start the balancer with a threshold of 5%
* To stop:
* bin/ stop-balancer.sh
* </pre>
*
* <p>DESCRIPTION
* <p>The threshold parameter is a fraction in the range of (1%, 100%) with a
* default value of 10%. The threshold sets a target for whether the cluster
* is balanced. A cluster is balanced if for each datanode, the utilization
* of the node (ratio of used space at the node to total capacity of the node)
* differs from the utilization of the (ratio of used space in the cluster
* to total capacity of the cluster) by no more than the threshold value.
* The smaller the threshold, the more balanced a cluster will become.
* It takes more time to run the balancer for small threshold values.
* Also for a very small threshold the cluster may not be able to reach the
* balanced state when applications write and delete files concurrently.
*
* <p>The tool moves blocks from highly utilized datanodes to poorly
* utilized datanodes iteratively. In each iteration a datanode moves or
* receives no more than the lesser of 10G bytes or the threshold fraction
* of its capacity. Each iteration runs no more than 20 minutes.
* At the end of each iteration, the balancer obtains updated datanodes
* information from the namenode.
*
* <p>A system property that limits the balancer's use of bandwidth is
* defined in the default configuration file:
* <pre>
* <property>
* <name>dfs.balance.bandwidthPerSec</name>
* <value>1048576</value>
* <description> Specifies the maximum bandwidth that each datanode
* can utilize for the balancing purpose in term of the number of bytes
* per second. </description>
* </property>
* </pre>
*
* <p>This property determines the maximum speed at which a block will be
* moved from one datanode to another. The default value is 1MB/s. The higher
* the bandwidth, the faster a cluster can reach the balanced state,
* but with greater competition with application processes. If an
* administrator changes the value of this property in the configuration
* file, the change is observed when HDFS is next restarted.
*
* <p>MONITERING BALANCER PROGRESS
* <p>After the balancer is started, an output file name where the balancer
* progress will be recorded is printed on the screen. The administrator
* can monitor the running of the balancer by reading the output file.
* The output shows the balancer's status iteration by iteration. In each
* iteration it prints the starting time, the iteration number, the total
* number of bytes that have been moved in the previous iterations,
* the total number of bytes that are left to move in order for the cluster
* to be balanced, and the number of bytes that are being moved in this
* iteration. Normally "Bytes Already Moved" is increasing while "Bytes Left
* To Move" is decreasing.
*
* <p>Running multiple instances of the balancer in an HDFS cluster is
* prohibited by the tool.
*
* <p>The balancer automatically exits when any of the following five
* conditions is satisfied:
* <ol>
* <li>The cluster is balanced;
* <li>No block can be moved;
* <li>No block has been moved for five consecutive iterations;
* <li>An IOException occurs while communicating with the namenode;
* <li>Another balancer is running.
* </ol>
*
* <p>Upon exit, a balancer returns an exit code and prints one of the
* following messages to the output file in corresponding to the above exit
* reasons:
* <ol>
* <li>The cluster is balanced. Exiting
* <li>No block can be moved. Exiting...
* <li>No block has been moved for 5 iterations. Exiting...
* <li>Received an IO exception: failure reason. Exiting...
* <li>Another balancer is running. Exiting...
* </ol>
*
* <p>The administrator can interrupt the execution of the balancer at any
* time by running the command "stop-balancer.sh" on the machine where the
* balancer is running.
*/
@InterfaceAudience.Private
public class Balancer {
static final Log LOG = LogFactory.getLog(Balancer.class);
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final long GB = 1L << 30; //1GB
private static final long MAX_SIZE_TO_MOVE = 10*GB;
private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName()
+ "\n\t[-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
+ BalancingPolicy.Pool.INSTANCE.getName()
+ "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+ "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+ "\tExcludes the specified datanodes."
+ "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+ "\tIncludes only the specified datanodes.";
private final Dispatcher dispatcher;
private final BalancingPolicy policy;
private final double threshold;
// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
private final Collection<StorageGroup> belowAvgUtilized
= new LinkedList<StorageGroup>();
private final Collection<StorageGroup> underUtilized
= new LinkedList<StorageGroup>();
/* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode.
*/
private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException {
if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof
BlockPlacementPolicyDefault)) {
throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault");
}
}
/**
* Construct a balancer.
* Initialize balancer. It sets the value of the threshold, and
* builds the communication proxies to
* namenode as a client and a secondary namenode and retry proxies
* when connection fails.
*/
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
final int moverThreads = conf.getInt(
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
final int dispatcherThreads = conf.getInt(
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, conf);
this.threshold = p.threshold;
this.policy = p.policy;
}
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
long capacity = 0L;
for(StorageReport r : report.getStorageReports()) {
if (r.getStorage().getStorageType() == t) {
capacity += r.getCapacity();
}
}
return capacity;
}
private static long getRemaining(DatanodeStorageReport report, StorageType t) {
long remaining = 0L;
for(StorageReport r : report.getStorageReports()) {
if (r.getStorage().getStorageType() == t) {
remaining += r.getRemaining();
}
}
return remaining;
}
/**
* Given a datanode storage set, build a network topology and decide
* over-utilized storages, above average utilized storages,
* below average utilized storages, and underutilized storages.
* The input datanode storage set is shuffled in order to randomize
* to the storage matching later on.
*
* @return the number of bytes needed to move in order to balance the cluster.
*/
private long init(List<DatanodeStorageReport> reports) {
// compute average utilization
for (DatanodeStorageReport r : reports) {
policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
// create network topology and classify utilization collections:
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r);
for(StorageType t : StorageType.asList()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
continue;
}
final long capacity = getCapacity(r, t);
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, threshold);
final StorageGroup g;
if (utilizationDiff > 0) {
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
} else {
overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
overUtilized.add(s);
}
g = s;
} else {
g = dn.addStorageGroup(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
} else {
underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
underUtilized.add(g);
}
}
dispatcher.getStorageGroupMap().put(g);
}
}
logUtilizationCollections();
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+ belowAvgUtilized.size(),
"Mismatched number of storage groups");
// return number of bytes to be moved in order to make the cluster balanced
return Math.max(overLoadedBytes, underLoadedBytes);
}
private static long computeMaxSize2Move(final long capacity, final long remaining,
final double utilizationDiff, final double threshold) {
final double diff = Math.min(threshold, Math.abs(utilizationDiff));
long maxSizeToMove = precentage2bytes(diff, capacity);
if (utilizationDiff < 0) {
maxSizeToMove = Math.min(remaining, maxSizeToMove);
}
return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
}
private static long precentage2bytes(double precentage, long capacity) {
Preconditions.checkArgument(precentage >= 0,
"precentage = " + precentage + " < 0");
return (long)(precentage * capacity / 100.0);
}
/* log the over utilized & under utilized nodes */
private void logUtilizationCollections() {
logUtilizationCollection("over-utilized", overUtilized);
if (LOG.isTraceEnabled()) {
logUtilizationCollection("above-average", aboveAvgUtilized);
logUtilizationCollection("below-average", belowAvgUtilized);
}
logUtilizationCollection("underutilized", underUtilized);
}
private static <T extends StorageGroup>
void logUtilizationCollection(String name, Collection<T> items) {
LOG.info(items.size() + " " + name + ": " + items);
}
/**
* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
* Maximum bytes to be moved per storage group is
* min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
* @return total number of bytes to move in this iteration
*/
private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware
if (dispatcher.getCluster().isNodeGroupAware()) {
chooseStorageGroups(Matcher.SAME_NODE_GROUP);
}
// Then, match nodes on the same rack
chooseStorageGroups(Matcher.SAME_RACK);
// At last, match all remaining nodes
chooseStorageGroups(Matcher.ANY_OTHER);
return dispatcher.bytesToMove();
}
/** Decide all <source, target> pairs according to the matcher. */
private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}
/**
* For each datanode, choose matching nodes from the candidates. Either the
* datanodes or the candidates are source nodes with (utilization > Avg), and
* the others are target nodes with (utilization < Avg).
*/
private <G extends StorageGroup, C extends StorageGroup>
void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
Matcher matcher) {
for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
final G g = i.next();
for(; choose4One(g, candidates, matcher); );
if (!g.hasSpaceForScheduling()) {
i.remove();
}
}
}
/**
* For the given datanode, choose a candidate and then schedule it.
* @return true if a candidate is chosen; false if no candidates is chosen.
*/
private <C extends StorageGroup> boolean choose4One(StorageGroup g,
Collection<C> candidates, Matcher matcher) {
final Iterator<C> i = candidates.iterator();
final C chosen = chooseCandidate(g, i, matcher);
if (chosen == null) {
return false;
}
if (g instanceof Source) {
matchSourceWithTargetToMove((Source)g, chosen);
} else {
matchSourceWithTargetToMove((Source)chosen, g);
}
if (!chosen.hasSpaceForScheduling()) {
i.remove();
}
return true;
}
private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
final Task task = new Task(target, size);
source.addTask(task);
target.incScheduledSize(task.getSize());
dispatcher.add(source, target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ source.getDisplayName() + " to " + target.getDisplayName());
}
/** Choose a candidate for the given datanode. */
private <G extends StorageGroup, C extends StorageGroup>
C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
if (g.hasSpaceForScheduling()) {
for(; candidates.hasNext(); ) {
final C c = candidates.next();
if (!c.hasSpaceForScheduling()) {
candidates.remove();
} else if (matcher.match(dispatcher.getCluster(),
g.getDatanodeInfo(), c.getDatanodeInfo())) {
return c;
}
}
}
return null;
}
/* reset all fields in a balancer preparing for the next iteration */
private void resetData(Configuration conf) {
this.overUtilized.clear();
this.aboveAvgUtilized.clear();
this.belowAvgUtilized.clear();
this.underUtilized.clear();
this.policy.reset();
dispatcher.reset(conf);;
}
/** Run an iteration for all datanodes. */
private ExitStatus run(int iteration, Formatter formatter,
Configuration conf) {
try {
final List<DatanodeStorageReport> reports = dispatcher.init();
final long bytesLeftToMove = init(reports);
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
return ExitStatus.SUCCESS;
} else {
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ " to make the cluster balanced." );
}
/* Decide all the nodes that will participate in the block move and
* the number of bytes that need to be moved from one node to another
* in this iteration. Maximum bytes to be moved per node is
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/
final long bytesToMove = chooseStorageGroups();
if (bytesToMove == 0) {
System.out.println("No block can be moved. Exiting...");
return ExitStatus.NO_MOVE_BLOCK;
} else {
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
" in this iteration");
}
formatter.format("%-24s %10d %19s %18s %17s%n",
DateFormat.getDateTimeInstance().format(new Date()),
iteration,
StringUtils.byteDesc(dispatcher.getBytesMoved()),
StringUtils.byteDesc(bytesLeftToMove),
StringUtils.byteDesc(bytesToMove)
);
/* For each pair of <source, target>, start a thread that repeatedly
* decide a block to be moved and its proxy source,
* then initiates the move until all bytes are moved or no more block
* available to move.
* Exit no byte has been moved for 5 consecutive iterations.
*/
if (!dispatcher.dispatchAndCheckContinue()) {
return ExitStatus.NO_MOVE_PROGRESS;
}
return ExitStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS;
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION;
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.INTERRUPTED;
} finally {
dispatcher.shutdownNow();
}
}
/**
* Balance all namenodes.
* For each iteration,
* for each namenode,
* execute a {@link Balancer} to work through all datanodes once.
*/
static int run(Collection<URI> namenodes, final Parameters p,
Configuration conf) throws IOException, InterruptedException {
final long sleeptime = 2000*conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p);
final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
final List<NameNodeConnector> connectors
= new ArrayList<NameNodeConnector>(namenodes.size());
try {
for (URI uri : namenodes) {
final NameNodeConnector nnc = new NameNodeConnector(
Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
}
boolean done = false;
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
final ExitStatus r = b.run(iteration, formatter, conf);
// clean all lists
b.resetData(conf);
if (r == ExitStatus.IN_PROGRESS) {
done = false;
} else if (r != ExitStatus.SUCCESS) {
//must be an error statue, return.
return r.getExitCode();
}
}
if (!done) {
Thread.sleep(sleeptime);
}
}
} finally {
for(NameNodeConnector nnc : connectors) {
nnc.close();
}
}
return ExitStatus.SUCCESS.getExitCode();
}
/* Given elaspedTime in ms, return a printable string */
private static String time2Str(long elapsedTime) {
String unit;
double time = elapsedTime;
if (elapsedTime < 1000) {
unit = "milliseconds";
} else if (elapsedTime < 60*1000) {
unit = "seconds";
time = time/1000;
} else if (elapsedTime < 3600*1000) {
unit = "minutes";
time = time/(60*1000);
} else {
unit = "hours";
time = time/(3600*1000);
}
return time+" "+unit;
}
static class Parameters {
static final Parameters DEFAULT = new Parameters(
BalancingPolicy.Node.INSTANCE, 10.0,
Collections.<String> emptySet(), Collections.<String> emptySet());
final BalancingPolicy policy;
final double threshold;
// exclude the nodes in this set from balancing operations
Set<String> nodesToBeExcluded;
//include only these nodes in balancing operations
Set<String> nodesToBeIncluded;
Parameters(BalancingPolicy policy, double threshold,
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
this.policy = policy;
this.threshold = threshold;
this.nodesToBeExcluded = nodesToBeExcluded;
this.nodesToBeIncluded = nodesToBeIncluded;
}
@Override
public String toString() {
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
+ "[" + policy + ", threshold=" + threshold +
", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
}
}
static class Cli extends Configured implements Tool {
/**
* Parse arguments and then run Balancer.
*
* @param args command specific arguments.
* @return exit code. 0 indicates success, non-zero indicates failure.
*/
@Override
public int run(String[] args) {
final long startTime = Time.now();
final Configuration conf = getConf();
try {
checkReplicationPolicyCompatibility(conf);
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION.getExitCode();
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.INTERRUPTED.getExitCode();
} finally {
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
}
}
/** parse command line arguments */
static Parameters parse(String[] args) {
BalancingPolicy policy = Parameters.DEFAULT.policy;
double threshold = Parameters.DEFAULT.threshold;
Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
if (args != null) {
try {
for(int i = 0; i < args.length; i++) {
if ("-threshold".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"Threshold value is missing: args = " + Arrays.toString(args));
try {
threshold = Double.parseDouble(args[i]);
if (threshold < 1 || threshold > 100) {
throw new IllegalArgumentException(
"Number out of range: threshold = " + threshold);
}
LOG.info( "Using a threshold of " + threshold );
} catch(IllegalArgumentException e) {
System.err.println(
"Expecting a number in the range of [1.0, 100.0]: "
+ args[i]);
throw e;
}
} else if ("-policy".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"Policy value is missing: args = " + Arrays.toString(args));
try {
policy = BalancingPolicy.parse(args[i]);
} catch(IllegalArgumentException e) {
System.err.println("Illegal policy name: " + args[i]);
throw e;
}
} else if ("-exclude".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"List of nodes to exclude | -f <filename> is missing: args = "
+ Arrays.toString(args));
if ("-f".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"File containing nodes to exclude is not specified: args = "
+ Arrays.toString(args));
nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
} else {
nodesTobeExcluded = Util.parseHostList(args[i]);
}
} else if ("-include".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"List of nodes to include | -f <filename> is missing: args = "
+ Arrays.toString(args));
if ("-f".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"File containing nodes to include is not specified: args = "
+ Arrays.toString(args));
nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
} else {
nodesTobeIncluded = Util.parseHostList(args[i]);
}
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));
}
}
checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(),
"-exclude and -include options cannot be specified together.");
} catch(RuntimeException e) {
printUsage(System.err);
throw e;
}
}
return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
}
private static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
}
}
/**
* Run a balancer
* @param args Command line arguments
*/
public static void main(String[] args) {
if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
System.exit(0);
}
try {
System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
} catch (Throwable e) {
LOG.error("Exiting balancer due an exception", e);
System.exit(-1);
}
}
}