blob: 3a1663bbf651fd2020c97c85de66a3822fd3c233 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* a archive creation utility.
* This class provides methods that can be used
* to create hadoop archives. For understanding of
* Hadoop archives look at {@link HarFileSystem}.
*/
public class HadoopArchives implements Tool {
private static final Log LOG = LogFactory.getLog(HadoopArchives.class);
private static final String NAME = "har";
static final String SRC_LIST_LABEL = NAME + ".src.list";
static final String DST_DIR_LABEL = NAME + ".dest.path";
static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
static final String JOB_DIR_LABEL = NAME + ".job.dir";
static final String SRC_COUNT_LABEL = NAME + ".src.count";
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
static final String DST_HAR_LABEL = NAME + ".archive.name";
static final String SRC_PARENT_LABEL = NAME + ".parent.path";
// size of each part file
// its fixed for now.
static final long partSize = 2 * 1024 * 1024 * 1024l;
private static final String usage = "archive"
+ " -archiveName NAME -p <parent path> <src>* <dest>" +
"\n";
private JobConf conf;
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf, HadoopArchives.class);
}
}
public Configuration getConf() {
return this.conf;
}
public HadoopArchives(Configuration conf) {
setConf(conf);
}
// check the src paths
private static void checkPaths(Configuration conf, List<Path> paths) throws
IOException {
for (Path p : paths) {
FileSystem fs = p.getFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException("Source " + p + " does not exist.");
}
}
}
/**
* this assumes that there are two types of files file/dir
* @param fs the input filesystem
* @param p the top level path
* @param out the list of paths output of recursive ls
* @throws IOException
*/
private void recursivels(FileSystem fs, Path p, List<FileStatus> out)
throws IOException {
FileStatus fstatus = fs.getFileStatus(p);
if (!fstatus.isDir()) {
out.add(fstatus);
return;
}
else {
out.add(fstatus);
FileStatus[] listStatus = fs.listStatus(p);
for (FileStatus stat: listStatus) {
recursivels(fs, stat.getPath(), out);
}
}
}
/**
* Input format of a hadoop archive job responsible for
* generating splits of the file list
*/
static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
//generate input splits from the src file lists
public InputSplit[] getSplits(JobConf jconf, int numSplits)
throws IOException {
String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
if ("".equals(srcfilelist)) {
throw new IOException("Unable to get the " +
"src file for archive generation.");
}
long totalSize = jconf.getLong(TOTAL_SIZE_LABEL, -1);
if (totalSize == -1) {
throw new IOException("Invalid size of files to archive");
}
//we should be safe since this is set by our own code
Path src = new Path(srcfilelist);
FileSystem fs = src.getFileSystem(jconf);
FileStatus fstatus = fs.getFileStatus(src);
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
LongWritable key = new LongWritable();
Text value = new Text();
SequenceFile.Reader reader = null;
// the remaining bytes in the file split
long remaining = fstatus.getLen();
// the count of sizes calculated till now
long currentCount = 0L;
// the endposition of the split
long lastPos = 0L;
// the start position of the split
long startPos = 0L;
long targetSize = totalSize/numSplits;
// create splits of size target size so that all the maps
// have equals sized data to read and write to.
try {
reader = new SequenceFile.Reader(fs, src, jconf);
while(reader.next(key, value)) {
if (currentCount + key.get() > targetSize && currentCount != 0){
long size = lastPos - startPos;
splits.add(new FileSplit(src, startPos, size, (String[]) null));
remaining = remaining - size;
startPos = lastPos;
currentCount = 0L;
}
currentCount += key.get();
lastPos = reader.getPosition();
}
// the remaining not equal to the target size.
if (remaining != 0) {
splits.add(new FileSplit(src, startPos, remaining, (String[])null));
}
}
finally {
reader.close();
}
return splits.toArray(new FileSplit[splits.size()]);
}
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return new SequenceFileRecordReader<LongWritable, Text>(job,
(FileSplit)split);
}
}
private boolean checkValidName(String name) {
Path tmp = new Path(name);
if (tmp.depth() != 1) {
return false;
}
if (name.endsWith(".har"))
return true;
return false;
}
private Path largestDepth(List<Path> paths) {
Path deepest = paths.get(0);
for (Path p: paths) {
if (p.depth() > deepest.depth()) {
deepest = p;
}
}
return deepest;
}
/**
* truncate the prefix root from the full path
* @param fullPath the full path
* @param root the prefix root to be truncated
* @return the relative path
*/
private Path relPathToRoot(Path fullPath, Path root) {
// just take some effort to do it
// rather than just using substring
// so that we do not break sometime later
Path justRoot = new Path(Path.SEPARATOR);
if (fullPath.depth() == root.depth()) {
return justRoot;
}
else if (fullPath.depth() > root.depth()) {
Path retPath = new Path(fullPath.getName());
Path parent = fullPath.getParent();
for (int i=0; i < (fullPath.depth() - root.depth() -1); i++) {
retPath = new Path(parent.getName(), retPath);
parent = parent.getParent();
}
return new Path(justRoot, retPath);
}
return null;
}
/**
* this method writes all the valid top level directories
* into the srcWriter for indexing. This method is a little
* tricky. example-
* for an input with parent path /home/user/ and sources
* as /home/user/source/dir1, /home/user/source/dir2 - this
* will output <source, dir, dir1, dir2> (dir means that source is a dir
* with dir1 and dir2 as children) and <source/dir1, file, null>
* and <source/dir2, file, null>
* @param srcWriter the sequence file writer to write the
* directories to
* @param paths the source paths provided by the user. They
* are glob free and have full path (not relative paths)
* @param parentPath the parent path that you wnat the archives
* to be relative to. example - /home/user/dir1 can be archived with
* parent as /home or /home/user.
* @throws IOException
*/
private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
List<Path> paths, Path parentPath) throws IOException {
//add all the directories
List<Path> justDirs = new ArrayList<Path>();
for (Path p: paths) {
if (!p.getFileSystem(getConf()).isFile(p)) {
justDirs.add(new Path(p.toUri().getPath()));
}
else {
justDirs.add(new Path(p.getParent().toUri().getPath()));
}
}
/* find all the common parents of paths that are valid archive
* paths. The below is done so that we do not add a common path
* twice and also we need to only add valid child of a path that
* are specified the user.
*/
TreeMap<String, HashSet<String>> allpaths = new TreeMap<String,
HashSet<String>>();
/* the largest depth of paths. the max number of times
* we need to iterate
*/
Path deepest = largestDepth(paths);
Path root = new Path(Path.SEPARATOR);
for (int i = parentPath.depth(); i < deepest.depth(); i++) {
List<Path> parents = new ArrayList<Path>();
for (Path p: justDirs) {
if (p.compareTo(root) == 0){
//do nothing
}
else {
Path parent = p.getParent();
if (allpaths.containsKey(parent.toString())) {
HashSet<String> children = allpaths.get(parent.toString());
children.add(p.getName());
}
else {
HashSet<String> children = new HashSet<String>();
children.add(p.getName());
allpaths.put(parent.toString(), children);
}
parents.add(parent);
}
}
justDirs = parents;
}
Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
for (Map.Entry<String, HashSet<String>> entry : keyVals) {
Path relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
if (relPath != null) {
String toWrite = relPath + " dir ";
HashSet<String> children = entry.getValue();
StringBuffer sbuff = new StringBuffer();
sbuff.append(toWrite);
for (String child: children) {
sbuff.append(child + " ");
}
toWrite = sbuff.toString();
srcWriter.append(new LongWritable(0L), new Text(toWrite));
}
}
}
/**archive the given source paths into
* the dest
* @param parentPath the parent path of all the source paths
* @param srcPaths the src paths to be archived
* @param dest the dest dir that will contain the archive
*/
void archive(Path parentPath, List<Path> srcPaths,
String archiveName, Path dest) throws IOException {
checkPaths(conf, srcPaths);
int numFiles = 0;
long totalSize = 0;
FileSystem fs = parentPath.getFileSystem(conf);
conf.set(DST_HAR_LABEL, archiveName);
conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
Path outputPath = new Path(dest, archiveName);
FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf);
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
throw new IOException("Invalid Output: " + outputPath);
}
conf.set(DST_DIR_LABEL, outputPath.toString());
final String randomId = DistCp.getRandomId();
Path stagingArea;
try {
stagingArea = JobSubmissionFiles.getStagingDir(new Cluster(conf),
conf);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
Path jobDirectory = new Path(stagingArea,
NAME + "_" + randomId);
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jobDirectory.getFileSystem(conf), jobDirectory,
mapredSysPerms);
conf.set(JOB_DIR_LABEL, jobDirectory.toString());
//get a tmp directory for input splits
FileSystem jobfs = jobDirectory.getFileSystem(conf);
Path srcFiles = new Path(jobDirectory, "_har_src_files");
conf.set(SRC_LIST_LABEL, srcFiles.toString());
SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
srcFiles, LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
// get the list of files
// create single list of files and dirs
try {
// write the top level dirs in first
writeTopLevelDirs(srcWriter, srcPaths, parentPath);
srcWriter.sync();
// these are the input paths passed
// from the command line
// we do a recursive ls on these paths
// and then write them to the input file
// one at a time
for (Path src: srcPaths) {
ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
recursivels(fs, src, allFiles);
for (FileStatus stat: allFiles) {
String toWrite = "";
long len = stat.isDir()? 0:stat.getLen();
if (stat.isDir()) {
toWrite = "" + relPathToRoot(stat.getPath(), parentPath) + " dir ";
//get the children
FileStatus[] list = fs.listStatus(stat.getPath());
StringBuffer sbuff = new StringBuffer();
sbuff.append(toWrite);
for (FileStatus stats: list) {
sbuff.append(stats.getPath().getName() + " ");
}
toWrite = sbuff.toString();
}
else {
toWrite += relPathToRoot(stat.getPath(), parentPath) + " file ";
}
srcWriter.append(new LongWritable(len), new
Text(toWrite));
srcWriter.sync();
numFiles++;
totalSize += len;
}
}
} finally {
srcWriter.close();
}
//increase the replication of src files
jobfs.setReplication(srcFiles, (short) 10);
conf.setInt(SRC_COUNT_LABEL, numFiles);
conf.setLong(TOTAL_SIZE_LABEL, totalSize);
int numMaps = (int)(totalSize/partSize);
//run atleast one map.
conf.setNumMapTasks(numMaps == 0? 1:numMaps);
conf.setNumReduceTasks(1);
conf.setInputFormat(HArchiveInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(HArchivesMapper.class);
conf.setReducerClass(HArchivesReducer.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.set(JobContext.HISTORY_LOCATION, "none");
FileInputFormat.addInputPath(conf, jobDirectory);
//make sure no speculative execution is done
conf.setSpeculativeExecution(false);
JobClient.runJob(conf);
//delete the tmp job directory
try {
jobfs.delete(jobDirectory, true);
} catch(IOException ie) {
LOG.info("Unable to clean tmp directory " + jobDirectory);
}
}
static class HArchivesMapper
implements Mapper<LongWritable, Text, IntWritable, Text> {
private JobConf conf = null;
int partId = -1 ;
Path tmpOutputDir = null;
Path tmpOutput = null;
String partname = null;
Path rootPath = null;
FSDataOutputStream partStream = null;
FileSystem destFs = null;
byte[] buffer;
int buf_size = 128 * 1024;
// configure the mapper and create
// the part file.
// use map reduce framework to write into
// tmp files.
public void configure(JobConf conf) {
this.conf = conf;
// this is tightly tied to map reduce
// since it does not expose an api
// to get the partition
partId = conf.getInt(JobContext.TASK_PARTITION, -1);
// create a file name using the partition
// we need to write to this directory
tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
// get the output path and write to the tmp
// directory
partname = "part-" + partId;
tmpOutput = new Path(tmpOutputDir, partname);
rootPath = (conf.get(SRC_PARENT_LABEL, null) == null) ? null :
new Path(conf.get(SRC_PARENT_LABEL));
if (rootPath == null) {
throw new RuntimeException("Unable to read parent " +
"path for har from config");
}
try {
destFs = tmpOutput.getFileSystem(conf);
//this was a stale copy
if (destFs.exists(tmpOutput)) {
destFs.delete(tmpOutput, false);
}
partStream = destFs.create(tmpOutput);
} catch(IOException ie) {
throw new RuntimeException("Unable to open output file " + tmpOutput);
}
buffer = new byte[buf_size];
}
// copy raw data.
public void copyData(Path input, FSDataInputStream fsin,
FSDataOutputStream fout, Reporter reporter) throws IOException {
try {
for (int cbread=0; (cbread = fsin.read(buffer))>= 0;) {
fout.write(buffer, 0,cbread);
reporter.progress();
}
} finally {
fsin.close();
}
}
static class MapStat {
private String pathname;
private boolean isDir;
private List<String> children;
public MapStat(String line) {
String[] splits = line.split(" ");
pathname = splits[0];
if ("dir".equals(splits[1])) {
isDir = true;
}
else {
isDir = false;
}
if (isDir) {
children = new ArrayList<String>();
for (int i = 2; i < splits.length; i++) {
children.add(splits[i]);
}
}
}
}
/**
* get rid of / in the beginning of path
* @param p the path
* @return return path without /
*/
private Path realPath(Path p, Path parent) {
Path rootPath = new Path(Path.SEPARATOR);
if (rootPath.compareTo(p) == 0) {
return parent;
}
return new Path(parent, new Path(p.toString().substring(1)));
}
// read files from the split input
// and write it onto the part files.
// also output hash(name) and string
// for reducer to create index
// and masterindex files.
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
String line = value.toString();
MapStat mstat = new MapStat(line);
Path relPath = new Path(mstat.pathname);
int hash = HarFileSystem.getHarHash(relPath);
String towrite = null;
Path srcPath = realPath(relPath, rootPath);
long startPos = partStream.getPos();
if (mstat.isDir) {
towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
StringBuffer sbuff = new StringBuffer();
sbuff.append(towrite);
for (String child: mstat.children) {
sbuff.append(child + " ");
}
towrite = sbuff.toString();
//reading directories is also progress
reporter.progress();
}
else {
FileSystem srcFs = srcPath.getFileSystem(conf);
FileStatus srcStatus = srcFs.getFileStatus(srcPath);
FSDataInputStream input = srcFs.open(srcStatus.getPath());
reporter.setStatus("Copying file " + srcStatus.getPath() +
" to archive.");
copyData(srcStatus.getPath(), input, partStream, reporter);
towrite = relPath.toString() + " file " + partname + " " + startPos
+ " " + srcStatus.getLen() + " ";
}
out.collect(new IntWritable(hash), new Text(towrite));
}
public void close() throws IOException {
// close the part files.
partStream.close();
}
}
/** the reduce for creating the index and the master index
*
*/
static class HArchivesReducer implements Reducer<IntWritable,
Text, Text, Text> {
private JobConf conf = null;
private long startIndex = 0;
private long endIndex = 0;
private long startPos = 0;
private Path masterIndex = null;
private Path index = null;
private FileSystem fs = null;
private FSDataOutputStream outStream = null;
private FSDataOutputStream indexStream = null;
private int numIndexes = 1000;
private Path tmpOutputDir = null;
private int written = 0;
private int keyVal = 0;
// configure
public void configure(JobConf conf) {
this.conf = conf;
tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
masterIndex = new Path(tmpOutputDir, "_masterindex");
index = new Path(tmpOutputDir, "_index");
try {
fs = masterIndex.getFileSystem(conf);
if (fs.exists(masterIndex)) {
fs.delete(masterIndex, false);
}
if (fs.exists(index)) {
fs.delete(index, false);
}
indexStream = fs.create(index);
outStream = fs.create(masterIndex);
String version = HarFileSystem.VERSION + " \n";
outStream.write(version.getBytes());
} catch(IOException e) {
throw new RuntimeException(e);
}
}
// create the index and master index. The input to
// the reduce is already sorted by the hash of the
// files. SO we just need to write it to the index.
// We update the masterindex as soon as we update
// numIndex entries.
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
keyVal = key.get();
while(values.hasNext()) {
Text value = values.next();
String towrite = value.toString() + "\n";
indexStream.write(towrite.getBytes());
written++;
if (written > numIndexes -1) {
// every 1000 indexes we report status
reporter.setStatus("Creating index for archives");
reporter.progress();
endIndex = keyVal;
String masterWrite = startIndex + " " + endIndex + " " + startPos
+ " " + indexStream.getPos() + " \n" ;
outStream.write(masterWrite.getBytes());
startPos = indexStream.getPos();
startIndex = endIndex;
written = 0;
}
}
}
public void close() throws IOException {
//write the last part of the master index.
if (written > 0) {
String masterWrite = startIndex + " " + keyVal + " " + startPos +
" " + indexStream.getPos() + " \n";
outStream.write(masterWrite.getBytes());
}
// close the streams
outStream.close();
indexStream.close();
// try increasing the replication
fs.setReplication(index, (short) 5);
fs.setReplication(masterIndex, (short) 5);
}
}
/** the main driver for creating the archives
* it takes at least three command line parameters. The parent path,
* The src and the dest. It does an lsr on the source paths.
* The mapper created archuves and the reducer creates
* the archive index.
*/
public int run(String[] args) throws Exception {
try {
Path parentPath = null;
List<Path> srcPaths = new ArrayList<Path>();
Path destPath = null;
String archiveName = null;
if (args.length < 5) {
System.out.println(usage);
throw new IOException("Invalid usage.");
}
if (!"-archiveName".equals(args[0])) {
System.out.println(usage);
throw new IOException("Archive Name not specified.");
}
archiveName = args[1];
if (!checkValidName(archiveName)) {
System.out.println(usage);
throw new IOException("Invalid name for archives. " + archiveName);
}
int i = 2;
//check to see if relative parent has been provided or not
//this is a required parameter.
if (! "-p".equals(args[i])) {
System.out.println(usage);
throw new IOException("Parent path not specified.");
}
parentPath = new Path(args[i+1]);
i+=2;
//read the rest of the paths
for (; i < args.length; i++) {
if (i == (args.length - 1)) {
destPath = new Path(args[i]);
}
else {
Path argPath = new Path(args[i]);
if (argPath.isAbsolute()) {
System.out.println(usage);
throw new IOException("source path " + argPath +
" is not relative to "+ parentPath);
}
srcPaths.add(new Path(parentPath, argPath));
}
}
if (srcPaths.size() == 0) {
// assuming if the user does not specify path for sources
// the whole parent directory needs to be archived.
srcPaths.add(parentPath);
}
// do a glob on the srcPaths and then pass it on
List<Path> globPaths = new ArrayList<Path>();
for (Path p: srcPaths) {
FileSystem fs = p.getFileSystem(getConf());
FileStatus[] statuses = fs.globStatus(p);
for (FileStatus status: statuses) {
globPaths.add(fs.makeQualified(status.getPath()));
}
}
archive(parentPath, globPaths, archiveName, destPath);
} catch(IOException ie) {
System.err.println(ie.getLocalizedMessage());
return -1;
}
return 0;
}
/** the main functions **/
public static void main(String[] args) {
JobConf job = new JobConf(HadoopArchives.class);
HadoopArchives harchives = new HadoopArchives(job);
int ret = 0;
try{
ret = ToolRunner.run(harchives, args);
} catch(Exception e) {
LOG.debug("Exception in archives ", e);
System.err.println("Exception in archives");
System.err.println(e.getLocalizedMessage());
System.exit(1);
}
System.exit(ret);
}
}