blob: 540f15bffa2ae3de292e387d4c514777f1bfcb6b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
/**
* Job History Log Analyzer.
*
* <h3>Description.</h3>
* This a tool for parsing and analyzing history logs of map-reduce jobs.
* History logs contain information about execution of jobs, tasks, and
* attempts. This tool focuses on submission, launch, start, and finish times,
* as well as the success or failure of jobs, tasks, and attempts.
* <p>
* The analyzer calculates <em>per hour slot utilization</em> for the cluster
* as follows.
* For each task attempt it divides the time segment from the start of the
* attempt t<sub>S</sub> to the finish t<sub>F</sub> into whole hours
* [t<sub>0</sub>, ..., t<sub>n</sub>], where t<sub>0</sub> <= t<sub>S</sub>
* is the maximal whole hour preceding t<sub>S</sub>, and
* t<sub>n</sub> >= t<sub>F</sub> is the minimal whole hour after t<sub>F</sub>.
* Thus, [t<sub>0</sub>, ..., t<sub>n</sub>] covers the segment
* [t<sub>S</sub>, t<sub>F</sub>], during which the attempt was executed.
* Each interval [t<sub>i</sub>, t<sub>i+1</sub>] fully contained in
* [t<sub>S</sub>, t<sub>F</sub>] corresponds to exactly one slot on
* a map-reduce cluster (usually MAP-slot or REDUCE-slot).
* If interval [t<sub>i</sub>, t<sub>i+1</sub>] only intersects with
* [t<sub>S</sub>, t<sub>F</sub>] then we say that the task
* attempt used just a fraction of the slot during this hour.
* The fraction equals the size of the intersection.
* Let slotTime(A, h) denote the number of slots calculated that way for a
* specific attempt A during hour h.
* The tool then sums all slots for all attempts for every hour.
* The result is the slot hour utilization of the cluster:
* <tt>slotTime(h) = SUM<sub>A</sub> slotTime(A,h)</tt>.
* <p>
* Log analyzer calculates slot hours for <em>MAP</em> and <em>REDUCE</em>
* attempts separately.
* <p>
* Log analyzer distinguishes between <em>successful</em> and <em>failed</em>
* attempts. Task attempt is considered successful if its own status is SUCCESS
* and the statuses of the task and the job it is a part of are also SUCCESS.
* Otherwise the task attempt is considered failed.
* <p>
* Map-reduce clusters are usually configured to have a fixed number of MAP
* and REDUCE slots per node. Thus the maximal possible number of slots on
* the cluster is <tt>total_slots = total_nodes * slots_per_node</tt>.
* Effective slot hour cannot exceed <tt>total_slots</tt> for successful
* attempts.
* <p>
* <em>Pending time</em> characterizes the wait time of attempts.
* It is calculated similarly to the slot hour except that the wait interval
* starts when the job is submitted and ends when an attempt starts execution.
* In addition to that pending time also includes intervals between attempts
* of the same task if it was re-executed.
* <p>
* History log analyzer calculates two pending time variations. First is based
* on job submission time as described above, second, starts the wait interval
* when the job is launched rather than submitted.
*
* <h3>Input.</h3>
* The following input parameters can be specified in the argument string
* to the job log analyzer:
* <ul>
* <li><tt>-historyDir inputDir</tt> specifies the location of the directory
* where analyzer will be looking for job history log files.</li>
* <li><tt>-resFile resultFile</tt> the name of the result file.</li>
* <li><tt>-usersIncluded | -usersExcluded userList</tt> slot utilization and
* pending time can be calculated for all or for all but the specified users.
* <br>
* <tt>userList</tt> is a comma or semicolon separated list of users.</li>
* <li><tt>-gzip</tt> is used if history log files are compressed.
* Only {@link GzipCodec} is currently supported.</li>
* <li><tt>-jobDelimiter pattern</tt> one can concatenate original log files into
* larger file(s) with the specified delimiter to recognize the end of the log
* for one job from the next one.<br>
* <tt>pattern</tt> is a java regular expression
* {@link java.util.regex.Pattern}, which should match only the log delimiters.
* <br>
* E.g. pattern <tt>".!!FILE=.*!!"</tt> matches delimiters, which contain
* the original history log file names in the following form:<br>
* <tt>"$!!FILE=my.job.tracker.com_myJobId_user_wordcount.log!!"</tt></li>
* <li><tt>-clean</tt> cleans up default directories used by the analyzer.</li>
* <li><tt>-test</tt> test one file locally and exit;
* does not require map-reduce.</li>
* <li><tt>-help</tt> print usage.</li>
* </ul>
*
* <h3>Output.</h3>
* The output file is formatted as a tab separated table consisting of four
* columns: <tt>SERIES, PERIOD, TYPE, SLOT_HOUR</tt>.
* <ul>
* <li><tt>SERIES</tt> one of the four statistical series;</li>
* <li><tt>PERIOD</tt> the start of the time interval in the following format:
* <tt>"yyyy-mm-dd hh:mm:ss"</tt>;</li>
* <li><tt>TYPE</tt> the slot type, e.g. MAP or REDUCE;</li>
* <li><tt>SLOT_HOUR</tt> the value of the slot usage during this
* time interval.</li>
* </ul>
*/
@SuppressWarnings("deprecation")
public class JHLogAnalyzer {
private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class);
// Constants
private static final String JHLA_ROOT_DIR =
System.getProperty("test.build.data", "stats/JHLA");
private static final Path INPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_input");
private static final String BASE_INPUT_FILE_NAME = "jhla_in_";
private static final Path OUTPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_output");
private static final Path RESULT_FILE =
new Path(JHLA_ROOT_DIR, "jhla_result.txt");
private static final Path DEFAULT_HISTORY_DIR = new Path("history");
private static final int DEFAULT_TIME_INTERVAL_MSEC = 1000*60*60; // 1 hour
static{
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
static enum StatSeries {
STAT_ALL_SLOT_TIME
(AccumulatingReducer.VALUE_TYPE_LONG + "allSlotTime"),
STAT_FAILED_SLOT_TIME
(AccumulatingReducer.VALUE_TYPE_LONG + "failedSlotTime"),
STAT_SUBMIT_PENDING_SLOT_TIME
(AccumulatingReducer.VALUE_TYPE_LONG + "submitPendingSlotTime"),
STAT_LAUNCHED_PENDING_SLOT_TIME
(AccumulatingReducer.VALUE_TYPE_LONG + "launchedPendingSlotTime");
private String statName = null;
private StatSeries(String name) {this.statName = name;}
public String toString() {return statName;}
}
private static class FileCreateDaemon extends Thread {
private static final int NUM_CREATE_THREADS = 10;
private static volatile int numFinishedThreads;
private static volatile int numRunningThreads;
private static FileStatus[] jhLogFiles;
FileSystem fs;
int start;
int end;
FileCreateDaemon(FileSystem fs, int start, int end) {
this.fs = fs;
this.start = start;
this.end = end;
}
public void run() {
try {
for(int i=start; i < end; i++) {
String name = getFileName(i);
Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
String logFile = jhLogFiles[i].getPath().toString();
writer.append(new Text(logFile), new LongWritable(0));
} catch(Exception e) {
throw new IOException(e);
} finally {
if (writer != null)
writer.close();
writer = null;
}
}
} catch(IOException ex) {
LOG.error("FileCreateDaemon failed.", ex);
}
numFinishedThreads++;
}
private static void createControlFile(FileSystem fs, Path jhLogDir
) throws IOException {
fs.delete(INPUT_DIR, true);
jhLogFiles = fs.listStatus(jhLogDir);
numFinishedThreads = 0;
try {
int start = 0;
int step = jhLogFiles.length / NUM_CREATE_THREADS
+ ((jhLogFiles.length % NUM_CREATE_THREADS) > 0 ? 1 : 0);
FileCreateDaemon[] daemons = new FileCreateDaemon[NUM_CREATE_THREADS];
numRunningThreads = 0;
for(int tIdx=0; tIdx < NUM_CREATE_THREADS && start < jhLogFiles.length; tIdx++) {
int end = Math.min(start + step, jhLogFiles.length);
daemons[tIdx] = new FileCreateDaemon(fs, start, end);
start += step;
numRunningThreads++;
}
for(int tIdx=0; tIdx < numRunningThreads; tIdx++) {
daemons[tIdx].start();
}
} finally {
int prevValue = 0;
while(numFinishedThreads < numRunningThreads) {
if(prevValue < numFinishedThreads) {
LOG.info("Finished " + numFinishedThreads + " threads out of " + numRunningThreads);
prevValue = numFinishedThreads;
}
try {Thread.sleep(500);} catch (InterruptedException e) {}
}
}
}
}
private static void createControlFile(FileSystem fs, Path jhLogDir
) throws IOException {
LOG.info("creating control file: JH log dir = " + jhLogDir);
FileCreateDaemon.createControlFile(fs, jhLogDir);
LOG.info("created control file: JH log dir = " + jhLogDir);
}
private static String getFileName(int fIdx) {
return BASE_INPUT_FILE_NAME + Integer.toString(fIdx);
}
/**
* If keyVal is of the form KEY="VALUE", then this will return [KEY, VALUE]
*/
private static String [] getKeyValue(String t) throws IOException {
String[] keyVal = t.split("=\"*|\"");
return keyVal;
}
/**
* JobHistory log record.
*/
private static class JobHistoryLog {
String JOBID;
String JOB_STATUS;
long SUBMIT_TIME;
long LAUNCH_TIME;
long FINISH_TIME;
long TOTAL_MAPS;
long TOTAL_REDUCES;
long FINISHED_MAPS;
long FINISHED_REDUCES;
String USER;
Map<String, TaskHistoryLog> tasks;
boolean isSuccessful() {
return (JOB_STATUS != null) && JOB_STATUS.equals("SUCCESS");
}
void parseLine(String line) throws IOException {
StringTokenizer tokens = new StringTokenizer(line);
if(!tokens.hasMoreTokens())
return;
String what = tokens.nextToken();
// Line should start with one of the following:
// Job, Task, MapAttempt, ReduceAttempt
if(what.equals("Job"))
updateJob(tokens);
else if(what.equals("Task"))
updateTask(tokens);
else if(what.indexOf("Attempt") >= 0)
updateTaskAttempt(tokens);
}
private void updateJob(StringTokenizer tokens) throws IOException {
while(tokens.hasMoreTokens()) {
String t = tokens.nextToken();
String[] keyVal = getKeyValue(t);
if(keyVal.length < 2) continue;
if(keyVal[0].equals("JOBID")) {
if(JOBID == null)
JOBID = new String(keyVal[1]);
else if(!JOBID.equals(keyVal[1])) {
LOG.error("Incorrect JOBID: "
+ keyVal[1].substring(0, Math.min(keyVal[1].length(), 100))
+ " expect " + JOBID);
return;
}
}
else if(keyVal[0].equals("JOB_STATUS"))
JOB_STATUS = new String(keyVal[1]);
else if(keyVal[0].equals("SUBMIT_TIME"))
SUBMIT_TIME = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("LAUNCH_TIME"))
LAUNCH_TIME = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("FINISH_TIME"))
FINISH_TIME = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("TOTAL_MAPS"))
TOTAL_MAPS = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("TOTAL_REDUCES"))
TOTAL_REDUCES = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("FINISHED_MAPS"))
FINISHED_MAPS = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("FINISHED_REDUCES"))
FINISHED_REDUCES = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("USER"))
USER = new String(keyVal[1]);
}
}
private void updateTask(StringTokenizer tokens) throws IOException {
// unpack
TaskHistoryLog task = new TaskHistoryLog().parse(tokens);
if(task.TASKID == null) {
LOG.error("TASKID = NULL for job " + JOBID);
return;
}
// update or insert
if(tasks == null)
tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES));
TaskHistoryLog existing = tasks.get(task.TASKID);
if(existing == null)
tasks.put(task.TASKID, task);
else
existing.updateWith(task);
}
private void updateTaskAttempt(StringTokenizer tokens) throws IOException {
// unpack
TaskAttemptHistoryLog attempt = new TaskAttemptHistoryLog();
String taskID = attempt.parse(tokens);
if(taskID == null) return;
if(tasks == null)
tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES));
TaskHistoryLog existing = tasks.get(taskID);
if(existing == null) {
existing = new TaskHistoryLog(taskID);
tasks.put(taskID, existing);
}
existing.updateWith(attempt);
}
}
/**
* TaskHistory log record.
*/
private static class TaskHistoryLog {
String TASKID;
String TASK_TYPE; // MAP, REDUCE, SETUP, CLEANUP
String TASK_STATUS;
long START_TIME;
long FINISH_TIME;
Map<String, TaskAttemptHistoryLog> attempts;
TaskHistoryLog() {}
TaskHistoryLog(String taskID) {
TASKID = taskID;
}
boolean isSuccessful() {
return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS");
}
TaskHistoryLog parse(StringTokenizer tokens) throws IOException {
while(tokens.hasMoreTokens()) {
String t = tokens.nextToken();
String[] keyVal = getKeyValue(t);
if(keyVal.length < 2) continue;
if(keyVal[0].equals("TASKID")) {
if(TASKID == null)
TASKID = new String(keyVal[1]);
else if(!TASKID.equals(keyVal[1])) {
LOG.error("Incorrect TASKID: "
+ keyVal[1].substring(0, Math.min(keyVal[1].length(), 100))
+ " expect " + TASKID);
continue;
}
}
else if(keyVal[0].equals("TASK_TYPE"))
TASK_TYPE = new String(keyVal[1]);
else if(keyVal[0].equals("TASK_STATUS"))
TASK_STATUS = new String(keyVal[1]);
else if(keyVal[0].equals("START_TIME"))
START_TIME = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("FINISH_TIME"))
FINISH_TIME = Long.parseLong(keyVal[1]);
}
return this;
}
/**
* Update with non-null fields of the same task log record.
*/
void updateWith(TaskHistoryLog from) throws IOException {
if(TASKID == null)
TASKID = from.TASKID;
else if(!TASKID.equals(from.TASKID)) {
throw new IOException("Incorrect TASKID: " + from.TASKID
+ " expect " + TASKID);
}
if(TASK_TYPE == null)
TASK_TYPE = from.TASK_TYPE;
else if(! TASK_TYPE.equals(from.TASK_TYPE)) {
LOG.error(
"Incorrect TASK_TYPE: " + from.TASK_TYPE + " expect " + TASK_TYPE
+ " for task " + TASKID);
return;
}
if(from.TASK_STATUS != null)
TASK_STATUS = from.TASK_STATUS;
if(from.START_TIME > 0)
START_TIME = from.START_TIME;
if(from.FINISH_TIME > 0)
FINISH_TIME = from.FINISH_TIME;
}
/**
* Update with non-null fields of the task attempt log record.
*/
void updateWith(TaskAttemptHistoryLog attempt) throws IOException {
if(attempt.TASK_ATTEMPT_ID == null) {
LOG.error("Unexpected TASK_ATTEMPT_ID = null for task " + TASKID);
return;
}
if(attempts == null)
attempts = new HashMap<String, TaskAttemptHistoryLog>();
TaskAttemptHistoryLog existing = attempts.get(attempt.TASK_ATTEMPT_ID);
if(existing == null)
attempts.put(attempt.TASK_ATTEMPT_ID, attempt);
else
existing.updateWith(attempt);
// update task start time
if(attempt.START_TIME > 0 &&
(this.START_TIME == 0 || this.START_TIME > attempt.START_TIME))
START_TIME = attempt.START_TIME;
}
}
/**
* TaskAttemptHistory log record.
*/
private static class TaskAttemptHistoryLog {
String TASK_ATTEMPT_ID;
String TASK_STATUS; // this task attempt status
long START_TIME;
long FINISH_TIME;
long HDFS_BYTES_READ;
long HDFS_BYTES_WRITTEN;
long FILE_BYTES_READ;
long FILE_BYTES_WRITTEN;
/**
* Task attempt is considered successful iff all three statuses
* of the attempt, the task, and the job equal "SUCCESS".
*/
boolean isSuccessful() {
return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS");
}
String parse(StringTokenizer tokens) throws IOException {
String taskID = null;
while(tokens.hasMoreTokens()) {
String t = tokens.nextToken();
String[] keyVal = getKeyValue(t);
if(keyVal.length < 2) continue;
if(keyVal[0].equals("TASKID")) {
if(taskID == null)
taskID = new String(keyVal[1]);
else if(!taskID.equals(keyVal[1])) {
LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID);
continue;
}
}
else if(keyVal[0].equals("TASK_ATTEMPT_ID")) {
if(TASK_ATTEMPT_ID == null)
TASK_ATTEMPT_ID = new String(keyVal[1]);
else if(!TASK_ATTEMPT_ID.equals(keyVal[1])) {
LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID);
continue;
}
}
else if(keyVal[0].equals("TASK_STATUS"))
TASK_STATUS = new String(keyVal[1]);
else if(keyVal[0].equals("START_TIME"))
START_TIME = Long.parseLong(keyVal[1]);
else if(keyVal[0].equals("FINISH_TIME"))
FINISH_TIME = Long.parseLong(keyVal[1]);
}
return taskID;
}
/**
* Update with non-null fields of the same task attempt log record.
*/
void updateWith(TaskAttemptHistoryLog from) throws IOException {
if(TASK_ATTEMPT_ID == null)
TASK_ATTEMPT_ID = from.TASK_ATTEMPT_ID;
else if(! TASK_ATTEMPT_ID.equals(from.TASK_ATTEMPT_ID)) {
throw new IOException(
"Incorrect TASK_ATTEMPT_ID: " + from.TASK_ATTEMPT_ID
+ " expect " + TASK_ATTEMPT_ID);
}
if(from.TASK_STATUS != null)
TASK_STATUS = from.TASK_STATUS;
if(from.START_TIME > 0)
START_TIME = from.START_TIME;
if(from.FINISH_TIME > 0)
FINISH_TIME = from.FINISH_TIME;
if(from.HDFS_BYTES_READ > 0)
HDFS_BYTES_READ = from.HDFS_BYTES_READ;
if(from.HDFS_BYTES_WRITTEN > 0)
HDFS_BYTES_WRITTEN = from.HDFS_BYTES_WRITTEN;
if(from.FILE_BYTES_READ > 0)
FILE_BYTES_READ = from.FILE_BYTES_READ;
if(from.FILE_BYTES_WRITTEN > 0)
FILE_BYTES_WRITTEN = from.FILE_BYTES_WRITTEN;
}
}
/**
* Key = statName*date-time*taskType
* Value = number of msec for the our
*/
private static class IntervalKey {
static final String KEY_FIELD_DELIMITER = "*";
String statName;
String dateTime;
String taskType;
IntervalKey(String stat, long timeMSec, String taskType) {
statName = stat;
SimpleDateFormat dateF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dateTime = dateF.format(new Date(timeMSec));
this.taskType = taskType;
}
IntervalKey(String key) {
StringTokenizer keyTokens = new StringTokenizer(key, KEY_FIELD_DELIMITER);
if(!keyTokens.hasMoreTokens()) return;
statName = keyTokens.nextToken();
if(!keyTokens.hasMoreTokens()) return;
dateTime = keyTokens.nextToken();
if(!keyTokens.hasMoreTokens()) return;
taskType = keyTokens.nextToken();
}
void setStatName(String stat) {
statName = stat;
}
String getStringKey() {
return statName + KEY_FIELD_DELIMITER +
dateTime + KEY_FIELD_DELIMITER +
taskType;
}
Text getTextKey() {
return new Text(getStringKey());
}
public String toString() {
return getStringKey();
}
}
/**
* Mapper class.
*/
private static class JHLAMapper extends IOMapperBase<Object> {
/**
* A line pattern, which delimits history logs of different jobs,
* if multiple job logs are written in the same file.
* Null value means only one job log per file is expected.
* The pattern should be a regular expression as in
* {@link String#matches(String)}.
*/
String jobDelimiterPattern;
int maxJobDelimiterLineLength;
/** Count only these users jobs */
Collection<String> usersIncluded;
/** Exclude jobs of the following users */
Collection<String> usersExcluded;
/** Type of compression for compressed files: gzip */
Class<? extends CompressionCodec> compressionClass;
JHLAMapper() throws IOException {
}
JHLAMapper(Configuration conf) throws IOException {
configure(new JobConf(conf));
}
public void configure(JobConf conf) {
super.configure(conf );
usersIncluded = getUserList(conf.get("jhla.users.included", null));
usersExcluded = getUserList(conf.get("jhla.users.excluded", null));
String zipClassName = conf.get("jhla.compression.class", null);
try {
compressionClass = (zipClassName == null) ? null :
Class.forName(zipClassName).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
jobDelimiterPattern = conf.get("jhla.job.delimiter.pattern", null);
maxJobDelimiterLineLength = conf.getInt("jhla.job.delimiter.length", 512);
}
@Override
public void map(Text key,
LongWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
long tStart = System.currentTimeMillis();
parseLogFile(fs, new Path(name), longValue, output, reporter);
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
reporter.setStatus("finished " + name + " ::host = " + hostName +
" in " + execTime/1000 + " sec.");
}
public Object doIO(Reporter reporter,
String path, // full path of history log file
long offset // starting offset within the file
) throws IOException {
return null;
}
void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object jobObjects) throws IOException {
}
private boolean isEndOfJobLog(String line) {
if(jobDelimiterPattern == null)
return false;
return line.matches(jobDelimiterPattern);
}
/**
* Collect information about one job.
*
* @param fs - file system
* @param filePath - full path of a history log file
* @param offset - starting offset in the history log file
* @throws IOException
*/
public void parseLogFile(FileSystem fs,
Path filePath,
long offset,
OutputCollector<Text, Text> output,
Reporter reporter
) throws IOException {
InputStream in = null;
try {
// open file & seek
FSDataInputStream stm = fs.open(filePath);
stm.seek(offset);
in = stm;
LOG.info("Opened " + filePath);
reporter.setStatus("Opened " + filePath);
// get a compression filter if specified
if(compressionClass != null) {
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(compressionClass, new Configuration());
in = codec.createInputStream(stm);
LOG.info("Codec created " + filePath);
reporter.setStatus("Codec created " + filePath);
}
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
LOG.info("Reader created " + filePath);
// skip to the next job log start
long processed = 0L;
if(jobDelimiterPattern != null) {
for(String line = reader.readLine();
line != null; line = reader.readLine()) {
if((stm.getPos() - processed) > 100000) {
processed = stm.getPos();
reporter.setStatus("Processing " + filePath + " at " + processed);
}
if(isEndOfJobLog(line))
break;
}
}
// parse lines and update job history
JobHistoryLog jh = new JobHistoryLog();
int jobLineCount = 0;
for(String line = readLine(reader);
line != null; line = readLine(reader)) {
jobLineCount++;
if((stm.getPos() - processed) > 20000) {
processed = stm.getPos();
long numTasks = (jh.tasks == null ? 0 : jh.tasks.size());
String txt = "Processing " + filePath + " at " + processed
+ " # tasks = " + numTasks;
reporter.setStatus(txt);
LOG.info(txt);
}
if(isEndOfJobLog(line)) {
if(jh.JOBID != null) {
LOG.info("Finished parsing job: " + jh.JOBID
+ " line count = " + jobLineCount);
collectJobStats(jh, output, reporter);
LOG.info("Collected stats for job: " + jh.JOBID);
}
jh = new JobHistoryLog();
jobLineCount = 0;
} else
jh.parseLine(line);
}
if(jh.JOBID == null) {
LOG.error("JOBID = NULL in " + filePath + " at " + processed);
return;
}
collectJobStats(jh, output, reporter);
} catch(Exception ie) {
// parsing errors can happen if the file has been truncated
LOG.error("JHLAMapper.parseLogFile", ie);
reporter.setStatus("JHLAMapper.parseLogFile failed "
+ StringUtils.stringifyException(ie));
throw new IOException("Job failed.", ie);
} finally {
if(in != null) in.close();
}
}
/**
* Read lines until one ends with a " ." or "\" "
*/
private StringBuffer resBuffer = new StringBuffer();
private String readLine(BufferedReader reader) throws IOException {
resBuffer.setLength(0);
reader.mark(maxJobDelimiterLineLength);
for(String line = reader.readLine();
line != null; line = reader.readLine()) {
if(isEndOfJobLog(line)) {
if(resBuffer.length() == 0)
resBuffer.append(line);
else
reader.reset();
break;
}
if(resBuffer.length() == 0)
resBuffer.append(line);
else if(resBuffer.length() < 32000)
resBuffer.append(line);
if(line.endsWith(" .") || line.endsWith("\" ")) {
break;
}
reader.mark(maxJobDelimiterLineLength);
}
String result = resBuffer.length() == 0 ? null : resBuffer.toString();
resBuffer.setLength(0);
return result;
}
private void collectPerIntervalStats(OutputCollector<Text, Text> output,
long start, long finish, String taskType,
StatSeries ... stats) throws IOException {
long curInterval = (start / DEFAULT_TIME_INTERVAL_MSEC)
* DEFAULT_TIME_INTERVAL_MSEC;
long curTime = start;
long accumTime = 0;
while(curTime < finish) {
// how much of the task time belonged to current interval
long nextInterval = curInterval + DEFAULT_TIME_INTERVAL_MSEC;
long intervalTime = ((finish < nextInterval) ?
finish : nextInterval) - curTime;
IntervalKey key = new IntervalKey("", curInterval, taskType);
Text val = new Text(String.valueOf(intervalTime));
for(StatSeries statName : stats) {
key.setStatName(statName.toString());
output.collect(key.getTextKey(), val);
}
curTime = curInterval = nextInterval;
accumTime += intervalTime;
}
// For the pending stat speculative attempts may intersect.
// Only one of them is considered pending.
assert accumTime == finish - start || finish < start;
}
private void collectJobStats(JobHistoryLog jh,
OutputCollector<Text, Text> output,
Reporter reporter
) throws IOException {
if(jh == null)
return;
if(jh.tasks == null)
return;
if(jh.SUBMIT_TIME <= 0)
throw new IOException("Job " + jh.JOBID
+ " SUBMIT_TIME = " + jh.SUBMIT_TIME);
if(usersIncluded != null && !usersIncluded.contains(jh.USER))
return;
if(usersExcluded != null && usersExcluded.contains(jh.USER))
return;
int numAttempts = 0;
long totalTime = 0;
boolean jobSuccess = jh.isSuccessful();
long jobWaitTime = jh.LAUNCH_TIME - jh.SUBMIT_TIME;
// attemptSubmitTime is the job's SUBMIT_TIME,
// or the previous attempt FINISH_TIME for all subsequent attempts
for(TaskHistoryLog th : jh.tasks.values()) {
if(th.attempts == null)
continue;
// Task is successful iff both the task and the job are a "SUCCESS"
long attemptSubmitTime = jh.LAUNCH_TIME;
boolean taskSuccess = jobSuccess && th.isSuccessful();
for(TaskAttemptHistoryLog tah : th.attempts.values()) {
// Task attempt is considered successful iff all three statuses
// of the attempt, the task, and the job equal "SUCCESS"
boolean success = taskSuccess && tah.isSuccessful();
if(tah.START_TIME == 0) {
LOG.error("Start time 0 for task attempt " + tah.TASK_ATTEMPT_ID);
continue;
}
if(tah.FINISH_TIME < tah.START_TIME) {
LOG.error("Finish time " + tah.FINISH_TIME + " is less than " +
"Start time " + tah.START_TIME + " for task attempt " +
tah.TASK_ATTEMPT_ID);
tah.FINISH_TIME = tah.START_TIME;
}
if(!"MAP".equals(th.TASK_TYPE) && !"REDUCE".equals(th.TASK_TYPE) &&
!"CLEANUP".equals(th.TASK_TYPE) && !"SETUP".equals(th.TASK_TYPE)) {
LOG.error("Unexpected TASK_TYPE = " + th.TASK_TYPE
+ " for attempt " + tah.TASK_ATTEMPT_ID);
}
collectPerIntervalStats(output,
attemptSubmitTime, tah.START_TIME, th.TASK_TYPE,
StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME);
collectPerIntervalStats(output,
attemptSubmitTime - jobWaitTime, tah.START_TIME, th.TASK_TYPE,
StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME);
if(success)
collectPerIntervalStats(output,
tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE,
StatSeries.STAT_ALL_SLOT_TIME);
else
collectPerIntervalStats(output,
tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE,
StatSeries.STAT_ALL_SLOT_TIME,
StatSeries.STAT_FAILED_SLOT_TIME);
totalTime += (tah.FINISH_TIME - tah.START_TIME);
numAttempts++;
if(numAttempts % 500 == 0) {
reporter.setStatus("Processing " + jh.JOBID + " at " + numAttempts);
}
attemptSubmitTime = tah.FINISH_TIME;
}
}
LOG.info("Total Maps = " + jh.TOTAL_MAPS
+ " Reduces = " + jh.TOTAL_REDUCES);
LOG.info("Finished Maps = " + jh.FINISHED_MAPS
+ " Reduces = " + jh.FINISHED_REDUCES);
LOG.info("numAttempts = " + numAttempts);
LOG.info("totalTime = " + totalTime);
LOG.info("averageAttemptTime = "
+ (numAttempts==0 ? 0 : totalTime/numAttempts));
LOG.info("jobTotalTime = " + (jh.FINISH_TIME <= jh.SUBMIT_TIME? 0 :
jh.FINISH_TIME - jh.SUBMIT_TIME));
}
}
public static class JHLAPartitioner implements Partitioner<Text, Text> {
static final int NUM_REDUCERS = 9;
public void configure(JobConf conf) {}
public int getPartition(Text key, Text value, int numPartitions) {
IntervalKey intKey = new IntervalKey(key.toString());
if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 0;
else if(intKey.taskType.equals("REDUCE"))
return 1;
} else if(intKey.statName.equals(
StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 2;
else if(intKey.taskType.equals("REDUCE"))
return 3;
} else if(intKey.statName.equals(
StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 4;
else if(intKey.taskType.equals("REDUCE"))
return 5;
} else if(intKey.statName.equals(
StatSeries.STAT_FAILED_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 6;
else if(intKey.taskType.equals("REDUCE"))
return 7;
}
return 8;
}
}
private static void runJHLA(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir,
Configuration fsConfig) throws IOException {
JobConf job = new JobConf(fsConfig, JHLogAnalyzer.class);
job.setPartitionerClass(JHLAPartitioner.class);
FileInputFormat.setInputPaths(job, INPUT_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(JHLAPartitioner.NUM_REDUCERS);
JobClient.runJob(job);
}
private static class LoggingCollector implements OutputCollector<Text, Text> {
public void collect(Text key, Text value) throws IOException {
LOG.info(key + " == " + value);
}
}
/**
* Run job history log analyser.
*/
public static void main(String[] args) {
Path resFileName = RESULT_FILE;
Configuration conf = new Configuration();
try {
conf.setInt("test.io.file.buffer.size", 0);
Path historyDir = DEFAULT_HISTORY_DIR;
String testFile = null;
boolean cleanup = false;
boolean initControlFiles = true;
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equalsIgnoreCase("-historyDir")) {
historyDir = new Path(args[++i]);
} else if (args[i].equalsIgnoreCase("-resFile")) {
resFileName = new Path(args[++i]);
} else if (args[i].equalsIgnoreCase("-usersIncluded")) {
conf.set("jhla.users.included", args[++i]);
} else if (args[i].equalsIgnoreCase("-usersExcluded")) {
conf.set("jhla.users.excluded", args[++i]);
} else if (args[i].equalsIgnoreCase("-gzip")) {
conf.set("jhla.compression.class", GzipCodec.class.getCanonicalName());
} else if (args[i].equalsIgnoreCase("-jobDelimiter")) {
conf.set("jhla.job.delimiter.pattern", args[++i]);
} else if (args[i].equalsIgnoreCase("-jobDelimiterLength")) {
conf.setInt("jhla.job.delimiter.length", Integer.parseInt(args[++i]));
} else if(args[i].equalsIgnoreCase("-noInit")) {
initControlFiles = false;
} else if(args[i].equalsIgnoreCase("-test")) {
testFile = args[++i];
} else if(args[i].equalsIgnoreCase("-clean")) {
cleanup = true;
} else if(args[i].equalsIgnoreCase("-jobQueue")) {
conf.set("mapred.job.queue.name", args[++i]);
} else if(args[i].startsWith("-Xmx")) {
conf.set("mapred.child.java.opts", args[i]);
} else {
printUsage();
}
}
if(cleanup) {
cleanup(conf);
return;
}
if(testFile != null) {
LOG.info("Start JHLA test ============ ");
LocalFileSystem lfs = FileSystem.getLocal(conf);
conf.set("fs.default.name", "file:///");
JHLAMapper map = new JHLAMapper(conf);
map.parseLogFile(lfs, new Path(testFile), 0L,
new LoggingCollector(), Reporter.NULL);
return;
}
FileSystem fs = FileSystem.get(conf);
if(initControlFiles)
createControlFile(fs, historyDir);
long tStart = System.currentTimeMillis();
runJHLA(JHLAMapper.class, OUTPUT_DIR, conf);
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, 0, execTime, resFileName);
} catch(IOException e) {
System.err.print(StringUtils.stringifyException(e));
System.exit(-1);
}
}
private static void printUsage() {
String className = JHLogAnalyzer.class.getSimpleName();
System.err.println("Usage: " + className
+ "\n\t[-historyDir inputDir] | [-resFile resultFile] |"
+ "\n\t[-usersIncluded | -usersExcluded userList] |"
+ "\n\t[-gzip] | [-jobDelimiter pattern] |"
+ "\n\t[-help | -clean | -test testFile]");
System.exit(-1);
}
private static Collection<String> getUserList(String users) {
if(users == null)
return null;
StringTokenizer tokens = new StringTokenizer(users, ",;");
Collection<String> userList = new ArrayList<String>(tokens.countTokens());
while(tokens.hasMoreTokens())
userList.add(tokens.nextToken());
return userList;
}
/**
* Result is combined from all reduce output files and is written to
* RESULT_FILE in the format
* column 1:
*/
private static void analyzeResult( FileSystem fs,
int testType,
long execTime,
Path resFileName
) throws IOException {
LOG.info("Analizing results ...");
DataOutputStream out = null;
BufferedWriter writer = null;
try {
out = new DataOutputStream(fs.create(resFileName));
writer = new BufferedWriter(new OutputStreamWriter(out));
writer.write("SERIES\tPERIOD\tTYPE\tSLOT_HOUR\n");
FileStatus[] reduceFiles = fs.listStatus(OUTPUT_DIR);
assert reduceFiles.length == JHLAPartitioner.NUM_REDUCERS;
for(int i = 0; i < JHLAPartitioner.NUM_REDUCERS; i++) {
DataInputStream in = null;
BufferedReader lines = null;
try {
in = fs.open(reduceFiles[i].getPath());
lines = new BufferedReader(new InputStreamReader(in));
String line;
while((line = lines.readLine()) != null) {
StringTokenizer tokens = new StringTokenizer(line, "\t*");
String attr = tokens.nextToken();
String dateTime = tokens.nextToken();
String taskType = tokens.nextToken();
double val = Long.parseLong(tokens.nextToken()) /
(double)DEFAULT_TIME_INTERVAL_MSEC;
writer.write(attr.substring(2)); // skip the stat type "l:"
writer.write("\t");
writer.write(dateTime);
writer.write("\t");
writer.write(taskType);
writer.write("\t");
writer.write(String.valueOf((float)val));
writer.newLine();
}
} finally {
if(lines != null) lines.close();
if(in != null) in.close();
}
}
} finally {
if(writer != null) writer.close();
if(out != null) out.close();
}
LOG.info("Analizing results ... done.");
}
private static void cleanup(Configuration conf) throws IOException {
LOG.info("Cleaning up test files");
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(JHLA_ROOT_DIR), true);
}
}