blob: 821d4c6de1e6e7d35157b0c011ac94a9f78270d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.After;
/**
* This test class tests the functionality related to configuring, reporting
* and computing memory related parameters in a Map/Reduce cluster.
*
* Each test sets up a {@link MiniMRCluster} with a locally defined
* {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates
* the memory related configuration is correctly computed and reported from
* the tasktracker in
* {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
*/
public class TestTTResourceReporting extends TestCase {
static final Log LOG = LogFactory.getLog(TestTTResourceReporting.class);
private MiniMRCluster miniMRCluster;
/**
* Fake scheduler to test the proper reporting of memory values by TT
*/
public static class FakeTaskScheduler extends JobQueueTaskScheduler {
private boolean hasPassed = true;
private boolean hasDynamicValuePassed = true;
private String message;
public FakeTaskScheduler() {
super();
}
public boolean hasTestPassed() {
return hasPassed;
}
public boolean hasDynamicTestPassed() {
return hasDynamicValuePassed;
}
public String getFailureMessage() {
return message;
}
@Override
public List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
TaskTrackerStatus status = taskTracker.getStatus();
long totalVirtualMemoryOnTT =
getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
long totalPhysicalMemoryOnTT =
getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
long mapSlotMemorySize =
getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
long reduceSlotMemorySize =
getConf()
.getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
long availableVirtualMemoryOnTT =
getConf().getLong("availableVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
long availablePhysicalMemoryOnTT =
getConf().getLong("availablePmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
long cumulativeCpuTime =
getConf().getLong("cumulativeCpuTime", TaskTrackerStatus.UNAVAILABLE);
long cpuFrequency =
getConf().getLong("cpuFrequency", TaskTrackerStatus.UNAVAILABLE);
int numProcessors =
getConf().getInt("numProcessors", TaskTrackerStatus.UNAVAILABLE);
float cpuUsage =
getConf().getFloat("cpuUsage", TaskTrackerStatus.UNAVAILABLE);
long reportedTotalVirtualMemoryOnTT =
status.getResourceStatus().getTotalVirtualMemory();
long reportedTotalPhysicalMemoryOnTT =
status.getResourceStatus().getTotalPhysicalMemory();
long reportedMapSlotMemorySize =
status.getResourceStatus().getMapSlotMemorySizeOnTT();
long reportedReduceSlotMemorySize =
status.getResourceStatus().getReduceSlotMemorySizeOnTT();
long reportedAvailableVirtualMemoryOnTT =
status.getResourceStatus().getAvailabelVirtualMemory();
long reportedAvailablePhysicalMemoryOnTT =
status.getResourceStatus().getAvailablePhysicalMemory();
long reportedCumulativeCpuTime =
status.getResourceStatus().getCumulativeCpuTime();
long reportedCpuFrequency = status.getResourceStatus().getCpuFrequency();
int reportedNumProcessors = status.getResourceStatus().getNumProcessors();
float reportedCpuUsage = status.getResourceStatus().getCpuUsage();
message =
"expected values : "
+ "(totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
+ "availableVirtualMemoryOnTT, availablePhysicalMemoryOnTT, "
+ "mapSlotMemSize, reduceSlotMemorySize, cumulativeCpuTime, "
+ "cpuFrequency, numProcessors, cpuUsage) = ("
+ totalVirtualMemoryOnTT + ", "
+ totalPhysicalMemoryOnTT + ","
+ availableVirtualMemoryOnTT + ", "
+ availablePhysicalMemoryOnTT + ","
+ mapSlotMemorySize + ","
+ reduceSlotMemorySize + ","
+ cumulativeCpuTime + ","
+ cpuFrequency + ","
+ numProcessors + ","
+ cpuUsage
+")";
message +=
"\nreported values : "
+ "(totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
+ "availableVirtualMemoryOnTT, availablePhysicalMemoryOnTT, "
+ "reportedMapSlotMemorySize, reportedReduceSlotMemorySize, "
+ "reportedCumulativeCpuTime, reportedCpuFrequency, "
+ "reportedNumProcessors, cpuUsage) = ("
+ reportedTotalVirtualMemoryOnTT + ", "
+ reportedTotalPhysicalMemoryOnTT + ","
+ reportedAvailableVirtualMemoryOnTT + ", "
+ reportedAvailablePhysicalMemoryOnTT + ","
+ reportedMapSlotMemorySize + ","
+ reportedReduceSlotMemorySize + ","
+ reportedCumulativeCpuTime + ","
+ reportedCpuFrequency + ","
+ reportedNumProcessors + ","
+ reportedCpuUsage
+ ")";
LOG.info(message);
hasDynamicValuePassed = true;
// Check task resource status in task reports
for (TaskStatus taskStatus : status.getTaskReports()) {
Counters counters = taskStatus.getCounters();
// This should be zero because the initial CPU time is subtracted.
long procCumulativeCpuTime = 0;
long procVirtualMemorySize =
getConf().getLong("procVirtualMemorySize", -1);
long procPhysicalMemorySize =
getConf().getLong("procPhysicalMemorySize", -1);
long reportedProcCumulativeCpuTime =
counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue();
long reportedProcVirtualMemorySize =
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue();
long reportedProcPhysicalMemorySize =
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue();
String procMessage =
"expected values : "
+ "(procCumulativeCpuTime, procVirtualMemorySize,"
+ " procPhysicalMemorySize) = ("
+ procCumulativeCpuTime + ", "
+ procVirtualMemorySize + ", "
+ procPhysicalMemorySize + ")";
procMessage +=
"\nreported values : "
+ "(procCumulativeCpuTime, procVirtualMemorySize,"
+ " procPhysicalMemorySize) = ("
+ reportedProcCumulativeCpuTime + ", "
+ reportedProcVirtualMemorySize + ", "
+ reportedProcPhysicalMemorySize + ")";
LOG.info(procMessage);
message += "\n" + procMessage;
if (procCumulativeCpuTime != reportedProcCumulativeCpuTime ||
procVirtualMemorySize != reportedProcVirtualMemorySize ||
procPhysicalMemorySize != reportedProcPhysicalMemorySize) {
hasDynamicValuePassed = false;
}
}
hasPassed = true;
if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
|| totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
|| mapSlotMemorySize != reportedMapSlotMemorySize
|| reduceSlotMemorySize != reportedReduceSlotMemorySize
|| numProcessors != reportedNumProcessors) {
hasPassed = false;
}
// These values changes every moment on the node so it can only be
// tested by DummyMemoryCalculatorPlugin. Need to check them separately
if (availableVirtualMemoryOnTT != reportedAvailableVirtualMemoryOnTT
|| availablePhysicalMemoryOnTT != reportedAvailablePhysicalMemoryOnTT
|| cumulativeCpuTime != reportedCumulativeCpuTime
|| cpuFrequency != reportedCpuFrequency
|| cpuUsage != reportedCpuUsage) {
hasDynamicValuePassed = false;
}
return super.assignTasks(taskTracker);
}
}
/**
* Test that verifies default values are configured and reported correctly.
*
* @throws Exception
*/
@Test
public void testDefaultResourceValues()
throws Exception {
JobConf conf = new JobConf();
try {
// Memory values are disabled by default.
conf.setClass(
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
setUpCluster(conf);
JobConf jobConf = miniMRCluster.createJobConf();
jobConf.setClass(
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
runSleepJob(jobConf);
verifyTestResults();
} finally {
tearDownCluster();
}
}
/**
* Test that verifies that configured values are reported correctly.
*
* @throws Exception
*/
@Test
public void testConfiguredResourceValues()
throws Exception {
JobConf conf = new JobConf();
conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
conf.setLong("mapSlotMemorySize", 1 * 512L);
conf.setLong("reduceSlotMemorySize", 1 * 1024L);
conf.setLong("availableVmemOnTT", 4 * 1024 * 1024 * 1024L);
conf.setLong("availablePmemOnTT", 2 * 1024 * 1024 * 1024L);
conf.setLong("cumulativeCpuTime", 10000L);
conf.setLong("cpuFrequency", 2000000L);
conf.setInt("numProcessors", 8);
conf.setFloat("cpuUsage", 15.5F);
conf.setLong("procCumulativeCpuTime", 1000L);
conf.setLong("procVirtualMemorySize", 2 * 1024 * 1024 * 1024L);
conf.setLong("procPhysicalMemorySize", 1024 * 1024 * 1024L);
conf.setClass(
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
conf.setLong(DummyResourceCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
4 * 1024 * 1024 * 1024L);
conf.setLong(DummyResourceCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
2 * 1024 * 1024 * 1024L);
conf.setLong(MRConfig.MAPMEMORY_MB, 512L);
conf.setLong(MRConfig.REDUCEMEMORY_MB, 1024L);
conf.setLong(DummyResourceCalculatorPlugin.CUMULATIVE_CPU_TIME, 10000L);
conf.setLong(DummyResourceCalculatorPlugin.CPU_FREQUENCY, 2000000L);
conf.setInt(DummyResourceCalculatorPlugin.NUM_PROCESSORS, 8);
conf.setFloat(DummyResourceCalculatorPlugin.CPU_USAGE, 15.5F);
try {
setUpCluster(conf);
JobConf jobConf = miniMRCluster.createJobConf();
jobConf.setMemoryForMapTask(1 * 1024L);
jobConf.setMemoryForReduceTask(2 * 1024L);
jobConf.setClass(
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
jobConf.setLong(DummyResourceCalculatorPlugin.PROC_CUMULATIVE_CPU_TIME, 1000L);
jobConf.setLong(DummyResourceCalculatorPlugin.PROC_VMEM_TESTING_PROPERTY,
2 * 1024 * 1024 * 1024L);
jobConf.setLong(DummyResourceCalculatorPlugin.PROC_PMEM_TESTING_PROPERTY,
1024 * 1024 * 1024L);
runSleepJob(jobConf);
verifyTestResults();
} finally {
tearDownCluster();
}
}
/**
* Test that verifies that total memory values are calculated and reported
* correctly.
*
* @throws Exception
*/
@Test
public void testResourceValuesOnLinux()
throws Exception {
if (!System.getProperty("os.name").startsWith("Linux")) {
return;
}
JobConf conf = new JobConf();
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
// In this case, we only check these three fields because they are static
conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
conf.setLong("numProcessors", plugin.getNumProcessors());
try {
setUpCluster(conf);
runSleepJob(miniMRCluster.createJobConf());
verifyTestResults(true);
} finally {
tearDownCluster();
}
}
private void setUpCluster(JobConf conf)
throws Exception {
conf.setClass(JTConfig.JT_TASK_SCHEDULER,
TestTTResourceReporting.FakeTaskScheduler.class, TaskScheduler.class);
conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
}
private void runSleepJob(JobConf conf) throws Exception {
String[] args = { "-m", "1", "-r", "1",
"-mt", "10", "-rt", "10" };
ToolRunner.run(conf, new SleepJob(), args);
}
private void verifyTestResults() {
verifyTestResults(false);
}
private void verifyTestResults(boolean excludeDynamic) {
FakeTaskScheduler scheduler =
(FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
getJobTracker().getTaskScheduler();
assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
if (!excludeDynamic) {
assertTrue(scheduler.getFailureMessage(),
scheduler.hasDynamicTestPassed());
}
}
@After
private void tearDownCluster() {
if (miniMRCluster != null) {
miniMRCluster.shutdown();
}
}
}