blob: 705c6e9cfb9e5beb5cefe98fecd3de71c85a1aa1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.analyzer.plugins;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.analyzer.Result;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.ATSImportTool;
import org.apache.tez.history.parser.ATSFileParser;
import org.apache.tez.history.parser.ProtoHistoryParser;
import org.apache.tez.history.parser.SimpleHistoryParser;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.common.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer {
private static final Logger LOG = LoggerFactory.getLogger(TezAnalyzerBase.class);
private static final String EVENT_FILE_NAME = "eventFileName";
private static final String OUTPUT_DIR = "outputDir";
private static final String SAVE_RESULTS = "saveResults";
private static final String DAG_ID = "dagId";
private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory";
private static final String FROM_PROTO_HISTORY = "fromProtoHistory";
private static final String HELP = "help";
private static final int SEPARATOR_WIDTH = 80;
private static final int MIN_COL_WIDTH = 12;
private String outputDir;
private boolean saveResults = false;
public TezAnalyzerBase(Configuration config) {
setConf(config);
}
@SuppressWarnings("static-access")
private static Options buildOptions() {
Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
.withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create();
Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR)
.withDescription("Directory to write outputs to.").hasArg().isRequired(false).create();
Option saveResults = OptionBuilder.withArgName(SAVE_RESULTS).withLongOpt(SAVE_RESULTS)
.withDescription("Saves results to output directory (optional)")
.hasArg(false).isRequired(false).create();
Option eventFileNameOption = OptionBuilder.withArgName(EVENT_FILE_NAME).withLongOpt
(EVENT_FILE_NAME)
.withDescription("File with event data for the DAG").hasArg()
.isRequired(false).create();
Option fromSimpleHistoryOption = OptionBuilder.withArgName(FROM_SIMPLE_HISTORY).withLongOpt
(FROM_SIMPLE_HISTORY)
.withDescription("Event data from Simple History logging. Must also specify event file")
.isRequired(false).create();
Option fromProtoHistoryOption =
OptionBuilder.withArgName(FROM_PROTO_HISTORY).withLongOpt(FROM_PROTO_HISTORY)
.withDescription("Event data from Proto History logging. Must also specify event file")
.isRequired(false).create();
Option help = OptionBuilder.withArgName(HELP).withLongOpt
(HELP)
.withDescription("print help")
.isRequired(false).create();
Options opts = new Options();
opts.addOption(dagIdOption);
opts.addOption(outputDirOption);
opts.addOption(saveResults);
opts.addOption(eventFileNameOption);
opts.addOption(fromSimpleHistoryOption);
opts.addOption(fromProtoHistoryOption);
opts.addOption(help);
return opts;
}
protected String getOutputDir() {
return outputDir;
}
private void printUsage() {
System.err.println("Analyzer base options are");
Options options = buildOptions();
for (Object obj : options.getOptions()) {
Option option = (Option) obj;
System.err.println(option.getArgName() + " : " + option.getDescription());
}
}
@Override
public int run(String[] args) throws Exception {
//Parse downloaded contents
CommandLine cmdLine = null;
try {
cmdLine = new GnuParser().parse(buildOptions(), args);
} catch (ParseException e) {
System.err.println("Invalid options on command line");
printUsage();
return -1;
}
saveResults = cmdLine.hasOption(SAVE_RESULTS);
if(cmdLine.hasOption(HELP)) {
printUsage();
return 0;
}
outputDir = cmdLine.getOptionValue(OUTPUT_DIR);
if (outputDir == null) {
outputDir = System.getProperty("user.dir");
}
String dagId = cmdLine.getOptionValue(DAG_ID);
List<File> files = new ArrayList<File>();
if (cmdLine.hasOption(EVENT_FILE_NAME)) {
for (String file : cmdLine.getOptionValue(EVENT_FILE_NAME).split(",")) {
File fileOrDir = new File(file);
if (fileOrDir.exists()) {
if (fileOrDir.isFile()) {
files.add(fileOrDir);
} else {
files.addAll(collectFilesForDagId(fileOrDir, dagId));
}
}
}
}
DagInfo dagInfo = null;
if (files.isEmpty()) {
if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
System.err.println("Event file name must be specified when using simple history");
printUsage();
return -2;
}
if (cmdLine.hasOption(FROM_PROTO_HISTORY)) {
System.err.println("Proto file name must be specified when using proto history");
printUsage();
return -2;
}
// using ATS - try to download directly
String[] importArgs = { "--dagId=" + dagId, "--downloadDir=" + outputDir };
int result = ATSImportTool.process(importArgs);
if (result != 0) {
System.err.println("Error downloading data from ATS");
return -3;
}
//Parse ATS data and verify results
//Parse downloaded contents
files.add(new File(outputDir
+ Path.SEPARATOR + dagId + ".zip"));
}
Preconditions.checkState(!files.isEmpty());
if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
SimpleHistoryParser parser = new SimpleHistoryParser(files);
dagInfo = parser.getDAGData(dagId);
} else if (cmdLine.hasOption(FROM_PROTO_HISTORY)) {
ProtoHistoryParser parser = new ProtoHistoryParser(files);
dagInfo = parser.getDAGData(dagId);
} else {
ATSFileParser parser = new ATSFileParser(files);
dagInfo = parser.getDAGData(dagId);
}
Preconditions.checkState(dagInfo.getDagId().equals(dagId));
analyze(dagInfo);
Result result = getResult();
if (saveResults) {
String dagInfoFileName = outputDir + File.separator + this.getClass().getName() + "_"
+ dagInfo.getDagId() + ".dag";
FileUtils.writeStringToFile(new File(dagInfoFileName), dagInfo.toExtendedString());
LOG.info("Saved dag info in " + dagInfoFileName);
if (result instanceof CSVResult) {
String fileName = outputDir + File.separator + this.getClass().getName() + "_"
+ dagInfo.getDagId() + ".csv";
((CSVResult) result).dumpToFile(fileName);
LOG.info("Saved results in " + fileName);
}
}
return 0;
}
private List<File> collectFilesForDagId(File parentDir, String dagId) {
File[] arrFiles = parentDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.contains(dagId);
}
});
if (arrFiles == null || arrFiles.length == 0) {
throw new RuntimeException(
String.format("cannot find relevant files for dag: '%s' in dir: %s", dagId, parentDir));
}
List<File> files = Arrays.asList(arrFiles);
LOG.info("collected files for dag: \n"
+ files.stream().map(f -> "\n" + f.getAbsolutePath()).collect(Collectors.toList()));
return files;
}
public void printResults() throws TezException {
Result result = getResult();
if (result instanceof CSVResult) {
String[] headers = ((CSVResult) result).getHeaders();
StringBuilder formatBuilder = new StringBuilder();
int size = Math.max(MIN_COL_WIDTH, SEPARATOR_WIDTH / headers.length);
for (int i = 0; i < headers.length; i++) {
formatBuilder.append("%-").append(size).append("s ");
}
String format = formatBuilder.toString();
StringBuilder sep = new StringBuilder();
for (int i = 0; i < SEPARATOR_WIDTH; i++) {
sep.append("-");
}
String separator = sep.toString();
LOG.debug(separator);
LOG.debug(String.format(format.toString(), (String[]) headers));
LOG.debug(separator);
Iterator<String[]> recordsIterator = ((CSVResult) result).getRecordsIterator();
while (recordsIterator.hasNext()) {
String line = String.format(format, (String[]) recordsIterator.next());
LOG.debug(line);
}
LOG.debug(separator);
}
}
}