MAPREDUCE-220. Collect cpu and memory statistics per task. Contributed by Scott Chen.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@987585 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 13cccee..75e2fb8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,9 @@
     MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.
     (Joshua Harlow via shv)
 
+    MAPREDUCE-220. Collect cpu and memory statistics per task. (Scott Chen via
+    acmurthy)
+
   IMPROVEMENTS
 
     MAPREDUCE-1546. Redirect all job pages to corresponding history page
diff --git a/src/java/org/apache/hadoop/mapred/Task.java b/src/java/org/apache/hadoop/mapred/Task.java
index 0a350c6..18367a7 100644
--- a/src/java/org/apache/hadoop/mapred/Task.java
+++ b/src/java/org/apache/hadoop/mapred/Task.java
@@ -59,6 +59,8 @@
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -77,7 +79,6 @@
     LogFactory.getLog(Task.class);
 
   public static String MERGED_OUTPUT_PREFIX = ".merged";
-  
 
   /**
    * Counters to measure the usage of the different file systems.
@@ -133,7 +134,10 @@
   private volatile long currentRecStartIndex; 
   private Iterator<Long> currentRecIndexIterator = 
     skipRanges.skipRangeIterator();
-  
+
+  private ResourceCalculatorPlugin resourceCalculator = null;
+  private long initCpuCumulativeTime = 0;
+
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
@@ -503,6 +507,16 @@
       }
     }
     committer.setupTask(taskContext);
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+            null, ResourceCalculatorPlugin.class);
+    resourceCalculator = ResourceCalculatorPlugin
+            .getResourceCalculatorPlugin(clazz, conf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator);
+    if (resourceCalculator != null) {
+      initCpuCumulativeTime =
+        resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
+    }
   }
   
   @InterfaceAudience.Private
@@ -701,6 +715,24 @@
   }
 
   /**
+   * Update resource information counters
+   */
+  void updateResourceCounters() {
+    if (resourceCalculator == null) {
+      return;
+    }
+    ProcResourceValues res = resourceCalculator.getProcResourceValues();
+    long cpuTime = res.getCumulativeCpuTime();
+    long pMem = res.getPhysicalMemorySize();
+    long vMem = res.getVirtualMemorySize();
+    // Remove the CPU time consumed previously by JVM reuse
+    cpuTime -= initCpuCumulativeTime;
+    counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
+    counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+    counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+  }
+
+  /**
    * An updater that tracks the amount of time this task has spent in GC.
    */
   class GcTimeUpdater {
@@ -799,6 +831,7 @@
     }
 
     gcUpdater.incrementGcCounter();
+    updateResourceCounters();
   }
 
   public void done(TaskUmbilicalProtocol umbilical,
diff --git a/src/java/org/apache/hadoop/mapreduce/TaskCounter.java b/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
index 74d103e..b6362e8 100644
--- a/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
+++ b/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
@@ -42,5 +42,8 @@
   SHUFFLED_MAPS, 
   FAILED_SHUFFLE,
   MERGED_MAP_OUTPUTS,
-  GC_TIME_MILLIS
+  GC_TIME_MILLIS,
+  CPU_MILLISECONDS,
+  PHYSICAL_MEMORY_BYTES,
+  VIRTUAL_MEMORY_BYTES
 }
diff --git a/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java b/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java
index 91663f3..62afe1d 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java
@@ -90,6 +90,7 @@
   private float cpuUsage = TaskTrackerStatus.UNAVAILABLE;
   private long sampleTime = TaskTrackerStatus.UNAVAILABLE;
   private long lastSampleTime = TaskTrackerStatus.UNAVAILABLE;
+  private ProcfsBasedProcessTree pTree = null;
 
   boolean readMemInfoFile = false;
   boolean readCpuInfoFile = false;
@@ -107,7 +108,8 @@
     procfsCpuFile = PROCFS_CPUINFO;
     procfsStatFile = PROCFS_STAT;
     jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
