blob: 91f51b4c2199902a78a303242b5193bce5b07a19 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.analyzer.plugins;
import com.google.common.collect.Lists;
import com.google.common.collect.TreeMultiset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
import java.util.Comparator;
import java.util.List;
/**
* Analyze concurrent tasks running in every vertex at regular intervals.
*/
public class TaskConcurrencyAnalyzer extends TezAnalyzerBase implements Analyzer {
private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
private final CSVResult csvResult;
public TaskConcurrencyAnalyzer(Configuration conf) {
super(conf);
this.csvResult = new CSVResult(headers);
}
private enum EventType {START, FINISH}
static class TimeInfo {
EventType eventType;
long timestamp;
int concurrentTasks;
public TimeInfo(EventType eventType, long timestamp) {
this.eventType = eventType;
this.timestamp = timestamp;
}
}
@Override
public void analyze(DagInfo dagInfo) throws TezException {
//For each vertex find the concurrent tasks running at any point
for (VertexInfo vertexInfo : dagInfo.getVertices()) {
List<TaskAttemptInfo> taskAttempts =
Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
String vertexName = vertexInfo.getVertexName();
/**
* - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
* tasks starting/ending at same time.
* - Walk through the set
* - Increment concurrent tasks when start event is encountered
* - Decrement concurrent tasks when start event is encountered
*/
TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() {
@Override public int compare(TimeInfo o1, TimeInfo o2) {
if (o1.timestamp < o2.timestamp) {
return -1;
}
if (o1.timestamp > o2.timestamp) {
return 1;
}
if (o1.timestamp == o2.timestamp) {
//check event type
if (o1.eventType.equals(o2.eventType)) {
return 0;
}
if (o1.eventType.equals(EventType.START)
&& o2.eventType.equals(EventType.FINISH)) {
return -1;
} else {
return 1;
}
}
return 0;
}
});
for (TaskAttemptInfo attemptInfo : taskAttempts) {
TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
timeInfoSet.add(startTimeInfo);
timeInfoSet.add(stopTimeInfo);
}
//Compute concurrent tasks in the list now.
int concurrentTasks = 0;
for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
switch (timeInfo.eventType) {
case START:
concurrentTasks += timeInfoSet.count(timeInfo);
break;
case FINISH:
concurrentTasks -= timeInfoSet.count(timeInfo);
break;
default:
break;
}
timeInfo.concurrentTasks = concurrentTasks;
addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
}
}
}
private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
csvResult.addRecord(record);
}
@Override
public CSVResult getResult() throws TezException {
return csvResult;
}
@Override
public String getName() {
return "TaskConcurrencyAnalyzer";
}
@Override
public String getDescription() {
return "Analyze how many tasks were running in every vertex at given point in time. This "
+ "would be helpful in understanding whether any starvation was there or not.";
}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config);
int res = ToolRunner.run(config, analyzer, args);
analyzer.printResults();
System.exit(res);
}
}