blob: d7ffcfaa3f073a056af7433c44b8068874e80cfc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import junit.framework.TestCase;
/**
* Test class to verify memory management of tasks.
*/
public class TestTaskTrackerMemoryManager extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestTaskTrackerMemoryManager.class);
private static String TEST_ROOT_DIR = new Path(System.getProperty(
"test.build.data", "/tmp")).toString().replace(' ', '+');
private MiniMRCluster miniMRCluster;
private String taskOverLimitPatternString =
"TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond.*memory-limits. "
+ "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
private void startCluster(JobConf conf)
throws Exception {
conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
conf.set(TTConfig.TT_MAP_SLOTS, "1");
conf.set(TTConfig.TT_REDUCE_SLOTS, "1");
conf.set(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, "0");
miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
}
@Override
protected void tearDown() {
if (miniMRCluster != null) {
miniMRCluster.shutdown();
}
}
private int runSleepJob(JobConf conf) throws Exception {
String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
return ToolRunner.run(conf, new SleepJob(), args);
}
private void runAndCheckSuccessfulJob(JobConf conf)
throws IOException {
Pattern taskOverLimitPattern =
Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
Matcher mat = null;
// Start the job.
int ret;
try {
ret = runSleepJob(conf);
} catch (Exception e) {
ret = 1;
}
// Job has to succeed
assertTrue(ret == 0);
JobClient jClient = new JobClient(conf);
JobStatus[] jStatus = jClient.getAllJobs();
JobStatus js = jStatus[0]; // Our only job
RunningJob rj = jClient.getJob(js.getJobID());
// All events
TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
for (TaskCompletionEvent tce : taskComplEvents) {
String[] diagnostics =
rj.getTaskDiagnostics(tce.getTaskAttemptId());
if (diagnostics != null) {
for (String str : diagnostics) {
mat = taskOverLimitPattern.matcher(str);
// The error pattern shouldn't be there in any TIP's diagnostics
assertFalse(mat.find());
}
}
}
}
private boolean isProcfsBasedTreeAvailable() {
try {
if (!ProcfsBasedProcessTree.isAvailable()) {
LOG.info("Currently ProcessTree has only one implementation "
+ "ProcfsBasedProcessTree, which is not available on this "
+ "system. Not testing");
return false;
}
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
return false;
}
return true;
}
/**
* Test for verifying that nothing is killed when memory management is
* disabled on the TT, even when the tasks run over their limits.
*
* @throws Exception
*/
public void testTTLimitsDisabled()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Task-memory management disabled by default.
startCluster(new JobConf());
long PER_TASK_LIMIT = 1L; // Doesn't matter how low.
JobConf conf = miniMRCluster.createJobConf();
conf.setMemoryForMapTask(PER_TASK_LIMIT);
conf.setMemoryForReduceTask(PER_TASK_LIMIT);
runAndCheckSuccessfulJob(conf);
}
/**
* Test for verifying that tasks within limits, with the cumulative usage also
* under TT's limits succeed.
*
* @throws Exception
*/
public void testTasksWithinLimits()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Large so that sleepjob goes through and fits total TT usage
long PER_TASK_LIMIT = 2 * 1024L;
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024L);
fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024L);
// Reserve only 1 mb of the memory on TaskTrackers
fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB, 1L);
startCluster(new JobConf());
JobConf conf = new JobConf(miniMRCluster.createJobConf());
conf.setMemoryForMapTask(PER_TASK_LIMIT);
conf.setMemoryForReduceTask(PER_TASK_LIMIT);
// Set task physical memory limits
conf.setLong(MRJobConfig.MAP_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
conf.setLong(MRJobConfig.REDUCE_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
runAndCheckSuccessfulJob(conf);
}
/**
* Test for verifying that tasks that go beyond limits get killed.
*
* @throws Exception
*/
public void testTasksBeyondLimits()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
// very small value, so that no task escapes to successful completion.
fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024);
fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024);
startCluster(fConf);
runJobExceedingMemoryLimit(false);
}
/**
* Test for verifying that tasks that go beyond physical limits get killed.
*
* @throws Exception
*/
public void testTasksBeyondPhysicalLimits()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
// very small value, so that no task escapes to successful completion.
fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
// Reserve only 1 mb of the memory on TaskTrackers
fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB, 1L);
startCluster(fConf);
runJobExceedingMemoryLimit(true);
}
/**
* Runs tests with tasks beyond limit and using old configuration values for
* the TaskTracker.
*
* @throws Exception
*/
public void testTaskMemoryMonitoringWithDeprecatedConfiguration ()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
// very small value, so that no task escapes to successful completion.
fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
//set old values, max vm property per task and upper limit on the tasks
//vm
//setting the default maximum vmem property to 2 GB
fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
(2L * 1024L * 1024L * 1024L));
fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
(3L * 1024L * 1024L * 1024L));
startCluster(fConf);
runJobExceedingMemoryLimit(false);
}
/**
* Runs a job which should fail the when run by the memory monitor.
*
* @param doPhysicalMemory If it is true, use physical memory limit.
* Otherwise use virtual memory limit.
* @throws IOException
*/
private void runJobExceedingMemoryLimit(boolean doPhysicalMemory)
throws IOException {
long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks.
Pattern taskOverLimitPattern =
Pattern.compile(String.format(taskOverLimitPatternString, String
.valueOf(PER_TASK_LIMIT*1024*1024L)));
Matcher mat = null;
// Set up job.
JobConf conf = new JobConf(miniMRCluster.createJobConf());
if (doPhysicalMemory) {
conf.setLong(MRJobConfig.MAP_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
conf.setLong(MRJobConfig.REDUCE_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
} else {
conf.setMemoryForMapTask(PER_TASK_LIMIT);
conf.setMemoryForReduceTask(PER_TASK_LIMIT);
}
conf.setMaxMapAttempts(1);
conf.setMaxReduceAttempts(1);
// Start the job.
int ret = 0;
try {
ret = runSleepJob(conf);
} catch (Exception e) {
ret = 1;
}
// Job has to fail
assertTrue(ret != 0);
JobClient jClient = new JobClient(conf);
JobStatus[] jStatus = jClient.getAllJobs();
JobStatus js = jStatus[0]; // Our only job
RunningJob rj = jClient.getJob(js.getJobID());
// All events
TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
for (TaskCompletionEvent tce : taskComplEvents) {
// Every task HAS to fail
assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
.getTaskStatus() == TaskCompletionEvent.Status.FAILED);
String[] diagnostics =
rj.getTaskDiagnostics(tce.getTaskAttemptId());
// Every task HAS to spit out the out-of-memory errors
assert (diagnostics != null);
for (String str : diagnostics) {
mat = taskOverLimitPattern.matcher(str);
// Every task HAS to spit out the out-of-memory errors in the same
// format. And these are the only diagnostic messages.
assertTrue(mat.find());
}
}
}
/**
* Test for verifying that tasks causing cumulative usage to go beyond TT's
* limit get killed even though they all are under individual limits. Memory
* management for tasks with disabled task-limits also traverses the same
* code-path, so we don't need a separate testTaskLimitsDisabled.
*
* @throws Exception
*/
public void testTasksCumulativelyExceedingTTLimits()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Large enough for SleepJob Tasks.
long PER_TASK_LIMIT = 100 * 1024L;
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
fConf.setLong(MRConfig.MAPMEMORY_MB,
1L);
fConf.setLong(
MRConfig.REDUCEMEMORY_MB, 1L);
// Because of the above, the total tt limit is 2mb
long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
// very small value, so that no task escapes to successful completion.
fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
startCluster(fConf);
Pattern taskOverLimitPattern =
Pattern.compile(String.format(taskOverLimitPatternString, String
.valueOf(PER_TASK_LIMIT)));
Pattern trackerOverLimitPattern =
Pattern
.compile("Killing one of the least progress tasks - .*, as "
+ "the cumulative memory usage of all the tasks on the TaskTracker"
+ " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
Matcher mat = null;
// Set up job.
JobConf conf = new JobConf(miniMRCluster.createJobConf());
conf.setMemoryForMapTask(PER_TASK_LIMIT);
conf.setMemoryForReduceTask(PER_TASK_LIMIT);
JobClient jClient = new JobClient(conf);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
// Start the job
Job job = sleepJob.createJob(1, 1, 5000, 1, 1000, 1);
job.submit();
boolean TTOverFlowMsgPresent = false;
while (true) {
List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
allTaskReports.addAll(Arrays.asList(jClient
.getSetupTaskReports(JobID.downgrade(job.getJobID()))));
allTaskReports.addAll(Arrays.asList(jClient
.getMapTaskReports(JobID.downgrade(job.getJobID()))));
for (TaskReport tr : allTaskReports) {
String[] diag = tr.getDiagnostics();
for (String str : diag) {
mat = taskOverLimitPattern.matcher(str);
assertFalse(mat.find());
mat = trackerOverLimitPattern.matcher(str);
if (mat.find()) {
TTOverFlowMsgPresent = true;
}
}
}
if (TTOverFlowMsgPresent) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// nothing
}
}
// If it comes here without a test-timeout, it means there was a task that
// was killed because of crossing cumulative TT limit.
// Test succeeded, kill the job.
job.killJob();
}
/**
* Test to verify the check for whether a process tree is over limit or not.
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
public void testProcessTreeLimits() throws IOException {
// set up a dummy proc file system
File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
String[] pids = { "100", "200", "300", "400", "500", "600", "700" };
try {
TestProcfsBasedProcessTree.setupProcfsRootDir(procfsRootDir);
// create pid dirs.
TestProcfsBasedProcessTree.setupPidDirs(procfsRootDir, pids);
// create process infos.
TestProcfsBasedProcessTree.ProcessStatInfo[] procs =
new TestProcfsBasedProcessTree.ProcessStatInfo[7];
// assume pids 100, 500 are in 1 tree
// 200,300,400 are in another
// 600,700 are in a third
procs[0] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"100", "proc1", "1", "100", "100", "100000"});
procs[1] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"200", "proc2", "1", "200", "200", "200000"});
procs[2] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"300", "proc3", "200", "200", "200", "300000"});
procs[3] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"400", "proc4", "200", "200", "200", "400000"});
procs[4] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"500", "proc5", "100", "100", "100", "1500000"});
procs[5] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"600", "proc6", "1", "600", "600", "100000"});
procs[6] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] {"700", "proc7", "600", "600", "600", "100000"});
// write stat files.
TestProcfsBasedProcessTree.writeStatFiles(procfsRootDir, pids, procs);
// vmem limit
long limit = 700000;
// Create TaskMemoryMonitorThread
TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L,
5000L);
// create process trees
// tree rooted at 100 is over limit immediately, as it is
// twice over the mem limit.
ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree(
"100", true, 100L,
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
assertTrue("tree rooted at 100 should be over limit " +
"after first iteration.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// the tree rooted at 200 is initially below limit.
pTree = new ProcfsBasedProcessTree("200", true, 100L,
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
assertFalse("tree rooted at 200 shouldn't be over limit " +
"after one iteration.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// second iteration - now the tree has been over limit twice,
// hence it should be declared over limit.
pTree.getProcessTree();
assertTrue("tree rooted at 200 should be over limit after 2 iterations",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// the tree rooted at 600 is never over limit.
pTree = new ProcfsBasedProcessTree("600", true, 100L,
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
assertFalse("tree rooted at 600 should never be over limit.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// another iteration does not make any difference.
pTree.getProcessTree();
assertFalse("tree rooted at 600 should never be over limit.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
}
/**
* Test for verifying that tasks causing cumulative usage of physical memory
* to go beyond TT's limit get killed.
*
* @throws Exception
*/
public void testTasksCumulativelyExceedingTTPhysicalLimits()
throws Exception {
// Run the test only if memory management is enabled
if (!isProcfsBasedTreeAvailable()) {
return;
}
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
// very small value, so that no task escapes to successful completion.
fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
String.valueOf(300));
// reserve all memory on TT so that the job will exceed memory limits
LinuxResourceCalculatorPlugin memoryCalculatorPlugin =
new LinuxResourceCalculatorPlugin();
long totalPhysicalMemory = memoryCalculatorPlugin.getPhysicalMemorySize();
long reservedPhysicalMemory = totalPhysicalMemory / (1024 * 1024) + 1;
fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB,
reservedPhysicalMemory);
long maxRssMemoryAllowedForAllTasks = totalPhysicalMemory -
reservedPhysicalMemory * 1024 * 1024L;
Pattern physicalMemoryOverLimitPattern = Pattern.compile(
"Killing one of the memory-consuming tasks - .*"
+ ", as the cumulative RSS memory usage of all the tasks on "
+ "the TaskTracker exceeds physical memory limit "
+ maxRssMemoryAllowedForAllTasks + ".");
startCluster(fConf);
Matcher mat = null;
// Set up job.
JobConf conf = new JobConf(miniMRCluster.createJobConf());
// Set per task physical memory limits to be a higher value
conf.setLong(MRJobConfig.MAP_MEMORY_PHYSICAL_MB, 2 * 1024L);
conf.setLong(MRJobConfig.REDUCE_MEMORY_PHYSICAL_MB, 2 * 1024L);
JobClient jClient = new JobClient(conf);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
// Start the job
Job job = sleepJob.createJob(1, 1, 100000, 1, 100000, 1);
job.submit();
boolean TTOverFlowMsgPresent = false;
while (true) {
List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
allTaskReports.addAll(Arrays.asList(jClient
.getSetupTaskReports(JobID.downgrade(job.getJobID()))));
allTaskReports.addAll(Arrays.asList(jClient
.getMapTaskReports(JobID.downgrade(job.getJobID()))));
for (TaskReport tr : allTaskReports) {
String[] diag = tr.getDiagnostics();
for (String str : diag) {
mat = physicalMemoryOverLimitPattern.matcher(str);
if (mat.find()) {
TTOverFlowMsgPresent = true;
}
}
}
if (TTOverFlowMsgPresent) {
break;
}
assertFalse("Job should not finish successfully", job.isSuccessful());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// nothing
}
}
// If it comes here without a test-timeout, it means there was a task that
// was killed because of crossing cumulative TT limit.
// Test succeeded, kill the job.
job.killJob();
}
}