blob: a414fdfef614b1b859439102294dabc601e94d33 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.regex.Pattern;
import junit.framework.Assert;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.TestProcfsBasedProcessTree;
import org.junit.Before;
import org.junit.Test;
public class TestContainersMonitor extends BaseContainerManagerTest {
public TestContainersMonitor() throws UnsupportedFileSystemException {
super();
}
static {
LOG = LogFactory.getLog(TestContainersMonitor.class);
}
@Before
public void setup() throws IOException {
conf.setClass(
ContainersMonitorImpl.RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY,
LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
super.setup();
}
/**
* Test to verify the check for whether a process tree is over limit or not.
*
* @throws IOException
* if there was a problem setting up the fake procfs directories or
* files.
*/
@Test
public void testProcessTreeLimits() throws IOException {
// set up a dummy proc file system
File procfsRootDir = new File(localDir, "proc");
String[] pids = { "100", "200", "300", "400", "500", "600", "700" };
try {
TestProcfsBasedProcessTree.setupProcfsRootDir(procfsRootDir);
// create pid dirs.
TestProcfsBasedProcessTree.setupPidDirs(procfsRootDir, pids);
// create process infos.
TestProcfsBasedProcessTree.ProcessStatInfo[] procs =
new TestProcfsBasedProcessTree.ProcessStatInfo[7];
// assume pids 100, 500 are in 1 tree
// 200,300,400 are in another
// 600,700 are in a third
procs[0] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "100", "proc1", "1", "100", "100", "100000" });
procs[1] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "200", "proc2", "1", "200", "200", "200000" });
procs[2] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "300", "proc3", "200", "200", "200", "300000" });
procs[3] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "400", "proc4", "200", "200", "200", "400000" });
procs[4] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "500", "proc5", "100", "100", "100", "1500000" });
procs[5] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "600", "proc6", "1", "600", "600", "100000" });
procs[6] = new TestProcfsBasedProcessTree.ProcessStatInfo(
new String[] { "700", "proc7", "600", "600", "600", "100000" });
// write stat files.
TestProcfsBasedProcessTree.writeStatFiles(procfsRootDir, pids, procs);
// vmem limit
long limit = 700000;
ContainersMonitorImpl test = new ContainersMonitorImpl(null, null, null);
// create process trees
// tree rooted at 100 is over limit immediately, as it is
// twice over the mem limit.
ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree(
"100", true,
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
assertTrue("tree rooted at 100 should be over limit " +
"after first iteration.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// the tree rooted at 200 is initially below limit.
pTree = new ProcfsBasedProcessTree("200", true,
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
assertFalse("tree rooted at 200 shouldn't be over limit " +
"after one iteration.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// second iteration - now the tree has been over limit twice,
// hence it should be declared over limit.
pTree.getProcessTree();
assertTrue(
"tree rooted at 200 should be over limit after 2 iterations",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// the tree rooted at 600 is never over limit.
pTree = new ProcfsBasedProcessTree("600", true,
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
assertFalse("tree rooted at 600 should never be over limit.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// another iteration does not make any difference.
pTree.getProcessTree();
assertFalse("tree rooted at 600 should never be over limit.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
}
@Test
public void testContainerKillOnMemoryOverflow() throws IOException,
InterruptedException {
if (!ProcfsBasedProcessTree.isAvailable()) {
return;
}
containerManager.start();
File scriptFile = new File(tmpDir, "scriptFile.sh");
PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile =
new File(tmpDir, "start_file.txt").getAbsoluteFile();
fileWriter.write("\numask 0"); // So that start file is readable by the
// test.
fileWriter.write("\necho Hello World! > " + processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nsleep 15");
fileWriter.close();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setAppId(appId);
cId.setId(0);
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(user);
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
containerLaunchContext.setUser(containerLaunchContext.getUser());
containerLaunchContext.addCommand("/bin/bash");
containerLaunchContext.addCommand(scriptFile.getAbsolutePath());
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(8 * 1024 * 1024);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for process start-file to be created");
}
Assert.assertTrue("ProcessStartFile doesn't exist!",
processStartFile.exists());
// Now verify the contents of the file
BufferedReader reader =
new BufferedReader(new FileReader(processStartFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Get the pid of the process
String pid = reader.readLine().trim();
// No more lines
Assert.assertEquals(null, reader.readLine());
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE, 60);
GetContainerStatusRequest gcsRequest =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
gcsRequest.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(String.valueOf(ExitCode.KILLED.getExitCode()),
containerStatus.getExitStatus());
String expectedMsgPattern =
"Container \\[pid=" + pid + ",containerID=" + cId
+ "\\] is running beyond memory-limits. Current usage : "
+ "[0-9]*bytes. Limit : [0-9]*"
+ "bytes. Killing container. \nDump of the process-tree for "
+ cId + " : \n";
Pattern pat = Pattern.compile(expectedMsgPattern);
Assert.assertEquals("Expected message patterns is: " + expectedMsgPattern
+ "\n\nObserved message is: " + containerStatus.getDiagnostics(),
true, pat.matcher(containerStatus.getDiagnostics()).find());
// Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!",
exec.signalContainer(user,
pid, Signal.NULL));
}
}