TEZ-4231: Fix multiple history parser and event converter issues (#123) (Laszlo Bodor reviewed by Rajesh Balamohan)

diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
index d28fd67..397a46f 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
@@ -98,7 +98,6 @@
           return message != null;
         } catch (java.io.EOFException e) {
           reader.close();
-
           if (!fileIt.hasNext()) {
             return false;
           } else {
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
index db3f648..c1711ce 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -236,11 +236,13 @@
         // time etc).
         if (dagJson == null) {
           dagJson = jsonObject;
-        } else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
-            .optJSONObject(ATSConstants.DAG_PLAN) == null) {
-          // if DAG_PLAN is not filled already, let's try to fetch it from other
-          dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
-              .getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
+        } else {
+          if (dagJson.optJSONObject(ATSConstants.OTHER_INFO).optJSONObject(ATSConstants.DAG_PLAN) == null) {
+            // if DAG_PLAN is not filled already, let's try to fetch it from other
+            dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN,
+                jsonObject.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
+          }
+          mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
         }
         JSONArray relatedEntities = dagJson.optJSONArray(Constants
             .RELATED_ENTITIES);
@@ -268,6 +270,8 @@
         }
         if (!vertexJsonMap.containsKey(vertexName)) {
           vertexJsonMap.put(vertexName, jsonObject);
+        } else {
+          mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
         }
         populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
         break;
@@ -281,6 +285,8 @@
         }
         if (!taskJsonMap.containsKey(taskName)) {
           taskJsonMap.put(taskName, jsonObject);
+        } else {
+          mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
         }
         populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
         break;
@@ -294,6 +300,8 @@
         }
         if (!attemptJsonMap.containsKey(taskAttemptName)) {
           attemptJsonMap.put(taskAttemptName, jsonObject);
+        } else {
+          mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
         }
         populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
         break;
@@ -311,4 +319,17 @@
           "Please provide a valid/complete history log file containing " + dagId);
     }
   }
+
+  private void mergeSubJSONArray(JSONObject source, JSONObject destination, String key)
+      throws JSONException {
+    if (source.optJSONArray(key) == null) {
+      source.put(key, new JSONArray());
+    }
+    if (destination.optJSONArray(key) == null) {
+      destination.put(key, new JSONArray());
+    }
+    for (int i = 0; i < source.getJSONArray(key).length(); i++) {
+      destination.getJSONArray(key).put(source.getJSONArray(key).get(i));
+    }
+  }
 }
\ No newline at end of file
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
index 3f9666a..783f486 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
@@ -44,8 +44,20 @@
   BaseInfo(JSONObject jsonObject) throws JSONException {
     final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
     //parse tez counters
-    tezCounters = Utils.parseTezCountersFromJSON(
-        otherInfoNode.optJSONObject(Constants.COUNTERS));
+    JSONObject countersObj = otherInfoNode.optJSONObject(Constants.COUNTERS);
+    if (countersObj == null) {
+      /*
+       * This is a workaround for formatting differences, where a TaskFinishedEvent's
+       * counter is a correct json object shown as string, but VertexFinishedEvent's
+       * counter is an encoded json string, so the latter is interpreted as a String
+       * while parsing. The issue might be somewhere while converting these event objects
+       * to proto (HistoryEventProtoConverter). Even if should be fixed there,
+       * already generated events should be parsed correctly, hence this workaround.
+       * Will be investigated in the scope of TEZ-4324.
+       */
+      countersObj = new JSONObject(otherInfoNode.optString(Constants.COUNTERS));
+    }
+    tezCounters = Utils.parseTezCountersFromJSON(countersObj);
 
     //parse events
     eventList = Lists.newArrayList();
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
index 08eb92b..94b50a6 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -128,12 +128,12 @@
       JSONObject eventNode = eventNodes.optJSONObject(i);
       final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
       final String eventType = eventNode.optString(Constants.EVENT_TYPE);
-      final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
+      final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP) == 0
+          ? eventNode.optLong(Constants.TIMESTAMP) : eventNode.optLong(Constants.EVENT_TIME_STAMP);
 
       Event event = new Event(eventInfo, eventType, time);
 
       eventList.add(event);
-
     }
   }
 
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
index 26e20ab..ef84b2e 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
@@ -556,12 +556,12 @@
     events.put(finishEvent);
     jsonObject.put(ATSConstants.EVENTS, events);
 
-    long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+    long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);
 
     JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_TIME, startTime);
