| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ConfigurationException; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; |
| import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.StringUtils; |
| |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.CONTAINER_PRE_LAUNCH_STDERR; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.CONTAINER_PRE_LAUNCH_STDOUT; |
| |
| /** |
| * This class is abstraction of the mechanism used to launch a container on the |
| * underlying OS. All executor implementations must extend ContainerExecutor. |
| */ |
| public abstract class ContainerExecutor implements Configurable { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(ContainerExecutor.class); |
| protected static final String WILDCARD = "*"; |
| |
| /** |
| * The permissions to use when creating the launch script. |
| */ |
| public static final FsPermission TASK_LAUNCH_SCRIPT_PERMISSION = |
| FsPermission.createImmutable((short)0700); |
| |
| /** |
| * The relative path to which debug information will be written. |
| * |
| * @see ContainerLaunch.ShellScriptBuilder#listDebugInformation |
| */ |
| public static final String DIRECTORY_CONTENTS = "directory.info"; |
| |
| private Configuration conf; |
| private final ConcurrentMap<ContainerId, Path> pidFiles = |
| new ConcurrentHashMap<>(); |
| private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| private final ReadLock readLock = lock.readLock(); |
| private final WriteLock writeLock = lock.writeLock(); |
| private String[] whitelistVars; |
| |
| @Override |
| public void setConf(Configuration conf) { |
| this.conf = conf; |
| if (conf != null) { |
| whitelistVars = conf.get(YarnConfiguration.NM_ENV_WHITELIST, |
| YarnConfiguration.DEFAULT_NM_ENV_WHITELIST).split(","); |
| } |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| /** |
| * Run the executor initialization steps. |
| * Verify that the necessary configs and permissions are in place. |
| * |
| * @param nmContext Context of NM |
| * @throws IOException if initialization fails |
| */ |
| public abstract void init(Context nmContext) throws IOException; |
| |
| /** |
| * This function localizes the JAR file on-demand. |
| * On Windows the ContainerLaunch creates a temporary special JAR manifest of |
| * other JARs to workaround the CLASSPATH length. In a secure cluster this |
| * JAR must be localized so that the container has access to it. |
| * The default implementation returns the classpath passed to it, which |
| * is expected to have been created in the node manager's <i>fprivate</i> |
| * folder, which will not work with secure Windows clusters. |
| * |
| * @param jarPath the path to the JAR to localize |
| * @param target the directory where the JAR file should be localized |
| * @param owner the name of the user who should own the localized file |
| * @return the path to the localized JAR file |
| * @throws IOException if localization fails |
| */ |
| public Path localizeClasspathJar(Path jarPath, Path target, String owner) |
| throws IOException { |
| return jarPath; |
| } |
| |
| |
| /** |
| * Prepare the environment for containers in this application to execute. |
| * <pre> |
| * For $x in local.dirs |
| * create $x/$user/$appId |
| * Copy $nmLocal/appTokens {@literal ->} $N/$user/$appId |
| * For $rsrc in private resources |
| * Copy $rsrc {@literal ->} $N/$user/filecache/[idef] |
| * For $rsrc in job resources |
| * Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef |
| * </pre> |
| * |
| * @param ctx LocalizerStartContext that encapsulates necessary information |
| * for starting a localizer. |
| * @throws IOException for most application init failures |
| * @throws InterruptedException if application init thread is halted by NM |
| */ |
| public abstract void startLocalizer(LocalizerStartContext ctx) |
| throws IOException, InterruptedException; |
| |
| |
| /** |
| * Launch the container on the node. This is a blocking call and returns only |
| * when the container exits. |
| * @param ctx Encapsulates information necessary for launching containers. |
| * @return the return status of the launch |
| * @throws IOException if the container launch fails |
| * @throws ConfigurationException if config error was found |
| */ |
| public abstract int launchContainer(ContainerStartContext ctx) throws |
| IOException, ConfigurationException; |
| |
| /** |
| * Signal container with the specified signal. |
| * |
| * @param ctx Encapsulates information necessary for signaling containers. |
| * @return returns true if the operation succeeded |
| * @throws IOException if signaling the container fails |
| */ |
| public abstract boolean signalContainer(ContainerSignalContext ctx) |
| throws IOException; |
| |
| /** |
| * Delete specified directories as a given user. |
| * |
| * @param ctx Encapsulates information necessary for deletion. |
| * @throws IOException if delete fails |
| * @throws InterruptedException if interrupted while waiting for the deletion |
| * operation to complete |
| */ |
| public abstract void deleteAsUser(DeletionAsUserContext ctx) |
| throws IOException, InterruptedException; |
| |
| /** |
| * Create a symlink file which points to the target. |
| * @param target The target for symlink |
| * @param symlink the symlink file |
| * @throws IOException Error when creating symlinks |
| */ |
| public abstract void symLink(String target, String symlink) |
| throws IOException; |
| |
| /** |
| * Check if a container is alive. |
| * @param ctx Encapsulates information necessary for container liveness check. |
| * @return true if container is still alive |
| * @throws IOException if there is a failure while checking the container |
| * status |
| */ |
| public abstract boolean isContainerAlive(ContainerLivenessContext ctx) |
| throws IOException; |
| |
| /** |
| * Recover an already existing container. This is a blocking call and returns |
| * only when the container exits. Note that the container must have been |
| * activated prior to this call. |
| * |
| * @param ctx encapsulates information necessary to reacquire container |
| * @return The exit code of the pre-existing container |
| * @throws IOException if there is a failure while reacquiring the container |
| * @throws InterruptedException if interrupted while waiting to reacquire |
| * the container |
| */ |
| public int reacquireContainer(ContainerReacquisitionContext ctx) |
| throws IOException, InterruptedException { |
| Container container = ctx.getContainer(); |
| String user = ctx.getUser(); |
| ContainerId containerId = ctx.getContainerId(); |
| Path pidPath = getPidFilePath(containerId); |
| |
| if (pidPath == null) { |
| LOG.warn(containerId + " is not active, returning terminated error"); |
| |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| |
| String pid = ProcessIdFileReader.getProcessId(pidPath); |
| |
| if (pid == null) { |
| throw new IOException("Unable to determine pid for " + containerId); |
| } |
| |
| LOG.info("Reacquiring " + containerId + " with pid " + pid); |
| |
| ContainerLivenessContext livenessContext = new ContainerLivenessContext |
| .Builder() |
| .setContainer(container) |
| .setUser(user) |
| .setPid(pid) |
| .build(); |
| |
| while (isContainerAlive(livenessContext)) { |
| Thread.sleep(1000); |
| } |
| |
| // wait for exit code file to appear |
| final int sleepMsec = 100; |
| int msecLeft = 2000; |
| String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString()); |
| File file = new File(exitCodeFile); |
| |
| while (!file.exists() && msecLeft >= 0) { |
| if (!isContainerActive(containerId)) { |
| LOG.info(containerId + " was deactivated"); |
| |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| |
| Thread.sleep(sleepMsec); |
| |
| msecLeft -= sleepMsec; |
| } |
| |
| if (msecLeft < 0) { |
| throw new IOException("Timeout while waiting for exit code from " |
| + containerId); |
| } |
| |
| try { |
| return Integer.parseInt(FileUtils.readFileToString(file).trim()); |
| } catch (NumberFormatException e) { |
| throw new IOException("Error parsing exit code from pid " + pid, e); |
| } |
| } |
| |
| /** |
| * This method writes out the launch environment of a container to the |
| * default container launch script. For the default container script path see |
| * {@link ContainerLaunch#CONTAINER_SCRIPT}. |
| * |
| * @param out the output stream to which the environment is written (usually |
| * a script file which will be executed by the Launcher) |
| * @param environment the environment variables and their values |
| * @param resources the resources which have been localized for this |
| * container. Symlinks will be created to these localized resources |
| * @param command the command that will be run |
| * @param logDir the log dir to which to copy debugging information |
| * @param user the username of the job owner |
| * @throws IOException if any errors happened writing to the OutputStream, |
| * while creating symlinks |
| */ |
| public void writeLaunchEnv(OutputStream out, Map<String, String> environment, |
| Map<Path, List<String>> resources, List<String> command, Path logDir, |
| String user) throws IOException { |
| this.writeLaunchEnv(out, environment, resources, command, logDir, user, |
| ContainerLaunch.CONTAINER_SCRIPT); |
| } |
| |
| /** |
| * This method writes out the launch environment of a container to a specified |
| * path. |
| * |
| * @param out the output stream to which the environment is written (usually |
| * a script file which will be executed by the Launcher) |
| * @param environment the environment variables and their values |
| * @param resources the resources which have been localized for this |
| * container. Symlinks will be created to these localized resources |
| * @param command the command that will be run |
| * @param logDir the log dir to which to copy debugging information |
| * @param user the username of the job owner |
| * @param outFilename the path to which to write the launch environment |
| * @throws IOException if any errors happened writing to the OutputStream, |
| * while creating symlinks |
| */ |
| @VisibleForTesting |
| public void writeLaunchEnv(OutputStream out, Map<String, String> environment, |
| Map<Path, List<String>> resources, List<String> command, Path logDir, |
| String user, String outFilename) throws IOException { |
| updateEnvForWhitelistVars(environment); |
| |
| ContainerLaunch.ShellScriptBuilder sb = |
| ContainerLaunch.ShellScriptBuilder.create(); |
| |
| // Add "set -o pipefail -e" to validate launch_container script. |
| sb.setExitOnFailure(); |
| |
| //Redirect stdout and stderr for launch_container script |
| sb.stdout(logDir, CONTAINER_PRE_LAUNCH_STDOUT); |
| sb.stderr(logDir, CONTAINER_PRE_LAUNCH_STDERR); |
| |
| |
| if (environment != null) { |
| sb.echo("Setting up env variables"); |
| for (Map.Entry<String, String> env : |
| sb.orderEnvByDependencies(environment).entrySet()) { |
| sb.env(env.getKey(), env.getValue()); |
| } |
| } |
| |
| if (resources != null) { |
| sb.echo("Setting up job resources"); |
| for (Map.Entry<Path, List<String>> resourceEntry : |
| resources.entrySet()) { |
| for (String linkName : resourceEntry.getValue()) { |
| if (new Path(linkName).getName().equals(WILDCARD)) { |
| // If this is a wildcarded path, link to everything in the |
| // directory from the working directory |
| for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) { |
| sb.symlink(new Path(wildLink.toString()), |
| new Path(wildLink.getName())); |
| } |
| } else { |
| sb.symlink(resourceEntry.getKey(), new Path(linkName)); |
| } |
| } |
| } |
| } |
| |
| // dump debugging information if configured |
| if (getConf() != null && |
| getConf().getBoolean(YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO, |
| YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO)) { |
| sb.echo("Copying debugging information"); |
| sb.copyDebugInformation(new Path(outFilename), |
| new Path(logDir, outFilename)); |
| sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS)); |
| } |
| sb.echo("Launching container"); |
| sb.command(command); |
| |
| PrintStream pout = null; |
| try { |
| pout = new PrintStream(out, false, "UTF-8"); |
| sb.write(pout); |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| /** |
| * Return the files in the target directory. If retrieving the list of files |
| * requires specific access rights, that access will happen as the |
| * specified user. The list will not include entries for "." or "..". |
| * |
| * @param user the user as whom to access the target directory |
| * @param dir the target directory |
| * @return a list of files in the target directory |
| */ |
| protected File[] readDirAsUser(String user, Path dir) { |
| return new File(dir.toString()).listFiles(); |
| } |
| |
| /** |
| * The container exit code. |
| */ |
| public enum ExitCode { |
| SUCCESS(0), |
| FORCE_KILLED(137), |
| TERMINATED(143), |
| LOST(154); |
| |
| private final int code; |
| |
| private ExitCode(int exitCode) { |
| this.code = exitCode; |
| } |
| |
| /** |
| * Get the exit code as an int. |
| * @return the exit code as an int |
| */ |
| public int getExitCode() { |
| return code; |
| } |
| |
| @Override |
| public String toString() { |
| return String.valueOf(code); |
| } |
| } |
| |
| /** |
| * The constants for the signals. |
| */ |
| public enum Signal { |
| NULL(0, "NULL"), |
| QUIT(3, "SIGQUIT"), |
| KILL(9, "SIGKILL"), |
| TERM(15, "SIGTERM"); |
| |
| private final int value; |
| private final String str; |
| |
| private Signal(int value, String str) { |
| this.str = str; |
| this.value = value; |
| } |
| |
| /** |
| * Get the signal number. |
| * @return the signal number |
| */ |
| public int getValue() { |
| return value; |
| } |
| |
| @Override |
| public String toString() { |
| return str; |
| } |
| } |
| |
| /** |
| * Log each line of the output string as INFO level log messages. |
| * |
| * @param output the output string to log |
| */ |
| protected void logOutput(String output) { |
| String shExecOutput = output; |
| |
| if (shExecOutput != null) { |
| for (String str : shExecOutput.split("\n")) { |
| LOG.info(str); |
| } |
| } |
| } |
| |
| /** |
| * Get the pidFile of the container. |
| * |
| * @param containerId the container ID |
| * @return the path of the pid-file for the given containerId. |
| */ |
| protected Path getPidFilePath(ContainerId containerId) { |
| try { |
| readLock.lock(); |
| return (this.pidFiles.get(containerId)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Return a command line to execute the given command in the OS shell. |
| * On Windows, the {code}groupId{code} parameter can be used to launch |
| * and associate the given GID with a process group. On |
| * non-Windows hosts, the {code}groupId{code} parameter is ignored. |
| * |
| * @param command the command to execute |
| * @param groupId the job owner's GID |
| * @param userName the job owner's username |
| * @param pidFile the path to the container's PID file |
| * @param config the configuration |
| * @return the command line to execute |
| */ |
| protected String[] getRunCommand(String command, String groupId, |
| String userName, Path pidFile, Configuration config) { |
| return getRunCommand(command, groupId, userName, pidFile, config, null); |
| } |
| |
| /** |
| * Return a command line to execute the given command in the OS shell. |
| * On Windows, the {code}groupId{code} parameter can be used to launch |
| * and associate the given GID with a process group. On |
| * non-Windows hosts, the {code}groupId{code} parameter is ignored. |
| * |
| * @param command the command to execute |
| * @param groupId the job owner's GID for Windows. On other operating systems |
| * it is ignored. |
| * @param userName the job owner's username for Windows. On other operating |
| * systems it is ignored. |
| * @param pidFile the path to the container's PID file on Windows. On other |
| * operating systems it is ignored. |
| * @param config the configuration |
| * @param resource on Windows this parameter controls memory and CPU limits. |
| * If null, no limits are set. On other operating systems it is ignored. |
| * @return the command line to execute |
| */ |
| protected String[] getRunCommand(String command, String groupId, |
| String userName, Path pidFile, Configuration config, Resource resource) { |
| if (Shell.WINDOWS) { |
| return getRunCommandForWindows(command, groupId, userName, pidFile, |
| config, resource); |
| } else { |
| return getRunCommandForOther(command, config); |
| } |
| |
| } |
| |
| /** |
| * Return a command line to execute the given command in the OS shell. |
| * The {code}groupId{code} parameter can be used to launch |
| * and associate the given GID with a process group. |
| * |
| * @param command the command to execute |
| * @param groupId the job owner's GID |
| * @param userName the job owner's username |
| * @param pidFile the path to the container's PID file |
| * @param config the configuration |
| * @param resource this parameter controls memory and CPU limits. |
| * If null, no limits are set. |
| * @return the command line to execute |
| */ |
| protected String[] getRunCommandForWindows(String command, String groupId, |
| String userName, Path pidFile, Configuration config, Resource resource) { |
| int cpuRate = -1; |
| int memory = -1; |
| |
| if (resource != null) { |
| if (config.getBoolean( |
| YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, |
| YarnConfiguration. |
| DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) { |
| memory = (int) resource.getMemorySize(); |
| } |
| |
| if (config.getBoolean( |
| YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, |
| YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) { |
| int containerVCores = resource.getVirtualCores(); |
| int nodeVCores = NodeManagerHardwareUtils.getVCores(config); |
| int nodeCpuPercentage = |
| NodeManagerHardwareUtils.getNodeCpuPercentage(config); |
| |
| float containerCpuPercentage = |
| (float)(nodeCpuPercentage * containerVCores) / nodeVCores; |
| |
| // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit |
| // should be set as 20 * 100. |
| cpuRate = Math.min(10000, (int)(containerCpuPercentage * 100)); |
| } |
| } |
| |
| return new String[] { |
| Shell.getWinUtilsPath(), |
| "task", |
| "create", |
| "-m", |
| String.valueOf(memory), |
| "-c", |
| String.valueOf(cpuRate), |
| groupId, |
| "cmd /c " + command |
| }; |
| } |
| |
| /** |
| * Return a command line to execute the given command in the OS shell. |
| * |
| * @param command the command to execute |
| * @param config the configuration |
| * @return the command line to execute |
| */ |
| protected String[] getRunCommandForOther(String command, |
| Configuration config) { |
| List<String> retCommand = new ArrayList<>(); |
| boolean containerSchedPriorityIsSet = false; |
| int containerSchedPriorityAdjustment = |
| YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; |
| |
| if (config.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != |
| null) { |
| containerSchedPriorityIsSet = true; |
| containerSchedPriorityAdjustment = config |
| .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, |
| YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY); |
| } |
| |
| if (containerSchedPriorityIsSet) { |
| retCommand.addAll(Arrays.asList("nice", "-n", |
| Integer.toString(containerSchedPriorityAdjustment))); |
| } |
| |
| retCommand.addAll(Arrays.asList("bash", command)); |
| |
| return retCommand.toArray(new String[retCommand.size()]); |
| } |
| |
| /** |
| * Return whether the container is still active. |
| * |
| * @param containerId the target container's ID |
| * @return true if the container is active |
| */ |
| protected boolean isContainerActive(ContainerId containerId) { |
| try { |
| readLock.lock(); |
| |
| return (this.pidFiles.containsKey(containerId)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Propagate variables from the nodemanager's environment into the |
| * container's environment if unspecified by the container. |
| * @param env the environment to update |
| * @see org.apache.hadoop.yarn.conf.YarnConfiguration#NM_ENV_WHITELIST |
| */ |
| protected void updateEnvForWhitelistVars(Map<String, String> env) { |
| for(String var : whitelistVars) { |
| if (!env.containsKey(var)) { |
| String val = getNMEnvVar(var); |
| if (val != null) { |
| env.put(var, val); |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| protected String getNMEnvVar(String varname) { |
| return System.getenv(varname); |
| } |
| |
| /** |
| * Mark the container as active. |
| * |
| * @param containerId the container ID |
| * @param pidFilePath the path where the executor should write the PID |
| * of the launched process |
| */ |
| public void activateContainer(ContainerId containerId, Path pidFilePath) { |
| try { |
| writeLock.lock(); |
| this.pidFiles.put(containerId, pidFilePath); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| // LinuxContainerExecutor overrides this method and behaves differently. |
| public String[] getIpAndHost(Container container) |
| throws ContainerExecutionException { |
| return getLocalIpAndHost(container); |
| } |
| |
| // ipAndHost[0] contains ip. |
| // ipAndHost[1] contains hostname. |
| public static String[] getLocalIpAndHost(Container container) { |
| String[] ipAndHost = new String[2]; |
| try { |
| InetAddress address = InetAddress.getLocalHost(); |
| ipAndHost[0] = address.getHostAddress(); |
| ipAndHost[1] = address.getHostName(); |
| } catch (UnknownHostException e) { |
| LOG.error("Unable to get Local hostname and ip for " + container |
| .getContainerId(), e); |
| } |
| return ipAndHost; |
| } |
| |
| /** |
| * Mark the container as inactive. For inactive containers this |
| * method has no effect. |
| * |
| * @param containerId the container ID |
| */ |
| public void deactivateContainer(ContainerId containerId) { |
| try { |
| writeLock.lock(); |
| this.pidFiles.remove(containerId); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Pause the container. The default implementation is to raise a kill event. |
| * Specific executor implementations can override this behavior. |
| * @param container |
| * the Container |
| */ |
| public void pauseContainer(Container container) { |
| LOG.warn(container.getContainerId() + " doesn't support pausing."); |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Resume the container from pause state. The default implementation ignores |
| * this event. Specific implementations can override this behavior. |
| * @param container |
| * the Container |
| */ |
| public void resumeContainer(Container container) { |
| LOG.warn(container.getContainerId() + " doesn't support resume."); |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Get the process-identifier for the container. |
| * |
| * @param containerID the container ID |
| * @return the process ID of the container if it has already launched, |
| * or null otherwise |
| */ |
| public String getProcessId(ContainerId containerID) { |
| String pid = null; |
| Path pidFile = pidFiles.get(containerID); |
| |
| // If PID is null, this container hasn't launched yet. |
| if (pidFile != null) { |
| try { |
| pid = ProcessIdFileReader.getProcessId(pidFile); |
| } catch (IOException e) { |
| LOG.error("Got exception reading pid from pid-file " + pidFile, e); |
| } |
| } |
| |
| return pid; |
| } |
| |
| /** |
| * This class will signal a target container after a specified delay. |
| * @see #signalContainer |
| */ |
| public static class DelayedProcessKiller extends Thread { |
| private final Container container; |
| private final String user; |
| private final String pid; |
| private final long delay; |
| private final Signal signal; |
| private final ContainerExecutor containerExecutor; |
| |
| /** |
| * Basic constructor. |
| * |
| * @param container the container to signal |
| * @param user the user as whow to send the signal |
| * @param pid the PID of the container process |
| * @param delayMS the period of time to wait in millis before signaling |
| * the container |
| * @param signal the signal to send |
| * @param containerExecutor the executor to use to send the signal |
| */ |
| public DelayedProcessKiller(Container container, String user, String pid, |
| long delayMS, Signal signal, ContainerExecutor containerExecutor) { |
| this.container = container; |
| this.user = user; |
| this.pid = pid; |
| this.delay = delayMS; |
| this.signal = signal; |
| this.containerExecutor = containerExecutor; |
| setName("Task killer for " + pid); |
| setDaemon(false); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(delay); |
| containerExecutor.signalContainer(new ContainerSignalContext.Builder() |
| .setContainer(container) |
| .setUser(user) |
| .setPid(pid) |
| .setSignal(signal) |
| .build()); |
| } catch (InterruptedException e) { |
| interrupt(); |
| } catch (IOException e) { |
| String message = "Exception when user " + user + " killing task " + pid |
| + " in DelayedProcessKiller: " + StringUtils.stringifyException(e); |
| LOG.warn(message); |
| container.handle(new ContainerDiagnosticsUpdateEvent( |
| container.getContainerId(), message)); |
| } |
| } |
| } |
| } |