-    
+    String pid = System.getenv().get("JVM_PID");
+    pTree = new ProcfsBasedProcessTree(pid);
   }
   
   /**
@@ -126,6 +128,8 @@
     this.procfsCpuFile = procfsCpuFile;
     this.procfsStatFile = procfsStatFile;
     this.jiffyLengthInMillis = jiffyLengthInMillis;
+    String pid = System.getenv().get("JVM_PID");
+    pTree = new ProcfsBasedProcessTree(pid);
   }
 
   /**
@@ -395,4 +399,13 @@
     }
     System.out.println("CPU usage % : " + plugin.getCpuUsage());
   }
-}
\ No newline at end of file
+
+  @Override
+  public ProcResourceValues getProcResourceValues() {
+    pTree = pTree.getProcessTree();
+    long cpuTime = pTree.getCumulativeCpuTime();
+    long pMem = pTree.getCumulativeRssmem();
+    long vMem = pTree.getCumulativeVmem();
+    return new ProcResourceValues(cpuTime, pMem, vMem);
+  }
+}
diff --git a/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java b/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java
index 07f4823..541773c 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java
@@ -91,6 +91,48 @@
   public abstract float getCpuUsage();
 
   /**
+   * Obtain resource status used by current process tree.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public abstract ProcResourceValues getProcResourceValues();
+
+  public class ProcResourceValues {
+    private final long cumulativeCpuTime;
+    private final long physicalMemorySize;
+    private final long virtualMemorySize;
+    public ProcResourceValues(long cumulativeCpuTime, long physicalMemorySize,
+                              long virtualMemorySize) {
+      this.cumulativeCpuTime = cumulativeCpuTime;
+      this.physicalMemorySize = physicalMemorySize;
+      this.virtualMemorySize = virtualMemorySize;
+    }
+    /**
+     * Obtain the physical memory size used by current process tree.
+     * @return physical memory size in bytes.
+     */
+    public long getPhysicalMemorySize() {
+      return physicalMemorySize;
+    }
+
+    /**
+     * Obtain the virtual memory size used by a current process tree.
+     * @return virtual memory size in bytes.
+     */
+    public long getVirtualMemorySize() {
+      return virtualMemorySize;
+    }
+
+    /**
+     * Obtain the cumulative CPU time used by a current process tree.
+     * @return cumulative CPU time in milliseconds
+     */
+    public long getCumulativeCpuTime() {
+      return cumulativeCpuTime;
+    }
+  }
+
+  /**
    * Get the ResourceCalculatorPlugin from the class name and configure it. If
    * class name is null, this method will try and return a memory calculator
    * plugin available for this system.
@@ -120,4 +162,4 @@
     // Not supported on this system.
     return null;
   }
-}
\ No newline at end of file
+}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java b/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java
index 0354d5d..5fd3f9f 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java
@@ -50,6 +50,15 @@
   /** CPU usage percentage for testing */
   public static final String CPU_USAGE =
       "mapred.tasktracker.cpuusage.testing";
+  /** process cumulative CPU usage time for testing */
+  public static final String PROC_CUMULATIVE_CPU_TIME =
+      "mapred.tasktracker.proccumulativecputime.testing";
+  /** process pmem for testing*/
+  public static final String PROC_PMEM_TESTING_PROPERTY =
+      "mapred.tasktracker.procpmem.testing";
+  /** process vmem for testing*/
+  public static final String PROC_VMEM_TESTING_PROPERTY =
+      "mapred.tasktracker.procvmem.testing";
 
   /** {@inheritDoc} */
   @Override
@@ -98,4 +107,12 @@
   public float getCpuUsage() {
     return getConf().getFloat(CPU_USAGE, -1);
   }
-}
\ No newline at end of file
+
+  @Override
+  public ProcResourceValues getProcResourceValues() {
+    long cpuTime = getConf().getLong(PROC_CUMULATIVE_CPU_TIME, -1);
+    long pMem = getConf().getLong(PROC_PMEM_TESTING_PROPERTY, -1);
+    long vMem = getConf().getLong(PROC_VMEM_TESTING_PROPERTY, -1);
+    return new ProcResourceValues(cpuTime, pMem, vMem);
+  }
+}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java b/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
index 849fc3a..821d4c6 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.util.ToolRunner;
@@ -154,9 +155,46 @@
               + reportedNumProcessors + ","
               + reportedCpuUsage
                + ")";
