blob: 2fb5293123256449e573031c989c47aa6e0851d0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* The main driver of the Rumen Parser.
*/
public class TraceBuilder extends Configured implements Tool {
static final private Log LOG = LogFactory.getLog(TraceBuilder.class);
static final int RUN_METHOD_FAILED_EXIT_CODE = 3;
TopologyBuilder topologyBuilder = new TopologyBuilder();
Outputter<LoggedJob> traceWriter;
Outputter<LoggedNetworkTopology> topologyWriter;
static class MyOptions {
Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
@SuppressWarnings("unchecked")
Class<? extends Outputter> clazzTraceOutputter = DefaultOutputter.class;
Path traceOutput;
Path topologyOutput;
List<Path> inputs = new LinkedList<Path>();
MyOptions(String[] args, Configuration conf) throws FileNotFoundException,
IOException, ClassNotFoundException {
int switchTop = 0;
// to determine if the input paths should be recursively scanned or not
boolean doRecursiveTraversal = false;
while (args[switchTop].startsWith("-")) {
if (args[switchTop].equalsIgnoreCase("-demuxer")) {
inputDemuxerClass =
Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
} else if (args[switchTop].equalsIgnoreCase("-recursive")) {
doRecursiveTraversal = true;
}
++switchTop;
}
traceOutput = new Path(args[0 + switchTop]);
topologyOutput = new Path(args[1 + switchTop]);
for (int i = 2 + switchTop; i < args.length; ++i) {
inputs.addAll(processInputArgument(
args[i], conf, doRecursiveTraversal));
}
}
/**
* Compare the history file names, not the full paths.
* Job history file name format is such that doing lexicographic sort on the
* history file names should result in the order of jobs' submission times.
*/
private static class HistoryLogsComparator
implements Comparator<FileStatus>, Serializable {
@Override
public int compare(FileStatus file1, FileStatus file2) {
return file1.getPath().getName().compareTo(
file2.getPath().getName());
}
}
/**
* Processes the input file/folder argument. If the input is a file,
* then it is directly considered for further processing by TraceBuilder.
* If the input is a folder, then all the history logs in the
* input folder are considered for further processing.
*
* If isRecursive is true, then the input path is recursively scanned
* for job history logs for further processing by TraceBuilder.
*
* NOTE: If the input represents a globbed path, then it is first flattened
* and then the individual paths represented by the globbed input
* path are considered for further processing.
*
* @param input input path, possibly globbed
* @param conf configuration
* @param isRecursive whether to recursively traverse the input paths to
* find history logs
* @return the input history log files' paths
* @throws FileNotFoundException
* @throws IOException
*/
static List<Path> processInputArgument(String input, Configuration conf,
boolean isRecursive) throws FileNotFoundException, IOException {
Path inPath = new Path(input);
FileSystem fs = inPath.getFileSystem(conf);
FileStatus[] inStatuses = fs.globStatus(inPath);
List<Path> inputPaths = new LinkedList<Path>();
if (inStatuses == null || inStatuses.length == 0) {
return inputPaths;
}
for (FileStatus inStatus : inStatuses) {
Path thisPath = inStatus.getPath();
if (inStatus.isDirectory()) {
// Find list of files in this path(recursively if -recursive option
// is specified).
List<FileStatus> historyLogs = new ArrayList<FileStatus>();
RemoteIterator<LocatedFileStatus> iter =
fs.listFiles(thisPath, isRecursive);
while (iter.hasNext()) {
LocatedFileStatus child = iter.next();
String fileName = child.getPath().getName();
if (!(fileName.endsWith(".crc") || fileName.startsWith("."))) {
historyLogs.add(child);
}
}
if (historyLogs.size() > 0) {
// Add the sorted history log file names in this path to the
// inputPaths list
FileStatus[] sortableNames =
historyLogs.toArray(new FileStatus[historyLogs.size()]);
Arrays.sort(sortableNames, new HistoryLogsComparator());
for (FileStatus historyLog : sortableNames) {
inputPaths.add(historyLog.getPath());
}
}
} else {
inputPaths.add(thisPath);
}
}
return inputPaths;
}
}
public static void main(String[] args) {
TraceBuilder builder = new TraceBuilder();
int result = RUN_METHOD_FAILED_EXIT_CODE;
try {
result = ToolRunner.run(builder, args);
} catch (Throwable t) {
t.printStackTrace(System.err);
} finally {
try {
builder.finish();
} finally {
if (result == 0) {
return;
}
System.exit(result);
}
}
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
MyOptions options = new MyOptions(args, getConf());
traceWriter = options.clazzTraceOutputter.newInstance();
traceWriter.init(options.traceOutput, getConf());
topologyWriter = new DefaultOutputter<LoggedNetworkTopology>();
topologyWriter.init(options.topologyOutput, getConf());
try {
JobBuilder jobBuilder = null;
for (Path p : options.inputs) {
InputDemuxer inputDemuxer = options.inputDemuxerClass.newInstance();
try {
inputDemuxer.bindTo(p, getConf());
} catch (IOException e) {
LOG.warn("Unable to bind Path " + p + " . Skipping...", e);
continue;
}
Pair<String, InputStream> filePair = null;
try {
while ((filePair = inputDemuxer.getNext()) != null) {
RewindableInputStream ris =
new RewindableInputStream(filePair.second());
JobHistoryParser parser = null;
try {
String jobID = JobHistoryUtils.extractJobID(filePair.first());
if (jobID == null) {
LOG.warn("File skipped: Invalid file name: "
+ filePair.first());
continue;
}
if ((jobBuilder == null)
|| (!jobBuilder.getJobID().equals(jobID))) {
if (jobBuilder != null) {
traceWriter.output(jobBuilder.build());
}
jobBuilder = new JobBuilder(jobID);
}
if (JobHistoryUtils.isJobConfXml(filePair.first())) {
processJobConf(JobConfigurationParser.parse(ris.rewind()),
jobBuilder);
} else {
parser = JobHistoryParserFactory.getParser(ris);
if (parser == null) {
LOG.warn("File skipped: Cannot find suitable parser: "
+ filePair.first());
} else {
processJobHistory(parser, jobBuilder);
}
}
} finally {
if (parser == null) {
ris.close();
} else {
parser.close();
parser = null;
}
}
}
} catch (Throwable t) {
if (filePair != null) {
LOG.warn("TraceBuilder got an error while processing the [possibly virtual] file "
+ filePair.first() + " within Path " + p , t);
}
} finally {
inputDemuxer.close();
}
}
if (jobBuilder != null) {
traceWriter.output(jobBuilder.build());
jobBuilder = null;
} else {
LOG.warn("No job found in traces: ");
}
topologyWriter.output(topologyBuilder.build());
} finally {
traceWriter.close();
topologyWriter.close();
}
return 0;
}
private void processJobConf(Properties properties, JobBuilder jobBuilder) {
jobBuilder.process(properties);
topologyBuilder.process(properties);
}
void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
throws IOException {
HistoryEvent e;
while ((e = parser.nextEvent()) != null) {
jobBuilder.process(e);
topologyBuilder.process(e);
}
parser.close();
}
void finish() {
IOUtils.cleanup(LOG, traceWriter, topologyWriter);
}
}