MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to TraceBuilder's output. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1132529 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 99cef35..b410256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,9 @@
 
   IMPROVEMENTS
 
+    MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to 
+    TraceBuilder's output. (amarrk)
+
     MAPREDUCE-2554. [Gridmix]  Add distributed cache emulation system tests 
     to Gridmix. (Vinay Kumar Thota via amarrk)
  
diff --git a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
index 8a1be6b..c57875a 100644
--- a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
+++ b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
@@ -33,11 +33,13 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
@@ -230,7 +232,7 @@
       parser = new Hadoop20JHParser(ris);
       ArrayList<String> seenEvents = new ArrayList<String>(150);
 
-      getHistoryEvents(parser, seenEvents); // get events into seenEvents
+      getHistoryEvents(parser, seenEvents, null); // get events into seenEvents
 
       // Validate the events seen by history parser from
       // history file v20-single-input-log.gz
@@ -512,7 +514,11 @@
       // Test if the JobHistoryParserFactory can detect the parser correctly
       parser = JobHistoryParserFactory.getParser(ris);
 
-      getHistoryEvents(parser, seenEvents); // get events into seenEvents
+      // create a job builder
+      JobBuilder builder = new JobBuilder(id.toString());
+
+      // get events into seenEvents and also process them using builder
+      getHistoryEvents(parser, seenEvents, builder); 
 
       // Check against the gold standard
       System.out.println("testCurrentJHParser validating using gold std ");
@@ -523,6 +529,26 @@
       };
 
       validateSeenHistoryEvents(seenEvents, goldLinesExpected);
+      
+      // validate resource usage metrics
+      //  get the job counters
+      Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters();
+      
+      //  get the logged job
+      LoggedJob loggedJob = builder.build();
+      //  get the logged attempts
+      LoggedTaskAttempt attempt = 
+        loggedJob.getMapTasks().get(0).getAttempts().get(0);
+      //  get the resource usage metrics
+      ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics();
+      
+      //  check with the actual values
+      testResourceUsageMetricViaDeepCompare(metrics, 
+          counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue(), 
+          counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue(),
+          counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(),
+          counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(),
+          true);
     } finally {
       // stop the MR cluster
       mrCluster.shutdown();
@@ -687,6 +713,141 @@
     }
 
 