-      hasPassed = true;
-      hasDynamicValuePassed = true;
       LOG.info(message);
+      hasDynamicValuePassed = true;
+      // Check task resource status in task reports
+      for (TaskStatus taskStatus : status.getTaskReports()) {
+        Counters counters = taskStatus.getCounters();
+        // This should be zero because the initial CPU time is subtracted.
+        long procCumulativeCpuTime = 0;
+        long procVirtualMemorySize =
+          getConf().getLong("procVirtualMemorySize", -1);
+        long procPhysicalMemorySize =
+          getConf().getLong("procPhysicalMemorySize", -1);
+        long reportedProcCumulativeCpuTime =
+          counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue();
+        long reportedProcVirtualMemorySize =
+          counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue();
+        long reportedProcPhysicalMemorySize =
+          counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue();
+        String procMessage =
+          "expected values : "
+              + "(procCumulativeCpuTime, procVirtualMemorySize,"
+              + " procPhysicalMemorySize) = ("
+              + procCumulativeCpuTime + ", "
+              + procVirtualMemorySize + ", "
+              + procPhysicalMemorySize + ")";
+        procMessage +=
+          "\nreported values : "
+              + "(procCumulativeCpuTime, procVirtualMemorySize,"
+              + " procPhysicalMemorySize) = ("
+              + reportedProcCumulativeCpuTime + ", "
+              + reportedProcVirtualMemorySize + ", "
+              + reportedProcPhysicalMemorySize + ")";
+        LOG.info(procMessage);
+        message += "\n" + procMessage;
+        if (procCumulativeCpuTime != reportedProcCumulativeCpuTime ||
+            procVirtualMemorySize != reportedProcVirtualMemorySize ||
+            procPhysicalMemorySize != reportedProcPhysicalMemorySize) {
+          hasDynamicValuePassed = false;
+        }
+      }
+      hasPassed = true;
       if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
           || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
           || mapSlotMemorySize != reportedMapSlotMemorySize
@@ -192,7 +230,11 @@
           org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,       
           DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
       setUpCluster(conf);
-      runSleepJob(miniMRCluster.createJobConf());
+      JobConf jobConf = miniMRCluster.createJobConf();
+      jobConf.setClass(
+          org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+          DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
+      runSleepJob(jobConf);
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -218,6 +260,9 @@
     conf.setLong("cpuFrequency", 2000000L);
     conf.setInt("numProcessors", 8);
     conf.setFloat("cpuUsage", 15.5F);
+    conf.setLong("procCumulativeCpuTime", 1000L);
+    conf.setLong("procVirtualMemorySize", 2 * 1024 * 1024 * 1024L);
+    conf.setLong("procPhysicalMemorySize", 1024 * 1024 * 1024L);
 
     conf.setClass(
         org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,       
@@ -232,12 +277,19 @@
     conf.setLong(DummyResourceCalculatorPlugin.CPU_FREQUENCY, 2000000L);
     conf.setInt(DummyResourceCalculatorPlugin.NUM_PROCESSORS, 8);
     conf.setFloat(DummyResourceCalculatorPlugin.CPU_USAGE, 15.5F);
-    
     try {
       setUpCluster(conf);
       JobConf jobConf = miniMRCluster.createJobConf();
       jobConf.setMemoryForMapTask(1 * 1024L);
       jobConf.setMemoryForReduceTask(2 * 1024L);
+      jobConf.setClass(
+        org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+        DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
+      jobConf.setLong(DummyResourceCalculatorPlugin.PROC_CUMULATIVE_CPU_TIME, 1000L);
+      jobConf.setLong(DummyResourceCalculatorPlugin.PROC_VMEM_TESTING_PROPERTY,
+                      2 * 1024 * 1024 * 1024L);
+      jobConf.setLong(DummyResourceCalculatorPlugin.PROC_PMEM_TESTING_PROPERTY,
+                      1024 * 1024 * 1024L);
       runSleepJob(jobConf);
       verifyTestResults();
     } finally {