TEZ-3957: Report TASK_DURATION_MILLIS as a Counter for completed tasks (Sergey Shelukhin, reviewed by Gopal V)

Signed-off-by: Gopal V <gopalv@apache.org>
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
index 5064c35..0a32d38 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
@@ -38,5 +38,7 @@
   NUM_UBER_SUBTASKS,
   NUM_FAILED_UBERTASKS,
   AM_CPU_MILLISECONDS,
+  /** Wall clock time taken by all the tasks. */
+  WALL_CLOCK_MILLIS,
   AM_GC_TIME_MILLIS
 }
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 2f18bc6..80424c7 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -74,6 +74,8 @@
   MERGED_MAP_OUTPUTS,
   GC_TIME_MILLIS,
   CPU_MILLISECONDS,
+  /** Wall clock time taken by the task initialization and execution. */
+  WALL_CLOCK_MILLISECONDS,
   PHYSICAL_MEMORY_BYTES,
   VIRTUAL_MEMORY_BYTES,
   COMMITTED_HEAP_BYTES,
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7399979..3107330 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -179,6 +180,12 @@
   private TaskAttemptRecoveryData recoveryData;
   private long launchTime = 0;
   private long finishTime = 0;
+  /** System.nanoTime for task launch time, if recorded in this JVM. */
+  private Long launchTimeNs;
+  /** System.nanoTime for task finish time, if recorded in this JVM. */
+  private Long finishTimeNs;
+  /** Whether the task was recovered from a prior AM; see getDurationNs. */
+  private boolean isRecoveredDuration;
   private String trackerName;
   private int httpPort;
 
@@ -782,6 +789,25 @@
     }
   }
 
+
+  /** @return task runtime duration in NS. */
+  public long getDurationNs() {
+    readLock.lock();
+    try {
+      if (isRecoveredDuration) {
+        // NS values are not mappable between JVMs (per documentation, at
+        // least), so just use the clock after recovery.
+        return TimeUnit.MILLISECONDS.toNanos(launchTime == 0 ? 0
+            : (finishTime == 0 ? clock.getTime() : finishTime) - launchTime);
+      } else {
+        long ft = (finishTimeNs == null ? System.nanoTime() : finishTimeNs);
+        return (launchTimeNs == null) ? 0 : (ft - launchTimeNs);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   public long getCreationTime() {
     readLock.lock();
     try {
@@ -930,6 +956,8 @@
     // set the finish time only if launch time is set
     if (launchTime != 0 && finishTime == 0) {
       finishTime = clock.getTime();
+      // The default clock is not safe for measuring durations.
+      finishTimeNs = System.nanoTime();
     }
   }
 
@@ -957,6 +985,10 @@
       jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
     }
 
+    long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(
+        taskAttempt.getDurationNs());
+    jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);
+
     return jce;
   }
 
@@ -1032,6 +1064,14 @@
 //    */
 //  }
 
+  /**
+   * Records the launch time of the task.
+   */
+  private void setLaunchTime() {
+    launchTime = clock.getTime();
+    launchTimeNs = System.nanoTime();
+  }
+
   private void updateProgressSplits() {
 //    double newProgress = reportedStatus.progress;
 //    newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
@@ -1215,6 +1255,7 @@
             ta.recoveryData.getTaskAttemptStartedEvent();
         if (taStartedEvent != null) {
           ta.launchTime = taStartedEvent.getStartTime();
+          ta.isRecoveredDuration = true;
           TaskAttemptFinishedEvent taFinishedEvent =
               ta.recoveryData.getTaskAttemptFinishedEvent();
           if (taFinishedEvent == null) {
@@ -1383,6 +1424,7 @@
             .getTaskAttemptState(), helper.getFailureType(event));
       } else {
         ta.finishTime = ta.recoveryData.getTaskAttemptFinishedEvent().getFinishTime();
+        ta.isRecoveredDuration = true;
       }
 
       if (event instanceof RecoveryEvent) {
@@ -1419,7 +1461,7 @@
           .getNetworkLocation());
       ta.lastNotifyProgressTimestamp = ta.clock.getTime();
 
-      ta.launchTime = ta.clock.getTime();
+      ta.setLaunchTime();
 
       // TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr = NetUtils
@@ -1630,6 +1672,7 @@
           ta.sendEvent(new VertexEventRouteEvent(ta.getVertexID(), tezEvents));
         }
         ta.finishTime = taFinishedEvent.getFinishTime();
+        ta.isRecoveredDuration = true;
       } else {
         ta.setFinishTime();
         // Send out history event.
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 0ac916f..87ebb7b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.hadoop.shim.HadoopShim;
@@ -57,6 +58,7 @@
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.RunnableWithNdc;
 import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -160,6 +162,8 @@
   private final boolean initializeProcessorFirst;
   private final boolean initializeProcessorIOSerially;
   private final TezExecutors sharedExecutor;
+  /** nanoTime of the task initialization start. */
+  private Long initStartTimeNs = null;
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
@@ -229,6 +233,9 @@
   public void initialize() throws Exception {
     Preconditions.checkState(this.state.get() == State.NEW, "Already initialized");
     this.state.set(State.INITED);
+    if (this.tezCounters != null) {
+      this.initStartTimeNs = System.nanoTime();
+    }
 
     this.processorContext = createProcessorContext();
     this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);
@@ -1077,4 +1084,15 @@
   public Configuration getTaskConf() {
     return tezConf;
   }
+
+  @Override
+  public void setFrameworkCounters() {
+    super.setFrameworkCounters();
+    if (tezCounters != null && isUpdatingSystemCounters()) {
+      long timeNs = initStartTimeNs == null ? 0
+          : (System.nanoTime() - initStartTimeNs);
+      tezCounters.findCounter(TaskCounter.WALL_CLOCK_MILLISECONDS)
+          .setValue(TimeUnit.NANOSECONDS.toMillis(timeNs));
+    }
+  }
 }
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 7b86d4b..a53d0d2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -178,4 +178,8 @@
   }
 
   public abstract void abortTask();
+
+  protected final boolean isUpdatingSystemCounters() {
+    return counterUpdater != null;
+  }
 }
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index 07b9d33..6c25f0a 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -675,8 +675,8 @@
 
     // If Target <=0, assert counter count is exactly 0
     if (minTaskCounterCount <= 0) {
-      assertEquals(0, numTaskCounters);
-      assertEquals(0, numFsCounters);
+      assertEquals(tezCounters.toString(), 0, numTaskCounters);
+      assertEquals(tezCounters.toString(), 0, numFsCounters);
     } else {
       assertTrue(numTaskCounters >= minTaskCounterCount);
       assertTrue(numFsCounters >= minFsCounterCount);