+    otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
     otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
-    otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+    otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);
 
     otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
     otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
@@ -620,11 +620,13 @@
     events.put(finishEvent);
     jsonObject.put(ATSConstants.EVENTS, events);
 
-    long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+    long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);
 
     JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
     otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
-    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime));
+    otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);
+
     otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
     otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
     otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
index 6021c58..1f0a7ad 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.analyzer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 
@@ -54,11 +53,4 @@
    * @return description of analyzer
    */
   public String getDescription();
-
-  /**
-   * Get config properties related to this analyzer
-   *
-   * @return config related to analyzer
-   */
-  public Configuration getConfiguration();
 }
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index cad0d98..294527c 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -48,12 +48,18 @@
           "Print task-to-node assignment details of a DAG");
       pgd.addClass("TaskAttemptResultStatisticsAnalyzer", TaskAttemptResultStatisticsAnalyzer.class,
           "Print vertex:node:status level details of task attempt results");
+      pgd.addClass("InputReadErrorAnalyzer", InputReadErrorAnalyzer.class,
+          "Print INPUT_READ_ERROR sources");
       pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
           "Print the task concurrency details in a DAG");
       pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
           "Find critical path at vertex level in a DAG");
       pgd.addClass("OneOnOneEdgeAnalyzer", OneOnOneEdgeAnalyzer.class,
           "Find out schedule misses in 1:1 edges in a DAG");
