blob: 3f5e3004b8de7d84b8ec830202f5ebd80859e752 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.analyzer.plugins;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
import org.apache.tez.analyzer.utils.SVGUtils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.history.parser.datamodel.Container;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import com.google.common.base.Joiner;
import org.apache.tez.common.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
private static final Logger LOG = LoggerFactory.getLogger(CriticalPathAnalyzer.class);
private static final String SUCCEEDED_STATE = TaskAttemptState.SUCCEEDED.name();
private static final String FAILED_STATE = TaskAttemptState.FAILED.name();
public enum CriticalPathDependency {
DATA_DEPENDENCY,
INIT_DEPENDENCY,
COMMIT_DEPENDENCY,
RETRY_DEPENDENCY,
OUTPUT_RECREATE_DEPENDENCY
}
public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg";
public static final String OUTPUT_DIR = "output-dir";
public static class CriticalPathStep {
public enum EntityType {
ATTEMPT,
VERTEX_INIT,
DAG_COMMIT
}
EntityType type;
TaskAttemptInfo attempt;
CriticalPathDependency reason; // reason linking this to the previous step on the critical path
long startCriticalPathTime; // time at which attempt is on critical path
long stopCriticalPathTime; // time at which attempt is off critical path
List<String> notes = Lists.newLinkedList();
public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) {
this.type = type;
this.attempt = attempt;
}
public EntityType getType() {
return type;
}
public TaskAttemptInfo getAttempt() {
return attempt;
}
public long getStartCriticalTime() {
return startCriticalPathTime;
}
public long getStopCriticalTime() {
return stopCriticalPathTime;
}
public CriticalPathDependency getReason() {
return reason;
}
public List<String> getNotes() {
return notes;
}
}
List<CriticalPathStep> criticalPath = Lists.newLinkedList();
Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
int maxConcurrency = 0;
ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
public CriticalPathAnalyzer() {
super(new Configuration());
}
public CriticalPathAnalyzer(Configuration conf) {
super(conf);
}
@Override
public void analyze(DagInfo dagInfo) throws TezException {
// get all attempts in the dag and find the last failed/succeeded attempt.
// ignore killed attempt to handle kills that happen upon dag completion
TaskAttemptInfo lastAttempt = null;
long lastAttemptFinishTime = 0;
for (VertexInfo vertex : dagInfo.getVertices()) {
for (TaskInfo task : vertex.getTasks()) {
for (TaskAttemptInfo attempt : task.getTaskAttempts()) {
attempts.put(attempt.getTaskAttemptId(), attempt);
if (attempt.getStatus().equals(SUCCEEDED_STATE) ||
attempt.getStatus().equals(FAILED_STATE)) {
if (lastAttemptFinishTime < attempt.getFinishTime()) {
lastAttempt = attempt;
lastAttemptFinishTime = attempt.getFinishTime();
}
}
}
}
}
if (lastAttempt == null) {
LOG.info("Cannot find last attempt to finish in DAG " + dagInfo.getDagId());
return;
}
createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, attempts);
analyzeCriticalPath(dagInfo);
if (getConf().getBoolean(DRAW_SVG, true)) {
saveCriticalPathAsSVG(dagInfo);
}
}
public List<CriticalPathStep> getCriticalPath() {
return criticalPath;
}
private void saveCriticalPathAsSVG(DagInfo dagInfo) {
SVGUtils svg = new SVGUtils();
String outputDir = getOutputDir();
if (outputDir == null) {
outputDir = getConf().get(OUTPUT_DIR);
}
String outputFileName = outputDir + File.separator + dagInfo.getDagId() + ".svg";
LOG.info("Writing output to: " + outputFileName);
svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
}
static class TimeInfo implements Comparable<TimeInfo> {
long timestamp;
int count;
boolean start;
TimeInfo(long timestamp, boolean start) {
this.timestamp = timestamp;
this.start = start;
}
@Override
public int compareTo(TimeInfo o) {
return Long.compare(this.timestamp, o.timestamp);
}
@Override
public int hashCode() {
return (int)((timestamp >> 32) ^ timestamp);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if(o == null) {
return false;
}
if (o.getClass() == this.getClass()) {
TimeInfo other = (TimeInfo) o;
return (this.compareTo(other) == 0);
}
else {
return false;
}
}
}
private void determineConcurrency(DagInfo dag) {
ArrayList<TimeInfo> timeInfo = Lists.newArrayList();
for (VertexInfo v : dag.getVertices()) {
for (TaskInfo t : v.getTasks()) {
for (TaskAttemptInfo a : t.getTaskAttempts()) {
if (a.getStartTime() > 0) {
timeInfo.add(new TimeInfo(a.getStartTime(), true));
timeInfo.add(new TimeInfo(a.getFinishTime(), false));
}
}
}
}
Collections.sort(timeInfo);
int concurrency = 0;
TimeInfo lastTimeInfo = null;
for (TimeInfo t : timeInfo) {
concurrency += (t.start) ? 1 : -1;
maxConcurrency = (concurrency > maxConcurrency) ? concurrency : maxConcurrency;
if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) {
lastTimeInfo = t;
lastTimeInfo.count = concurrency;
concurrencyByTime.add(lastTimeInfo);
} else {
// lastTimeInfo.timestamp == t.timestamp
lastTimeInfo.count = concurrency;
}
}
// for (TimeInfo t : concurrencyByTime) {
// System.out.println(t.timestamp + " " + t.count);
// }
}
private int getIntervalMaxConcurrency(long begin, long end) {
int concurrency = 0;
for (TimeInfo timeInfo : concurrencyByTime) {
if (timeInfo.timestamp < begin) {
continue;
}
if (timeInfo.timestamp > end) {
break;
}
if (timeInfo.count > concurrency) {
concurrency = timeInfo.count;
}
}
return concurrency;
}
private void analyzeAllocationOverhead(DagInfo dag) {
List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
for (VertexInfo v : dag.getVertices()) {
for (TaskInfo t : v.getTasks()) {
for (TaskAttemptInfo a : t.getTaskAttempts()) {
if (a.getTerminationCause().equals(
TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) {
LOG.debug("Found preempted attempt " + a.getTaskAttemptId());
preemptedAttempts.add(a);
}
}
}
}
for (int i = 0; i < criticalPath.size(); ++i) {
CriticalPathStep step = criticalPath.get(i);
TaskAttemptInfo attempt = step.attempt;
if (step.getType() != EntityType.ATTEMPT) {
continue;
}
long creationTime = attempt.getCreationTime();
long allocationTime = attempt.getAllocationTime();
long finishTime = attempt.getFinishTime();
if (allocationTime < step.startCriticalPathTime) {
// allocated before it became critical
continue;
}
// the attempt is critical before allocation. So allocation overhead needs analysis
Container container = attempt.getContainer();
if (container != null) {
Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
if (attempts != null && !attempts.isEmpty()) {
// arrange attempts by allocation time
List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
// walk the list to record allocation time before the current attempt
long containerPreviousAllocatedTime = 0;
int reUsesForVertex = 1;
for (TaskAttemptInfo containerAttempt : attemptsList) {
if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
break;
}
if (containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals(
attempt.getTaskInfo().getVertexInfo().getVertexId())) {
// another task from the same vertex ran in this container. So there are multiple
// waves for this vertex on this container.
reUsesForVertex++;
}
long cAllocTime = containerAttempt.getAllocationTime();
long cFinishTime = containerAttempt.getFinishTime();
if (cFinishTime > creationTime) {
// for containerAttempts that used the container while this attempt was waiting
// add up time container was allocated to containerAttempt. Account for allocations
// that started before this attempt was created.
containerPreviousAllocatedTime +=
(cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime));
}
}
int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
if (reUsesForVertex > 1) {
step.notes.add("Container ran multiple tasks for this vertex. ");
if (numWaves < 1) {
// less than 1 wave total but still ran more than 1 on this container
step.notes.add("Vertex potentially seeing contention from other branches in the DAG. ");
}
}
if (containerPreviousAllocatedTime == 0) {
step.notes.add("Container newly allocated.");
} else {
if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
step.notes.add("Container was fully allocated");
} else {
step.notes.add("Container in use for " +
SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) +
" of allocation wait time");
}
}
}
// look for internal preemptions while attempt was waiting for allocation
for (TaskAttemptInfo a : preemptedAttempts) {
if (a.getTaskInfo().getVertexInfo().getVertexId()
.equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) {
// dont preempt same vertex task. ideally this should look at priority but we dont have it
continue;
}
if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) {
// found an attempt that was preempted within this time interval
step.notes.add("Potentially waited for preemption of " + a.getShortName());
}
}
}
}
}
private double getWaves(int numTasks, int concurrency) {
double numWaves = (numTasks*1.0) / concurrency;
numWaves = (double)Math.round(numWaves * 10d) / 10d; // convert to 1 decimal place
return numWaves;
}
private void analyzeWaves(DagInfo dag) {
for (int i = 0; i < criticalPath.size(); ++i) {
CriticalPathStep step = criticalPath.get(i);
TaskAttemptInfo attempt = step.attempt;
if (step.getType() != EntityType.ATTEMPT) {
continue;
}
long creationTime = attempt.getCreationTime();
long finishTime = attempt.getFinishTime();
int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
if (numVertexTasks <= 1) {
continue;
}
int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
step.notes.add("Vertex ran " + numVertexTasks
+ " tasks in " + numWaves
+ " waves with available concurrency of " + intervalMaxConcurrency);
if (numWaves > 1) {
if (numWaves%1 < 0.5) {
// more than 1 wave needed and last wave is small
step.notes.add("Last partial wave did not use full concurrency. ");
}
}
}
}
private void analyzeStragglers(DagInfo dag) {
long dagStartTime = dag.getStartTime();
long dagTime = dag.getFinishTime() - dagStartTime;
long totalAttemptCriticalTime = 0;
for (int i = 0; i < criticalPath.size(); ++i) {
CriticalPathStep step = criticalPath.get(i);
totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
TaskAttemptInfo attempt = step.attempt;
if (step.getType() == EntityType.ATTEMPT) {
// analyze execution overhead
if (attempt.getLastDataEvents().size() > 1) {
// there were read errors. that could have delayed the attempt. ignore this
continue;
}
long avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo()
.getAvgPostDataExecutionTimeInterval();
if (avgPostDataExecutionTime <= 0) {
continue;
}
long attemptExecTime = attempt.getPostDataExecutionTimeInterval();
if (avgPostDataExecutionTime * 1.25 < attemptExecTime) {
step.notes
.add("Potential straggler. Post Data Execution time " +
SVGUtils.getTimeStr(attemptExecTime)
+ " compared to vertex average of " +
SVGUtils.getTimeStr(avgPostDataExecutionTime));
}
}
}
LOG.debug("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
+ " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
}
private void analyzeCriticalPath(DagInfo dag) {
if (!criticalPath.isEmpty()) {
determineConcurrency(dag);
analyzeStragglers(dag);
analyzeWaves(dag);
analyzeAllocationOverhead(dag);
}
}
private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt,
long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) {
List<CriticalPathStep> tempCP = Lists.newLinkedList();
if (lastAttempt != null) {
TaskAttemptInfo currentAttempt = lastAttempt;
CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT);
long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
// add the commit step
if (dagInfo.getFinishTime() > 0) {
currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
} else {
// AM crashed and no dag finished written
currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
}
currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
tempCP.add(currentStep);
while (true) {
Preconditions.checkState(currentAttempt != null);
Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
LOG.debug("Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
// consider the last data event seen immediately preceding the current critical path
// stop time for this attempt
long currentStepLastDataEventTime = 0;
String currentStepLastDataTA = null;
DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime);
if (item!=null) {
currentStepLastDataEventTime = item.getTimestamp();
currentStepLastDataTA = item.getTaskAttemptId();
}
// sanity check
for (CriticalPathStep previousStep : tempCP) {
if (previousStep.type == EntityType.ATTEMPT) {
if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) {
// found loop.
// this should only happen for read errors in currentAttempt
List<DataDependencyEvent> dataEvents = currentAttempt.getLastDataEvents();
Preconditions.checkState(dataEvents.size() > 1); // received
// original and
// retry data events
Preconditions.checkState(currentStepLastDataEventTime < dataEvents
.get(dataEvents.size() - 1).getTimestamp()); // new event is
// earlier than
// last
}
}
}
tempCP.add(currentStep);
// find the next attempt on the critical path
boolean dataDependency = false;
// find out predecessor dependency
if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) {
dataDependency = true;
}
long startCriticalPathTime = 0;
String nextAttemptId = null;
CriticalPathDependency reason = null;
if (dataDependency) {
// last data event was produced after the attempt was scheduled. use
// data dependency
// typically the case when scheduling ahead of time
LOG.debug("Has data dependency");
if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
// there is a valid data causal TA. Use it.
nextAttemptId = currentStepLastDataTA;
reason = CriticalPathDependency.DATA_DEPENDENCY;
startCriticalPathTime = currentStepLastDataEventTime;
LOG.debug("Using data dependency " + nextAttemptId);
} else {
// there is no valid data causal TA. This means data event came from the same vertex
VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo();
Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(),
"Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
+ "TA is null for " + currentAttempt.getTaskAttemptId());
nextAttemptId = null;
reason = CriticalPathDependency.INIT_DEPENDENCY;
LOG.debug("Using init dependency");
}
} else {
// attempt was scheduled after last data event. use scheduling dependency
// typically happens for retries
LOG.debug("Has scheduling dependency");
if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
// there is a scheduling causal TA. Use it.
nextAttemptId = currentAttempt.getCreationCausalTA();
reason = CriticalPathDependency.RETRY_DEPENDENCY;
TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
if (nextAttemptId != null) {
VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
// cause from different vertex. Might be rerun to re-generate outputs
for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
// next vertex is an output vertex
reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
break;
}
}
}
}
if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
// rescheduled due to read error. start critical at read error report time.
// for now proxy own creation time for read error report time
startCriticalPathTime = currentAttempt.getCreationTime();
} else {
// rescheduled due to own previous attempt failure
// we are critical when the previous attempt fails
Preconditions.checkState(nextAttempt != null);
Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals(
currentAttempt.getTaskInfo().getTaskId()));
startCriticalPathTime = nextAttempt.getFinishTime();
}
LOG.debug("Using scheduling dependency " + nextAttemptId);
} else {
// there is no scheduling causal TA.
if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
// there is a data event going to the vertex. Count the time between data event and
// creation time as Initializer/Manager overhead and follow data dependency
nextAttemptId = currentStepLastDataTA;
reason = CriticalPathDependency.DATA_DEPENDENCY;
startCriticalPathTime = currentStepLastDataEventTime;
long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime;
currentStep.notes
.add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead));
LOG.debug("Using data dependency " + nextAttemptId);
} else {
// there is no scheduling causal TA and no data event casual TA.
// the vertex has external input that sent the last data events
// or the vertex has external input but does not use events
// or the vertex has no external inputs or edges
nextAttemptId = null;
reason = CriticalPathDependency.INIT_DEPENDENCY;
LOG.debug("Using init dependency");
}
}
}
currentStep.startCriticalPathTime = startCriticalPathTime;
currentStep.reason = reason;
Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime);
if (Strings.isNullOrEmpty(nextAttemptId)) {
Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY));
Preconditions.checkState(startCriticalPathTime == 0);
// no predecessor attempt found. this is the last step in the critical path
// assume attempts start critical path time is when its scheduled. before that is
// vertex initialization time
currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime();
// add vertex init step
long initStepStopCriticalTime = currentStep.startCriticalPathTime;
currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
currentStep.stopCriticalPathTime = initStepStopCriticalTime;
currentStep.startCriticalPathTime = dagInfo.getStartTime();
currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY;
tempCP.add(currentStep);
if (!tempCP.isEmpty()) {
for (int i=tempCP.size() - 1; i>=0; --i) {
criticalPath.add(tempCP.get(i));
}
}
return;
}
currentAttempt = attempts.get(nextAttemptId);
currentAttemptStopCriticalPathTime = startCriticalPathTime;
}
}
}
@Override
public CSVResult getResult() throws TezException {
String[] headers = { "Entity", "PathReason", "Status", "CriticalStartTime",
"CriticalStopTime", "Notes" };
CSVResult csvResult = new CSVResult(headers);
for (CriticalPathStep step : criticalPath) {
String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId()
: (step.getType() == EntityType.VERTEX_INIT
? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT"));
String [] record = {entity, step.getReason().name(),
step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()),
String.valueOf(step.getStopCriticalTime()),
Joiner.on(";").join(step.getNotes())};
csvResult.addRecord(record);
}
return csvResult;
}
@Override
public String getName() {
return "CriticalPathAnalyzer";
}
@Override
public String getDescription() {
return "Analyze critical path of the DAG";
}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args);
System.exit(res);
}
}