MAPREDUCE-220. Collect cpu and memory statistics per task. Contributed by Scott Chen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@987585 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 13cccee..75e2fb8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,9 @@
MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.
(Joshua Harlow via shv)
+ MAPREDUCE-220. Collect cpu and memory statistics per task. (Scott Chen via
+ acmurthy)
+
IMPROVEMENTS
MAPREDUCE-1546. Redirect all job pages to corresponding history page
diff --git a/src/java/org/apache/hadoop/mapred/Task.java b/src/java/org/apache/hadoop/mapred/Task.java
index 0a350c6..18367a7 100644
--- a/src/java/org/apache/hadoop/mapred/Task.java
+++ b/src/java/org/apache/hadoop/mapred/Task.java
@@ -59,6 +59,8 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@@ -77,7 +79,6 @@
LogFactory.getLog(Task.class);
public static String MERGED_OUTPUT_PREFIX = ".merged";
-
/**
* Counters to measure the usage of the different file systems.
@@ -133,7 +134,10 @@
private volatile long currentRecStartIndex;
private Iterator<Long> currentRecIndexIterator =
skipRanges.skipRangeIterator();
-
+
+ private ResourceCalculatorPlugin resourceCalculator = null;
+ private long initCpuCumulativeTime = 0;
+
protected JobConf conf;
protected MapOutputFile mapOutputFile = new MapOutputFile();
protected LocalDirAllocator lDirAlloc;
@@ -503,6 +507,16 @@
}
}
committer.setupTask(taskContext);
+ Class<? extends ResourceCalculatorPlugin> clazz =
+ conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+ null, ResourceCalculatorPlugin.class);
+ resourceCalculator = ResourceCalculatorPlugin
+ .getResourceCalculatorPlugin(clazz, conf);
+ LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator);
+ if (resourceCalculator != null) {
+ initCpuCumulativeTime =
+ resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
+ }
}
@InterfaceAudience.Private
@@ -701,6 +715,24 @@
}
/**
+ * Update resource information counters
+ */
+ void updateResourceCounters() {
+ if (resourceCalculator == null) {
+ return;
+ }
+ ProcResourceValues res = resourceCalculator.getProcResourceValues();
+ long cpuTime = res.getCumulativeCpuTime();
+ long pMem = res.getPhysicalMemorySize();
+ long vMem = res.getVirtualMemorySize();
+ // Remove the CPU time consumed previously by JVM reuse
+ cpuTime -= initCpuCumulativeTime;
+ counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
+ counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+ counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+ }
+
+ /**
* An updater that tracks the amount of time this task has spent in GC.
*/
class GcTimeUpdater {
@@ -799,6 +831,7 @@
}
gcUpdater.incrementGcCounter();
+ updateResourceCounters();
}
public void done(TaskUmbilicalProtocol umbilical,
diff --git a/src/java/org/apache/hadoop/mapreduce/TaskCounter.java b/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
index 74d103e..b6362e8 100644
--- a/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
+++ b/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
@@ -42,5 +42,8 @@
SHUFFLED_MAPS,
FAILED_SHUFFLE,
MERGED_MAP_OUTPUTS,
- GC_TIME_MILLIS
+ GC_TIME_MILLIS,
+ CPU_MILLISECONDS,
+ PHYSICAL_MEMORY_BYTES,
+ VIRTUAL_MEMORY_BYTES
}
diff --git a/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java b/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java
index 91663f3..62afe1d 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/LinuxResourceCalculatorPlugin.java
@@ -90,6 +90,7 @@
private float cpuUsage = TaskTrackerStatus.UNAVAILABLE;
private long sampleTime = TaskTrackerStatus.UNAVAILABLE;
private long lastSampleTime = TaskTrackerStatus.UNAVAILABLE;
+ private ProcfsBasedProcessTree pTree = null;
boolean readMemInfoFile = false;
boolean readCpuInfoFile = false;
@@ -107,7 +108,8 @@
procfsCpuFile = PROCFS_CPUINFO;
procfsStatFile = PROCFS_STAT;
jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
-
+ String pid = System.getenv().get("JVM_PID");
+ pTree = new ProcfsBasedProcessTree(pid);
}
/**
@@ -126,6 +128,8 @@
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
+ String pid = System.getenv().get("JVM_PID");
+ pTree = new ProcfsBasedProcessTree(pid);
}
/**
@@ -395,4 +399,13 @@
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}
-}
\ No newline at end of file
+
+ @Override
+ public ProcResourceValues getProcResourceValues() {
+ pTree = pTree.getProcessTree();
+ long cpuTime = pTree.getCumulativeCpuTime();
+ long pMem = pTree.getCumulativeRssmem();
+ long vMem = pTree.getCumulativeVmem();
+ return new ProcResourceValues(cpuTime, pMem, vMem);
+ }
+}
diff --git a/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java b/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java
index 07f4823..541773c 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/ResourceCalculatorPlugin.java
@@ -91,6 +91,48 @@
public abstract float getCpuUsage();
/**
+ * Obtain resource status used by current process tree.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public abstract ProcResourceValues getProcResourceValues();
+
+ public class ProcResourceValues {
+ private final long cumulativeCpuTime;
+ private final long physicalMemorySize;
+ private final long virtualMemorySize;
+ public ProcResourceValues(long cumulativeCpuTime, long physicalMemorySize,
+ long virtualMemorySize) {
+ this.cumulativeCpuTime = cumulativeCpuTime;
+ this.physicalMemorySize = physicalMemorySize;
+ this.virtualMemorySize = virtualMemorySize;
+ }
+ /**
+ * Obtain the physical memory size used by current process tree.
+ * @return physical memory size in bytes.
+ */
+ public long getPhysicalMemorySize() {
+ return physicalMemorySize;
+ }
+
+ /**
+ * Obtain the virtual memory size used by a current process tree.
+ * @return virtual memory size in bytes.
+ */
+ public long getVirtualMemorySize() {
+ return virtualMemorySize;
+ }
+
+ /**
+ * Obtain the cumulative CPU time used by a current process tree.
+ * @return cumulative CPU time in milliseconds
+ */
+ public long getCumulativeCpuTime() {
+ return cumulativeCpuTime;
+ }
+ }
+
+ /**
* Get the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator
* plugin available for this system.
@@ -120,4 +162,4 @@
// Not supported on this system.
return null;
}
-}
\ No newline at end of file
+}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java b/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java
index 0354d5d..5fd3f9f 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/DummyResourceCalculatorPlugin.java
@@ -50,6 +50,15 @@
/** CPU usage percentage for testing */
public static final String CPU_USAGE =
"mapred.tasktracker.cpuusage.testing";
+ /** process cumulative CPU usage time for testing */
+ public static final String PROC_CUMULATIVE_CPU_TIME =
+ "mapred.tasktracker.proccumulativecputime.testing";
+ /** process pmem for testing*/
+ public static final String PROC_PMEM_TESTING_PROPERTY =
+ "mapred.tasktracker.procpmem.testing";
+ /** process vmem for testing*/
+ public static final String PROC_VMEM_TESTING_PROPERTY =
+ "mapred.tasktracker.procvmem.testing";
/** {@inheritDoc} */
@Override
@@ -98,4 +107,12 @@
public float getCpuUsage() {
return getConf().getFloat(CPU_USAGE, -1);
}
-}
\ No newline at end of file
+
+ @Override
+ public ProcResourceValues getProcResourceValues() {
+ long cpuTime = getConf().getLong(PROC_CUMULATIVE_CPU_TIME, -1);
+ long pMem = getConf().getLong(PROC_PMEM_TESTING_PROPERTY, -1);
+ long vMem = getConf().getLong(PROC_VMEM_TESTING_PROPERTY, -1);
+ return new ProcResourceValues(cpuTime, pMem, vMem);
+ }
+}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java b/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
index 849fc3a..821d4c6 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.util.ToolRunner;
@@ -154,9 +155,46 @@
+ reportedNumProcessors + ","
+ reportedCpuUsage
+ ")";
- hasPassed = true;
- hasDynamicValuePassed = true;
LOG.info(message);
+ hasDynamicValuePassed = true;
+ // Check task resource status in task reports
+ for (TaskStatus taskStatus : status.getTaskReports()) {
+ Counters counters = taskStatus.getCounters();
+ // This should be zero because the initial CPU time is subtracted.
+ long procCumulativeCpuTime = 0;
+ long procVirtualMemorySize =
+ getConf().getLong("procVirtualMemorySize", -1);
+ long procPhysicalMemorySize =
+ getConf().getLong("procPhysicalMemorySize", -1);
+ long reportedProcCumulativeCpuTime =
+ counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue();
+ long reportedProcVirtualMemorySize =
+ counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue();
+ long reportedProcPhysicalMemorySize =
+ counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue();
+ String procMessage =
+ "expected values : "
+ + "(procCumulativeCpuTime, procVirtualMemorySize,"
+ + " procPhysicalMemorySize) = ("
+ + procCumulativeCpuTime + ", "
+ + procVirtualMemorySize + ", "
+ + procPhysicalMemorySize + ")";
+ procMessage +=
+ "\nreported values : "
+ + "(procCumulativeCpuTime, procVirtualMemorySize,"
+ + " procPhysicalMemorySize) = ("
+ + reportedProcCumulativeCpuTime + ", "
+ + reportedProcVirtualMemorySize + ", "
+ + reportedProcPhysicalMemorySize + ")";
+ LOG.info(procMessage);
+ message += "\n" + procMessage;
+ if (procCumulativeCpuTime != reportedProcCumulativeCpuTime ||
+ procVirtualMemorySize != reportedProcVirtualMemorySize ||
+ procPhysicalMemorySize != reportedProcPhysicalMemorySize) {
+ hasDynamicValuePassed = false;
+ }
+ }
+ hasPassed = true;
if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
|| totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
|| mapSlotMemorySize != reportedMapSlotMemorySize
@@ -192,7 +230,11 @@
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
setUpCluster(conf);
- runSleepJob(miniMRCluster.createJobConf());
+ JobConf jobConf = miniMRCluster.createJobConf();
+ jobConf.setClass(
+ org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+ DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
+ runSleepJob(jobConf);
verifyTestResults();
} finally {
tearDownCluster();
@@ -218,6 +260,9 @@
conf.setLong("cpuFrequency", 2000000L);
conf.setInt("numProcessors", 8);
conf.setFloat("cpuUsage", 15.5F);
+ conf.setLong("procCumulativeCpuTime", 1000L);
+ conf.setLong("procVirtualMemorySize", 2 * 1024 * 1024 * 1024L);
+ conf.setLong("procPhysicalMemorySize", 1024 * 1024 * 1024L);
conf.setClass(
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
@@ -232,12 +277,19 @@
conf.setLong(DummyResourceCalculatorPlugin.CPU_FREQUENCY, 2000000L);
conf.setInt(DummyResourceCalculatorPlugin.NUM_PROCESSORS, 8);
conf.setFloat(DummyResourceCalculatorPlugin.CPU_USAGE, 15.5F);
-
try {
setUpCluster(conf);
JobConf jobConf = miniMRCluster.createJobConf();
jobConf.setMemoryForMapTask(1 * 1024L);
jobConf.setMemoryForReduceTask(2 * 1024L);
+ jobConf.setClass(
+ org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+ DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
+ jobConf.setLong(DummyResourceCalculatorPlugin.PROC_CUMULATIVE_CPU_TIME, 1000L);
+ jobConf.setLong(DummyResourceCalculatorPlugin.PROC_VMEM_TESTING_PROPERTY,
+ 2 * 1024 * 1024 * 1024L);
+ jobConf.setLong(DummyResourceCalculatorPlugin.PROC_PMEM_TESTING_PROPERTY,
+ 1024 * 1024 * 1024L);
runSleepJob(jobConf);
verifyTestResults();
} finally {