+      pgd.addClass("DagOverviewAnalyzer", DagOverviewAnalyzer.class,
+          "Print basic dag information (dag/vertex events)");
+      pgd.addClass("TaskHangAnalyzer", HungTaskAnalyzer.class,
+          "Print all vertices/tasks and their last attempts with status/duration/node");
       exitCode = pgd.run(argv);
     } catch(Throwable e){
       e.printStackTrace();
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
index 5b862f8..553ff0e 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
@@ -39,15 +39,13 @@
  */
 public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer {
 
-  private final Configuration config;
-
   private static final String[] headers =
       { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };
 
   private final CSVResult csvResult;
 
   public ContainerReuseAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     this.csvResult = new CSVResult(headers);
   }
 
@@ -82,11 +80,6 @@
     return "Get details on container reuse analysis";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 387b0cf..3f5e300 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -113,10 +113,11 @@
   ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
 
   public CriticalPathAnalyzer() {
+    super(new Configuration());
   }
 
   public CriticalPathAnalyzer(Configuration conf) {
-    setConf(conf);
+    super(conf);
   }
 
   @Override 
@@ -643,13 +644,9 @@
     return "Analyze critical path of the DAG";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return getConf();
-  }
-  
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
+    Configuration config = new Configuration();
+    int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args);
     System.exit(res);
   }
 
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java
new file mode 100644
index 0000000..b193c30
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.plugins;
+
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+public class DagOverviewAnalyzer extends TezAnalyzerBase implements Analyzer {
+  private final String[] headers =
+      { "name", "id", "event_type", "status", "event_time", "event_time_str", "vertex_task_stats", "diagnostics" };
+  private final CSVResult csvResult;
+  private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+  public DagOverviewAnalyzer(Configuration config) {
+    super(config);
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (Event event : dagInfo.getEvents()) {
+      csvResult.addRecord(new String[] { dagInfo.getDagId(), dagInfo.getDagId(), event.getType(),
+          dagInfo.getStatus(), Long.toString(event.getTime()), toDateStr(event.getTime()), "", "" });
+    }
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      for (Event event : vertex.getEvents()) {
+        String vertexFailureInfoIfAny = "";
+        for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+          if (attempt.getStatus().contains("FAILED")) {
+            vertexFailureInfoIfAny = attempt.getTaskAttemptId() + ": "
+                + attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ");
+            break;
+          }
+        }
+        csvResult.addRecord(new String[] { vertex.getVertexName(), vertex.getVertexId(),
+            event.getType(), vertex.getStatus(), Long.toString(event.getTime()),
+            toDateStr(event.getTime()), getTaskStats(vertex), vertexFailureInfoIfAny });
+      }
+
+      // a failed task can lead to dag failure, so hopefully holds valuable information
+      for (TaskInfo failedTask : vertex.getFailedTasks()) {
+        for (Event failedTaskEvent : failedTask.getEvents()) {
+          if (failedTaskEvent.getType().equalsIgnoreCase("TASK_FINISHED")) {
+            csvResult.addRecord(new String[] { vertex.getVertexName(), failedTask.getTaskId(),
+                failedTaskEvent.getType(), failedTask.getStatus(), Long.toString(failedTaskEvent.getTime()),
+                toDateStr(failedTaskEvent.getTime()), getTaskStats(vertex),
+                failedTask.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
+          }
+        }
+        // if we already found a failing task, let's scan the failing attempts as well
+        for (TaskAttemptInfo failedAttempt : failedTask.getFailedTaskAttempts()) {
+          for (Event failedTaskAttemptEvent : failedAttempt.getEvents()) {
+            if (failedTaskAttemptEvent.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
+              csvResult.addRecord(new String[] { vertex.getVertexName(),
+                  failedAttempt.getTaskAttemptId(), failedTaskAttemptEvent.getType(),
+                  failedAttempt.getStatus(), Long.toString(failedTaskAttemptEvent.getTime()),
+                  toDateStr(failedTaskAttemptEvent.getTime()), getTaskStats(vertex),
+                  failedAttempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
+            }
+          }
+        }
+      }
+    }
+
+    csvResult.sort(new Comparator<String[]>() {
+      public int compare(String[] first, String[] second) {
+        return (int) (Long.parseLong(first[4]) - Long.parseLong(second[4]));
+      }
+    });
+  }
+
+  private String getTaskStats(VertexInfo vertex) {
+    return String.format("numTasks: %d failedTasks: %d completedTasks: %d", vertex.getNumTasks(),
+        vertex.getFailedTasksCount(), vertex.getCompletedTasksCount());
+  }
+
+  private static synchronized String toDateStr(long time) {
+    return FORMAT.format(new Date(time));
+  }
+
+  @Override
+  public Result getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Dag overview analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "High level dag events overview (dag, vertex event summary)."
+        + " Helps understand the overall progress of a dag by simply listing the dag/vertex related events";
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    DagOverviewAnalyzer analyzer = new DagOverviewAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java
new file mode 100644
index 0000000..9a38e28
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Gives insights about hanging task attempts by providing details about last attempts of all tasks.
+ */
+public class HungTaskAnalyzer extends TezAnalyzerBase implements Analyzer {
+  private final String[] headers = { "vertex", "task", " number_of_attempts", "last_attempt_id",
+      "last_attempt_status", "last_attempt_duration_ms", "last_attempt_node" };
+  private final CSVResult csvResult;
+
+  private static final String HEADER_NUM_ATTEMPTS = "num_attempts";
+  private static final String HEADER_LAST_ATTEMPT_ID_AND_STATUS = "last_attempt_id_and_status";
+  private static final String HEADER_LAST_ATTEMPT_STATUS = "last_attempt_status";
+  private static final String HEADER_LAST_ATTEMPT_NODE = "last_attempt_node";
+  private static final String HEADER_LAST_ATTEMPT_DURATION_MS = "last_attempt_duration_ms";
+
+  public HungTaskAnalyzer(Configuration config) {
+    super(config);
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Map<String, String>> taskData = new HashMap<>(); // task attempt count per task
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      taskData.clear();
+      for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+        String taskId = attempt.getTaskInfo().getTaskId();
+
+        int numAttemptsForTask = attempt.getTaskInfo().getNumberOfTaskAttempts();
+        Map<String, String> thisTaskData = taskData.get(taskId);
+
+        if (thisTaskData == null) {
+          thisTaskData = new HashMap<>();
+          thisTaskData.put(HEADER_NUM_ATTEMPTS, Integer.toString(numAttemptsForTask));
+          taskData.put(taskId, thisTaskData);
+        }
+
+        int attemptNumber = TezTaskAttemptID.fromString(attempt.getTaskAttemptId()).getId();
+        if (attemptNumber == numAttemptsForTask - 1) {
+          thisTaskData.put(HEADER_LAST_ATTEMPT_ID_AND_STATUS, String.format("%s/%s", attempt.getTaskAttemptId(), attempt.getStatus()));
+          thisTaskData.put(HEADER_LAST_ATTEMPT_STATUS, attempt.getDetailedStatus());
+          thisTaskData.put(HEADER_LAST_ATTEMPT_NODE, attempt.getNodeId());
+
+          thisTaskData.put(HEADER_LAST_ATTEMPT_DURATION_MS,
+              (attempt.getFinishTime() == 0 || attempt.getStartTime() == 0) ? "-1"
+                : Long.toString(attempt.getFinishTime() - attempt.getStartTime()));
+        }
+      }
+      for (Map.Entry<String, Map<String, String>> task : taskData.entrySet()) {
+        addARecord(vertex.getVertexName(), task.getKey(), task.getValue().get(HEADER_NUM_ATTEMPTS),
+            task.getValue().get(HEADER_LAST_ATTEMPT_ID_AND_STATUS), task.getValue().get(HEADER_LAST_ATTEMPT_STATUS),
+            task.getValue().get(HEADER_LAST_ATTEMPT_DURATION_MS),
+            task.getValue().get(HEADER_LAST_ATTEMPT_NODE));
+      }
+    }
+
+    csvResult.sort(new Comparator<String[]>() {
+      public int compare(String[] first, String[] second) {
+        int vertexOrder = first[0].compareTo(second[0]);
+        int lastAttemptStatusOrder =
+            (first[4] == null || second[4] == null) ? 0 : first[4].compareTo(second[4]);
+        int attemptNumberOrder = Integer.valueOf(second[2]).compareTo(Integer.valueOf(first[2]));
+
+        return vertexOrder == 0
+          ? (lastAttemptStatusOrder == 0 ? attemptNumberOrder : lastAttemptStatusOrder)
+          : vertexOrder;
+      }
+    });
+  }
+
+  private void addARecord(String vertexName, String taskId, String numAttempts,
+      String lastAttemptId, String lastAttemptStatus, String lastAttemptDuration,
+      String lastAttemptNode) {
+    String[] record = new String[7];
+    record[0] = vertexName;
+    record[1] = taskId;
+    record[2] = numAttempts;
+    record[3] = lastAttemptId;
+    record[4] = lastAttemptStatus;
+    record[5] = lastAttemptDuration;
+    record[6] = lastAttemptNode;
+
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public Result getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Hung Task Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "TaskHandAnalyzer can give quick insights about hanging task attempts"
+        + " by giving an overview of all tasks and their last attempts' status, duration, etc.";
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    HungTaskAnalyzer analyzer = new HungTaskAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java
new file mode 100644
index 0000000..3cb523f
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.plugins;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+/**
+ * Helps finding the root cause of shuffle errors, e.g. which node(s) can be blamed for them.
+ */
+public class InputReadErrorAnalyzer extends TezAnalyzerBase implements Analyzer {
+  private final String[] headers = { "vertex:attempt", "status", "time", "node", "diagnostics" };
+  private final CSVResult csvResult;
+
+  public InputReadErrorAnalyzer(Configuration config) {
+    super(config);
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+        String terminationCause = attempt.getTerminationCause();
+        if ("INPUT_READ_ERROR".equalsIgnoreCase(terminationCause)
+            || "OUTPUT_LOST".equalsIgnoreCase(terminationCause)
+            || "NODE_FAILED".equalsIgnoreCase(terminationCause)) {
+          for (Event event : attempt.getEvents()) {
+            if (event.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
+              csvResult.addRecord(new String[] {
+                  vertex.getVertexName() + ":" + attempt.getTaskAttemptId(),
+                  attempt.getDetailedStatus(), String.valueOf(event.getTime()), attempt.getNodeId(),
+                  attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
+            }
+          }
+        }
+      }
+    }
+
+    csvResult.sort(new Comparator<String[]>() {
+      public int compare(String[] first, String[] second) {
+        return (int) (Long.parseLong(second[2]) - Long.parseLong(first[2]));
+      }
+    });
+  }
+
+  @Override
+  public Result getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Input read error analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Prints every task attempt (with node) which are related to input read errors";
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    InputReadErrorAnalyzer analyzer = new InputReadErrorAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
index ec72df1..d640704 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -52,12 +52,10 @@
   private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio";
   private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
 
-  private final Configuration config;
-
   private final CSVResult csvResult;
 
   public LocalityAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     csvResult = new CSVResult(headers);
   }
 
@@ -119,7 +117,7 @@
         record.add(otherTaskResult.avgHDFSBytesRead + "");
 
         String recommendation = "";
-        if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
+        if (dataLocalRatio < getConf().getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
           recommendation = "Data locality is poor for this vertex. Try tuning "
               + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
               + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", "
@@ -182,10 +180,6 @@
     return "Analyze for locality information (data local, rack local, off-rack)";
   }
 
-  @Override public Configuration getConfiguration() {
-    return config;
-  }
-
   /**
    * Placeholder for task attempt details
    */
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java
index 2ba715e..a6cb3f1 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java
@@ -55,12 +55,10 @@
 
   // DataMovementType::ONE_TO_ONE
   private static final String ONE_TO_ONE = "ONE_TO_ONE";
-  private final Configuration config;
-
   private final CSVResult csvResult;
 
   public OneOnOneEdgeAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     csvResult = new CSVResult(headers);
   }
 
@@ -140,11 +138,6 @@
     return "To understand the locality miss in 1:1 edge";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     OneOnOneEdgeAnalyzer analyzer = new OneOnOneEdgeAnalyzer(conf);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
index 57e91c6..f8f9112 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -66,14 +66,12 @@
 
   private final CSVResult csvResult = new CSVResult(headers);
 
-  private final Configuration config;
-
   private final float realWorkDoneRatio;
   private final long minShuffleRecords;
 
 
   public ShuffleTimeAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
 
     realWorkDoneRatio = config.getFloat
         (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
@@ -208,11 +206,6 @@
         + "and the real work done in the task";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
index 6025541..a7d14fa 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -85,14 +85,12 @@
 
   private final CSVResult csvResult = new CSVResult(headers);
 
-  private final Configuration config;
-
   private final float minRatio;
   private final float maxRatio;
   private final long maxShuffleBytesPerSource;
 
   public SkewAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO,
         ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT);
     minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO,
@@ -214,7 +212,7 @@
       if (vertexNumTasks > 1) {
         if (ratio > maxRatio) {
           //input records > 60% of vertex level record count
-          if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) {
+          if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.6f)) {
             List<String> result = Lists.newLinkedList();
             result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
             result.add(attemptInfo.getTaskAttemptId());
@@ -305,12 +303,7 @@
 
   @Override
   public String getDescription() {
-    return "Analyzer reducer skews by mining reducer task counters";
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return null;
+    return "Analyze reducer skews by mining reducer task counters";
   }
 
   public static void main(String[] args) throws Exception {
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
index a810a8a..9e573c2 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
@@ -59,10 +59,8 @@
 
   private final CSVResult csvResult = new CSVResult(headers);
 
-  private final Configuration config;
-
   public SlowNodeAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
   }
 
   @Override
@@ -182,11 +180,6 @@
     return sb.toString();
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
index d2474ad..7c9958b 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -51,10 +51,8 @@
   private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count";
   private static final int NO_OF_TASKS_DEFAULT = 100;
 
-  private final Configuration config;
-
   public SlowTaskIdentifier(Configuration config) {
-    this.config = config;
+    super(config);
     this.csvResult = new CSVResult(headers);
   }
 
@@ -75,7 +73,7 @@
     });
 
     int limit = Math.min(taskAttempts.size(),
-        Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+        Math.max(0, getConf().getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
 
     if (limit == 0) {
       return;
@@ -111,11 +109,6 @@
     return "Identifies slow tasks in the DAG";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
index 33f2421..efa39a3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -49,7 +49,6 @@
 
   private final CSVResult csvResult = new CSVResult(headers);
 
-  private final Configuration config;
   private final MetricRegistry metrics = new MetricRegistry();
   private Histogram taskAttemptRuntimeHistorgram;
 
@@ -59,7 +58,7 @@
   private final long vertexRuntimeThreshold;
 
   public SlowestVertexAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
         MAX_VERTEX_RUNTIME_DEFAULT));
 
@@ -204,11 +203,6 @@
     return "Identify the slowest vertex in the DAG, which needs to be looked into first";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
index d69ca23..026dd15 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -60,10 +60,8 @@
 
   private final long minOutputBytesPerTask;
 
-  private final Configuration config;
-
   public SpillAnalyzerImpl(Configuration config) {
-    this.config = config;
+    super(config);
     minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
         OUTPUT_BYTES_THRESHOLD_DEFAULT));
     this.csvResult = new CSVResult(headers);
@@ -130,11 +128,6 @@
     return "Analyze spill details in the task";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
index ce6fa41..02b821f 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
@@ -36,29 +36,27 @@
  */
 public class TaskAssignmentAnalyzer extends TezAnalyzerBase
     implements Analyzer {
-  private final String[] headers = { "vertex", "node", "numTasks", "load" };
-  private final Configuration config;
+  private final String[] headers = { "vertex", "node", "numTaskAttempts", "load" };
   private final CSVResult csvResult;
 
   public TaskAssignmentAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     csvResult = new CSVResult(headers);
   }
 
   @Override
   public void analyze(DagInfo dagInfo) throws TezException {
-    Map<String, Integer> map = new HashMap<>();
+    Map<String, Integer> taskAttemptsPerNode = new HashMap<>();
     for (VertexInfo vertex : dagInfo.getVertices()) {
-      map.clear();
+      taskAttemptsPerNode.clear();
       for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
-        Integer previousValue = map.get(attempt.getNodeId());
-        map.put(attempt.getNodeId(),
-            previousValue == null ? 1 : previousValue + 1);
+        Integer previousValue = taskAttemptsPerNode.get(attempt.getNodeId());
+        taskAttemptsPerNode.put(attempt.getNodeId(), previousValue == null ? 1 : previousValue + 1);
       }
-      double mean = vertex.getTaskAttempts().size() / Math.max(1.0, map.size());
-      for (Map.Entry<String, Integer> assignment : map.entrySet()) {
-        addARecord(vertex.getVertexName(), assignment.getKey(),
-            assignment.getValue(), assignment.getValue() * 100 / mean);
+      double mean = vertex.getTaskAttempts().size() / Math.max(1.0, taskAttemptsPerNode.size());
+      for (Map.Entry<String, Integer> assignment : taskAttemptsPerNode.entrySet()) {
+        addARecord(vertex.getVertexName(), assignment.getKey(), assignment.getValue(),
+            assignment.getValue() * 100 / mean);
       }
     }
   }
@@ -88,11 +86,6 @@
     return "Get the Task assignments on different nodes of the cluster";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     TaskAssignmentAnalyzer analyzer = new TaskAssignmentAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
index df2f95c..cf6b2f0 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
@@ -44,11 +44,10 @@
 public class TaskAttemptResultStatisticsAnalyzer extends TezAnalyzerBase implements Analyzer {
   private final String[] headers =
       { "vertex (+task stats: all/succeeded/failed/killed)", "node", "status", "numAttempts" };
-  private final Configuration config;
   private final CSVResult csvResult;
 
   public TaskAttemptResultStatisticsAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     csvResult = new CSVResult(headers);
   }
 
@@ -71,7 +70,8 @@
     }
 
     map.forEach((key, value) -> {
-      addARecord(key.split("#")[0], key.split("#")[1], key.split("#")[2], value);
+      String[] keys = key.split("#");
+      addARecord(keys[0], keys[1], keys.length > 2 ? keys[2] : "", value);
     });
 
     csvResult.sort(new Comparator<String[]>() {
@@ -110,11 +110,6 @@
     return "Get statistics about task attempts states in vertex:node:status level";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     TaskAttemptResultStatisticsAnalyzer analyzer = new TaskAttemptResultStatisticsAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
index 72f3b36..91f51b4 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -41,11 +41,10 @@
   private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
 
   private final CSVResult csvResult;
-  private final Configuration config;
 
   public TaskConcurrencyAnalyzer(Configuration conf) {
+    super(conf);
     this.csvResult = new CSVResult(headers);
-    this.config = conf;
   }
 
   private enum EventType {START, FINISH}
@@ -153,11 +152,6 @@
         + "would be helpful in understanding whether any starvation was there or not.";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration config = new Configuration();
     TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config);
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
index 75a55a7..705c6e9 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -33,6 +33,7 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Tool;
@@ -67,7 +68,11 @@
 
   private String outputDir;
   private boolean saveResults = false;
-  
+
+  public TezAnalyzerBase(Configuration config) {
+    setConf(config);
+  }
+
   @SuppressWarnings("static-access")
   private static Options buildOptions() {
     Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
index 06b8983..78a4d41 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
@@ -44,8 +44,6 @@
  * Identify a set of vertices which fall in the critical path in a DAG.
  */
 public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
-  private final Configuration config;
-
   private static final String[] headers = { "CriticalPath", "Score" };
 
   private final CSVResult csvResult;
@@ -58,7 +56,7 @@
   private static final String CONNECTOR = "-->";
 
   public VertexLevelCriticalPathAnalyzer(Configuration config) {
-    this.config = config;
+    super(config);
     this.csvResult = new CSVResult(headers);
     this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
   }
@@ -105,11 +103,6 @@
     return "Analyze vertex level critical path of the DAG";
   }
 
-  @Override
-  public Configuration getConfiguration() {
-    return config;
-  }
-
   private static Map<String, Long> sortByValues(Map<String, Long> result) {
     //Sort result by time in reverse order
     final Ordering<String> reversValueOrdering =