+  /**
+   * Test {@link ResourceUsageMetrics}.
+   */
+  @Test
+  public void testResourceUsageMetrics() throws Exception {
+    final long cpuUsage = 100;
+    final long pMemUsage = 200;
+    final long vMemUsage = 300;
+    final long heapUsage = 400;
+    
+    // test ResourceUsageMetrics's setters
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+    metrics.setCumulativeCpuUsage(cpuUsage);
+    metrics.setPhysicalMemoryUsage(pMemUsage);
+    metrics.setVirtualMemoryUsage(vMemUsage);
+    metrics.setHeapUsage(heapUsage);
+    // test cpu usage value
+    assertEquals("Cpu usage values mismatch via set", cpuUsage, 
+                 metrics.getCumulativeCpuUsage());
+    // test pMem usage value
+    assertEquals("Physical memory usage values mismatch via set", pMemUsage, 
+                 metrics.getPhysicalMemoryUsage());
+    // test vMem usage value
+    assertEquals("Virtual memory usage values mismatch via set", vMemUsage, 
+                 metrics.getVirtualMemoryUsage());
+    // test heap usage value
+    assertEquals("Heap usage values mismatch via set", heapUsage, 
+                 metrics.getHeapUsage());
+    
+    // test deepCompare() (pass case)
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 
+                                          pMemUsage, heapUsage, true);
+    
+    // test deepCompare (fail case)
+    // test cpu usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, 0, vMemUsage, pMemUsage, 
+                                          heapUsage, false);
+    // test pMem usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 0, 
+                                          heapUsage, false);
+    // test vMem usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, 0, pMemUsage, 
+                                          heapUsage, false);
+    // test heap usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 
+                                          pMemUsage, 0, false);
+    
+    // define a metric with a fixed value of size()
+    ResourceUsageMetrics metrics2 = new ResourceUsageMetrics() {
+      @Override
+      public int size() {
+        return -1;
+      }
+    };
+    metrics2.setCumulativeCpuUsage(cpuUsage);
+    metrics2.setPhysicalMemoryUsage(pMemUsage);
+    metrics2.setVirtualMemoryUsage(vMemUsage);
+    metrics2.setHeapUsage(heapUsage);
+    
+    // test with size mismatch
+    testResourceUsageMetricViaDeepCompare(metrics2, cpuUsage, vMemUsage, 
+                                          pMemUsage, heapUsage, false);
+  }
+  
+  // test ResourceUsageMetric's deepCompare() method
+  private static void testResourceUsageMetricViaDeepCompare(
+                        ResourceUsageMetrics metrics, long cpuUsage, 
+                        long vMemUsage, long pMemUsage, long heapUsage,
+                        boolean shouldPass) {
+    ResourceUsageMetrics testMetrics = new ResourceUsageMetrics();
+    testMetrics.setCumulativeCpuUsage(cpuUsage);
+    testMetrics.setPhysicalMemoryUsage(pMemUsage);
+    testMetrics.setVirtualMemoryUsage(vMemUsage);
+    testMetrics.setHeapUsage(heapUsage);
+    
+    Boolean passed = null;
+    try {
+      metrics.deepCompare(testMetrics, new TreePath(null, "<root>"));
+      passed = true;
+    } catch (DeepInequalityException die) {
+      passed = false;
+    }
+    
+    assertEquals("ResourceUsageMetrics deepCompare() failed!", 
+                 shouldPass, passed);
+  }
+  
+  /**
+   * Testing {@link ResourceUsageMetrics} using {@link HadoopLogsAnalyzer}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testResourceUsageMetricsWithHadoopLogsAnalyzer() 
+  throws IOException {
+    Configuration conf = new Configuration();
+    // get the input trace file
+    Path rootInputDir =
+      new Path(System.getProperty("test.tools.input.dir", ""));
+    Path rootInputSubFolder = new Path(rootInputDir, "rumen/small-trace-test");
+    Path traceFile = new Path(rootInputSubFolder, "v20-resource-usage-log.gz");
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the root test directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp"));
+
+    // define output directory
+    Path outputDir = 
+      new Path(rootTempDir, "testResourceUsageMetricsWithHadoopLogsAnalyzer");
+    lfs.delete(outputDir, true);
+    lfs.deleteOnExit(outputDir);
+    
+    // run HadoopLogsAnalyzer
+    HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+    analyzer.setConf(conf);
+    Path traceOutput = new Path(outputDir, "trace.json");
+    analyzer.run(new String[] {"-write-job-trace", traceOutput.toString(), 
+                               "-v1", traceFile.toString()});
+    
+    // test HadoopLogsAnalyzer's output w.r.t ResourceUsageMetrics
+    //  get the logged job
+    JsonObjectMapperParser<LoggedJob> traceParser =
+      new JsonObjectMapperParser<LoggedJob>(traceOutput, LoggedJob.class, 
+                                            conf);
+    
+    //  get the logged job from the output trace file
+    LoggedJob job = traceParser.getNext();
+    LoggedTaskAttempt attempt = job.getMapTasks().get(0).getAttempts().get(0);
+    ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics();
+    
+    //  test via deepCompare()
+    testResourceUsageMetricViaDeepCompare(metrics, 200, 100, 75, 50, true);
+  }
+  
   @Test
   public void testTopologyBuilder() throws Exception {
     final TopologyBuilder subject = new TopologyBuilder();
@@ -795,12 +956,15 @@
    * @throws IOException
    */
   private void getHistoryEvents(JobHistoryParser parser,
-      ArrayList<String> events) throws IOException {
+      ArrayList<String> events, JobBuilder builder) throws IOException {
     HistoryEvent e;
     while ((e = parser.nextEvent()) != null) {
       String eventString = e.getClass().getSimpleName();
       System.out.println(eventString);
       events.add(eventString);
+      if (builder != null) {
+        builder.process(e);
+      }
     }
   }
 
diff --git a/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz b/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz
new file mode 100644
index 0000000..6d0dbeb
--- /dev/null
+++ b/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz
Binary files differ
diff --git a/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java b/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
index c393147..b8044ec 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
@@ -1208,6 +1208,38 @@
         attempt.spilledRecords = val;
       }
     }, counterString, "SPILLED_RECORDS");
