blob: b06788341fe5c71bf998fa5f9a9cedcaccd41c45 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
public static final String CONTAINER_SCRIPT = "launch_container.sh";
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private static final String PID_FILE_NAME_FMT = "%s.pid";
private final Dispatcher dispatcher;
private final ContainerExecutor exec;
private final Application app;
private final Container container;
private final Configuration conf;
private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
private volatile AtomicBoolean completed = new AtomicBoolean(false);
private long sleepDelayBeforeSigKill = 250;
private long maxKillWaitTime = 2000;
private Path pidFilePath = null;
private final LocalDirsHandlerService dirsHandler;
public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
ContainerExecutor exec, Application app, Container container,
LocalDirsHandlerService dirsHandler) {
this.conf = configuration;
this.app = app;
this.exec = exec;
this.container = container;
this.dispatcher = dispatcher;
this.dirsHandler = dirsHandler;
this.sleepDelayBeforeSigKill =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
this.maxKillWaitTime =
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
}
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,List<String>> localResources =
container.getLocalizedResources();
ContainerId containerID = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser();
final List<String> command = launchContext.getCommands();
int ret = -1;
try {
// /////////////////////////// Variable expansion
// Before the container script gets written out.
List<String> newCmds = new ArrayList<String>(command.size());
String appIdStr = app.getAppId().toString();
Path containerLogDir =
dirsHandler.getLogPathForWrite(ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr), false);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath()));
}
launchContext.setCommands(newCmds);
Map<String, String> environment = launchContext.getEnvironment();
// Make a copy of env to iterate & do variable expansion
for (Entry<String, String> entry : environment.entrySet()) {
String value = entry.getValue();
entry.setValue(
value.replace(
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath())
);
}
// /////////////////////////// End of variable expansion
FileContext lfs = FileContext.getLocalFSFileContext();
Path nmPrivateContainerScriptPath =
dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ CONTAINER_SCRIPT);
Path nmPrivateTokensPath =
dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr)
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr));
DataOutputStream containerScriptOutStream = null;
DataOutputStream tokensOutStream = null;
// Select the working directory for the container
Path containerWorkDir =
dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+ Path.SEPARATOR + user + Path.SEPARATOR
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, false);
String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
pidFilePath = dirsHandler.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ pidFileSuffix);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
if (!dirsHandler.areDisksHealthy()) {
ret = YarnConfiguration.DISKS_FAILED;
throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport());
}
try {
// /////////// Write out the container-script in the nmPrivate space.
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appIdStr));
}
containerScriptOutStream =
lfs.create(nmPrivateContainerScriptPath,
EnumSet.of(CREATE, OVERWRITE));
// Set the token location too.
environment.put(
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs);
// Write out the environment
writeLaunchEnv(containerScriptOutStream, environment, localResources,
launchContext.getCommands());
// /////////// End of writing out container-script
// /////////// Write out the container-tokens in the nmPrivate space.
tokensOutStream =
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE));
Credentials creds = container.getCredentials();
creds.writeTokenStorageToStream(tokensOutStream);
// /////////// End of writing out container-tokens
} finally {
IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream);
}
// LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
containerID,
ContainerEventType.CONTAINER_LAUNCHED));
// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
LOG.info("Container " + containerIdStr + " not launched as "
+ "cleanup already called");
ret = ExitCode.TERMINATED.getExitCode();
}
else {
exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
}
} catch (Throwable e) {
LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
completed.set(true);
exec.deactivateContainer(containerID);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Container " + containerIdStr + " completed with exit code "
+ ret);
}
if (ret == ExitCode.FORCE_KILLED.getExitCode()
|| ret == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method.
dispatcher.getEventHandler().handle(
new ContainerExitEvent(launchContext.getContainerId(),
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret,
"Container exited with a non-zero exit code " + ret));
return ret;
}
if (ret != 0) {
LOG.warn("Container exited with a non-zero exit code " + ret);
this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
"Container exited with a non-zero exit code " + ret));
return ret;
}
LOG.info("Container " + containerIdStr + " succeeded ");
dispatcher.getEventHandler().handle(
new ContainerEvent(launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
}
/**
* Cleanup the container.
* Cancels the launch if launch has not started yet or signals
* the executor to not execute the process if not already done so.
* Also, sends a SIGTERM followed by a SIGKILL to the process if
* the process id is available.
* @throws IOException
*/
public void cleanupContainer() throws IOException {
ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr);
// launch flag will be set to true if process already launched
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " No cleanup needed to be done");
return;
}
LOG.debug("Marking container " + containerIdStr + " as inactive");
// this should ensure that if the container process has not launched
// by this time, it will never be launched
exec.deactivateContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Getting pid for container " + containerIdStr + " to kill"
+ " from pid file "
+ (pidFilePath != null ? pidFilePath.toString() : "null"));
}
// however the container process may have already started
try {
// get process id from pid file if available
// else if shell is still active, get it from the shell
String processId = null;
if (pidFilePath != null) {
processId = getContainerPid(pidFilePath);
}
// kill process
if (processId != null) {
String user = container.getLaunchContext().getUser();
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);
if (sleepDelayBeforeSigKill > 0) {
boolean result = exec.signalContainer(user,
processId, Signal.TERM);
LOG.debug("Sent signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result? "success" : "failed"));
new DelayedProcessKiller(user,
processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}
}
} catch (Exception e) {
LOG.warn("Got error when trying to cleanup container " + containerIdStr
+ ", error=" + e.getMessage());
} finally {
// cleanup pid file if present
if (pidFilePath != null) {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(pidFilePath, false);
}
}
}
/**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
* @return Process ID
* @throws Exception
*/
private String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr =
ConverterUtils.toString(container.getContainerID());
String processId = null;
LOG.debug("Accessing pid for container " + containerIdStr
+ " from pid file " + pidFilePath);
int sleepCounter = 0;
final int sleepInterval = 100;
// loop waiting for pid file to show up
// until either the completed flag is set which means something bad
// happened or our timer expires in which case we admit defeat
while (!completed.get()) {
processId = ProcessIdFileReader.getProcessId(pidFilePath);
if (processId != null) {
LOG.debug("Got pid " + processId + " for container "
+ containerIdStr);
break;
}
else if ((sleepCounter*sleepInterval) > maxKillWaitTime) {
LOG.info("Could not get pid for " + containerIdStr
+ ". Waited for " + maxKillWaitTime + " ms.");
break;
}
else {
++sleepCounter;
Thread.sleep(sleepInterval);
}
}
return processId;
}
public static String getRelativeContainerLogDir(String appIdStr,
String containerIdStr) {
return appIdStr + Path.SEPARATOR + containerIdStr;
}
private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ Path.SEPARATOR;
}
private String getAppPrivateDir(String appIdStr) {
return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ appIdStr;
}
private static class ShellScriptBuilder {
private final StringBuilder sb;
public ShellScriptBuilder() {
this(new StringBuilder("#!/bin/bash\n\n"));
}
protected ShellScriptBuilder(StringBuilder sb) {
this.sb = sb;
}
public ShellScriptBuilder env(String key, String value) {
line("export ", key, "=\"", value, "\"");
return this;
}
public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
return symlink(src, new Path(dst));
}
public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
if (!src.isAbsolute()) {
throw new IOException("Source must be absolute");
}
if (dst.isAbsolute()) {
throw new IOException("Destination must be relative");
}
if (dst.toUri().getPath().indexOf('/') != -1) {
line("mkdir -p ", dst.getParent().toString());
}
line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
return this;
}
public void write(PrintStream out) throws IOException {
out.append(sb);
}
public void line(String... command) {
for (String s : command) {
sb.append(s);
}
sb.append("\n");
}
@Override
public String toString() {
return sb.toString();
}
}
private static void putEnvIfNotNull(
Map<String, String> environment, String variable, String value) {
if (value != null) {
environment.put(variable, value);
}
}
private static void putEnvIfAbsent(
Map<String, String> environment, String variable) {
if (environment.get(variable) == null) {
putEnvIfNotNull(environment, variable, System.getenv(variable));
}
}
public void sanitizeEnv(Map<String, String> environment,
Path pwd, List<Path> appDirs) {
/**
* Non-modifiable environment variables
*/
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
putEnvIfNotNull(environment,
Environment.LOGNAME.name(),container.getUser());
putEnvIfNotNull(environment,
Environment.HOME.name(),
conf.get(
YarnConfiguration.NM_USER_HOME_DIR,
YarnConfiguration.DEFAULT_NM_USER_HOME_DIR
)
);
putEnvIfNotNull(environment, Environment.PWD.name(), pwd.toString());
putEnvIfNotNull(environment,
Environment.HADOOP_CONF_DIR.name(),
System.getenv(Environment.HADOOP_CONF_DIR.name())
);
putEnvIfNotNull(environment,
ApplicationConstants.LOCAL_DIR_ENV,
StringUtils.join(",", appDirs)
);
if (!Shell.WINDOWS) {
environment.put("JVM_PID", "$$");
}
/**
* Modifiable environment variables
*/
// allow containers to override these variables
String[] whitelist = conf.get(YarnConfiguration.NM_ENV_WHITELIST, YarnConfiguration.DEFAULT_NM_ENV_WHITELIST).split(",");
for(String whitelistEnvVariable : whitelist) {
putEnvIfAbsent(environment, whitelistEnvVariable.trim());
}
// variables here will be forced in, even if the container has specified them.
Apps.setEnvFromInputString(
environment,
conf.get(
YarnConfiguration.NM_ADMIN_USER_ENV,
YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV)
);
}
static void writeLaunchEnv(OutputStream out,
Map<String,String> environment, Map<Path,List<String>> resources,
List<String> command)
throws IOException {
ShellScriptBuilder sb = new ShellScriptBuilder();
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
sb.env(env.getKey().toString(), env.getValue().toString());
}
}
if (resources != null) {
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), linkName);
}
}
}
ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
cmd.add("exec /bin/bash ");
cmd.add("-c ");
cmd.add("\"");
for (String cs : command) {
cmd.add(cs.toString());
cmd.add(" ");
}
cmd.add("\"");
sb.line(cmd.toArray(new String[cmd.size()]));
PrintStream pout = null;
try {
pout = new PrintStream(out);
sb.write(pout);
} finally {
if (out != null) {
out.close();
}
}
}
}