| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPID; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPLICATION_LOCAL_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_ID_STR; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_LAUNCH_PREFIX_COMMANDS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_LOCAL_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_LOG_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_RUN_CMDS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_WORK_DIR; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.FILECACHE_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.LOCALIZED_RESOURCES; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.LOCAL_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.LOG_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_CONTAINER_SCRIPT_PATH; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_KEYSTORE_PATH; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_TOKENS_PATH; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_TRUSTSTORE_PATH; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.PID; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.PID_FILE_PATH; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.TC_COMMAND_FILE; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS; |
| import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_LOCAL_DIRS; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.regex.Pattern; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ConfigurationException; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DefaultLinuxContainerRuntime; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DelegatingLinuxContainerRuntime; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.OCIContainerRuntime; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; |
| import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; |
| import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * <p>This class provides {@link Container} execution using a native |
| * {@code container-executor} binary. By using a helper written it native code, |
| * this class is able to do several things that the |
| * {@link DefaultContainerExecutor} cannot, such as execution of applications |
| * as the applications' owners, provide localization that takes advantage of |
| * mapping the application owner to a UID on the execution host, resource |
| * management through Linux CGROUPS, and Docker support.</p> |
| * |
| * <p>If {@code hadoop.security.authentication} is set to {@code simple}, |
| * then the |
| * {@code yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users} |
| * property will determine whether the {@code LinuxContainerExecutor} runs |
| * processes as the application owner or as the default user, as set in the |
| * {@code yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user} |
| * property.</p> |
| * |
| * <p>The {@code LinuxContainerExecutor} will manage applications through an |
| * appropriate {@link LinuxContainerRuntime} instance. This class uses a |
| * {@link DelegatingLinuxContainerRuntime} instance, which will delegate calls |
| * to either a {@link DefaultLinuxContainerRuntime} instance or a |
| * {@link OCIContainerRuntime} instance, depending on the job's |
| * configuration.</p> |
| * |
| * @see LinuxContainerRuntime |
| * @see DelegatingLinuxContainerRuntime |
| * @see DefaultLinuxContainerRuntime |
| * @see DockerLinuxContainerRuntime |
| * @see OCIContainerRuntime#isOCICompliantContainerRequested |
| */ |
| public class LinuxContainerExecutor extends ContainerExecutor { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(LinuxContainerExecutor.class); |
| |
| private String nonsecureLocalUser; |
| private Pattern nonsecureLocalUserPattern; |
| private LCEResourcesHandler resourcesHandler; |
| private boolean containerSchedPriorityIsSet = false; |
| private int containerSchedPriorityAdjustment = 0; |
| private boolean containerLimitUsers; |
| private ResourceHandler resourceHandlerChain; |
| private LinuxContainerRuntime linuxContainerRuntime; |
| private Context nmContext; |
| |
| /** |
| * The container exit code. |
| */ |
| public enum ExitCode { |
| SUCCESS(0), |
| INVALID_ARGUMENT_NUMBER(1), |
| INVALID_COMMAND_PROVIDED(3), |
| INVALID_NM_ROOT_DIRS(5), |
| SETUID_OPER_FAILED(6), |
| UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7), |
| UNABLE_TO_SIGNAL_CONTAINER(8), |
| INVALID_CONTAINER_PID(9), |
| OUT_OF_MEMORY(18), |
| INITIALIZE_USER_FAILED(20), |
| PATH_TO_DELETE_IS_NULL(21), |
| INVALID_CONTAINER_EXEC_PERMISSIONS(22), |
| INVALID_CONFIG_FILE(24), |
| SETSID_OPER_FAILED(25), |
| WRITE_PIDFILE_FAILED(26), |
| WRITE_CGROUP_FAILED(27), |
| TRAFFIC_CONTROL_EXECUTION_FAILED(28), |
| DOCKER_RUN_FAILED(29), |
| ERROR_OPENING_DOCKER_FILE(30), |
| ERROR_READING_DOCKER_FILE(31), |
| FEATURE_DISABLED(32), |
| COULD_NOT_CREATE_SCRIPT_COPY(33), |
| COULD_NOT_CREATE_CREDENTIALS_FILE(34), |
| COULD_NOT_CREATE_WORK_DIRECTORIES(35), |
| COULD_NOT_CREATE_APP_LOG_DIRECTORIES(36), |
| COULD_NOT_CREATE_TMP_DIRECTORIES(37), |
| ERROR_CREATE_CONTAINER_DIRECTORIES_ARGUMENTS(38); |
| |
| private final int code; |
| |
| ExitCode(int exitCode) { |
| this.code = exitCode; |
| } |
| |
| /** |
| * Get the exit code as an int. |
| * @return the exit code as an int |
| */ |
| public int getExitCode() { |
| return code; |
| } |
| |
| @Override |
| public String toString() { |
| return String.valueOf(code); |
| } |
| } |
| |
| /** |
| * Default constructor to allow for creation through reflection. |
| */ |
| public LinuxContainerExecutor() { |
| } |
| |
| /** |
| * Create a LinuxContainerExecutor with a provided |
| * {@link LinuxContainerRuntime}. Used primarily for testing. |
| * |
| * @param linuxContainerRuntime the runtime to use |
| */ |
| public LinuxContainerExecutor(LinuxContainerRuntime linuxContainerRuntime) { |
| this.linuxContainerRuntime = linuxContainerRuntime; |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| super.setConf(conf); |
| |
| resourcesHandler = getResourcesHandler(conf); |
| |
| containerSchedPriorityIsSet = false; |
| if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) |
| != null) { |
| containerSchedPriorityIsSet = true; |
| containerSchedPriorityAdjustment = conf |
| .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, |
| YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY); |
| } |
| nonsecureLocalUser = conf.get( |
| YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, |
| YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER); |
| nonsecureLocalUserPattern = Pattern.compile( |
| conf.get(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY, |
| YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_USER_PATTERN)); |
| containerLimitUsers = conf.getBoolean( |
| YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, |
| YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LIMIT_USERS); |
| if (!containerLimitUsers) { |
| LOG.warn("{}: impersonation without authentication enabled", |
| YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS); |
| } |
| } |
| |
| private LCEResourcesHandler getResourcesHandler(Configuration conf) { |
| LCEResourcesHandler handler = ReflectionUtils.newInstance( |
| conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER, |
| DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf); |
| |
| // Stop using CgroupsLCEResourcesHandler |
| // use the resource handler chain instead |
| // ResourceHandlerModule will create the cgroup cpu module if |
| // CgroupsLCEResourcesHandler is set |
| if (handler instanceof CgroupsLCEResourcesHandler) { |
| handler = |
| ReflectionUtils.newInstance(DefaultLCEResourcesHandler.class, conf); |
| } |
| handler.setConf(conf); |
| return handler; |
| } |
| |
| void verifyUsernamePattern(String user) { |
| if (!UserGroupInformation.isSecurityEnabled() && |
| !nonsecureLocalUserPattern.matcher(user).matches()) { |
| throw new IllegalArgumentException("Invalid user name '" + user + "'," + |
| " it must match '" + nonsecureLocalUserPattern.pattern() + "'"); |
| } |
| } |
| |
| String getRunAsUser(String user) { |
| if (UserGroupInformation.isSecurityEnabled() || |
| !containerLimitUsers) { |
| return user; |
| } else { |
| return nonsecureLocalUser; |
| } |
| } |
| |
| /** |
| * Get the path to the {@code container-executor} binary. The path will |
| * be absolute. |
| * |
| * @param conf the {@link Configuration} |
| * @return the path to the {@code container-executor} binary |
| */ |
| protected String getContainerExecutorExecutablePath(Configuration conf) { |
| String yarnHomeEnvVar = |
| System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key()); |
| File hadoopBin = new File(yarnHomeEnvVar, "bin"); |
| String defaultPath = |
| new File(hadoopBin, "container-executor").getAbsolutePath(); |
| return null == conf |
| ? defaultPath |
| : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, |
| defaultPath); |
| } |
| |
| /** |
| * Add a niceness level to the process that will be executed. Adds |
| * {@code -n <nice>} to the given command. The niceness level will be |
| * taken from the |
| * {@link YarnConfiguration#NM_CONTAINER_EXECUTOR_SCHED_PRIORITY} property. |
| * |
| * @param command the command to which to add the niceness setting. |
| */ |
| protected void addSchedPriorityCommand(List<String> command) { |
| if (containerSchedPriorityIsSet) { |
| command.addAll(Arrays.asList("nice", "-n", |
| Integer.toString(containerSchedPriorityAdjustment))); |
| } |
| } |
| |
| protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() { |
| return PrivilegedOperationExecutor.getInstance(getConf()); |
| } |
| |
| @Override |
| public void init(Context context) throws IOException { |
| Configuration conf = super.getConf(); |
| this.nmContext = context; |
| |
| // Send command to executor which will just start up, |
| // verify configuration/permissions and exit |
| try { |
| PrivilegedOperation checkSetupOp = new PrivilegedOperation( |
| PrivilegedOperation.OperationType.CHECK_SETUP); |
| PrivilegedOperationExecutor privilegedOperationExecutor = |
| getPrivilegedOperationExecutor(); |
| |
| privilegedOperationExecutor.executePrivilegedOperation(checkSetupOp, |
| false); |
| } catch (PrivilegedOperationException e) { |
| int exitCode = e.getExitCode(); |
| LOG.warn("Exit code from container executor initialization is : {}", |
| exitCode, e); |
| |
| throw new IOException("Linux container executor not configured properly" |
| + " (error=" + exitCode + ")", e); |
| } |
| |
| try { |
| resourceHandlerChain = ResourceHandlerModule |
| .getConfiguredResourceHandlerChain(conf, nmContext); |
| LOG.debug("Resource handler chain enabled = {}", |
| (resourceHandlerChain != null)); |
| if (resourceHandlerChain != null) { |
| LOG.debug("Bootstrapping resource handler chain: {}", |
| resourceHandlerChain); |
| resourceHandlerChain.bootstrap(conf); |
| } |
| } catch (ResourceHandlerException e) { |
| LOG.error("Failed to bootstrap configured resource subsystems! ", e); |
| throw new IOException( |
| "Failed to bootstrap configured resource subsystems!"); |
| } |
| |
| try { |
| if (linuxContainerRuntime == null) { |
| LinuxContainerRuntime runtime = new DelegatingLinuxContainerRuntime(); |
| |
| runtime.initialize(conf, nmContext); |
| this.linuxContainerRuntime = runtime; |
| } |
| } catch (ContainerExecutionException e) { |
| LOG.error("Failed to initialize linux container runtime(s)!", e); |
| throw new IOException("Failed to initialize linux container runtime(s)!"); |
| } |
| |
| resourcesHandler.init(this); |
| } |
| |
| @Override |
| public void start() { |
| super.start(); |
| linuxContainerRuntime.start(); |
| } |
| |
| @Override |
| public void stop() { |
| super.stop(); |
| linuxContainerRuntime.stop(); |
| } |
| |
| @Override |
| public void startLocalizer(LocalizerStartContext ctx) |
| throws IOException, InterruptedException { |
| Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens(); |
| InetSocketAddress nmAddr = ctx.getNmAddr(); |
| String user = ctx.getUser(); |
| String appId = ctx.getAppId(); |
| String locId = ctx.getLocId(); |
| LocalDirsHandlerService dirsHandler = ctx.getDirsHandler(); |
| List<String> localDirs = dirsHandler.getLocalDirs(); |
| List<String> logDirs = dirsHandler.getLogDirs(); |
| |
| verifyUsernamePattern(user); |
| String runAsUser = getRunAsUser(user); |
| PrivilegedOperation initializeContainerOp = new PrivilegedOperation( |
| PrivilegedOperation.OperationType.INITIALIZE_CONTAINER); |
| List<String> prefixCommands = new ArrayList<>(); |
| |
| addSchedPriorityCommand(prefixCommands); |
| initializeContainerOp.appendArgs( |
| runAsUser, |
| user, |
| Integer.toString( |
| PrivilegedOperation.RunAsUserCommand.INITIALIZE_CONTAINER |
| .getValue()), |
| appId, |
| locId, |
| nmPrivateContainerTokensPath.toUri().getPath().toString(), |
| StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR, |
| localDirs), |
| StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR, |
| logDirs)); |
| |
| File jvm = // use same jvm as parent |
| new File(new File(System.getProperty("java.home"), "bin"), "java"); |
| initializeContainerOp.appendArgs(jvm.toString()); |
| initializeContainerOp.appendArgs("-classpath"); |
| initializeContainerOp.appendArgs(System.getProperty("java.class.path")); |
| String javaLibPath = System.getProperty("java.library.path"); |
| if (javaLibPath != null) { |
| initializeContainerOp.appendArgs("-Djava.library.path=" + javaLibPath); |
| } |
| |
| initializeContainerOp.appendArgs(ContainerLocalizer.getJavaOpts(getConf())); |
| |
| List<String> localizerArgs = new ArrayList<>(); |
| |
| buildMainArgs(localizerArgs, user, appId, locId, nmAddr, |
| nmPrivateContainerTokensPath.getName(), localDirs); |
| Path containerLogDir = getContainerLogDir(dirsHandler, appId, locId); |
| localizerArgs = replaceWithContainerLogDir(localizerArgs, containerLogDir); |
| |
| initializeContainerOp.appendArgs(localizerArgs); |
| |
| try { |
| Configuration conf = super.getConf(); |
| PrivilegedOperationExecutor privilegedOperationExecutor = |
| getPrivilegedOperationExecutor(); |
| |
| privilegedOperationExecutor.executePrivilegedOperation(prefixCommands, |
| initializeContainerOp, null, null, false, true); |
| |
| } catch (PrivilegedOperationException e) { |
| int exitCode = e.getExitCode(); |
| LOG.warn("Exit code from container {} startLocalizer is : {}", |
| locId, exitCode, e); |
| |
| throw new IOException("Application " + appId + " initialization failed" + |
| " (exitCode=" + exitCode + ") with output: " + e.getOutput(), e); |
| } |
| } |
| |
| private List<String> replaceWithContainerLogDir(List<String> commands, |
| Path containerLogDir) { |
| List<String> newCmds = new ArrayList<>(commands.size()); |
| |
| for (String item : commands) { |
| newCmds.add(item.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR, |
| containerLogDir.toString())); |
| } |
| |
| return newCmds; |
| } |
| |
| private Path getContainerLogDir(LocalDirsHandlerService dirsHandler, |
| String appId, String containerId) throws IOException { |
| String relativeContainerLogDir = ContainerLaunch |
| .getRelativeContainerLogDir(appId, containerId); |
| |
| return dirsHandler.getLogPathForWrite(relativeContainerLogDir, |
| false); |
| } |
| |
| /** |
| * Set up the {@link ContainerLocalizer}. |
| * |
| * @param command the current ShellCommandExecutor command line |
| * @param user localization user |
| * @param appId localized app id |
| * @param locId localizer id |
| * @param nmAddr nodemanager address |
| * @param localDirs list of local dirs |
| * @see ContainerLocalizer#buildMainArgs |
| */ |
| @VisibleForTesting |
| public void buildMainArgs(List<String> command, String user, String appId, |
| String locId, InetSocketAddress nmAddr, String tokenFileName, |
| List<String> localDirs) { |
| ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, |
| tokenFileName, localDirs, super.getConf()); |
| } |
| |
| @Override |
| public void prepareContainer(ContainerPrepareContext ctx) throws IOException { |
| |
| ContainerRuntimeContext.Builder builder = |
| new ContainerRuntimeContext.Builder(ctx.getContainer()); |
| |
| builder.setExecutionAttribute(LOCALIZED_RESOURCES, |
| ctx.getLocalizedResources()) |
| .setExecutionAttribute(USER, ctx.getUser()) |
| .setExecutionAttribute(CONTAINER_LOCAL_DIRS, |
| ctx.getContainerLocalDirs()) |
| .setExecutionAttribute(CONTAINER_RUN_CMDS, ctx.getCommands()) |
| .setExecutionAttribute(CONTAINER_ID_STR, |
| ctx.getContainer().getContainerId().toString()); |
| |
| try { |
| linuxContainerRuntime.prepareContainer(builder.build()); |
| } catch (ContainerExecutionException e) { |
| throw new IOException("Unable to prepare container: ", e); |
| } |
| } |
| |
| @Override |
| public int launchContainer(ContainerStartContext ctx) |
| throws IOException, ConfigurationException { |
| return handleLaunchForLaunchType(ctx, |
| ApplicationConstants.ContainerLaunchType.LAUNCH); |
| } |
| |
| @Override |
| public int relaunchContainer(ContainerStartContext ctx) |
| throws IOException, ConfigurationException { |
| return handleLaunchForLaunchType(ctx, |
| ApplicationConstants.ContainerLaunchType.RELAUNCH); |
| } |
| |
| private int handleLaunchForLaunchType(ContainerStartContext ctx, |
| ApplicationConstants.ContainerLaunchType type) throws IOException, |
| ConfigurationException { |
| Container container = ctx.getContainer(); |
| String user = ctx.getUser(); |
| |
| verifyUsernamePattern(user); |
| |
| ContainerId containerId = container.getContainerId(); |
| |
| resourcesHandler.preExecute(containerId, |
| container.getResource()); |
| String resourcesOptions = resourcesHandler.getResourcesOption(containerId); |
| String tcCommandFile = null; |
| List<String> numaArgs = null; |
| |
| try { |
| if (resourceHandlerChain != null) { |
| List<PrivilegedOperation> ops = resourceHandlerChain |
| .preStart(container); |
| |
| if (ops != null) { |
| List<PrivilegedOperation> resourceOps = new ArrayList<>(); |
| |
| resourceOps.add(new PrivilegedOperation( |
| PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, |
| resourcesOptions)); |
| |
| for (PrivilegedOperation op : ops) { |
| switch (op.getOperationType()) { |
| case ADD_PID_TO_CGROUP: |
| resourceOps.add(op); |
| break; |
| case TC_MODIFY_STATE: |
| tcCommandFile = op.getArguments().get(0); |
| break; |
| case ADD_NUMA_PARAMS: |
| numaArgs = op.getArguments(); |
| break; |
| default: |
| LOG.warn("PrivilegedOperation type unsupported in launch: {}", |
| op.getOperationType()); |
| } |
| } |
| |
| if (resourceOps.size() > 1) { |
| //squash resource operations |
| try { |
| PrivilegedOperation operation = PrivilegedOperationExecutor |
| .squashCGroupOperations(resourceOps); |
| resourcesOptions = operation.getArguments().get(0); |
| } catch (PrivilegedOperationException e) { |
| LOG.error("Failed to squash cgroup operations!", e); |
| throw new ResourceHandlerException( |
| "Failed to squash cgroup operations!"); |
| } |
| } |
| } |
| } |
| } catch (ResourceHandlerException e) { |
| LOG.error("ResourceHandlerChain.preStart() failed!", e); |
| throw new IOException("ResourceHandlerChain.preStart() failed!", e); |
| } |
| |
| try { |
| Path pidFilePath = getPidFilePath(containerId); |
| if (pidFilePath != null) { |
| |
| ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext( |
| ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs); |
| |
| if (type.equals(ApplicationConstants.ContainerLaunchType.RELAUNCH)) { |
| linuxContainerRuntime.relaunchContainer(runtimeContext); |
| } else { |
| linuxContainerRuntime.launchContainer(runtimeContext); |
| } |
| |
| } else { |
| LOG.info( |
| "Container was marked as inactive. Returning terminated error"); |
| return ContainerExecutor.ExitCode.TERMINATED.getExitCode(); |
| } |
| } catch (ContainerExecutionException e) { |
| return handleExitCode(e, container, containerId); |
| } finally { |
| resourcesHandler.postExecute(containerId); |
| postComplete(containerId); |
| } |
| |
| return 0; |
| } |
| |
| private int handleExitCode(ContainerExecutionException e, Container container, |
| ContainerId containerId) throws ConfigurationException { |
| int exitCode = e.getExitCode(); |
| LOG.warn("Exit code from container {} is : {}", containerId, exitCode); |
| // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was |
| // terminated/killed forcefully. In all other cases, log the |
| // output |
| if (exitCode != ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() |
| && exitCode != ContainerExecutor.ExitCode.TERMINATED.getExitCode()) { |
| LOG.warn("Exception from container-launch with container ID: {} " |
| + "and exit code: {}", containerId, exitCode, e); |
| |
| StringBuilder builder = new StringBuilder(); |
| builder.append("Exception from container-launch.\n") |
| .append("Container id: " + containerId + "\n") |
| .append("Exit code: " + exitCode + "\n") |
| .append("Exception message: " + e.getMessage() + "\n"); |
| if (!Optional.ofNullable(e.getErrorOutput()).orElse("").isEmpty()) { |
| builder.append("Shell error output: " + e.getErrorOutput() + "\n"); |
| } |
| //Skip stack trace |
| String output = e.getOutput(); |
| if (output != null && !output.isEmpty()) { |
| builder.append("Shell output: " + output + "\n"); |
| } |
| String diagnostics = builder.toString(); |
| logOutput(diagnostics); |
| container.handle(new ContainerDiagnosticsUpdateEvent(containerId, |
| diagnostics)); |
| if (exitCode == |
| ExitCode.INVALID_CONTAINER_EXEC_PERMISSIONS.getExitCode() || |
| exitCode == |
| ExitCode.INVALID_CONFIG_FILE.getExitCode()) { |
| throw new ConfigurationException( |
| "Linux Container Executor reached unrecoverable exception", e); |
| } |
| } else { |
| container.handle(new ContainerDiagnosticsUpdateEvent(containerId, |
| "Container killed on request. Exit code is " + exitCode)); |
| } |
| return exitCode; |
| } |
| |
| private ContainerRuntimeContext buildContainerRuntimeContext( |
| ContainerStartContext ctx, Path pidFilePath, String resourcesOptions, |
| String tcCommandFile, List<String> numaArgs) { |
| |
| List<String> prefixCommands = new ArrayList<>(); |
| addSchedPriorityCommand(prefixCommands); |
| addNumaArgsToCommand(prefixCommands, numaArgs); |
| |
| Container container = ctx.getContainer(); |
| |
| ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext |
| .Builder(container); |
| if (prefixCommands.size() > 0) { |
| builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS, |
| prefixCommands); |
| } |
| |
| builder.setExecutionAttribute(LOCALIZED_RESOURCES, |
| ctx.getLocalizedResources()) |
| .setExecutionAttribute(RUN_AS_USER, getRunAsUser(ctx.getUser())) |
| .setExecutionAttribute(USER, ctx.getUser()) |
| .setExecutionAttribute(APPID, ctx.getAppId()) |
| .setExecutionAttribute(CONTAINER_ID_STR, |
| container.getContainerId().toString()) |
| .setExecutionAttribute(CONTAINER_WORK_DIR, ctx.getContainerWorkDir()) |
| .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH, |
| ctx.getNmPrivateContainerScriptPath()) |
| .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, |
| ctx.getNmPrivateTokensPath()) |
| .setExecutionAttribute(NM_PRIVATE_KEYSTORE_PATH, |
| ctx.getNmPrivateKeystorePath()) |
| .setExecutionAttribute(NM_PRIVATE_TRUSTSTORE_PATH, |
| ctx.getNmPrivateTruststorePath()) |
| .setExecutionAttribute(PID_FILE_PATH, pidFilePath) |
| .setExecutionAttribute(LOCAL_DIRS, ctx.getLocalDirs()) |
| .setExecutionAttribute(LOG_DIRS, ctx.getLogDirs()) |
| .setExecutionAttribute(FILECACHE_DIRS, ctx.getFilecacheDirs()) |
| .setExecutionAttribute(USER_LOCAL_DIRS, ctx.getUserLocalDirs()) |
| .setExecutionAttribute(CONTAINER_LOCAL_DIRS, ctx.getContainerLocalDirs()) |
| .setExecutionAttribute(USER_FILECACHE_DIRS, ctx.getUserFilecacheDirs()) |
| .setExecutionAttribute(APPLICATION_LOCAL_DIRS, |
| ctx.getApplicationLocalDirs()) |
| .setExecutionAttribute(CONTAINER_LOG_DIRS, ctx.getContainerLogDirs()) |
| .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions); |
| |
| if (tcCommandFile != null) { |
| builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile); |
| } |
| |
| return builder.build(); |
| } |
| |
| @Override |
| public String[] getIpAndHost(Container container) |
| throws ContainerExecutionException { |
| return linuxContainerRuntime.getIpAndHost(container); |
| } |
| |
| private void addNumaArgsToCommand(List<String> prefixCommands, |
| List<String> numaArgs) { |
| if (numaArgs != null) { |
| prefixCommands.addAll(numaArgs); |
| } |
| } |
| |
| @Override |
| public int reacquireContainer(ContainerReacquisitionContext ctx) |
| throws IOException, InterruptedException { |
| ContainerId containerId = ctx.getContainerId(); |
| |
| try { |
| //Resource handler chain needs to reacquire container state |
| //as well |
| if (resourceHandlerChain != null) { |
| try { |
| resourceHandlerChain.reacquireContainer(containerId); |
| } catch (ResourceHandlerException e) { |
| LOG.warn("ResourceHandlerChain.reacquireContainer failed for " + |
| "containerId: {} Exception: ", containerId, e); |
| } |
| } |
| |
| return super.reacquireContainer(ctx); |
| } finally { |
| resourcesHandler.postExecute(containerId); |
| postComplete(containerId); |
| } |
| } |
| |
| @Override |
| public boolean signalContainer(ContainerSignalContext ctx) |
| throws IOException { |
| Container container = ctx.getContainer(); |
| String user = ctx.getUser(); |
| String pid = ctx.getPid(); |
| Signal signal = ctx.getSignal(); |
| |
| verifyUsernamePattern(user); |
| String runAsUser = getRunAsUser(user); |
| |
| ContainerRuntimeContext runtimeContext = new ContainerRuntimeContext |
| .Builder(container) |
| .setExecutionAttribute(RUN_AS_USER, runAsUser) |
| .setExecutionAttribute(USER, user) |
| .setExecutionAttribute(PID, pid) |
| .setExecutionAttribute(SIGNAL, signal) |
| .build(); |
| |
| try { |
| linuxContainerRuntime.signalContainer(runtimeContext); |
| } catch (ContainerExecutionException e) { |
| int retCode = e.getExitCode(); |
| if (retCode == PrivilegedOperation.ResultCode.INVALID_CONTAINER_PID |
| .getValue()) { |
| return false; |
| } |
| LOG.warn("Error in signalling container {} with {}; exit = {}", |
| pid, signal, retCode, e); |
| logOutput(e.getOutput()); |
| throw new IOException("Problem signalling container " + pid + " with " |
| + signal + "; output: " + e.getOutput() + " and exitCode: " |
| + retCode, e); |
| } |
| return true; |
| } |
| |
| /** |
| * Performs the tasks necessary to reap the container. |
| * |
| * @param ctx Encapsulates information necessary for reaping containers. |
| * @return true if the reaping was successful. |
| * @throws IOException if an error occurs while reaping the container. |
| */ |
| @Override |
| public boolean reapContainer(ContainerReapContext ctx) throws IOException { |
| Container container = ctx.getContainer(); |
| String user = ctx.getUser(); |
| String runAsUser = getRunAsUser(user); |
| ContainerRuntimeContext runtimeContext = new ContainerRuntimeContext |
| .Builder(container) |
| .setExecutionAttribute(RUN_AS_USER, runAsUser) |
| .setExecutionAttribute(USER, user) |
| .build(); |
| try { |
| linuxContainerRuntime.reapContainer(runtimeContext); |
| } catch (ContainerExecutionException e) { |
| int retCode = e.getExitCode(); |
| if (retCode != 0) { |
| return false; |
| } |
| LOG.warn("Error in reaping container {} exit = {}", |
| container.getContainerId(), retCode, e); |
| logOutput(e.getOutput()); |
| throw new IOException("Error in reaping container " |
| + container.getContainerId().toString() + " exit = " + retCode, e); |
| } finally { |
| postComplete(container.getContainerId()); |
| } |
| return true; |
| } |
| |
| /** |
| * Performs container exec. |
| * |
| * @param ctx Encapsulates information necessary for exec container. |
| * @return stdin and stdout of container exec. |
| * @throws ContainerExecutionException if container exec fails. |
| */ |
| @Override |
| public IOStreamPair execContainer(ContainerExecContext ctx) |
| throws ContainerExecutionException { |
| IOStreamPair res; |
| try { |
| res = linuxContainerRuntime.execContainer(ctx); |
| } catch (ContainerExecutionException e) { |
| int retCode = e.getExitCode(); |
| if (retCode != 0) { |
| return new IOStreamPair(null, null); |
| } |
| LOG.warn("Error in executing container interactive shell {} exit = {}", |
| ctx, retCode, e); |
| logOutput(e.getOutput()); |
| throw new ContainerExecutionException( |
| "Error in executing container interactive shel" + ctx.getContainer() |
| .getContainerId().toString() + " exit = " + retCode); |
| } |
| return res; |
| } |
| |
| @Override |
| public void deleteAsUser(DeletionAsUserContext ctx) { |
| String user = ctx.getUser(); |
| Path dir = ctx.getSubDir(); |
| List<Path> baseDirs = ctx.getBasedirs(); |
| |
| verifyUsernamePattern(user); |
| |
| String runAsUser = getRunAsUser(user); |
| String dirString = dir == null ? "" : dir.toUri().getPath(); |
| |
| PrivilegedOperation deleteAsUserOp = new PrivilegedOperation( |
| PrivilegedOperation.OperationType.DELETE_AS_USER, (String) null); |
| |
| deleteAsUserOp.appendArgs( |
| runAsUser, |
| user, |
| Integer.toString(PrivilegedOperation. |
| RunAsUserCommand.DELETE_AS_USER.getValue()), |
| dirString); |
| |
| List<String> pathsToDelete = new ArrayList<String>(); |
| if (baseDirs == null || baseDirs.size() == 0) { |
| LOG.info("Deleting absolute path : {}", dir); |
| pathsToDelete.add(dirString); |
| } else { |
| for (Path baseDir : baseDirs) { |
| Path del = dir == null ? baseDir : new Path(baseDir, dir); |
| LOG.info("Deleting path : {}", del); |
| pathsToDelete.add(del.toString()); |
| deleteAsUserOp.appendArgs(baseDir.toUri().getPath()); |
| } |
| } |
| |
| try { |
| Configuration conf = super.getConf(); |
| PrivilegedOperationExecutor privilegedOperationExecutor = |
| getPrivilegedOperationExecutor(); |
| |
| privilegedOperationExecutor.executePrivilegedOperation(deleteAsUserOp, |
| false); |
| } catch (PrivilegedOperationException e) { |
| int exitCode = e.getExitCode(); |
| LOG.error("DeleteAsUser for {} returned with exit code: {}", |
| StringUtils.join(" ", pathsToDelete), exitCode, e); |
| } |
| } |
| |
| @Override |
| protected File[] readDirAsUser(String user, Path dir) { |
| List<File> files = new ArrayList<>(); |
| PrivilegedOperation listAsUserOp = new PrivilegedOperation( |
| PrivilegedOperation.OperationType.LIST_AS_USER, (String)null); |
| String runAsUser = getRunAsUser(user); |
| String dirString = ""; |
| |
| if (dir != null) { |
| dirString = dir.toUri().getPath(); |
| } |
| |
| listAsUserOp.appendArgs(runAsUser, user, |
| Integer.toString( |
| PrivilegedOperation.RunAsUserCommand.LIST_AS_USER.getValue()), |
| dirString); |
| |
| try { |
| PrivilegedOperationExecutor privOpExecutor = |
| getPrivilegedOperationExecutor(); |
| |
| String results = |
| privOpExecutor.executePrivilegedOperation(listAsUserOp, true); |
| |
| for (String file: results.split("\n")) { |
| // The container-executor always dumps its log output to stdout, which |
| // includes 3 lines that start with "main : " |
| if (!file.startsWith("main :")) { |
| files.add(new File(new File(dirString), file)); |
| } |
| } |
| } catch (PrivilegedOperationException e) { |
| LOG.error("ListAsUser for {} returned with exit code: {}", |
| dir, e.getExitCode(), e); |
| } |
| |
| return files.toArray(new File[files.size()]); |
| } |
| |
| @Override |
| public void symLink(String target, String symlink) { |
| |
| } |
| |
| @Override |
| public boolean isContainerAlive(ContainerLivenessContext ctx) |
| throws IOException { |
| String user = ctx.getUser(); |
| String pid = ctx.getPid(); |
| Container container = ctx.getContainer(); |
| |
| // Send a test signal to the process as the user to see if it's alive |
| return signalContainer(new ContainerSignalContext.Builder() |
| .setContainer(container) |
| .setUser(user) |
| .setPid(pid) |
| .setSignal(Signal.NULL) |
| .build()); |
| } |
| |
| /** |
| * Mount a CGROUPS controller at the requested mount point and create |
| * a hierarchy for the NodeManager to manage. |
| * |
| * @param cgroupKVs a key-value pair of the form |
| * {@code controller=mount-path} |
| * @param hierarchy the top directory of the hierarchy for the NodeManager |
| * @throws IOException if there is a problem mounting the CGROUPS |
| */ |
| public void mountCgroups(List<String> cgroupKVs, String hierarchy) |
| throws IOException { |
| try { |
| PrivilegedOperation mountCGroupsOp = new PrivilegedOperation( |
| PrivilegedOperation.OperationType.MOUNT_CGROUPS, hierarchy); |
| Configuration conf = super.getConf(); |
| |
| mountCGroupsOp.appendArgs(cgroupKVs); |
| PrivilegedOperationExecutor privilegedOperationExecutor = |
| getPrivilegedOperationExecutor(); |
| |
| privilegedOperationExecutor.executePrivilegedOperation(mountCGroupsOp, |
| false); |
| } catch (PrivilegedOperationException e) { |
| int exitCode = e.getExitCode(); |
| LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e); |
| |
| throw new IOException("Problem mounting cgroups " + cgroupKVs + |
| "; exit code = " + exitCode + " and output: " + e.getOutput(), |
| e); |
| } |
| } |
| |
| @VisibleForTesting |
| public ResourceHandler getResourceHandler() { |
| return resourceHandlerChain; |
| } |
| |
| /** |
| * Remove the docker container referenced in the context. |
| * |
| * @param containerId the containerId for the container. |
| */ |
| public void removeDockerContainer(String containerId) { |
| try { |
| PrivilegedOperationExecutor privOpExecutor = |
| PrivilegedOperationExecutor.getInstance(super.getConf()); |
| if (DockerCommandExecutor.isRemovable( |
| DockerCommandExecutor.getContainerStatus(containerId, privOpExecutor, |
| nmContext))) { |
| LOG.info("Removing Docker container : {}", containerId); |
| DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId, |
| ResourceHandlerModule.getCgroupsRelativeRoot()); |
| DockerCommandExecutor.executeDockerCommand(dockerRmCommand, containerId, |
| null, privOpExecutor, false, nmContext); |
| } |
| } catch (ContainerExecutionException e) { |
| LOG.warn("Unable to remove docker container: {}", containerId); |
| } |
| } |
| |
| @VisibleForTesting |
| void postComplete(final ContainerId containerId) { |
| try { |
| if (resourceHandlerChain != null) { |
| LOG.debug("{} post complete", containerId); |
| resourceHandlerChain.postComplete(containerId); |
| } |
| } catch (ResourceHandlerException e) { |
| LOG.warn("ResourceHandlerChain.postComplete failed for " + |
| "containerId: {}. Exception: ", containerId, e); |
| } |
| } |
| |
| @Override |
| public synchronized void updateYarnSysFS(Context ctx, String user, |
| String appId, String spec) throws IOException { |
| LocalDirsHandlerService dirsHandler = nmContext.getLocalDirsHandler(); |
| Path sysFSPath = dirsHandler.getLocalPathForWrite( |
| "nmPrivate/" + appId + "/sysfs/app.json"); |
| File file = new File(sysFSPath.toString()); |
| List<String> localDirs = dirsHandler.getLocalDirs(); |
| if (file.exists()) { |
| if (!file.delete()) { |
| LOG.warn("Unable to delete {}", sysFSPath); |
| } |
| } |
| if (file.createNewFile()) { |
| FileOutputStream output = new FileOutputStream(file); |
| try { |
| output.write(spec.getBytes("UTF-8")); |
| } finally { |
| output.close(); |
| } |
| } |
| PrivilegedOperation privOp = new PrivilegedOperation( |
| PrivilegedOperation.OperationType.SYNC_YARN_SYSFS); |
| String runAsUser = getRunAsUser(user); |
| privOp.appendArgs(runAsUser, |
| user, |
| Integer.toString(PrivilegedOperation.RunAsUserCommand |
| .SYNC_YARN_SYSFS.getValue()), |
| appId, StringUtils.join(PrivilegedOperation |
| .LINUX_FILE_PATH_SEPARATOR, localDirs)); |
| privOp.disableFailureLogging(); |
| PrivilegedOperationExecutor privilegedOperationExecutor = |
| PrivilegedOperationExecutor.getInstance(nmContext.getConf()); |
| try { |
| privilegedOperationExecutor.executePrivilegedOperation(null, |
| privOp, null, null, false, false); |
| } catch (PrivilegedOperationException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| public String getExposedPorts(Container container) |
| throws ContainerExecutionException { |
| return linuxContainerRuntime.getExposedPorts(container); |
| } |
| |
| @Override |
| public Map<String, LocalResource> getLocalResources(Container container) |
| throws IOException { |
| return linuxContainerRuntime.getLocalResources(container); |
| } |
| } |