+    
+    // incorporate CPU usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setCumulativeCpuUsage(val);
+      }
+    }, counterString, "CPU_MILLISECONDS");
+    
+    // incorporate virtual memory usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setVirtualMemoryUsage(val);
+      }
+    }, counterString, "VIRTUAL_MEMORY_BYTES");
+    
+    // incorporate physical memory usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setPhysicalMemoryUsage(val);
+      }
+    }, counterString, "PHYSICAL_MEMORY_BYTES");
+    
+    // incorporate heap usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setHeapUsage(val);
+      }
+    }, counterString, "COMMITTED_HEAP_BYTES");
   }
 
   private ParsedHost getAndRecordParsedHost(String hostName) {
diff --git a/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java b/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
index 18b26ca..0df30e6 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
@@ -28,7 +28,6 @@
 //                the Jackson implementation of JSON doesn't handle a 
 //                superclass-valued field.
 
-import org.apache.hadoop.mapreduce.jobhistory.Events;
 import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
 import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
 import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
@@ -69,6 +68,9 @@
 
   LoggedLocation location;
 
+  // Initialize to default object for backward compatibility
+  ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+  
   LoggedTaskAttempt() {
     super();
   }
@@ -354,8 +356,50 @@
         attempt.spilledRecords = val;
       }
     }, counters, "SPILLED_RECORDS");
+    
+    // incorporate CPU usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setCumulativeCpuUsage(val);
+      }
+    }, counters, "CPU_MILLISECONDS");
+    
+    // incorporate virtual memory usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setVirtualMemoryUsage(val);
+      }
+    }, counters, "VIRTUAL_MEMORY_BYTES");
+    
+    // incorporate physical memory usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setPhysicalMemoryUsage(val);
+      }
+    }, counters, "PHYSICAL_MEMORY_BYTES");
+    
+    // incorporate heap usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setHeapUsage(val);
+      }
+    }, counters, "COMMITTED_HEAP_BYTES");
   }
 
