blob: 25dbc1dd2ea82cad0e44fc5961bfcae367ec50d4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestNodeManagerShutdown {
static final File basedir =
new File("target", TestNodeManagerShutdown.class.getName());
static final File tmpDir = new File(basedir, "tmpDir");
static final File logsDir = new File(basedir, "logs");
static final File remoteLogsDir = new File(basedir, "remotelogs");
static final File nmLocalDir = new File(basedir, "nm0");
static final File processStartFile = new File(tmpDir, "start_file.txt")
.getAbsoluteFile();
static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
static final String user = "nobody";
private FileContext localFS;
private ContainerId cId;
private NodeManager nm;
@Before
public void setup() throws UnsupportedFileSystemException {
localFS = FileContext.getLocalFSFileContext();
tmpDir.mkdirs();
logsDir.mkdirs();
remoteLogsDir.mkdirs();
nmLocalDir.mkdirs();
// Construct the Container-id
cId = createContainerId();
}
@After
public void tearDown() throws IOException, InterruptedException {
if (nm != null) {
nm.stop();
}
localFS.delete(new Path(basedir.getPath()), true);
}
@Test
public void testStateStoreRemovalOnDecommission() throws IOException {
final File recoveryDir = new File(basedir, "nm-recovery");
nm = new TestNodeManager();
YarnConfiguration conf = createNMConfig();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, recoveryDir.getAbsolutePath());
// verify state store is not removed on normal shutdown
nm.init(conf);
nm.start();
Assert.assertTrue(recoveryDir.exists());
Assert.assertTrue(recoveryDir.isDirectory());
nm.stop();
nm = null;
Assert.assertTrue(recoveryDir.exists());
Assert.assertTrue(recoveryDir.isDirectory());
// verify state store is removed on decommissioned shutdown
nm = new TestNodeManager();
nm.init(conf);
nm.start();
Assert.assertTrue(recoveryDir.exists());
Assert.assertTrue(recoveryDir.isDirectory());
nm.getNMContext().setDecommissioned(true);
nm.stop();
nm = null;
Assert.assertFalse(recoveryDir.exists());
}
@Test
public void testKillContainersOnShutdown() throws IOException,
YarnException {
nm = new TestNodeManager();
int port = ServerSocketUtil.getPort(49157, 10);
nm.init(createNMConfig(port));
nm.start();
startContainer(nm, cId, localFS, tmpDir, processStartFile, port);
final int MAX_TRIES=20;
int numTries = 0;
while (!processStartFile.exists() && numTries < MAX_TRIES) {
try {
Thread.sleep(500);
} catch (InterruptedException ex) {ex.printStackTrace();}
numTries++;
}
nm.stop();
// Now verify the contents of the file. Script generates a message when it
// receives a sigterm so we look for that. We cannot perform this check on
// Windows, because the process is not notified when killed by winutils.
// There is no way for the process to trap and respond. Instead, we can
// verify that the job object with ID matching container ID no longer exists.
if (Shell.WINDOWS) {
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(cId.toString()));
} else {
BufferedReader reader =
new BufferedReader(new FileReader(processStartFile));
boolean foundSigTermMessage = false;
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
if (line.contains("SIGTERM")) {
foundSigTermMessage = true;
break;
}
}
Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
reader.close();
}
}
public static void startContainer(NodeManager nm, ContainerId cId,
FileContext localFS, File scriptFileDir, File processStartFile,
final int port)
throws IOException, YarnException {
File scriptFile =
createUnhaltingScriptFile(cId, scriptFileDir, processStartFile);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
NodeId nodeId = BuilderUtils.newNodeId(InetAddress.getByName("localhost")
.getCanonicalHostName(), port);
URL localResourceUri =
URL.fromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource localResource =
recordFactory.newRecordInstance(LocalResource.class);
localResource.setResource(localResourceUri);
localResource.setSize(-1);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setType(LocalResourceType.FILE);
localResource.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, localResource);
containerLaunchContext.setLocalResources(localResources);
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
final InetSocketAddress containerManagerBindAddress =
NetUtils.createSocketAddrForHost("127.0.0.1", port);
UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(cId.toString());
org.apache.hadoop.security.token.Token<NMTokenIdentifier> nmToken =
ConverterUtils.convertFromYarn(
nm.getNMContext().getNMTokenSecretManager()
.createNMToken(cId.getApplicationAttemptId(), nodeId, user),
containerManagerBindAddress);
currentUser.addToken(nmToken);
ContainerManagementProtocol containerManager =
currentUser.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
@Override
public ContainerManagementProtocol run() {
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress containerManagerBindAddress =
NetUtils.createSocketAddrForHost("127.0.0.1", port);
return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
containerManagerBindAddress, conf);
}
});
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
TestContainerManager.createContainerToken(cId, 0,
nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
containerIds.add(cId);
GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
Assert.assertTrue(EnumSet.of(ContainerState.RUNNING)
.contains(containerStatus.getState()));
}
public static ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 0);
return containerId;
}
private YarnConfiguration createNMConfig(int port) throws IOException {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:" + port);
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:"
+ ServerSocketUtil.getPort(49158, 10));
conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS,
"127.0.0.1:" + ServerSocketUtil
.getPort(YarnConfiguration.DEFAULT_NM_WEBAPP_PORT, 10));
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}
private YarnConfiguration createNMConfig() throws IOException {
return createNMConfig(ServerSocketUtil.getPort(49157, 10));
}
/**
* Creates a script to run a container that will run forever unless
* stopped by external means.
*/
private static File createUnhaltingScriptFile(ContainerId cId,
File scriptFileDir, File processStartFile) throws IOException {
File scriptFile = Shell.appendScriptExtension(scriptFileDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
if (Shell.WINDOWS) {
fileWriter.println("@echo \"Running testscript for delayed kill\"");
fileWriter.println("@echo \"Writing pid to start file\"");
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@pause");
} else {
fileWriter.write("#!/bin/bash\n\n");
fileWriter.write("echo \"Running testscript for delayed kill\"\n");
fileWriter.write("hello=\"Got SIGTERM\"\n");
fileWriter.write("umask 0\n");
fileWriter.write("trap \"echo $hello >> " + processStartFile +
"\" SIGTERM\n");
fileWriter.write("echo \"Writing pid to start file\"\n");
fileWriter.write("echo $$ >> " + processStartFile + "\n");
fileWriter.write("while true; do\ndate >> /dev/null;\n done\n");
}
fileWriter.close();
return scriptFile;
}
class TestNodeManager extends NodeManager {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MockNodeStatusUpdater myNodeStatusUpdater =
new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics);
return myNodeStatusUpdater;
}
public void setMasterKey(MasterKey masterKey) {
getNMContext().getContainerTokenSecretManager().setMasterKey(masterKey);
}
}
}