blob: 34d3e9908ef5c2c60af2f5fcdc5fb4a272e41382 [file] [log] [blame]
package org.apache.blur.mapreduce.lib.update;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HdfsConfigurationNamespaceMerge {
private static final String DFS_NAMESERVICES = "dfs.nameservices";
private static final Log LOG = LogFactory.getLog(HdfsConfigurationNamespaceMerge.class);
public static void main(String[] args) throws IOException {
Path p = new Path("./src/main/scripts/conf/hdfs");
Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new Configuration()), p);
// configuration.writeXml(System.out);
Collection<String> nameServices = configuration.getStringCollection(DFS_NAMESERVICES);
for (String name : nameServices) {
Path path = new Path("hdfs://" + name + "/");
FileSystem fileSystem = path.getFileSystem(configuration);
FileStatus[] listStatus = fileSystem.listStatus(path);
for (FileStatus fileStatus : listStatus) {
System.out.println(fileStatus.getPath());
}
}
}
private static boolean checkHostName(String host) {
try {
InetAddress.getAllByName(host);
return true;
} catch (UnknownHostException e) {
LOG.warn("Host not found [" + host + "]");
return false;
}
}
public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws IOException {
List<Configuration> configList = new ArrayList<Configuration>();
gatherConfigs(fs, p, configList);
return merge(configList);
}
public static Configuration merge(List<Configuration> configList) throws IOException {
Configuration merge = new Configuration(false);
Set<String> nameServices = new HashSet<String>();
for (Configuration configuration : configList) {
String nameService = configuration.get(DFS_NAMESERVICES);
if (nameServices.contains(nameService)) {
throw new IOException("Multiple confs define namespace [" + nameService + "]");
}
nameServices.add(nameService);
if (shouldAdd(configuration, nameService)) {
for (Entry<String, String> e : configuration) {
String key = e.getKey();
if (key.contains(nameService)) {
String value = e.getValue();
merge.set(key, value);
}
}
}
}
merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ","));
return merge;
}
private static boolean shouldAdd(Configuration configuration, String nameService) {
for (Entry<String, String> e : configuration) {
String key = e.getKey();
if (key.contains(nameService) && key.startsWith("dfs.namenode.rpc-address.")) {
return checkHostName(getHost(e.getValue()));
}
}
return false;
}
private static String getHost(String host) {
return host.substring(0, host.indexOf(":"));
}
public static void gatherConfigs(FileSystem fs, Path p, List<Configuration> configList) throws IOException {
if (fs.isFile(p)) {
if (p.getName().endsWith(".xml")) {
LOG.info("Loading file [" + p + "]");
Configuration configuration = new Configuration(false);
configuration.addResource(p);
configList.add(configuration);
} else {
LOG.info("Skipping file [" + p + "]");
}
} else {
FileStatus[] listStatus = fs.listStatus(p);
for (FileStatus fileStatus : listStatus) {
gatherConfigs(fs, fileStatus.getPath(), configList);
}
}
}
}