blob: 9659e1c9c5c1ea2c6b676575f25dfe5968166a14 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinished;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinished;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.apache.hadoop.util.StringUtils;
/**
* {@link JobBuilder} builds one job. It processes a sequence of
* {@link HistoryEvent}s.
*/
public class JobBuilder {
private static final long BYTES_IN_MEG =
StringUtils.TraditionalBinaryPrefix.string2long("1m");
private String jobID;
private boolean finalized = false;
private LoggedJob result = new LoggedJob();
private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
private Map<String, LoggedTask> reduceTasks =
new HashMap<String, LoggedTask>();
private Map<String, LoggedTask> otherTasks =
new HashMap<String, LoggedTask>();
private Map<String, LoggedTaskAttempt> attempts =
new HashMap<String, LoggedTaskAttempt>();
private Map<ParsedHost, ParsedHost> allHosts =
new HashMap<ParsedHost, ParsedHost>();
/**
* The number of splits a task can have, before we ignore them all.
*/
private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
/**
* The regular expression used to parse task attempt IDs in job tracker logs
*/
private final static Pattern taskAttemptIDPattern =
Pattern.compile(".*_([0-9]+)");
private int[] attemptTimesPercentiles = null;
// Use this to search within the java options to get heap sizes.
// The heap size number is in Capturing Group 1.
// The heap size order-of-magnitude suffix is in Capturing Group 2
private static final Pattern heapPattern =
Pattern.compile("-Xmx([0-9]+[kKmMgGtT])");
private Properties jobConfigurationParameters = null;
public JobBuilder(String jobID) {
this.jobID = jobID;
}
public String getJobID() {
return jobID;
}
{
if (attemptTimesPercentiles == null) {
attemptTimesPercentiles = new int[19];
for (int i = 0; i < 19; ++i) {
attemptTimesPercentiles[i] = (i + 1) * 5;
}
}
}
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (finalized) {
throw new IllegalStateException(
"JobBuilder.process(HistoryEvent event) called after LoggedJob built");
}
// these are in lexicographical order by class name.
if (event instanceof JobFinishedEvent) {
processJobFinishedEvent((JobFinishedEvent) event);
} else if (event instanceof JobInfoChangeEvent) {
processJobInfoChangeEvent((JobInfoChangeEvent) event);
} else if (event instanceof JobInitedEvent) {
processJobInitedEvent((JobInitedEvent) event);
} else if (event instanceof JobPriorityChangeEvent) {
processJobPriorityChangeEvent((JobPriorityChangeEvent) event);
} else if (event instanceof JobStatusChangedEvent) {
processJobStatusChangedEvent((JobStatusChangedEvent) event);
} else if (event instanceof JobSubmittedEvent) {
processJobSubmittedEvent((JobSubmittedEvent) event);
} else if (event instanceof JobUnsuccessfulCompletionEvent) {
processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptStartedEvent) {
processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskFailedEvent) {
processTaskFailedEvent((TaskFailedEvent) event);
} else if (event instanceof TaskFinishedEvent) {
processTaskFinishedEvent((TaskFinishedEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof TaskUpdatedEvent) {
processTaskUpdatedEvent((TaskUpdatedEvent) event);
} else
throw new IllegalArgumentException(
"JobBuilder.process(HistoryEvent): unknown event type");
}
static String extract(Properties conf, String[] names, String defaultValue) {
for (String name : names) {
String result = conf.getProperty(name);
if (result != null) {
return result;
}
}
return defaultValue;
}
private Integer extractMegabytes(Properties conf, String[] names) {
String javaOptions = extract(conf, names, null);
if (javaOptions == null) {
return null;
}
Matcher matcher = heapPattern.matcher(javaOptions);
Integer heapMegabytes = null;
while (matcher.find()) {
String heapSize = matcher.group(1);
heapMegabytes =
((int) (StringUtils.TraditionalBinaryPrefix.string2long(heapSize) / BYTES_IN_MEG));
}
return heapMegabytes;
}
private void maybeSetHeapMegabytes(Integer megabytes) {
if (megabytes != null) {
result.setHeapMegabytes(megabytes);
}
}
private void maybeSetJobMapMB(Integer megabytes) {
if (megabytes != null) {
result.setJobMapMB(megabytes);
}
}
private void maybeSetJobReduceMB(Integer megabytes) {
if (megabytes != null) {
result.setJobReduceMB(megabytes);
}
}
/**
* Process a collection of JobConf {@link Properties}. We do not restrict it
* to be called once. It is okay to process a conf before, during or after the
* events.
*
* @param conf
* The job conf properties to be added.
*/
public void process(Properties conf) {
if (finalized) {
throw new IllegalStateException(
"JobBuilder.process(Properties conf) called after LoggedJob built");
}
//TODO remove this once the deprecate APIs in LoggedJob are removed
result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
.getCandidates(), "default"));
result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
.getCandidates(), null));
maybeSetHeapMegabytes(extractMegabytes(conf,
JobConfPropertyNames.TASK_JAVA_OPTS_S.getCandidates()));
maybeSetJobMapMB(extractMegabytes(conf,
JobConfPropertyNames.MAP_JAVA_OPTS_S.getCandidates()));
maybeSetJobReduceMB(extractMegabytes(conf,
JobConfPropertyNames.REDUCE_JAVA_OPTS_S.getCandidates()));
this.jobConfigurationParameters = conf;
}
/**
* Request the builder to build the final object. Once called, the
* {@link JobBuilder} would accept no more events or job-conf properties.
*
* @return Parsed {@link LoggedJob} object.
*/
public LoggedJob build() {
// The main job here is to build CDFs and manage the conf
finalized = true;
// set the conf
result.setJobProperties(jobConfigurationParameters);
// initialize all the per-job statistics gathering places
Histogram[] successfulMapAttemptTimes =
new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
successfulMapAttemptTimes[i] = new Histogram();
}
Histogram successfulReduceAttemptTimes = new Histogram();
Histogram[] failedMapAttemptTimes =
new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
failedMapAttemptTimes[i] = new Histogram();
}
Histogram failedReduceAttemptTimes = new Histogram();
Histogram successfulNthMapperAttempts = new Histogram();
// Histogram successfulNthReducerAttempts = new Histogram();
// Histogram mapperLocality = new Histogram();
for (LoggedTask task : result.getMapTasks()) {
for (LoggedTaskAttempt attempt : task.getAttempts()) {
int distance = successfulMapAttemptTimes.length - 1;
Long runtime = null;
if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
runtime = attempt.getFinishTime() - attempt.getStartTime();
if (attempt.getResult() == Values.SUCCESS) {
LoggedLocation host = attempt.getLocation();
List<LoggedLocation> locs = task.getPreferredLocations();
if (host != null && locs != null) {
for (LoggedLocation loc : locs) {
ParsedHost preferedLoc = new ParsedHost(loc);
distance =
Math.min(distance, preferedLoc
.distance(new ParsedHost(host)));
}
// mapperLocality.enter(distance);
}
if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
if (runtime != null) {
successfulMapAttemptTimes[distance].enter(runtime);
}
}
String attemptID = attempt.getAttemptID();
if (attemptID != null) {
Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
if (matcher.matches()) {
String attemptNumberString = matcher.group(1);
if (attemptNumberString != null) {
int attemptNumber = Integer.parseInt(attemptNumberString);
successfulNthMapperAttempts.enter(attemptNumber);
}
}
}
} else {
if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
if (runtime != null) {
failedMapAttemptTimes[distance].enter(runtime);
}
}
}
}
}
}
for (LoggedTask task : result.getReduceTasks()) {
for (LoggedTaskAttempt attempt : task.getAttempts()) {
Long runtime = attempt.getFinishTime() - attempt.getStartTime();
if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
runtime = attempt.getFinishTime() - attempt.getStartTime();
}
if (attempt.getResult() == Values.SUCCESS) {
if (runtime != null) {
successfulReduceAttemptTimes.enter(runtime);
}
} else if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
failedReduceAttemptTimes.enter(runtime);
}
}
}
result.setFailedMapAttemptCDFs(mapCDFArrayList(failedMapAttemptTimes));
LoggedDiscreteCDF failedReduce = new LoggedDiscreteCDF();
failedReduce.setCDF(failedReduceAttemptTimes, attemptTimesPercentiles, 100);
result.setFailedReduceAttemptCDF(failedReduce);
result
.setSuccessfulMapAttemptCDFs(mapCDFArrayList(successfulMapAttemptTimes));
LoggedDiscreteCDF succReduce = new LoggedDiscreteCDF();
succReduce.setCDF(successfulReduceAttemptTimes, attemptTimesPercentiles,
100);
result.setSuccessfulReduceAttemptCDF(succReduce);
long totalSuccessfulAttempts = 0L;
long maxTriesToSucceed = 0L;
for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
totalSuccessfulAttempts += ent.getValue();
maxTriesToSucceed = Math.max(maxTriesToSucceed, ent.getKey());
}
if (totalSuccessfulAttempts > 0L) {
double[] successAfterI = new double[(int) maxTriesToSucceed + 1];
for (int i = 0; i < successAfterI.length; ++i) {
successAfterI[i] = 0.0D;
}
for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
successAfterI[ent.getKey().intValue()] =
((double) ent.getValue()) / totalSuccessfulAttempts;
}
result.setMapperTriesToSucceed(successAfterI);
} else {
result.setMapperTriesToSucceed(null);
}
return result;
}
private ArrayList<LoggedDiscreteCDF> mapCDFArrayList(Histogram[] data) {
ArrayList<LoggedDiscreteCDF> result = new ArrayList<LoggedDiscreteCDF>();
for (Histogram hist : data) {
LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF();
discCDF.setCDF(hist, attemptTimesPercentiles, 100);
result.add(discCDF);
}
return result;
}
private static Values getPre21Value(String name) {
if (name.equalsIgnoreCase("JOB_CLEANUP")) {
return Values.CLEANUP;
}
if (name.equalsIgnoreCase("JOB_SETUP")) {
return Values.SETUP;
}
// Note that pre-21, the task state of a successful task was logged as
// SUCCESS while from 21 onwards, its logged as SUCCEEDED.
if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
return Values.SUCCESS;
}
return Values.valueOf(name.toUpperCase());
}
private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
LoggedTask task = getTask(event.getTaskId().toString());
if (task == null) {
return;
}
task.setFinishTime(event.getFinishTime());
}
private void processTaskStartedEvent(TaskStartedEvent event) {
LoggedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
task.setStartTime(event.getStartTime());
task.setPreferredLocations(preferredLocationForSplits(event
.getSplitLocations()));
}
private void processTaskFinishedEvent(TaskFinishedEvent event) {
LoggedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
if (task == null) {
return;
}
task.setFinishTime(event.getFinishTime());
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
task.incorporateCounters(((TaskFinished) event.getDatum()).counters);
}
private void processTaskFailedEvent(TaskFailedEvent event) {
LoggedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
if (task == null) {
return;
}
task.setFinishTime(event.getFinishTime());
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
}
private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
LoggedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getTaskAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname());
if (parsedHost != null) {
attempt.setLocation(parsedHost.makeLoggedLocation());
}
attempt.setFinishTime(event.getFinishTime());
}
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
LoggedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getTaskAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setStartTime(event.getStartTime());
}
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
LoggedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setLocation(getAndRecordParsedHost(event.getHostname())
.makeLoggedLocation());
attempt.setFinishTime(event.getFinishTime());
attempt
.incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters);
}
private void processReduceAttemptFinishedEvent(
ReduceAttemptFinishedEvent event) {
LoggedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname());
// XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this
// is redundant, but making this will add future-proofing.
attempt.setFinishTime(event.getFinishTime());
attempt.setShuffleFinished(event.getShuffleFinishTime());
attempt.setSortFinished(event.getSortFinishTime());
attempt
.incorporateCounters(((ReduceAttemptFinished) event.getDatum()).counters);
}
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
LoggedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname());
// XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this
// is redundant, but making this will add future-proofing.
attempt.setFinishTime(event.getFinishTime());
attempt
.incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
}
private void processJobUnsuccessfulCompletionEvent(
JobUnsuccessfulCompletionEvent event) {
result.setOutcome(Pre21JobHistoryConstants.Values
.valueOf(event.getStatus()));
result.setFinishTime(event.getFinishTime());
}
private void processJobSubmittedEvent(JobSubmittedEvent event) {
result.setJobID(event.getJobId().toString());
result.setJobName(event.getJobName());
result.setUser(event.getUserName());
result.setSubmitTime(event.getSubmitTime());
// job queue name is set when conf file is processed.
// See JobBuilder.process(Properties) method for details.
}
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
result.setOutcome(Pre21JobHistoryConstants.Values
.valueOf(event.getStatus()));
}
private void processJobPriorityChangeEvent(JobPriorityChangeEvent event) {
result.setPriority(LoggedJob.JobPriority.valueOf(event.getPriority()
.toString()));
}
private void processJobInitedEvent(JobInitedEvent event) {
result.setLaunchTime(event.getLaunchTime());
result.setTotalMaps(event.getTotalMaps());
result.setTotalReduces(event.getTotalReduces());
}
private void processJobInfoChangeEvent(JobInfoChangeEvent event) {
result.setLaunchTime(event.getLaunchTime());
}
private void processJobFinishedEvent(JobFinishedEvent event) {
result.setFinishTime(event.getFinishTime());
result.setJobID(jobID);
result.setOutcome(Values.SUCCESS);
}
private LoggedTask getTask(String taskIDname) {
LoggedTask result = mapTasks.get(taskIDname);
if (result != null) {
return result;
}
result = reduceTasks.get(taskIDname);
if (result != null) {
return result;
}
return otherTasks.get(taskIDname);
}
/**
* @param type
* the task type
* @param taskIDname
* the task ID name, as a string
* @param allowCreate
* if true, we can create a task.
* @return
*/
private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
boolean allowCreate) {
Map<String, LoggedTask> taskMap = otherTasks;
List<LoggedTask> tasks = this.result.getOtherTasks();
switch (type) {
case MAP:
taskMap = mapTasks;
tasks = this.result.getMapTasks();
break;
case REDUCE:
taskMap = reduceTasks;
tasks = this.result.getReduceTasks();
break;
default:
// no code
}
LoggedTask result = taskMap.get(taskIDname);
if (result == null && allowCreate) {
result = new LoggedTask();
result.setTaskType(getPre21Value(type.toString()));
result.setTaskID(taskIDname);
taskMap.put(taskIDname, result);
tasks.add(result);
}
return result;
}
private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
String taskIDName, String taskAttemptName) {
LoggedTask task = getOrMakeTask(type, taskIDName, false);
LoggedTaskAttempt result = attempts.get(taskAttemptName);
if (result == null && task != null) {
result = new LoggedTaskAttempt();
result.setAttemptID(taskAttemptName);
attempts.put(taskAttemptName, result);
task.getAttempts().add(result);
}
return result;
}
private ParsedHost getAndRecordParsedHost(String hostName) {
ParsedHost result = ParsedHost.parse(hostName);
if (result != null) {
ParsedHost canonicalResult = allHosts.get(result);
if (canonicalResult != null) {
return canonicalResult;
}
allHosts.put(result, result);
return result;
}
return null;
}
private ArrayList<LoggedLocation> preferredLocationForSplits(String splits) {
if (splits != null) {
ArrayList<LoggedLocation> locations = null;
StringTokenizer tok = new StringTokenizer(splits, ",", false);
if (tok.countTokens() <= MAXIMUM_PREFERRED_LOCATIONS) {
locations = new ArrayList<LoggedLocation>();
while (tok.hasMoreTokens()) {
String nextSplit = tok.nextToken();
ParsedHost node = getAndRecordParsedHost(nextSplit);
if (locations != null && node != null) {
locations.add(node.makeLoggedLocation());
}
}
return locations;
}
}
return null;
}
}