+  // Get the resource usage metrics
+  public ResourceUsageMetrics getResourceUsageMetrics() {
+    return metrics;
+  }
+  
+  // Set the resource usage metrics
+  void setResourceUsageMetrics(ResourceUsageMetrics metrics) {
+    this.metrics = metrics;
+  }
+  
   private static String canonicalizeCounterName(String nonCanonicalName) {
     String result = nonCanonicalName.toLowerCase();
 
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java b/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
new file mode 100644
index 0000000..a0944c8
--- /dev/null
+++ b/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Captures the resource usage metrics.
+ */
+public class ResourceUsageMetrics implements Writable, DeepCompare  {
+  private long cumulativeCpuUsage;
+  private long virtualMemoryUsage;
+  private long physicalMemoryUsage;
+  private long heapUsage;
+  
+  public ResourceUsageMetrics() {
+  }
+  
+  /**
+   * Get the cumulative CPU usage.
+   */
+  public long getCumulativeCpuUsage() {
+    return cumulativeCpuUsage;
+  }
+  
+  /**
+   * Set the cumulative CPU usage.
+   */
+  public void setCumulativeCpuUsage(long usage) {
+    cumulativeCpuUsage = usage;
+  }
+  
+  /**
+   * Get the virtual memory usage.
+   */
+  public long getVirtualMemoryUsage() {
+    return virtualMemoryUsage;
+  }
+  
+  /**
+   * Set the virtual memory usage.
+   */
+  public void setVirtualMemoryUsage(long usage) {
+    virtualMemoryUsage = usage;
+  }
+  
+  /**
+   * Get the physical memory usage.
+   */
+  public long getPhysicalMemoryUsage() {
+    return physicalMemoryUsage;
+  }
+  
+  /**
+   * Set the physical memory usage.
+   */
+  public void setPhysicalMemoryUsage(long usage) {
+    physicalMemoryUsage = usage;
+  }
+  
+  /**
+   * Get the total heap usage.
+   */
+  public long getHeapUsage() {
+    return heapUsage;
+  }
+  
+  /**
+   * Set the total heap usage.
+   */
+  public void setHeapUsage(long usage) {
+    heapUsage = usage;
+  }
+  
+  /**
+   * Returns the size of the serialized data
+   */
+  public int size() {
+    int size = 0;
+    size += WritableUtils.getVIntSize(cumulativeCpuUsage);   // long #1
+    size += WritableUtils.getVIntSize(virtualMemoryUsage);   // long #2
+    size += WritableUtils.getVIntSize(physicalMemoryUsage);  // long #3
+    size += WritableUtils.getVIntSize(heapUsage);            // long #4
+    return size;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    cumulativeCpuUsage = WritableUtils.readVLong(in);  // long #1
+    virtualMemoryUsage = WritableUtils.readVLong(in);  // long #2
+    physicalMemoryUsage = WritableUtils.readVLong(in); // long #3
+    heapUsage = WritableUtils.readVLong(in);           // long #4
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    //TODO Write resources version no too
+    WritableUtils.writeVLong(out, cumulativeCpuUsage);  // long #1
+    WritableUtils.writeVLong(out, virtualMemoryUsage);  // long #2
+    WritableUtils.writeVLong(out, physicalMemoryUsage); // long #3
+    WritableUtils.writeVLong(out, heapUsage);           // long #4
+  }
+
+  private static void compareMetric(long m1, long m2, TreePath loc) 
+  throws DeepInequalityException {
+    if (m1 != m2) {
+      throw new DeepInequalityException("Value miscompared:" + loc.toString(), 
+                                        loc);
+    }
+  }
+  
+  private static void compareSize(ResourceUsageMetrics m1, 
+                                  ResourceUsageMetrics m2, TreePath loc) 
+  throws DeepInequalityException {
+    if (m1.size() != m2.size()) {
+      throw new DeepInequalityException("Size miscompared: " + loc.toString(), 
+                                        loc);
+    }
+  }
+  
+  @Override
+  public void deepCompare(DeepCompare other, TreePath loc)
+      throws DeepInequalityException {
+    if (!(other instanceof ResourceUsageMetrics)) {
+      throw new DeepInequalityException("Comparand has wrong type", loc);
+    }
+
+    ResourceUsageMetrics metrics2 = (ResourceUsageMetrics) other;
+    compareMetric(getCumulativeCpuUsage(), metrics2.getCumulativeCpuUsage(), 
+                  new TreePath(loc, "cumulativeCpu"));
+    compareMetric(getVirtualMemoryUsage(), metrics2.getVirtualMemoryUsage(), 
+                  new TreePath(loc, "virtualMemory"));
+    compareMetric(getPhysicalMemoryUsage(), metrics2.getPhysicalMemoryUsage(), 
+                  new TreePath(loc, "physicalMemory"));
+    compareMetric(getHeapUsage(), metrics2.getHeapUsage(), 
+                  new TreePath(loc, "heapUsage"));
+    compareSize(this, metrics2, new TreePath(loc, "size"));
+  }
+}
+
diff --git a/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java b/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
index 4cc6816..9aa6373 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
@@ -23,14 +23,22 @@
   private final long bytesOut;
   private final int recsOut;
   private final long maxMemory;
+  private final ResourceUsageMetrics metrics;
 
   public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
       long maxMemory) {
+    this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 
+         new ResourceUsageMetrics());
+  }
+  
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+                  long maxMemory, ResourceUsageMetrics metrics) {
     this.bytesIn = bytesIn;
     this.recsIn = recsIn;
     this.bytesOut = bytesOut;
     this.recsOut = recsOut;
     this.maxMemory = maxMemory;
+    this.metrics = metrics;
   }
 
   /**
@@ -70,4 +78,10 @@
     return maxMemory;
   }
 
+  /**
+   * @return Resource usage metrics
+   */
+  public ResourceUsageMetrics getResourceUsageMetrics() {
+    return metrics;
+  }
 }
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
index 7c2160e..838995d 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
@@ -636,6 +636,7 @@
     long outputBytes = -1;
     long outputRecords = -1;
     long heapMegabytes = -1;
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
 
     Values type = loggedTask.getTaskType();
     if ((type != Values.MAP) && (type != Values.REDUCE)) {
@@ -670,12 +671,15 @@
             (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
                 .getHeapMegabytes();
       }
+      // set the resource usage metrics
+      metrics = attempt.getResourceUsageMetrics();
       break;
     }
 
     TaskInfo taskInfo =
         new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
-            (int) outputRecords, (int) heapMegabytes);
+            (int) outputRecords, (int) heapMegabytes,
+            metrics);
     return taskInfo;
   }