| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; |
| import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.StringUtils; |
| |
| public abstract class ContainerExecutor implements Configurable { |
| |
| private static final Log LOG = LogFactory.getLog(ContainerExecutor.class); |
| final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION = |
| FsPermission.createImmutable((short) 0700); |
| |
| private Configuration conf; |
| |
| private ConcurrentMap<ContainerId, Path> pidFiles = |
| new ConcurrentHashMap<ContainerId, Path>(); |
| |
| private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| private final ReadLock readLock = lock.readLock(); |
| private final WriteLock writeLock = lock.writeLock(); |
| |
| @Override |
| public void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| /** |
| * Run the executor initialization steps. |
| * Verify that the necessary configs, permissions are in place. |
| * @throws IOException |
| */ |
| public abstract void init() throws IOException; |
| |
| /** |
| * On Windows the ContainerLaunch creates a temporary special jar manifest of |
| * other jars to workaround the CLASSPATH length. In a secure cluster this |
| * jar must be localized so that the container has access to it. |
| * This function localizes on-demand the jar. |
| * |
| * @param classPathJar |
| * @param owner |
| * @throws IOException |
| */ |
| public Path localizeClasspathJar(Path classPathJar, Path pwd, String owner) |
| throws IOException { |
| // Non-secure executor simply use the classpath created |
| // in the NM fprivate folder |
| return classPathJar; |
| } |
| |
| |
| /** |
| * Prepare the environment for containers in this application to execute. |
| * <pre> |
| * For $x in local.dirs |
| * create $x/$user/$appId |
| * Copy $nmLocal/appTokens {@literal ->} $N/$user/$appId |
| * For $rsrc in private resources |
| * Copy $rsrc {@literal ->} $N/$user/filecache/[idef] |
| * For $rsrc in job resources |
| * Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef |
| * </pre> |
| * @param ctx LocalizerStartContext that encapsulates necessary information |
| * for starting a localizer. |
| * @throws IOException For most application init failures |
| * @throws InterruptedException If application init thread is halted by NM |
| */ |
| public abstract void startLocalizer(LocalizerStartContext ctx) |
| throws IOException, InterruptedException; |
| |
| |
| /** |
| * Launch the container on the node. This is a blocking call and returns only |
| * when the container exits. |
| * @param ctx Encapsulates information necessary for launching containers. |
| * @return the return status of the launch |
| * @throws IOException |
| */ |
| public abstract int launchContainer(ContainerStartContext ctx) throws |
| IOException; |
| |
| /** |
| * Signal container with the specified signal. |
| * @param ctx Encapsulates information necessary for signaling containers. |
| * @return returns true if the operation succeeded |
| * @throws IOException |
| */ |
| public abstract boolean signalContainer(ContainerSignalContext ctx) |
| throws IOException; |
| |
| /** |
| * Delete specified directories as a given user. |
| * @param ctx Encapsulates information necessary for deletion. |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public abstract void deleteAsUser(DeletionAsUserContext ctx) |
| throws IOException, InterruptedException; |
| |
| /** |
| * Check if a container is alive. |
| * @param ctx Encapsulates information necessary for container liveness check. |
| * @return true if container is still alive |
| * @throws IOException |
| */ |
| public abstract boolean isContainerAlive(ContainerLivenessContext ctx) |
| throws IOException; |
| |
| /** |
| * Recover an already existing container. This is a blocking call and returns |
| * only when the container exits. Note that the container must have been |
| * activated prior to this call. |
| * @param ctx encapsulates information necessary to reacquire container |
| * @return The exit code of the pre-existing container |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public int reacquireContainer(ContainerReacquisitionContext ctx) |
| throws IOException, InterruptedException { |
| Container container = ctx.getContainer(); |
| String user = ctx.getUser(); |
| ContainerId containerId = ctx.getContainerId(); |
| |
| |
| Path pidPath = getPidFilePath(containerId); |
| if (pidPath == null) { |
| LOG.warn(containerId + " is not active, returning terminated error"); |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| |
| String pid = null; |
| pid = ProcessIdFileReader.getProcessId(pidPath); |
| if (pid == null) { |
| throw new IOException("Unable to determine pid for " + containerId); |
| } |
| |
| LOG.info("Reacquiring " + containerId + " with pid " + pid); |
| ContainerLivenessContext livenessContext = new ContainerLivenessContext |
| .Builder() |
| .setContainer(container) |
| .setUser(user) |
| .setPid(pid) |
| .build(); |
| while(isContainerAlive(livenessContext)) { |
| Thread.sleep(1000); |
| } |
| |
| // wait for exit code file to appear |
| String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString()); |
| File file = new File(exitCodeFile); |
| final int sleepMsec = 100; |
| int msecLeft = 2000; |
| while (!file.exists() && msecLeft >= 0) { |
| if (!isContainerActive(containerId)) { |
| LOG.info(containerId + " was deactivated"); |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| |
| Thread.sleep(sleepMsec); |
| |
| msecLeft -= sleepMsec; |
| } |
| if (msecLeft < 0) { |
| throw new IOException("Timeout while waiting for exit code from " |
| + containerId); |
| } |
| |
| try { |
| return Integer.parseInt(FileUtils.readFileToString(file).trim()); |
| } catch (NumberFormatException e) { |
| throw new IOException("Error parsing exit code from pid " + pid, e); |
| } |
| } |
| |
| /** |
| * This method writes out the launch environment of a container. This can be |
| * overridden by extending ContainerExecutors to provide different behaviors |
| * @param out the output stream to which the environment is written (usually |
| * a script file which will be executed by the Launcher) |
| * @param environment The environment variables and their values |
| * @param resources The resources which have been localized for this container |
| * Symlinks will be created to these localized resources |
| * @param command The command that will be run. |
| * @throws IOException if any errors happened writing to the OutputStream, |
| * while creating symlinks |
| */ |
| public void writeLaunchEnv(OutputStream out, Map<String, String> environment, |
| Map<Path, List<String>> resources, List<String> command) throws IOException{ |
| ContainerLaunch.ShellScriptBuilder sb = |
| ContainerLaunch.ShellScriptBuilder.create(); |
| Set<String> whitelist = new HashSet<String>(); |
| whitelist.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); |
| whitelist.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name()); |
| whitelist.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name()); |
| whitelist.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name()); |
| whitelist.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); |
| whitelist.add(ApplicationConstants.Environment.JAVA_HOME.name()); |
| if (environment != null) { |
| for (Map.Entry<String,String> env : environment.entrySet()) { |
| if (!whitelist.contains(env.getKey())) { |
| sb.env(env.getKey().toString(), env.getValue().toString()); |
| } else { |
| sb.whitelistedEnv(env.getKey().toString(), env.getValue().toString()); |
| } |
| } |
| } |
| if (resources != null) { |
| for (Map.Entry<Path,List<String>> entry : resources.entrySet()) { |
| for (String linkName : entry.getValue()) { |
| sb.symlink(entry.getKey(), new Path(linkName)); |
| } |
| } |
| } |
| |
| sb.command(command); |
| |
| PrintStream pout = null; |
| try { |
| pout = new PrintStream(out, false, "UTF-8"); |
| sb.write(pout); |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| public enum ExitCode { |
| FORCE_KILLED(137), |
| TERMINATED(143), |
| LOST(154); |
| private final int code; |
| |
| private ExitCode(int exitCode) { |
| this.code = exitCode; |
| } |
| |
| public int getExitCode() { |
| return code; |
| } |
| |
| @Override |
| public String toString() { |
| return String.valueOf(code); |
| } |
| } |
| |
| /** |
| * The constants for the signals. |
| */ |
| public enum Signal { |
| NULL(0, "NULL"), QUIT(3, "SIGQUIT"), |
| KILL(9, "SIGKILL"), TERM(15, "SIGTERM"); |
| private final int value; |
| private final String str; |
| private Signal(int value, String str) { |
| this.str = str; |
| this.value = value; |
| } |
| public int getValue() { |
| return value; |
| } |
| @Override |
| public String toString() { |
| return str; |
| } |
| } |
| |
| protected void logOutput(String output) { |
| String shExecOutput = output; |
| if (shExecOutput != null) { |
| for (String str : shExecOutput.split("\n")) { |
| LOG.info(str); |
| } |
| } |
| } |
| |
| /** |
| * Get the pidFile of the container. |
| * @param containerId |
| * @return the path of the pid-file for the given containerId. |
| */ |
| protected Path getPidFilePath(ContainerId containerId) { |
| try { |
| readLock.lock(); |
| return (this.pidFiles.get(containerId)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| protected String[] getRunCommand(String command, String groupId, |
| String userName, Path pidFile, Configuration conf) { |
| return getRunCommand(command, groupId, userName, pidFile, conf, null); |
| } |
| |
| /** |
| * Return a command to execute the given command in OS shell. |
| * On Windows, the passed in groupId can be used to launch |
| * and associate the given groupId in a process group. On |
| * non-Windows, groupId is ignored. |
| */ |
| protected String[] getRunCommand(String command, String groupId, |
| String userName, Path pidFile, Configuration conf, Resource resource) { |
| boolean containerSchedPriorityIsSet = false; |
| int containerSchedPriorityAdjustment = |
| YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; |
| |
| if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != |
| null) { |
| containerSchedPriorityIsSet = true; |
| containerSchedPriorityAdjustment = conf |
| .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, |
| YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY); |
| } |
| |
| if (Shell.WINDOWS) { |
| int cpuRate = -1; |
| int memory = -1; |
| if (resource != null) { |
| if (conf |
| .getBoolean( |
| YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, |
| YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) { |
| memory = resource.getMemory(); |
| } |
| |
| if (conf.getBoolean( |
| YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, |
| YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) { |
| int containerVCores = resource.getVirtualCores(); |
| int nodeVCores = NodeManagerHardwareUtils.getVCores(conf); |
| int nodeCpuPercentage = |
| NodeManagerHardwareUtils.getNodeCpuPercentage(conf); |
| |
| float containerCpuPercentage = |
| (float) (nodeCpuPercentage * containerVCores) / nodeVCores; |
| |
| // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit |
| // should be set as 20 * 100. |
| cpuRate = Math.min(10000, (int) (containerCpuPercentage * 100)); |
| } |
| } |
| return new String[] { Shell.WINUTILS, "task", "create", "-m", |
| String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId, |
| "cmd /c " + command }; |
| } else { |
| List<String> retCommand = new ArrayList<String>(); |
| if (containerSchedPriorityIsSet) { |
| retCommand.addAll(Arrays.asList("nice", "-n", |
| Integer.toString(containerSchedPriorityAdjustment))); |
| } |
| retCommand.addAll(Arrays.asList("bash", command)); |
| return retCommand.toArray(new String[retCommand.size()]); |
| } |
| |
| } |
| |
| /** |
| * Is the container still active? |
| * @param containerId |
| * @return true if the container is active else false. |
| */ |
| protected boolean isContainerActive(ContainerId containerId) { |
| try { |
| readLock.lock(); |
| return (this.pidFiles.containsKey(containerId)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Mark the container as active |
| * |
| * @param containerId |
| * the ContainerId |
| * @param pidFilePath |
| * Path where the executor should write the pid of the launched |
| * process |
| */ |
| public void activateContainer(ContainerId containerId, Path pidFilePath) { |
| try { |
| writeLock.lock(); |
| this.pidFiles.put(containerId, pidFilePath); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Mark the container as inactive. |
| * Done iff the container is still active. Else treat it as |
| * a no-op |
| */ |
| public void deactivateContainer(ContainerId containerId) { |
| try { |
| writeLock.lock(); |
| this.pidFiles.remove(containerId); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Get the process-identifier for the container |
| * |
| * @param containerID |
| * @return the processid of the container if it has already launched, |
| * otherwise return null |
| */ |
| public String getProcessId(ContainerId containerID) { |
| String pid = null; |
| Path pidFile = pidFiles.get(containerID); |
| if (pidFile == null) { |
| // This container isn't even launched yet. |
| return pid; |
| } |
| try { |
| pid = ProcessIdFileReader.getProcessId(pidFile); |
| } catch (IOException e) { |
| LOG.error("Got exception reading pid from pid-file " + pidFile, e); |
| } |
| return pid; |
| } |
| |
| public static class DelayedProcessKiller extends Thread { |
| private Container container; |
| private final String user; |
| private final String pid; |
| private final long delay; |
| private final Signal signal; |
| private final ContainerExecutor containerExecutor; |
| |
| public DelayedProcessKiller(Container container, String user, String pid, |
| long delay, Signal signal, ContainerExecutor containerExecutor) { |
| this.container = container; |
| this.user = user; |
| this.pid = pid; |
| this.delay = delay; |
| this.signal = signal; |
| this.containerExecutor = containerExecutor; |
| setName("Task killer for " + pid); |
| setDaemon(false); |
| } |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(delay); |
| containerExecutor.signalContainer(new ContainerSignalContext.Builder() |
| .setContainer(container) |
| .setUser(user) |
| .setPid(pid) |
| .setSignal(signal) |
| .build()); |
| } catch (InterruptedException e) { |
| return; |
| } catch (IOException e) { |
| String message = "Exception when user " + user + " killing task " + pid |
| + " in DelayedProcessKiller: " + StringUtils.stringifyException(e); |
| LOG.warn(message); |
| container.handle(new ContainerDiagnosticsUpdateEvent(container |
| .getContainerId(), message)); |
| } |
| } |
| } |
| } |