TEZ-4231: Fix multiple history parser and event converter issues (#123) (Laszlo Bodor reviewed by Rajesh Balamohan)
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
index d28fd67..397a46f 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
@@ -98,7 +98,6 @@
return message != null;
} catch (java.io.EOFException e) {
reader.close();
-
if (!fileIt.hasNext()) {
return false;
} else {
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
index db3f648..c1711ce 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -236,11 +236,13 @@
// time etc).
if (dagJson == null) {
dagJson = jsonObject;
- } else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
- .optJSONObject(ATSConstants.DAG_PLAN) == null) {
- // if DAG_PLAN is not filled already, let's try to fetch it from other
- dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
- .getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
+ } else {
+ if (dagJson.optJSONObject(ATSConstants.OTHER_INFO).optJSONObject(ATSConstants.DAG_PLAN) == null) {
+ // if DAG_PLAN is not filled already, let's try to fetch it from other
+ dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN,
+ jsonObject.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
+ }
+ mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
}
JSONArray relatedEntities = dagJson.optJSONArray(Constants
.RELATED_ENTITIES);
@@ -268,6 +270,8 @@
}
if (!vertexJsonMap.containsKey(vertexName)) {
vertexJsonMap.put(vertexName, jsonObject);
+ } else {
+ mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
break;
@@ -281,6 +285,8 @@
}
if (!taskJsonMap.containsKey(taskName)) {
taskJsonMap.put(taskName, jsonObject);
+ } else {
+ mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
break;
@@ -294,6 +300,8 @@
}
if (!attemptJsonMap.containsKey(taskAttemptName)) {
attemptJsonMap.put(taskAttemptName, jsonObject);
+ } else {
+ mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
break;
@@ -311,4 +319,17 @@
"Please provide a valid/complete history log file containing " + dagId);
}
}
+
+ private void mergeSubJSONArray(JSONObject source, JSONObject destination, String key)
+ throws JSONException {
+ if (source.optJSONArray(key) == null) {
+ source.put(key, new JSONArray());
+ }
+ if (destination.optJSONArray(key) == null) {
+ destination.put(key, new JSONArray());
+ }
+ for (int i = 0; i < source.getJSONArray(key).length(); i++) {
+ destination.getJSONArray(key).put(source.getJSONArray(key).get(i));
+ }
+ }
}
\ No newline at end of file
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
index 3f9666a..783f486 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
@@ -44,8 +44,20 @@
BaseInfo(JSONObject jsonObject) throws JSONException {
final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
//parse tez counters
- tezCounters = Utils.parseTezCountersFromJSON(
- otherInfoNode.optJSONObject(Constants.COUNTERS));
+ JSONObject countersObj = otherInfoNode.optJSONObject(Constants.COUNTERS);
+ if (countersObj == null) {
+ /*
+ * This is a workaround for formatting differences, where a TaskFinishedEvent's
+ * counter is a correct json object shown as string, but VertexFinishedEvent's
+ * counter is an encoded json string, so the latter is interpreted as a String
+ * while parsing. The issue might be somewhere while converting these event objects
+ * to proto (HistoryEventProtoConverter). Even if should be fixed there,
+ * already generated events should be parsed correctly, hence this workaround.
+ * Will be investigated in the scope of TEZ-4324.
+ */
+ countersObj = new JSONObject(otherInfoNode.optString(Constants.COUNTERS));
+ }
+ tezCounters = Utils.parseTezCountersFromJSON(countersObj);
//parse events
eventList = Lists.newArrayList();
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
index 08eb92b..94b50a6 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -128,12 +128,12 @@
JSONObject eventNode = eventNodes.optJSONObject(i);
final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
final String eventType = eventNode.optString(Constants.EVENT_TYPE);
- final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
+ final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP) == 0
+ ? eventNode.optLong(Constants.TIMESTAMP) : eventNode.optLong(Constants.EVENT_TIME_STAMP);
Event event = new Event(eventInfo, eventType, time);
eventList.add(event);
-
}
}
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
index 26e20ab..ef84b2e 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
@@ -556,12 +556,12 @@
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
- long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+ long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);
JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
+ otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
- otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+ otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
@@ -620,11 +620,13 @@
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
- long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+ long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);
JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
- otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime));
+ otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);
+
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
index 6021c58..1f0a7ad 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
@@ -18,7 +18,6 @@
package org.apache.tez.analyzer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
@@ -54,11 +53,4 @@
* @return description of analyzer
*/
public String getDescription();
-
- /**
- * Get config properties related to this analyzer
- *
- * @return config related to analyzer
- */
- public Configuration getConfiguration();
}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index cad0d98..294527c 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -48,12 +48,18 @@
"Print task-to-node assignment details of a DAG");
pgd.addClass("TaskAttemptResultStatisticsAnalyzer", TaskAttemptResultStatisticsAnalyzer.class,
"Print vertex:node:status level details of task attempt results");
+ pgd.addClass("InputReadErrorAnalyzer", InputReadErrorAnalyzer.class,
+ "Print INPUT_READ_ERROR sources");
pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
"Print the task concurrency details in a DAG");
pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
"Find critical path at vertex level in a DAG");
pgd.addClass("OneOnOneEdgeAnalyzer", OneOnOneEdgeAnalyzer.class,
"Find out schedule misses in 1:1 edges in a DAG");
+ pgd.addClass("DagOverviewAnalyzer", DagOverviewAnalyzer.class,
+ "Print basic dag information (dag/vertex events)");
+ pgd.addClass("TaskHangAnalyzer", HungTaskAnalyzer.class,
+ "Print all vertices/tasks and their last attempts with status/duration/node");
exitCode = pgd.run(argv);
} catch(Throwable e){
e.printStackTrace();
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
index 5b862f8..553ff0e 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
@@ -39,15 +39,13 @@
*/
public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer {
- private final Configuration config;
-
private static final String[] headers =
{ "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };
private final CSVResult csvResult;
public ContainerReuseAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
this.csvResult = new CSVResult(headers);
}
@@ -82,11 +80,6 @@
return "Get details on container reuse analysis";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 387b0cf..3f5e300 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -113,10 +113,11 @@
ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
public CriticalPathAnalyzer() {
+ super(new Configuration());
}
public CriticalPathAnalyzer(Configuration conf) {
- setConf(conf);
+ super(conf);
}
@Override
@@ -643,13 +644,9 @@
return "Analyze critical path of the DAG";
}
- @Override
- public Configuration getConfiguration() {
- return getConf();
- }
-
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
+ Configuration config = new Configuration();
+ int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args);
System.exit(res);
}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java
new file mode 100644
index 0000000..b193c30
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.plugins;
+
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+public class DagOverviewAnalyzer extends TezAnalyzerBase implements Analyzer {
+ private final String[] headers =
+ { "name", "id", "event_type", "status", "event_time", "event_time_str", "vertex_task_stats", "diagnostics" };
+ private final CSVResult csvResult;
+ private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+ public DagOverviewAnalyzer(Configuration config) {
+ super(config);
+ csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ for (Event event : dagInfo.getEvents()) {
+ csvResult.addRecord(new String[] { dagInfo.getDagId(), dagInfo.getDagId(), event.getType(),
+ dagInfo.getStatus(), Long.toString(event.getTime()), toDateStr(event.getTime()), "", "" });
+ }
+ for (VertexInfo vertex : dagInfo.getVertices()) {
+ for (Event event : vertex.getEvents()) {
+ String vertexFailureInfoIfAny = "";
+ for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+ if (attempt.getStatus().contains("FAILED")) {
+ vertexFailureInfoIfAny = attempt.getTaskAttemptId() + ": "
+ + attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ");
+ break;
+ }
+ }
+ csvResult.addRecord(new String[] { vertex.getVertexName(), vertex.getVertexId(),
+ event.getType(), vertex.getStatus(), Long.toString(event.getTime()),
+ toDateStr(event.getTime()), getTaskStats(vertex), vertexFailureInfoIfAny });
+ }
+
+ // a failed task can lead to dag failure, so hopefully holds valuable information
+ for (TaskInfo failedTask : vertex.getFailedTasks()) {
+ for (Event failedTaskEvent : failedTask.getEvents()) {
+ if (failedTaskEvent.getType().equalsIgnoreCase("TASK_FINISHED")) {
+ csvResult.addRecord(new String[] { vertex.getVertexName(), failedTask.getTaskId(),
+ failedTaskEvent.getType(), failedTask.getStatus(), Long.toString(failedTaskEvent.getTime()),
+ toDateStr(failedTaskEvent.getTime()), getTaskStats(vertex),
+ failedTask.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
+ }
+ }
+ // if we already found a failing task, let's scan the failing attempts as well
+ for (TaskAttemptInfo failedAttempt : failedTask.getFailedTaskAttempts()) {
+ for (Event failedTaskAttemptEvent : failedAttempt.getEvents()) {
+ if (failedTaskAttemptEvent.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
+ csvResult.addRecord(new String[] { vertex.getVertexName(),
+ failedAttempt.getTaskAttemptId(), failedTaskAttemptEvent.getType(),
+ failedAttempt.getStatus(), Long.toString(failedTaskAttemptEvent.getTime()),
+ toDateStr(failedTaskAttemptEvent.getTime()), getTaskStats(vertex),
+ failedAttempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
+ }
+ }
+ }
+ }
+ }
+
+ csvResult.sort(new Comparator<String[]>() {
+ public int compare(String[] first, String[] second) {
+ return (int) (Long.parseLong(first[4]) - Long.parseLong(second[4]));
+ }
+ });
+ }
+
+ private String getTaskStats(VertexInfo vertex) {
+ return String.format("numTasks: %d failedTasks: %d completedTasks: %d", vertex.getNumTasks(),
+ vertex.getFailedTasksCount(), vertex.getCompletedTasksCount());
+ }
+
+ private static synchronized String toDateStr(long time) {
+ return FORMAT.format(new Date(time));
+ }
+
+ @Override
+ public Result getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Dag overview analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "High level dag events overview (dag, vertex event summary)."
+ + " Helps understand the overall progress of a dag by simply listing the dag/vertex related events";
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ DagOverviewAnalyzer analyzer = new DagOverviewAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java
new file mode 100644
index 0000000..9a38e28
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Gives insights about hanging task attempts by providing details about last attempts of all tasks.
+ */
+public class HungTaskAnalyzer extends TezAnalyzerBase implements Analyzer {
+ private final String[] headers = { "vertex", "task", " number_of_attempts", "last_attempt_id",
+ "last_attempt_status", "last_attempt_duration_ms", "last_attempt_node" };
+ private final CSVResult csvResult;
+
+ private static final String HEADER_NUM_ATTEMPTS = "num_attempts";
+ private static final String HEADER_LAST_ATTEMPT_ID_AND_STATUS = "last_attempt_id_and_status";
+ private static final String HEADER_LAST_ATTEMPT_STATUS = "last_attempt_status";
+ private static final String HEADER_LAST_ATTEMPT_NODE = "last_attempt_node";
+ private static final String HEADER_LAST_ATTEMPT_DURATION_MS = "last_attempt_duration_ms";
+
+ public HungTaskAnalyzer(Configuration config) {
+ super(config);
+ csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ Map<String, Map<String, String>> taskData = new HashMap<>(); // task attempt count per task
+ for (VertexInfo vertex : dagInfo.getVertices()) {
+ taskData.clear();
+ for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+ String taskId = attempt.getTaskInfo().getTaskId();
+
+ int numAttemptsForTask = attempt.getTaskInfo().getNumberOfTaskAttempts();
+ Map<String, String> thisTaskData = taskData.get(taskId);
+
+ if (thisTaskData == null) {
+ thisTaskData = new HashMap<>();
+ thisTaskData.put(HEADER_NUM_ATTEMPTS, Integer.toString(numAttemptsForTask));
+ taskData.put(taskId, thisTaskData);
+ }
+
+ int attemptNumber = TezTaskAttemptID.fromString(attempt.getTaskAttemptId()).getId();
+ if (attemptNumber == numAttemptsForTask - 1) {
+ thisTaskData.put(HEADER_LAST_ATTEMPT_ID_AND_STATUS, String.format("%s/%s", attempt.getTaskAttemptId(), attempt.getStatus()));
+ thisTaskData.put(HEADER_LAST_ATTEMPT_STATUS, attempt.getDetailedStatus());
+ thisTaskData.put(HEADER_LAST_ATTEMPT_NODE, attempt.getNodeId());
+
+ thisTaskData.put(HEADER_LAST_ATTEMPT_DURATION_MS,
+ (attempt.getFinishTime() == 0 || attempt.getStartTime() == 0) ? "-1"
+ : Long.toString(attempt.getFinishTime() - attempt.getStartTime()));
+ }
+ }
+ for (Map.Entry<String, Map<String, String>> task : taskData.entrySet()) {
+ addARecord(vertex.getVertexName(), task.getKey(), task.getValue().get(HEADER_NUM_ATTEMPTS),
+ task.getValue().get(HEADER_LAST_ATTEMPT_ID_AND_STATUS), task.getValue().get(HEADER_LAST_ATTEMPT_STATUS),
+ task.getValue().get(HEADER_LAST_ATTEMPT_DURATION_MS),
+ task.getValue().get(HEADER_LAST_ATTEMPT_NODE));
+ }
+ }
+
+ csvResult.sort(new Comparator<String[]>() {
+ public int compare(String[] first, String[] second) {
+ int vertexOrder = first[0].compareTo(second[0]);
+ int lastAttemptStatusOrder =
+ (first[4] == null || second[4] == null) ? 0 : first[4].compareTo(second[4]);
+ int attemptNumberOrder = Integer.valueOf(second[2]).compareTo(Integer.valueOf(first[2]));
+
+ return vertexOrder == 0
+ ? (lastAttemptStatusOrder == 0 ? attemptNumberOrder : lastAttemptStatusOrder)
+ : vertexOrder;
+ }
+ });
+ }
+
+ private void addARecord(String vertexName, String taskId, String numAttempts,
+ String lastAttemptId, String lastAttemptStatus, String lastAttemptDuration,
+ String lastAttemptNode) {
+ String[] record = new String[7];
+ record[0] = vertexName;
+ record[1] = taskId;
+ record[2] = numAttempts;
+ record[3] = lastAttemptId;
+ record[4] = lastAttemptStatus;
+ record[5] = lastAttemptDuration;
+ record[6] = lastAttemptNode;
+
+ csvResult.addRecord(record);
+ }
+
+ @Override
+ public Result getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Hung Task Analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "TaskHandAnalyzer can give quick insights about hanging task attempts"
+ + " by giving an overview of all tasks and their last attempts' status, duration, etc.";
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ HungTaskAnalyzer analyzer = new HungTaskAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java
new file mode 100644
index 0000000..3cb523f
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.plugins;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+/**
+ * Helps finding the root cause of shuffle errors, e.g. which node(s) can be blamed for them.
+ */
+public class InputReadErrorAnalyzer extends TezAnalyzerBase implements Analyzer {
+ private final String[] headers = { "vertex:attempt", "status", "time", "node", "diagnostics" };
+ private final CSVResult csvResult;
+
+ public InputReadErrorAnalyzer(Configuration config) {
+ super(config);
+ csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ for (VertexInfo vertex : dagInfo.getVertices()) {
+ for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+ String terminationCause = attempt.getTerminationCause();
+ if ("INPUT_READ_ERROR".equalsIgnoreCase(terminationCause)
+ || "OUTPUT_LOST".equalsIgnoreCase(terminationCause)
+ || "NODE_FAILED".equalsIgnoreCase(terminationCause)) {
+ for (Event event : attempt.getEvents()) {
+ if (event.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
+ csvResult.addRecord(new String[] {
+ vertex.getVertexName() + ":" + attempt.getTaskAttemptId(),
+ attempt.getDetailedStatus(), String.valueOf(event.getTime()), attempt.getNodeId(),
+ attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
+ }
+ }
+ }
+ }
+ }
+
+ csvResult.sort(new Comparator<String[]>() {
+ public int compare(String[] first, String[] second) {
+ return (int) (Long.parseLong(second[2]) - Long.parseLong(first[2]));
+ }
+ });
+ }
+
+ @Override
+ public Result getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Input read error analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Prints every task attempt (with node) which are related to input read errors";
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ InputReadErrorAnalyzer analyzer = new InputReadErrorAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
index ec72df1..d640704 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -52,12 +52,10 @@
private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio";
private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
- private final Configuration config;
-
private final CSVResult csvResult;
public LocalityAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
csvResult = new CSVResult(headers);
}
@@ -119,7 +117,7 @@
record.add(otherTaskResult.avgHDFSBytesRead + "");
String recommendation = "";
- if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
+ if (dataLocalRatio < getConf().getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
recommendation = "Data locality is poor for this vertex. Try tuning "
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", "
@@ -182,10 +180,6 @@
return "Analyze for locality information (data local, rack local, off-rack)";
}
- @Override public Configuration getConfiguration() {
- return config;
- }
-
/**
* Placeholder for task attempt details
*/
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java
index 2ba715e..a6cb3f1 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java
@@ -55,12 +55,10 @@
// DataMovementType::ONE_TO_ONE
private static final String ONE_TO_ONE = "ONE_TO_ONE";
- private final Configuration config;
-
private final CSVResult csvResult;
public OneOnOneEdgeAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
csvResult = new CSVResult(headers);
}
@@ -140,11 +138,6 @@
return "To understand the locality miss in 1:1 edge";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
OneOnOneEdgeAnalyzer analyzer = new OneOnOneEdgeAnalyzer(conf);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
index 57e91c6..f8f9112 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -66,14 +66,12 @@
private final CSVResult csvResult = new CSVResult(headers);
- private final Configuration config;
-
private final float realWorkDoneRatio;
private final long minShuffleRecords;
public ShuffleTimeAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
realWorkDoneRatio = config.getFloat
(REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
@@ -208,11 +206,6 @@
+ "and the real work done in the task";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
index 6025541..a7d14fa 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -85,14 +85,12 @@
private final CSVResult csvResult = new CSVResult(headers);
- private final Configuration config;
-
private final float minRatio;
private final float maxRatio;
private final long maxShuffleBytesPerSource;
public SkewAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO,
ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT);
minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO,
@@ -214,7 +212,7 @@
if (vertexNumTasks > 1) {
if (ratio > maxRatio) {
//input records > 60% of vertex level record count
- if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) {
+ if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.6f)) {
List<String> result = Lists.newLinkedList();
result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
result.add(attemptInfo.getTaskAttemptId());
@@ -305,12 +303,7 @@
@Override
public String getDescription() {
- return "Analyzer reducer skews by mining reducer task counters";
- }
-
- @Override
- public Configuration getConfiguration() {
- return null;
+ return "Analyze reducer skews by mining reducer task counters";
}
public static void main(String[] args) throws Exception {
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
index a810a8a..9e573c2 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
@@ -59,10 +59,8 @@
private final CSVResult csvResult = new CSVResult(headers);
- private final Configuration config;
-
public SlowNodeAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
}
@Override
@@ -182,11 +180,6 @@
return sb.toString();
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
index d2474ad..7c9958b 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -51,10 +51,8 @@
private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count";
private static final int NO_OF_TASKS_DEFAULT = 100;
- private final Configuration config;
-
public SlowTaskIdentifier(Configuration config) {
- this.config = config;
+ super(config);
this.csvResult = new CSVResult(headers);
}
@@ -75,7 +73,7 @@
});
int limit = Math.min(taskAttempts.size(),
- Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+ Math.max(0, getConf().getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
if (limit == 0) {
return;
@@ -111,11 +109,6 @@
return "Identifies slow tasks in the DAG";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
index 33f2421..efa39a3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -49,7 +49,6 @@
private final CSVResult csvResult = new CSVResult(headers);
- private final Configuration config;
private final MetricRegistry metrics = new MetricRegistry();
private Histogram taskAttemptRuntimeHistorgram;
@@ -59,7 +58,7 @@
private final long vertexRuntimeThreshold;
public SlowestVertexAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
MAX_VERTEX_RUNTIME_DEFAULT));
@@ -204,11 +203,6 @@
return "Identify the slowest vertex in the DAG, which needs to be looked into first";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
index d69ca23..026dd15 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -60,10 +60,8 @@
private final long minOutputBytesPerTask;
- private final Configuration config;
-
public SpillAnalyzerImpl(Configuration config) {
- this.config = config;
+ super(config);
minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
OUTPUT_BYTES_THRESHOLD_DEFAULT));
this.csvResult = new CSVResult(headers);
@@ -130,11 +128,6 @@
return "Analyze spill details in the task";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
index ce6fa41..02b821f 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
@@ -36,29 +36,27 @@
*/
public class TaskAssignmentAnalyzer extends TezAnalyzerBase
implements Analyzer {
- private final String[] headers = { "vertex", "node", "numTasks", "load" };
- private final Configuration config;
+ private final String[] headers = { "vertex", "node", "numTaskAttempts", "load" };
private final CSVResult csvResult;
public TaskAssignmentAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
csvResult = new CSVResult(headers);
}
@Override
public void analyze(DagInfo dagInfo) throws TezException {
- Map<String, Integer> map = new HashMap<>();
+ Map<String, Integer> taskAttemptsPerNode = new HashMap<>();
for (VertexInfo vertex : dagInfo.getVertices()) {
- map.clear();
+ taskAttemptsPerNode.clear();
for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
- Integer previousValue = map.get(attempt.getNodeId());
- map.put(attempt.getNodeId(),
- previousValue == null ? 1 : previousValue + 1);
+ Integer previousValue = taskAttemptsPerNode.get(attempt.getNodeId());
+ taskAttemptsPerNode.put(attempt.getNodeId(), previousValue == null ? 1 : previousValue + 1);
}
- double mean = vertex.getTaskAttempts().size() / Math.max(1.0, map.size());
- for (Map.Entry<String, Integer> assignment : map.entrySet()) {
- addARecord(vertex.getVertexName(), assignment.getKey(),
- assignment.getValue(), assignment.getValue() * 100 / mean);
+ double mean = vertex.getTaskAttempts().size() / Math.max(1.0, taskAttemptsPerNode.size());
+ for (Map.Entry<String, Integer> assignment : taskAttemptsPerNode.entrySet()) {
+ addARecord(vertex.getVertexName(), assignment.getKey(), assignment.getValue(),
+ assignment.getValue() * 100 / mean);
}
}
}
@@ -88,11 +86,6 @@
return "Get the Task assignments on different nodes of the cluster";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
TaskAssignmentAnalyzer analyzer = new TaskAssignmentAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
index df2f95c..cf6b2f0 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
@@ -44,11 +44,10 @@
public class TaskAttemptResultStatisticsAnalyzer extends TezAnalyzerBase implements Analyzer {
private final String[] headers =
{ "vertex (+task stats: all/succeeded/failed/killed)", "node", "status", "numAttempts" };
- private final Configuration config;
private final CSVResult csvResult;
public TaskAttemptResultStatisticsAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
csvResult = new CSVResult(headers);
}
@@ -71,7 +70,8 @@
}
map.forEach((key, value) -> {
- addARecord(key.split("#")[0], key.split("#")[1], key.split("#")[2], value);
+ String[] keys = key.split("#");
+ addARecord(keys[0], keys[1], keys.length > 2 ? keys[2] : "", value);
});
csvResult.sort(new Comparator<String[]>() {
@@ -110,11 +110,6 @@
return "Get statistics about task attempts states in vertex:node:status level";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
TaskAttemptResultStatisticsAnalyzer analyzer = new TaskAttemptResultStatisticsAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
index 72f3b36..91f51b4 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -41,11 +41,10 @@
private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
private final CSVResult csvResult;
- private final Configuration config;
public TaskConcurrencyAnalyzer(Configuration conf) {
+ super(conf);
this.csvResult = new CSVResult(headers);
- this.config = conf;
}
private enum EventType {START, FINISH}
@@ -153,11 +152,6 @@
+ "would be helpful in understanding whether any starvation was there or not.";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
index 75a55a7..705c6e9 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -33,6 +33,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
@@ -67,7 +68,11 @@
private String outputDir;
private boolean saveResults = false;
-
+
+ public TezAnalyzerBase(Configuration config) {
+ setConf(config);
+ }
+
@SuppressWarnings("static-access")
private static Options buildOptions() {
Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
index 06b8983..78a4d41 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
@@ -44,8 +44,6 @@
* Identify a set of vertices which fall in the critical path in a DAG.
*/
public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
- private final Configuration config;
-
private static final String[] headers = { "CriticalPath", "Score" };
private final CSVResult csvResult;
@@ -58,7 +56,7 @@
private static final String CONNECTOR = "-->";
public VertexLevelCriticalPathAnalyzer(Configuration config) {
- this.config = config;
+ super(config);
this.csvResult = new CSVResult(headers);
this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
}
@@ -105,11 +103,6 @@
return "Analyze vertex level critical path of the DAG";
}
- @Override
- public Configuration getConfiguration() {
- return config;
- }
-
private static Map<String, Long> sortByValues(Map<String, Long> result) {
//Sort result by time in reverse order
final Ordering<String> reversValueOrdering =