blob: abddf1fe9c14bdab0492c10b95fe03faa91d3094 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.net.SocketFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes;
import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
public class DFSUtil {
public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
public static final byte[] EMPTY_BYTES = {};
/** Compare two byte arrays by lexicographical order. */
public static int compareBytes(byte[] left, byte[] right) {
if (left == null) {
left = EMPTY_BYTES;
}
if (right == null) {
right = EMPTY_BYTES;
}
return SignedBytes.lexicographicalComparator().compare(left, right);
}
private DFSUtil() { /* Hidden constructor */ }
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};
private static final ThreadLocal<SecureRandom> SECURE_RANDOM = new ThreadLocal<SecureRandom>() {
@Override
protected SecureRandom initialValue() {
return new SecureRandom();
}
};
/** @return a pseudo random number generator. */
public static Random getRandom() {
return RANDOM.get();
}
/** @return a pseudo secure random number generator. */
public static SecureRandom getSecureRandom() {
return SECURE_RANDOM.get();
}
/** Shuffle the elements in the given array. */
public static <T> T[] shuffle(final T[] array) {
if (array != null && array.length > 0) {
final Random random = getRandom();
for (int n = array.length; n > 1; ) {
final int randomIndex = random.nextInt(n);
n--;
if (n != randomIndex) {
final T tmp = array[randomIndex];
array[randomIndex] = array[n];
array[n] = tmp;
}
}
}
return array;
}
/**
* Compartor for sorting DataNodeInfo[] based on decommissioned states.
* Decommissioned nodes are moved to the end of the array on sorting with
* this compartor.
*/
public static final Comparator<DatanodeInfo> DECOM_COMPARATOR =
new Comparator<DatanodeInfo>() {
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
return a.isDecommissioned() == b.isDecommissioned() ? 0 :
a.isDecommissioned() ? 1 : -1;
}
};
/**
* Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
* Decommissioned/stale nodes are moved to the end of the array on sorting
* with this comparator.
*/
@InterfaceAudience.Private
public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
private long staleInterval;
/**
* Constructor of DecomStaleComparator
*
* @param interval
* The time interval for marking datanodes as stale is passed from
* outside, since the interval may be changed dynamically
*/
public DecomStaleComparator(long interval) {
this.staleInterval = interval;
}
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
// Decommissioned nodes will still be moved to the end of the list
if (a.isDecommissioned()) {
return b.isDecommissioned() ? 0 : 1;
} else if (b.isDecommissioned()) {
return -1;
}
// Stale nodes will be moved behind the normal nodes
boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval);
return aStale == bStale ? 0 : (aStale ? 1 : -1);
}
}
/**
* Address matcher for matching an address to local address
*/
static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() {
@Override
public boolean match(InetSocketAddress s) {
return NetUtils.isLocalAddress(s.getAddress());
};
};
/**
* Whether the pathname is valid. Currently prohibits relative paths,
* names which contain a ":" or "//", or other non-canonical paths.
*/
public static boolean isValidName(String src) {
// Path must be absolute.
if (!src.startsWith(Path.SEPARATOR)) {
return false;
}
// Check for ".." "." ":" "/"
String[] components = StringUtils.split(src, '/');
for (int i = 0; i < components.length; i++) {
String element = components[i];
if (element.equals(".") ||
(element.indexOf(":") >= 0) ||
(element.indexOf("/") >= 0)) {
return false;
}
// ".." is allowed in path starting with /.reserved/.inodes
if (element.equals("..")) {
if (components.length > 4
&& components[1].equals(FSDirectory.DOT_RESERVED_STRING)
&& components[2].equals(FSDirectory.DOT_INODES_STRING)) {
continue;
}
return false;
}
// The string may start or end with a /, but not have
// "//" in the middle.
if (element.isEmpty() && i != components.length - 1 &&
i != 0) {
return false;
}
}
return true;
}
/**
* Converts a byte array to a string using UTF8 encoding.
*/
public static String bytes2String(byte[] bytes) {
return bytes2String(bytes, 0, bytes.length);
}
/**
* Decode a specific range of bytes of the given byte array to a string
* using UTF8.
*
* @param bytes The bytes to be decoded into characters
* @param offset The index of the first byte to decode
* @param length The number of bytes to decode
* @return The decoded string
*/
public static String bytes2String(byte[] bytes, int offset, int length) {
try {
return new String(bytes, offset, length, "UTF8");
} catch(UnsupportedEncodingException e) {
assert false : "UTF8 encoding is not supported ";
}
return null;
}
/**
* Converts a string to a byte array using UTF8 encoding.
*/
public static byte[] string2Bytes(String str) {
return str.getBytes(Charsets.UTF_8);
}
/**
* Given a list of path components returns a path as a UTF8 String
*/
public static String byteArray2PathString(byte[][] pathComponents) {
if (pathComponents.length == 0) {
return "";
} else if (pathComponents.length == 1
&& (pathComponents[0] == null || pathComponents[0].length == 0)) {
return Path.SEPARATOR;
}
StringBuilder result = new StringBuilder();
for (int i = 0; i < pathComponents.length; i++) {
result.append(new String(pathComponents[i], Charsets.UTF_8));
if (i < pathComponents.length - 1) {
result.append(Path.SEPARATOR_CHAR);
}
}
return result.toString();
}
/**
* Given a list of path components returns a byte array
*/
public static byte[] byteArray2bytes(byte[][] pathComponents) {
if (pathComponents.length == 0) {
return EMPTY_BYTES;
} else if (pathComponents.length == 1
&& (pathComponents[0] == null || pathComponents[0].length == 0)) {
return new byte[]{(byte) Path.SEPARATOR_CHAR};
}
int length = 0;
for (int i = 0; i < pathComponents.length; i++) {
length += pathComponents[i].length;
if (i < pathComponents.length - 1) {
length++; // for SEPARATOR
}
}
byte[] path = new byte[length];
int index = 0;
for (int i = 0; i < pathComponents.length; i++) {
System.arraycopy(pathComponents[i], 0, path, index,
pathComponents[i].length);
index += pathComponents[i].length;
if (i < pathComponents.length - 1) {
path[index] = (byte) Path.SEPARATOR_CHAR;
index++;
}
}
return path;
}
/** Convert an object representing a path to a string. */
public static String path2String(final Object path) {
return path == null? null
: path instanceof String? (String)path
: path instanceof byte[][]? byteArray2PathString((byte[][])path)
: path.toString();
}
/**
* Splits the array of bytes into array of arrays of bytes
* on byte separator
* @param bytes the array of bytes to split
* @param separator the delimiting byte
*/
public static byte[][] bytes2byteArray(byte[] bytes, byte separator) {
return bytes2byteArray(bytes, bytes.length, separator);
}
/**
* Splits first len bytes in bytes to array of arrays of bytes
* on byte separator
* @param bytes the byte array to split
* @param len the number of bytes to split
* @param separator the delimiting byte
*/
public static byte[][] bytes2byteArray(byte[] bytes,
int len,
byte separator) {
assert len <= bytes.length;
int splits = 0;
if (len == 0) {
return new byte[][]{null};
}
// Count the splits. Omit multiple separators and the last one
for (int i = 0; i < len; i++) {
if (bytes[i] == separator) {
splits++;
}
}
int last = len - 1;
while (last > -1 && bytes[last--] == separator) {
splits--;
}
if (splits == 0 && bytes[0] == separator) {
return new byte[][]{null};
}
splits++;
byte[][] result = new byte[splits][];
int startIndex = 0;
int nextIndex = 0;
int index = 0;
// Build the splits
while (index < splits) {
while (nextIndex < len && bytes[nextIndex] != separator) {
nextIndex++;
}
result[index] = new byte[nextIndex - startIndex];
System.arraycopy(bytes, startIndex, result[index], 0, nextIndex
- startIndex);
index++;
startIndex = nextIndex + 1;
nextIndex = startIndex;
}
return result;
}
/**
* Convert a LocatedBlocks to BlockLocations[]
* @param blocks a LocatedBlocks
* @return an array of BlockLocations
*/
public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
if (blocks == null) {
return new BlockLocation[0];
}
return locatedBlocks2Locations(blocks.getLocatedBlocks());
}
/**
* Convert a List<LocatedBlock> to BlockLocation[]
* @param blocks A List<LocatedBlock> to be converted
* @return converted array of BlockLocation
*/
public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
if (blocks == null) {
return new BlockLocation[0];
}
int nrBlocks = blocks.size();
BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
if (nrBlocks == 0) {
return blkLocations;
}
int idx = 0;
for (LocatedBlock blk : blocks) {
assert idx < nrBlocks : "Incorrect index";
DatanodeInfo[] locations = blk.getLocations();
String[] hosts = new String[locations.length];
String[] xferAddrs = new String[locations.length];
String[] racks = new String[locations.length];
for (int hCnt = 0; hCnt < locations.length; hCnt++) {
hosts[hCnt] = locations[hCnt].getHostName();
xferAddrs[hCnt] = locations[hCnt].getXferAddr();
NodeBase node = new NodeBase(xferAddrs[hCnt],
locations[hCnt].getNetworkLocation());
racks[hCnt] = node.toString();
}
DatanodeInfo[] cachedLocations = blk.getCachedLocations();
String[] cachedHosts = new String[cachedLocations.length];
for (int i=0; i<cachedLocations.length; i++) {
cachedHosts[i] = cachedLocations[i].getHostName();
}
blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
racks,
blk.getStartOffset(),
blk.getBlockSize(),
blk.isCorrupt());
idx++;
}
return blkLocations;
}
/**
* Returns collection of nameservice Ids from the configuration.
* @param conf configuration
* @return collection of nameservice Ids, or null if not specified
*/
public static Collection<String> getNameServiceIds(Configuration conf) {
return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
}
/**
* @return <code>coll</code> if it is non-null and non-empty. Otherwise,
* returns a list with a single null value.
*/
private static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
if (coll == null || coll.isEmpty()) {
return Collections.singletonList(null);
} else {
return coll;
}
}
/**
* Namenode HighAvailability related configuration.
* Returns collection of namenode Ids from the configuration. One logical id
* for each namenode in the in the HA setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of namenode Ids
*/
public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
return conf.getTrimmedStringCollection(key);
}
/**
* Given a list of keys in the order of preference, returns a value
* for the key in the given order from the configuration.
* @param defaultValue default value to return, when key was not found
* @param keySuffix suffix to add to the key, if it is not null
* @param conf Configuration
* @param keys list of keys in the order of preference
* @return value of the key or default if a key was not found in configuration
*/
private static String getConfValue(String defaultValue, String keySuffix,
Configuration conf, String... keys) {
String value = null;
for (String key : keys) {
key = addSuffix(key, keySuffix);
value = conf.get(key);
if (value != null) {
break;
}
}
if (value == null) {
value = defaultValue;
}
return value;
}
/** Add non empty and non null suffix to a key */
private static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return key;
}
assert !suffix.startsWith(".") :
"suffix '" + suffix + "' should not already have '.' prepended.";
return key + "." + suffix;
}
/** Concatenate list of suffix strings '.' separated */
private static String concatSuffixes(String... suffixes) {
if (suffixes == null) {
return null;
}
return Joiner.on(".").skipNulls().join(suffixes);
}
/**
* Return configuration key of format key.suffix1.suffix2...suffixN
*/
public static String addKeySuffixes(String key, String... suffixes) {
String keySuffix = concatSuffixes(suffixes);
return addSuffix(key, keySuffix);
}
/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
private static Map<String, Map<String, InetSocketAddress>>
getAddresses(Configuration conf,
String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
}
return ret;
}
/**
* Get all of the RPC addresses of the individual NNs in a given nameservice.
*
* @param conf Configuration
* @param nsId the nameservice whose NNs addresses we want.
* @param defaultValue default address to return in case key is not found.
* @return A map from nnId -> RPC address of each NN in the nameservice.
*/
public static Map<String, InetSocketAddress> getRpcAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue) {
return getAddressesForNameserviceId(conf, nsId, defaultValue,
DFS_NAMENODE_RPC_ADDRESS_KEY);
}
private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue,
String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
LOG.warn("Namenode for " + nsId +
" remains unresolved for ID " + nnId +
". Check your hdfs-site.xml file to " +
"ensure namenodes are configured properly.");
}
ret.put(nnId, isa);
}
}
return ret;
}
/**
* @return a collection of all configured NN Kerberos principals.
*/
public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
Set<String> principals = new HashSet<String>();
for (String nsId : DFSUtil.getNameServiceIds(conf)) {
if (HAUtil.isHAEnabled(conf, nsId)) {
for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
Configuration confForNn = new Configuration(conf);
NameNode.initializeGenericKeys(confForNn, nsId, nnId);
String principal = SecurityUtil.getServerPrincipal(confForNn
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(confForNn).getHostName());
principals.add(principal);
}
} else {
Configuration confForNn = new Configuration(conf);
NameNode.initializeGenericKeys(confForNn, nsId, null);
String principal = SecurityUtil.getServerPrincipal(confForNn
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(confForNn).getHostName());
principals.add(principal);
}
}
return principals;
}
/**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
* Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
* the configuration.
*
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnWebHdfsAddresses(
Configuration conf, String scheme) {
if (WebHdfsFileSystem.SCHEME.equals(scheme)) {
return getAddresses(conf, null,
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
} else if (SWebHdfsFileSystem.SCHEME.equals(scheme)) {
return getAddresses(conf, null,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
} else {
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
}
}
/**
* Resolve an HDFS URL into real INetSocketAddress. It works like a DNS resolver
* when the URL points to an non-HA cluster. When the URL points to an HA
* cluster, the resolver further resolves the logical name (i.e., the authority
* in the URL) into real namenode addresses.
*/
public static InetSocketAddress[] resolveWebHdfsUri(URI uri, Configuration conf)
throws IOException {
int defaultPort;
String scheme = uri.getScheme();
if (WebHdfsFileSystem.SCHEME.equals(scheme)) {
defaultPort = DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
} else if (SWebHdfsFileSystem.SCHEME.equals(scheme)) {
defaultPort = DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
} else {
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
}
ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
if (!HAUtil.isLogicalUri(conf, uri)) {
InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
defaultPort);
ret.add(addr);
} else {
Map<String, Map<String, InetSocketAddress>> addresses = DFSUtil
.getHaNnWebHdfsAddresses(conf, scheme);
for (Map<String, InetSocketAddress> addrs : addresses.values()) {
for (InetSocketAddress addr : addrs.values()) {
ret.add(addr);
}
}
}
InetSocketAddress[] r = new InetSocketAddress[ret.size()];
return ret.toArray(r);
}
/**
* Returns list of InetSocketAddress corresponding to backup node rpc
* addresses from the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
* @throws IOException on error
*/
public static Map<String, Map<String, InetSocketAddress>> getBackupNodeAddresses(
Configuration conf) throws IOException {
Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf,
null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: backup node address "
+ DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
}
return addressList;
}
/**
* Returns list of InetSocketAddresses of corresponding to secondary namenode
* http addresses from the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
* @throws IOException on error
*/
public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAddresses(
Configuration conf) throws IOException {
Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf, null,
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: secondary namenode address "
+ DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
}
return addressList;
}
/**
* Returns list of InetSocketAddresses corresponding to namenodes from the
* configuration. Note this is to be used by datanodes to get the list of
* namenode addresses to talk to.
*
* Returns namenode address specifically configured for datanodes (using
* service ports), if found. If not, regular RPC address configured for other
* clients is returned.
*
* @param conf configuration
* @return list of InetSocketAddress
* @throws IOException on error
*/
public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses(
Configuration conf) throws IOException {
// Use default address as fall back
String defaultAddress;
try {
defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf));
} catch (IllegalArgumentException e) {
defaultAddress = null;
}
Map<String, Map<String, InetSocketAddress>> addressList =
getAddresses(conf, defaultAddress,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: namenode address "
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
+ DFS_NAMENODE_RPC_ADDRESS_KEY
+ " is not configured.");
}
return addressList;
}
/**
* Flatten the given map, as returned by other functions in this class,
* into a flat list of {@link ConfiguredNNAddress} instances.
*/
public static List<ConfiguredNNAddress> flattenAddressMap(
Map<String, Map<String, InetSocketAddress>> map) {
List<ConfiguredNNAddress> ret = Lists.newArrayList();
for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
map.entrySet()) {
String nsId = entry.getKey();
Map<String, InetSocketAddress> nnMap = entry.getValue();
for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
String nnId = e2.getKey();
InetSocketAddress addr = e2.getValue();
ret.add(new ConfiguredNNAddress(nsId, nnId, addr));
}
}
return ret;
}
/**
* Format the given map, as returned by other functions in this class,
* into a string suitable for debugging display. The format of this string
* should not be considered an interface, and is liable to change.
*/
public static String addressMapToString(
Map<String, Map<String, InetSocketAddress>> map) {
StringBuilder b = new StringBuilder();
for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
map.entrySet()) {
String nsId = entry.getKey();
Map<String, InetSocketAddress> nnMap = entry.getValue();
b.append("Nameservice <").append(nsId).append(">:").append("\n");
for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
b.append(" NN ID ").append(e2.getKey())
.append(" => ").append(e2.getValue()).append("\n");
}
}
return b.toString();
}
public static String nnAddressesAsString(Configuration conf) {
Map<String, Map<String, InetSocketAddress>> addresses =
getHaNnRpcAddresses(conf);
return addressMapToString(addresses);
}
/**
* Represent one of the NameNodes configured in the cluster.
*/
public static class ConfiguredNNAddress {
private final String nameserviceId;
private final String namenodeId;
private final InetSocketAddress addr;
private ConfiguredNNAddress(String nameserviceId, String namenodeId,
InetSocketAddress addr) {
this.nameserviceId = nameserviceId;
this.namenodeId = namenodeId;
this.addr = addr;
}
public String getNameserviceId() {
return nameserviceId;
}
public String getNamenodeId() {
return namenodeId;
}
public InetSocketAddress getAddress() {
return addr;
}
@Override
public String toString() {
return "ConfiguredNNAddress[nsId=" + nameserviceId + ";" +
"nnId=" + namenodeId + ";addr=" + addr + "]";
}
}
/**
* Get a URI for each configured nameservice. If a nameservice is
* HA-enabled, then the logical URI of the nameservice is returned. If the
* nameservice is not HA-enabled, then a URI corresponding to an RPC address
* of the single NN for that nameservice is returned, preferring the service
* RPC address over the client RPC address.
*
* @param conf configuration
* @return a collection of all configured NN URIs, preferring service
* addresses
*/
public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
return getNameServiceUris(conf,
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
* Get a URI for each configured nameservice. If a nameservice is
* HA-enabled, then the logical URI of the nameservice is returned. If the
* nameservice is not HA-enabled, then a URI corresponding to the address of
* the single NN for that nameservice is returned.
*
* @param conf configuration
* @param keys configuration keys to try in order to get the URI for non-HA
* nameservices
* @return a collection of all configured NN URIs
*/
public static Collection<URI> getNameServiceUris(Configuration conf,
String... keys) {
Set<URI> ret = new HashSet<URI>();
// We're passed multiple possible configuration keys for any given NN or HA
// nameservice, and search the config in order of these keys. In order to
// make sure that a later config lookup (e.g. fs.defaultFS) doesn't add a
// URI for a config key for which we've already found a preferred entry, we
// keep track of non-preferred keys here.
Set<URI> nonPreferredUris = new HashSet<URI>();
for (String nsId : getNameServiceIds(conf)) {
if (HAUtil.isHAEnabled(conf, nsId)) {
// Add the logical URI of the nameservice.
try {
ret.add(new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId));
} catch (URISyntaxException ue) {
throw new IllegalArgumentException(ue);
}
} else {
// Add the URI corresponding to the address of the NN.
boolean uriFound = false;
for (String key : keys) {
String addr = conf.get(concatSuffixes(key, nsId));
if (addr != null) {
URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
NetUtils.createSocketAddr(addr));
if (!uriFound) {
uriFound = true;
ret.add(uri);
} else {
nonPreferredUris.add(uri);
}
}
}
}
}
// Add the generic configuration keys.
boolean uriFound = false;
for (String key : keys) {
String addr = conf.get(key);
if (addr != null) {
URI uri = createUri("hdfs", NetUtils.createSocketAddr(addr));
if (!uriFound) {
uriFound = true;
ret.add(uri);
} else {
nonPreferredUris.add(uri);
}
}
}
// Add the default URI if it is an HDFS URI.
URI defaultUri = FileSystem.getDefaultUri(conf);
// checks if defaultUri is ip:port format
// and convert it to hostname:port format
if (defaultUri != null && (defaultUri.getPort() != -1)) {
defaultUri = createUri(defaultUri.getScheme(),
NetUtils.createSocketAddr(defaultUri.getHost(),
defaultUri.getPort()));
}
if (defaultUri != null &&
HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
!nonPreferredUris.contains(defaultUri)) {
ret.add(defaultUri);
}
return ret;
}
/**
* Given the InetSocketAddress this method returns the nameservice Id
* corresponding to the key with matching address, by doing a reverse
* lookup on the list of nameservices until it finds a match.
*
* Since the process of resolving URIs to Addresses is slightly expensive,
* this utility method should not be used in performance-critical routines.
*
* @param conf - configuration
* @param address - InetSocketAddress for configured communication with NN.
* Configured addresses are typically given as URIs, but we may have to
* compare against a URI typed in by a human, or the server name may be
* aliased, so we compare unambiguous InetSocketAddresses instead of just
* comparing URI substrings.
* @param keys - list of configured communication parameters that should
* be checked for matches. For example, to compare against RPC addresses,
* provide the list DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
* DFS_NAMENODE_RPC_ADDRESS_KEY. Use the generic parameter keys,
* not the NameServiceId-suffixed keys.
* @return nameserviceId, or null if no match found
*/
public static String getNameServiceIdFromAddress(final Configuration conf,
final InetSocketAddress address, String... keys) {
// Configuration with a single namenode and no nameserviceId
String[] ids = getSuffixIDs(conf, address, keys);
return (ids != null) ? ids[0] : null;
}
/**
* return server http or https address from the configuration for a
* given namenode rpc address.
* @param conf
* @param namenodeAddr - namenode RPC address
* @param scheme - the scheme (http / https)
* @return server http or https address
* @throws IOException
*/
public static URI getInfoServer(InetSocketAddress namenodeAddr,
Configuration conf, String scheme) throws IOException {
String[] suffixes = null;
if (namenodeAddr != null) {
// if non-default namenode, try reverse look up
// the nameServiceID if it is available
suffixes = getSuffixIDs(conf, namenodeAddr,
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
String authority;
if ("http".equals(scheme)) {
authority = getSuffixedConf(conf, DFS_NAMENODE_HTTP_ADDRESS_KEY,
DFS_NAMENODE_HTTP_ADDRESS_DEFAULT, suffixes);
} else if ("https".equals(scheme)) {
authority = getSuffixedConf(conf, DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT, suffixes);
} else {
throw new IllegalArgumentException("Invalid scheme:" + scheme);
}
if (namenodeAddr != null) {
authority = substituteForWildcardAddress(authority,
namenodeAddr.getHostName());
}
return URI.create(scheme + "://" + authority);
}
/**
* Lookup the HTTP / HTTPS address of the namenode, and replace its hostname
* with defaultHost when it found out that the address is a wildcard / local
* address.
*
* @param defaultHost
* The default host name of the namenode.
* @param conf
* The configuration
* @param scheme
* HTTP or HTTPS
* @throws IOException
*/
public static URI getInfoServerWithDefaultHost(String defaultHost,
Configuration conf, final String scheme) throws IOException {
URI configuredAddr = getInfoServer(null, conf, scheme);
String authority = substituteForWildcardAddress(
configuredAddr.getAuthority(), defaultHost);
return URI.create(scheme + "://" + authority);
}
/**
* Determine whether HTTP or HTTPS should be used to connect to the remote
* server. Currently the client only connects to the server via HTTPS if the
* policy is set to HTTPS_ONLY.
*
* @return the scheme (HTTP / HTTPS)
*/
public static String getHttpClientScheme(Configuration conf) {
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
return policy == HttpConfig.Policy.HTTPS_ONLY ? "https" : "http";
}
/**
* Substitute a default host in the case that an address has been configured
* with a wildcard. This is used, for example, when determining the HTTP
* address of the NN -- if it's configured to bind to 0.0.0.0, we want to
* substitute the hostname from the filesystem URI rather than trying to
* connect to 0.0.0.0.
* @param configuredAddress the address found in the configuration
* @param defaultHost the host to substitute with, if configuredAddress
* is a local/wildcard address.
* @return the substituted address
* @throws IOException if it is a wildcard address and security is enabled
*/
@VisibleForTesting
static String substituteForWildcardAddress(String configuredAddress,
String defaultHost) throws IOException {
InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
InetSocketAddress defaultSockAddr = NetUtils.createSocketAddr(defaultHost
+ ":0");
if (sockAddr.getAddress().isAnyLocalAddress()) {
if (UserGroupInformation.isSecurityEnabled() &&
defaultSockAddr.getAddress().isAnyLocalAddress()) {
throw new IOException("Cannot use a wildcard address with security. " +
"Must explicitly set bind address for Kerberos");
}
return defaultHost + ":" + sockAddr.getPort();
} else {
return configuredAddress;
}
}
private static String getSuffixedConf(Configuration conf,
String key, String defaultVal, String[] suffixes) {
String ret = conf.get(DFSUtil.addKeySuffixes(key, suffixes));
if (ret != null) {
return ret;
}
return conf.get(key, defaultVal);
}
/**
* Sets the node specific setting into generic configuration key. Looks up
* value of "key.nameserviceId.namenodeId" and if found sets that value into
* generic key in the conf. If this is not found, falls back to
* "key.nameserviceId" and then the unmodified key.
*
* Note that this only modifies the runtime conf.
*
* @param conf
* Configuration object to lookup specific key and to set the value
* to the key passed. Note the conf object is modified.
* @param nameserviceId
* nameservice Id to construct the node specific key. Pass null if
* federation is not configuration.
* @param nnId
* namenode Id to construct the node specific key. Pass null if
* HA is not configured.
* @param keys
* The key for which node specific value is looked up
*/
public static void setGenericConf(Configuration conf,
String nameserviceId, String nnId, String... keys) {
for (String key : keys) {
String value = conf.get(addKeySuffixes(key, nameserviceId, nnId));
if (value != null) {
conf.set(key, value);
continue;
}
value = conf.get(addKeySuffixes(key, nameserviceId));
if (value != null) {
conf.set(key, value);
}
}
}
/** Return used as percentage of capacity */
public static float getPercentUsed(long used, long capacity) {
return capacity <= 0 ? 100 : (used * 100.0f)/capacity;
}
/** Return remaining as percentage of capacity */
public static float getPercentRemaining(long remaining, long capacity) {
return capacity <= 0 ? 0 : (remaining * 100.0f)/capacity;
}
/** Convert percentage to a string. */
public static String percent2String(double percentage) {
return StringUtils.format("%.2f%%", percentage);
}
/**
* Round bytes to GiB (gibibyte)
* @param bytes number of bytes
* @return number of GiB
*/
public static int roundBytesToGB(long bytes) {
return Math.round((float)bytes/ 1024 / 1024 / 1024);
}
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
connectToDnViaHostname, locatedBlock);
}
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(
datanodeid, conf, socketTimeout, connectToDnViaHostname);
}
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
}
/**
* Get nameservice Id for the {@link NameNode} based on namenode RPC address
* matching the local node address.
*/
public static String getNamenodeNameServiceId(Configuration conf) {
return getNameServiceId(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
* Get nameservice Id for the BackupNode based on backup node RPC address
* matching the local node address.
*/
public static String getBackupNameServiceId(Configuration conf) {
return getNameServiceId(conf, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
}
/**
* Get nameservice Id for the secondary node based on secondary http address
* matching the local node address.
*/
public static String getSecondaryNameServiceId(Configuration conf) {
return getNameServiceId(conf, DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
}
/**
* Get the nameservice Id by matching the {@code addressKey} with the
* the address of the local node.
*
* If {@link DFSConfigKeys#DFS_NAMESERVICE_ID} is not specifically
* configured, and more than one nameservice Id is configured, this method
* determines the nameservice Id by matching the local node's address with the
* configured addresses. When a match is found, it returns the nameservice Id
* from the corresponding configuration key.
*
* @param conf Configuration
* @param addressKey configuration key to get the address.
* @return nameservice Id on success, null if federation is not configured.
* @throws HadoopIllegalArgumentException on error
*/
private static String getNameServiceId(Configuration conf, String addressKey) {
String nameserviceId = conf.get(DFS_NAMESERVICE_ID);
if (nameserviceId != null) {
return nameserviceId;
}
Collection<String> nsIds = getNameServiceIds(conf);
if (1 == nsIds.size()) {
return nsIds.toArray(new String[1])[0];
}
String nnId = conf.get(DFS_HA_NAMENODE_ID_KEY);
return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
}
/**
* Returns nameservice Id and namenode Id when the local host matches the
* configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>
*
* @param conf Configuration
* @param addressKey configuration key corresponding to the address.
* @param knownNsId only look at configs for the given nameservice, if not-null
* @param knownNNId only look at configs for the given namenode, if not null
* @param matcher matching criteria for matching the address
* @return Array with nameservice Id and namenode Id on success. First element
* in the array is nameservice Id and second element is namenode Id.
* Null value indicates that the configuration does not have the the
* Id.
* @throws HadoopIllegalArgumentException on error
*/
static String[] getSuffixIDs(final Configuration conf, final String addressKey,
String knownNsId, String knownNNId,
final AddressMatcher matcher) {
String nameserviceId = null;
String namenodeId = null;
int found = 0;
Collection<String> nsIds = getNameServiceIds(conf);
for (String nsId : emptyAsSingletonNull(nsIds)) {
if (knownNsId != null && !knownNsId.equals(nsId)) {
continue;
}
Collection<String> nnIds = getNameNodeIds(conf, nsId);
for (String nnId : emptyAsSingletonNull(nnIds)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
addressKey, nsId, nnId));
}
if (knownNNId != null && !knownNNId.equals(nnId)) {
continue;
}
String key = addKeySuffixes(addressKey, nsId, nnId);
String addr = conf.get(key);
if (addr == null) {
continue;
}
InetSocketAddress s = null;
try {
s = NetUtils.createSocketAddr(addr);
} catch (Exception e) {
LOG.warn("Exception in creating socket address " + addr, e);
continue;
}
if (!s.isUnresolved() && matcher.match(s)) {
nameserviceId = nsId;
namenodeId = nnId;
found++;
}
}
}
if (found > 1) { // Only one address must match the local address
String msg = "Configuration has multiple addresses that match "
+ "local node's address. Please configure the system with "
+ DFS_NAMESERVICE_ID + " and "
+ DFS_HA_NAMENODE_ID_KEY;
throw new HadoopIllegalArgumentException(msg);
}
return new String[] { nameserviceId, namenodeId };
}
/**
* For given set of {@code keys} adds nameservice Id and or namenode Id
* and returns {nameserviceId, namenodeId} when address match is found.
* @see #getSuffixIDs(Configuration, String, AddressMatcher)
*/
static String[] getSuffixIDs(final Configuration conf,
final InetSocketAddress address, final String... keys) {
AddressMatcher matcher = new AddressMatcher() {
@Override
public boolean match(InetSocketAddress s) {
return address.equals(s);
}
};
for (String key : keys) {
String[] ids = getSuffixIDs(conf, key, null, null, matcher);
if (ids != null && (ids [0] != null || ids[1] != null)) {
return ids;
}
}
return null;
}
private interface AddressMatcher {
public boolean match(InetSocketAddress s);
}
/** Create a URI from the scheme and address */
public static URI createUri(String scheme, InetSocketAddress address) {
try {
return new URI(scheme, null, address.getHostName(), address.getPort(),
null, null, null);
} catch (URISyntaxException ue) {
throw new IllegalArgumentException(ue);
}
}
/**
* Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}
* @param conf configuration
* @param protocol Protocol interface
* @param service service that implements the protocol
* @param server RPC server to which the protocol & implementation is added to
* @throws IOException
*/
public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
/**
* Map a logical namenode ID to its service address. Use the given
* nameservice if specified, or the configured one if none is given.
*
* @param conf Configuration
* @param nsId which nameservice nnId is a part of, optional
* @param nnId the namenode ID to get the service addr for
* @return the service addr, null if it could not be determined
*/
public static String getNamenodeServiceAddr(final Configuration conf,
String nsId, String nnId) {
if (nsId == null) {
nsId = getOnlyNameServiceIdOrNull(conf);
}
String serviceAddrKey = concatSuffixes(
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
String addrKey = concatSuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
String serviceRpcAddr = conf.get(serviceAddrKey);
if (serviceRpcAddr == null) {
serviceRpcAddr = conf.get(addrKey);
}
return serviceRpcAddr;
}
/**
* If the configuration refers to only a single nameservice, return the
* name of that nameservice. If it refers to 0 or more than 1, return null.
*/
public static String getOnlyNameServiceIdOrNull(Configuration conf) {
Collection<String> nsIds = getNameServiceIds(conf);
if (1 == nsIds.size()) {
return nsIds.toArray(new String[1])[0];
} else {
// No nameservice ID was given and more than one is configured
return null;
}
}
public static Options helpOptions = new Options();
public static Option helpOpt = new Option("h", "help", false,
"get help information");
static {
helpOptions.addOption(helpOpt);
}
/**
* Parse the arguments for commands
*
* @param args the argument to be parsed
* @param helpDescription help information to be printed out
* @param out Printer
* @param printGenericCommandUsage whether to print the
* generic command usage defined in ToolRunner
* @return true when the argument matches help option, false if not
*/
public static boolean parseHelpArgument(String[] args,
String helpDescription, PrintStream out, boolean printGenericCommandUsage) {
if (args.length == 1) {
try {
CommandLineParser parser = new PosixParser();
CommandLine cmdLine = parser.parse(helpOptions, args);
if (cmdLine.hasOption(helpOpt.getOpt())
|| cmdLine.hasOption(helpOpt.getLongOpt())) {
// should print out the help information
out.println(helpDescription + "\n");
if (printGenericCommandUsage) {
ToolRunner.printGenericCommandUsage(out);
}
return true;
}
} catch (ParseException pe) {
return false;
}
}
return false;
}
/**
* Get DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION from configuration.
*
* @param conf Configuration
* @return Value of DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION
*/
public static float getInvalidateWorkPctPerIteration(Configuration conf) {
float blocksInvalidateWorkPct = conf.getFloat(
DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
Preconditions.checkArgument(
(blocksInvalidateWorkPct > 0 && blocksInvalidateWorkPct <= 1.0f),
DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION +
" = '" + blocksInvalidateWorkPct + "' is invalid. " +
"It should be a positive, non-zero float value, not greater than 1.0f, " +
"to indicate a percentage.");
return blocksInvalidateWorkPct;
}
/**
* Get DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION from
* configuration.
*
* @param conf Configuration
* @return Value of DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
*/
public static int getReplWorkMultiplier(Configuration conf) {
int blocksReplWorkMultiplier = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
Preconditions.checkArgument(
(blocksReplWorkMultiplier > 0),
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION +
" = '" + blocksReplWorkMultiplier + "' is invalid. " +
"It should be a positive, non-zero integer value.");
return blocksReplWorkMultiplier;
}
/**
* Get SPNEGO keytab Key from configuration
*
* @param conf
* Configuration
* @param defaultKey
* @return DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY if the key is not empty
* else return defaultKey
*/
public static String getSpnegoKeytabKey(Configuration conf, String defaultKey) {
String value =
conf.get(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
return (value == null || value.isEmpty()) ?
defaultKey : DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY;
}
/**
* Get http policy. Http Policy is chosen as follows:
* <ol>
* <li>If hadoop.ssl.enabled is set, http endpoints are not started. Only
* https endpoints are started on configured https ports</li>
* <li>This configuration is overridden by dfs.https.enable configuration, if
* it is set to true. In that case, both http and https endpoints are stared.</li>
* <li>All the above configurations are overridden by dfs.http.policy
* configuration. With this configuration you can set http-only, https-only
* and http-and-https endpoints.</li>
* </ol>
* See hdfs-default.xml documentation for more details on each of the above
* configuration settings.
*/
public static HttpConfig.Policy getHttpPolicy(Configuration conf) {
String httpPolicy = conf.get(DFSConfigKeys.DFS_HTTP_POLICY_KEY,
DFSConfigKeys.DFS_HTTP_POLICY_DEFAULT);
HttpConfig.Policy policy = HttpConfig.Policy.fromString(httpPolicy);
if (policy == HttpConfig.Policy.HTTP_ONLY) {
boolean httpsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HTTPS_ENABLE_KEY,
DFSConfigKeys.DFS_HTTPS_ENABLE_DEFAULT);
boolean hadoopSslEnabled = conf.getBoolean(
CommonConfigurationKeys.HADOOP_SSL_ENABLED_KEY,
CommonConfigurationKeys.HADOOP_SSL_ENABLED_DEFAULT);
if (hadoopSslEnabled) {
LOG.warn(CommonConfigurationKeys.HADOOP_SSL_ENABLED_KEY
+ " is deprecated. Please use "
+ DFSConfigKeys.DFS_HTTPS_ENABLE_KEY + ".");
policy = HttpConfig.Policy.HTTPS_ONLY;
} else if (httpsEnabled) {
LOG.warn(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY
+ " is deprecated. Please use "
+ DFSConfigKeys.DFS_HTTPS_ENABLE_KEY + ".");
policy = HttpConfig.Policy.HTTP_AND_HTTPS;
}
}
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, policy.name());
return policy;
}
public static HttpServer.Builder loadSslConfToHttpServerBuilder(HttpServer.Builder builder,
Configuration sslConf) {
return builder
.needsClientAuth(
sslConf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT))
.keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
.keyStore(sslConf.get("ssl.server.keystore.location"),
sslConf.get("ssl.server.keystore.password"),
sslConf.get("ssl.server.keystore.type", "jks"))
.trustStore(sslConf.get("ssl.server.truststore.location"),
sslConf.get("ssl.server.truststore.password"),
sslConf.get("ssl.server.truststore.type", "jks"));
}
/**
* Converts a Date into an ISO-8601 formatted datetime string.
*/
public static String dateToIso8601String(Date date) {
SimpleDateFormat df =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
return df.format(date);
}
/**
* Converts a time duration in milliseconds into DDD:HH:MM:SS format.
*/
public static String durationToString(long durationMs) {
boolean negative = false;
if (durationMs < 0) {
negative = true;
durationMs = -durationMs;
}
// Chop off the milliseconds
long durationSec = durationMs / 1000;
final int secondsPerMinute = 60;
final int secondsPerHour = 60*60;
final int secondsPerDay = 60*60*24;
final long days = durationSec / secondsPerDay;
durationSec -= days * secondsPerDay;
final long hours = durationSec / secondsPerHour;
durationSec -= hours * secondsPerHour;
final long minutes = durationSec / secondsPerMinute;
durationSec -= minutes * secondsPerMinute;
final long seconds = durationSec;
final long milliseconds = durationMs % 1000;
String format = "%03d:%02d:%02d:%02d.%03d";
if (negative) {
format = "-" + format;
}
return String.format(format, days, hours, minutes, seconds, milliseconds);
}
/**
* Converts a relative time string into a duration in milliseconds.
*/
public static long parseRelativeTime(String relTime) throws IOException {
if (relTime.length() < 2) {
throw new IOException("Unable to parse relative time value of " + relTime
+ ": too short");
}
String ttlString = relTime.substring(0, relTime.length()-1);
long ttl;
try {
ttl = Long.parseLong(ttlString);
} catch (NumberFormatException e) {
throw new IOException("Unable to parse relative time value of " + relTime
+ ": " + ttlString + " is not a number");
}
if (relTime.endsWith("s")) {
// pass
} else if (relTime.endsWith("m")) {
ttl *= 60;
} else if (relTime.endsWith("h")) {
ttl *= 60*60;
} else if (relTime.endsWith("d")) {
ttl *= 60*60*24;
} else {
throw new IOException("Unable to parse relative time value of " + relTime
+ ": unknown time unit " + relTime.charAt(relTime.length() - 1));
}
return ttl*1000;
}
/**
* Load HTTPS-related configuration.
*/
public static Configuration loadSslConfiguration(Configuration conf) {
Configuration sslConf = new Configuration(false);
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
boolean requireClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
sslConf.setBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, requireClientAuth);
return sslConf;
}
/**
* Return a HttpServer.Builder that the journalnode / namenode / secondary
* namenode can use to initialize their HTTP / HTTPS server.
*
*/
public static HttpServer.Builder httpServerTemplateForNNAndJN(
Configuration conf, final InetSocketAddress httpAddr,
final InetSocketAddress httpsAddr, String name, String spnegoUserNameKey,
String spnegoKeytabFileKey) throws IOException {
HttpConfig.Policy policy = getHttpPolicy(conf);
HttpServer.Builder builder = new HttpServer.Builder().setName(name)
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
.setUsernameConfKey(spnegoUserNameKey)
.setKeytabConfKey(getSpnegoKeytabKey(conf, spnegoKeytabFileKey));
// initialize the webserver for uploading/downloading files.
LOG.info("Starting web server as: "
+ SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey),
httpAddr.getHostName()));
if (policy.isHttpEnabled()) {
if (httpAddr.getPort() == 0) {
builder.setFindPort(true);
}
URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddr));
builder.addEndpoint(uri);
LOG.info("Starting Web-server for " + name + " at: " + uri);
}
if (policy.isHttpsEnabled() && httpsAddr != null) {
Configuration sslConf = loadSslConfiguration(conf);
loadSslConfToHttpServerBuilder(builder, sslConf);
if (httpsAddr.getPort() == 0) {
builder.setFindPort(true);
}
URI uri = URI.create("https://" + NetUtils.getHostPortString(httpsAddr));
builder.addEndpoint(uri);
LOG.info("Starting Web-server for " + name + " at: " + uri);
}
return builder;
}
/**
* Assert that all objects in the collection are equal. Returns silently if
* so, throws an AssertionError if any object is not equal. All null values
* are considered equal.
*
* @param objects the collection of objects to check for equality.
*/
public static void assertAllResultsEqual(Collection<?> objects) {
Object[] resultsArray = objects.toArray();
if (resultsArray.length == 0)
return;
for (int i = 0; i < resultsArray.length; i++) {
if (i == 0)
continue;
else {
Object currElement = resultsArray[i];
Object lastElement = resultsArray[i - 1];
if ((currElement == null && currElement != lastElement) ||
(currElement != null && !currElement.equals(lastElement))) {
throw new AssertionError("Not all elements match in results: " +
Arrays.toString(resultsArray));
}
}
}
}
}