MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to TraceBuilder's output. (amarrk)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1132529 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 99cef35..b410256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,9 @@
IMPROVEMENTS
+ MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to
+ TraceBuilder's output. (amarrk)
+
MAPREDUCE-2554. [Gridmix] Add distributed cache emulation system tests
to Gridmix. (Vinay Kumar Thota via amarrk)
diff --git a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
index 8a1be6b..c57875a 100644
--- a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
+++ b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
@@ -33,11 +33,13 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
@@ -230,7 +232,7 @@
parser = new Hadoop20JHParser(ris);
ArrayList<String> seenEvents = new ArrayList<String>(150);
- getHistoryEvents(parser, seenEvents); // get events into seenEvents
+ getHistoryEvents(parser, seenEvents, null); // get events into seenEvents
// Validate the events seen by history parser from
// history file v20-single-input-log.gz
@@ -512,7 +514,11 @@
// Test if the JobHistoryParserFactory can detect the parser correctly
parser = JobHistoryParserFactory.getParser(ris);
- getHistoryEvents(parser, seenEvents); // get events into seenEvents
+ // create a job builder
+ JobBuilder builder = new JobBuilder(id.toString());
+
+ // get events into seenEvents and also process them using builder
+ getHistoryEvents(parser, seenEvents, builder);
// Check against the gold standard
System.out.println("testCurrentJHParser validating using gold std ");
@@ -523,6 +529,26 @@
};
validateSeenHistoryEvents(seenEvents, goldLinesExpected);
+
+ // validate resource usage metrics
+ // get the job counters
+ Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters();
+
+ // get the logged job
+ LoggedJob loggedJob = builder.build();
+ // get the logged attempts
+ LoggedTaskAttempt attempt =
+ loggedJob.getMapTasks().get(0).getAttempts().get(0);
+ // get the resource usage metrics
+ ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics();
+
+ // check with the actual values
+ testResourceUsageMetricViaDeepCompare(metrics,
+ counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue(),
+ counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue(),
+ counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(),
+ counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(),
+ true);
} finally {
// stop the MR cluster
mrCluster.shutdown();
@@ -687,6 +713,141 @@
}
+ /**
+ * Test {@link ResourceUsageMetrics}.
+ */
+ @Test
+ public void testResourceUsageMetrics() throws Exception {
+ final long cpuUsage = 100;
+ final long pMemUsage = 200;
+ final long vMemUsage = 300;
+ final long heapUsage = 400;
+
+ // test ResourceUsageMetrics's setters
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+ metrics.setCumulativeCpuUsage(cpuUsage);
+ metrics.setPhysicalMemoryUsage(pMemUsage);
+ metrics.setVirtualMemoryUsage(vMemUsage);
+ metrics.setHeapUsage(heapUsage);
+ // test cpu usage value
+ assertEquals("Cpu usage values mismatch via set", cpuUsage,
+ metrics.getCumulativeCpuUsage());
+ // test pMem usage value
+ assertEquals("Physical memory usage values mismatch via set", pMemUsage,
+ metrics.getPhysicalMemoryUsage());
+ // test vMem usage value
+ assertEquals("Virtual memory usage values mismatch via set", vMemUsage,
+ metrics.getVirtualMemoryUsage());
+ // test heap usage value
+ assertEquals("Heap usage values mismatch via set", heapUsage,
+ metrics.getHeapUsage());
+
+ // test deepCompare() (pass case)
+ testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage,
+ pMemUsage, heapUsage, true);
+
+ // test deepCompare (fail case)
+ // test cpu usage mismatch
+ testResourceUsageMetricViaDeepCompare(metrics, 0, vMemUsage, pMemUsage,
+ heapUsage, false);
+ // test pMem usage mismatch
+ testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 0,
+ heapUsage, false);
+ // test vMem usage mismatch
+ testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, 0, pMemUsage,
+ heapUsage, false);
+ // test heap usage mismatch
+ testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage,
+ pMemUsage, 0, false);
+
+ // define a metric with a fixed value of size()
+ ResourceUsageMetrics metrics2 = new ResourceUsageMetrics() {
+ @Override
+ public int size() {
+ return -1;
+ }
+ };
+ metrics2.setCumulativeCpuUsage(cpuUsage);
+ metrics2.setPhysicalMemoryUsage(pMemUsage);
+ metrics2.setVirtualMemoryUsage(vMemUsage);
+ metrics2.setHeapUsage(heapUsage);
+
+ // test with size mismatch
+ testResourceUsageMetricViaDeepCompare(metrics2, cpuUsage, vMemUsage,
+ pMemUsage, heapUsage, false);
+ }
+
+ // test ResourceUsageMetric's deepCompare() method
+ private static void testResourceUsageMetricViaDeepCompare(
+ ResourceUsageMetrics metrics, long cpuUsage,
+ long vMemUsage, long pMemUsage, long heapUsage,
+ boolean shouldPass) {
+ ResourceUsageMetrics testMetrics = new ResourceUsageMetrics();
+ testMetrics.setCumulativeCpuUsage(cpuUsage);
+ testMetrics.setPhysicalMemoryUsage(pMemUsage);
+ testMetrics.setVirtualMemoryUsage(vMemUsage);
+ testMetrics.setHeapUsage(heapUsage);
+
+ Boolean passed = null;
+ try {
+ metrics.deepCompare(testMetrics, new TreePath(null, "<root>"));
+ passed = true;
+ } catch (DeepInequalityException die) {
+ passed = false;
+ }
+
+ assertEquals("ResourceUsageMetrics deepCompare() failed!",
+ shouldPass, passed);
+ }
+
+ /**
+ * Testing {@link ResourceUsageMetrics} using {@link HadoopLogsAnalyzer}.
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testResourceUsageMetricsWithHadoopLogsAnalyzer()
+ throws IOException {
+ Configuration conf = new Configuration();
+ // get the input trace file
+ Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""));
+ Path rootInputSubFolder = new Path(rootInputDir, "rumen/small-trace-test");
+ Path traceFile = new Path(rootInputSubFolder, "v20-resource-usage-log.gz");
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+
+ // define the root test directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"));
+
+ // define output directory
+ Path outputDir =
+ new Path(rootTempDir, "testResourceUsageMetricsWithHadoopLogsAnalyzer");
+ lfs.delete(outputDir, true);
+ lfs.deleteOnExit(outputDir);
+
+ // run HadoopLogsAnalyzer
+ HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+ analyzer.setConf(conf);
+ Path traceOutput = new Path(outputDir, "trace.json");
+ analyzer.run(new String[] {"-write-job-trace", traceOutput.toString(),
+ "-v1", traceFile.toString()});
+
+ // test HadoopLogsAnalyzer's output w.r.t ResourceUsageMetrics
+ // get the logged job
+ JsonObjectMapperParser<LoggedJob> traceParser =
+ new JsonObjectMapperParser<LoggedJob>(traceOutput, LoggedJob.class,
+ conf);
+
+ // get the logged job from the output trace file
+ LoggedJob job = traceParser.getNext();
+ LoggedTaskAttempt attempt = job.getMapTasks().get(0).getAttempts().get(0);
+ ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics();
+
+ // test via deepCompare()
+ testResourceUsageMetricViaDeepCompare(metrics, 200, 100, 75, 50, true);
+ }
+
@Test
public void testTopologyBuilder() throws Exception {
final TopologyBuilder subject = new TopologyBuilder();
@@ -795,12 +956,15 @@
* @throws IOException
*/
private void getHistoryEvents(JobHistoryParser parser,
- ArrayList<String> events) throws IOException {
+ ArrayList<String> events, JobBuilder builder) throws IOException {
HistoryEvent e;
while ((e = parser.nextEvent()) != null) {
String eventString = e.getClass().getSimpleName();
System.out.println(eventString);
events.add(eventString);
+ if (builder != null) {
+ builder.process(e);
+ }
}
}
diff --git a/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz b/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz
new file mode 100644
index 0000000..6d0dbeb
--- /dev/null
+++ b/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz
Binary files differ
diff --git a/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java b/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
index c393147..b8044ec 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
@@ -1208,6 +1208,38 @@
attempt.spilledRecords = val;
}
}, counterString, "SPILLED_RECORDS");
+
+ // incorporate CPU usage
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.getResourceUsageMetrics().setCumulativeCpuUsage(val);
+ }
+ }, counterString, "CPU_MILLISECONDS");
+
+ // incorporate virtual memory usage
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.getResourceUsageMetrics().setVirtualMemoryUsage(val);
+ }
+ }, counterString, "VIRTUAL_MEMORY_BYTES");
+
+ // incorporate physical memory usage
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.getResourceUsageMetrics().setPhysicalMemoryUsage(val);
+ }
+ }, counterString, "PHYSICAL_MEMORY_BYTES");
+
+ // incorporate heap usage
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.getResourceUsageMetrics().setHeapUsage(val);
+ }
+ }, counterString, "COMMITTED_HEAP_BYTES");
}
private ParsedHost getAndRecordParsedHost(String hostName) {
diff --git a/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java b/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
index 18b26ca..0df30e6 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
@@ -28,7 +28,6 @@
// the Jackson implementation of JSON doesn't handle a
// superclass-valued field.
-import org.apache.hadoop.mapreduce.jobhistory.Events;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
@@ -69,6 +68,9 @@
LoggedLocation location;
+ // Initialize to default object for backward compatibility
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+
LoggedTaskAttempt() {
super();
}
@@ -354,8 +356,50 @@
attempt.spilledRecords = val;
}
}, counters, "SPILLED_RECORDS");
+
+ // incorporate CPU usage
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ metrics.setCumulativeCpuUsage(val);
+ }
+ }, counters, "CPU_MILLISECONDS");
+
+ // incorporate virtual memory usage
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ metrics.setVirtualMemoryUsage(val);
+ }
+ }, counters, "VIRTUAL_MEMORY_BYTES");
+
+ // incorporate physical memory usage
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ metrics.setPhysicalMemoryUsage(val);
+ }
+ }, counters, "PHYSICAL_MEMORY_BYTES");
+
+ // incorporate heap usage
+ incorporateCounter(new SetField(this) {
+ @Override
+ void set(long val) {
+ metrics.setHeapUsage(val);
+ }
+ }, counters, "COMMITTED_HEAP_BYTES");
}
+ // Get the resource usage metrics
+ public ResourceUsageMetrics getResourceUsageMetrics() {
+ return metrics;
+ }
+
+ // Set the resource usage metrics
+ void setResourceUsageMetrics(ResourceUsageMetrics metrics) {
+ this.metrics = metrics;
+ }
+
private static String canonicalizeCounterName(String nonCanonicalName) {
String result = nonCanonicalName.toLowerCase();
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java b/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
new file mode 100644
index 0000000..a0944c8
--- /dev/null
+++ b/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Captures the resource usage metrics.
+ */
+public class ResourceUsageMetrics implements Writable, DeepCompare {
+ private long cumulativeCpuUsage;
+ private long virtualMemoryUsage;
+ private long physicalMemoryUsage;
+ private long heapUsage;
+
+ public ResourceUsageMetrics() {
+ }
+
+ /**
+ * Get the cumulative CPU usage.
+ */
+ public long getCumulativeCpuUsage() {
+ return cumulativeCpuUsage;
+ }
+
+ /**
+ * Set the cumulative CPU usage.
+ */
+ public void setCumulativeCpuUsage(long usage) {
+ cumulativeCpuUsage = usage;
+ }
+
+ /**
+ * Get the virtual memory usage.
+ */
+ public long getVirtualMemoryUsage() {
+ return virtualMemoryUsage;
+ }
+
+ /**
+ * Set the virtual memory usage.
+ */
+ public void setVirtualMemoryUsage(long usage) {
+ virtualMemoryUsage = usage;
+ }
+
+ /**
+ * Get the physical memory usage.
+ */
+ public long getPhysicalMemoryUsage() {
+ return physicalMemoryUsage;
+ }
+
+ /**
+ * Set the physical memory usage.
+ */
+ public void setPhysicalMemoryUsage(long usage) {
+ physicalMemoryUsage = usage;
+ }
+
+ /**
+ * Get the total heap usage.
+ */
+ public long getHeapUsage() {
+ return heapUsage;
+ }
+
+ /**
+ * Set the total heap usage.
+ */
+ public void setHeapUsage(long usage) {
+ heapUsage = usage;
+ }
+
+ /**
+ * Returns the size of the serialized data
+ */
+ public int size() {
+ int size = 0;
+ size += WritableUtils.getVIntSize(cumulativeCpuUsage); // long #1
+ size += WritableUtils.getVIntSize(virtualMemoryUsage); // long #2
+ size += WritableUtils.getVIntSize(physicalMemoryUsage); // long #3
+ size += WritableUtils.getVIntSize(heapUsage); // long #4
+ return size;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ cumulativeCpuUsage = WritableUtils.readVLong(in); // long #1
+ virtualMemoryUsage = WritableUtils.readVLong(in); // long #2
+ physicalMemoryUsage = WritableUtils.readVLong(in); // long #3
+ heapUsage = WritableUtils.readVLong(in); // long #4
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ //TODO Write resources version no too
+ WritableUtils.writeVLong(out, cumulativeCpuUsage); // long #1
+ WritableUtils.writeVLong(out, virtualMemoryUsage); // long #2
+ WritableUtils.writeVLong(out, physicalMemoryUsage); // long #3
+ WritableUtils.writeVLong(out, heapUsage); // long #4
+ }
+
+ private static void compareMetric(long m1, long m2, TreePath loc)
+ throws DeepInequalityException {
+ if (m1 != m2) {
+ throw new DeepInequalityException("Value miscompared:" + loc.toString(),
+ loc);
+ }
+ }
+
+ private static void compareSize(ResourceUsageMetrics m1,
+ ResourceUsageMetrics m2, TreePath loc)
+ throws DeepInequalityException {
+ if (m1.size() != m2.size()) {
+ throw new DeepInequalityException("Size miscompared: " + loc.toString(),
+ loc);
+ }
+ }
+
+ @Override
+ public void deepCompare(DeepCompare other, TreePath loc)
+ throws DeepInequalityException {
+ if (!(other instanceof ResourceUsageMetrics)) {
+ throw new DeepInequalityException("Comparand has wrong type", loc);
+ }
+
+ ResourceUsageMetrics metrics2 = (ResourceUsageMetrics) other;
+ compareMetric(getCumulativeCpuUsage(), metrics2.getCumulativeCpuUsage(),
+ new TreePath(loc, "cumulativeCpu"));
+ compareMetric(getVirtualMemoryUsage(), metrics2.getVirtualMemoryUsage(),
+ new TreePath(loc, "virtualMemory"));
+ compareMetric(getPhysicalMemoryUsage(), metrics2.getPhysicalMemoryUsage(),
+ new TreePath(loc, "physicalMemory"));
+ compareMetric(getHeapUsage(), metrics2.getHeapUsage(),
+ new TreePath(loc, "heapUsage"));
+ compareSize(this, metrics2, new TreePath(loc, "size"));
+ }
+}
+
diff --git a/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java b/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
index 4cc6816..9aa6373 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
@@ -23,14 +23,22 @@
private final long bytesOut;
private final int recsOut;
private final long maxMemory;
+ private final ResourceUsageMetrics metrics;
public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
long maxMemory) {
+ this(bytesIn, recsIn, bytesOut, recsOut, maxMemory,
+ new ResourceUsageMetrics());
+ }
+
+ public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+ long maxMemory, ResourceUsageMetrics metrics) {
this.bytesIn = bytesIn;
this.recsIn = recsIn;
this.bytesOut = bytesOut;
this.recsOut = recsOut;
this.maxMemory = maxMemory;
+ this.metrics = metrics;
}
/**
@@ -70,4 +78,10 @@
return maxMemory;
}
+ /**
+ * @return Resource usage metrics
+ */
+ public ResourceUsageMetrics getResourceUsageMetrics() {
+ return metrics;
+ }
}
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
index 7c2160e..838995d 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
@@ -636,6 +636,7 @@
long outputBytes = -1;
long outputRecords = -1;
long heapMegabytes = -1;
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
Values type = loggedTask.getTaskType();
if ((type != Values.MAP) && (type != Values.REDUCE)) {
@@ -670,12 +671,15 @@
(job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
.getHeapMegabytes();
}
+ // set the resource usage metrics
+ metrics = attempt.getResourceUsageMetrics();
break;
}
TaskInfo taskInfo =
new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
- (int) outputRecords, (int) heapMegabytes);
+ (int) outputRecords, (int) heapMegabytes,
+ metrics);
return taskInfo;
}