| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import static org.apache.hadoop.fs.CreateFlag.CREATE; |
| import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.lang.math.RandomUtils; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.UnsupportedFileSystemException; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.Shell.CommandExecutor; |
| import org.apache.hadoop.util.Shell.ExitCodeException; |
| import org.apache.hadoop.util.Shell.ShellCommandExecutor; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ConfigurationException; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Optional; |
| |
| /** |
| * The {@code DefaultContainerExecuter} class offers generic container |
| * execution services. Process execution is handled in a platform-independent |
| * way via {@link ProcessBuilder}. |
| */ |
| public class DefaultContainerExecutor extends ContainerExecutor { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(DefaultContainerExecutor.class); |
| |
| private static final int WIN_MAX_PATH = 260; |
| |
| /** |
| * A {@link FileContext} for the local file system. |
| */ |
| protected final FileContext lfs; |
| |
| private String logDirPermissions = null; |
| |
| /** |
| * Default constructor for use in testing. |
| */ |
| @VisibleForTesting |
| public DefaultContainerExecutor() { |
| try { |
| this.lfs = FileContext.getLocalFSFileContext(); |
| } catch (UnsupportedFileSystemException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Create an instance with a given {@link FileContext}. |
| * |
| * @param lfs the given {@link FileContext} |
| */ |
| DefaultContainerExecutor(FileContext lfs) { |
| this.lfs = lfs; |
| } |
| |
| /** |
| * Copy a file using the {@link #lfs} {@link FileContext}. |
| * |
| * @param src the file to copy |
| * @param dst where to copy the file |
| * @param owner the owner of the new copy. Used only in secure Windows |
| * clusters |
| * @throws IOException when the copy fails |
| * @see WindowsSecureContainerExecutor |
| */ |
| protected void copyFile(Path src, Path dst, String owner) throws IOException { |
| lfs.util().copy(src, dst, false, true); |
| } |
| |
| /** |
| * Make a file executable using the {@link #lfs} {@link FileContext}. |
| * |
| * @param script the path to make executable |
| * @param owner the new owner for the file. Used only in secure Windows |
| * clusters |
| * @throws IOException when the change mode operation fails |
| * @see WindowsSecureContainerExecutor |
| */ |
| protected void setScriptExecutable(Path script, String owner) |
| throws IOException { |
| lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); |
| } |
| |
| @Override |
| public void init() throws IOException { |
| // nothing to do or verify here |
| } |
| |
| @Override |
| public void startLocalizer(LocalizerStartContext ctx) |
| throws IOException, InterruptedException { |
| Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens(); |
| InetSocketAddress nmAddr = ctx.getNmAddr(); |
| String user = ctx.getUser(); |
| String appId = ctx.getAppId(); |
| String locId = ctx.getLocId(); |
| LocalDirsHandlerService dirsHandler = ctx.getDirsHandler(); |
| |
| List<String> localDirs = dirsHandler.getLocalDirs(); |
| List<String> logDirs = dirsHandler.getLogDirs(); |
| |
| createUserLocalDirs(localDirs, user); |
| createUserCacheDirs(localDirs, user); |
| createAppDirs(localDirs, user, appId); |
| createAppLogDirs(appId, logDirs, user); |
| |
| // randomly choose the local directory |
| Path appStorageDir = getWorkingDir(localDirs, user, appId); |
| |
| String tokenFn = |
| String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); |
| Path tokenDst = new Path(appStorageDir, tokenFn); |
| copyFile(nmPrivateContainerTokensPath, tokenDst, user); |
| LOG.info("Copying from " + nmPrivateContainerTokensPath |
| + " to " + tokenDst); |
| |
| |
| FileContext localizerFc = |
| FileContext.getFileContext(lfs.getDefaultFileSystem(), getConf()); |
| localizerFc.setUMask(lfs.getUMask()); |
| localizerFc.setWorkingDirectory(appStorageDir); |
| LOG.info("Localizer CWD set to " + appStorageDir + " = " |
| + localizerFc.getWorkingDirectory()); |
| |
| ContainerLocalizer localizer = |
| createContainerLocalizer(user, appId, locId, localDirs, localizerFc); |
| // TODO: DO it over RPC for maintaining similarity? |
| localizer.runLocalization(nmAddr); |
| } |
| |
| /** |
| * Create a new {@link ContainerLocalizer} instance. |
| * |
| * @param user the user who owns the job for which the localization is being |
| * run |
| * @param appId the ID of the application for which the localization is being |
| * run |
| * @param locId the ID of the container for which the localization is being |
| * run |
| * @param localDirs a list of directories to use as destinations for the |
| * localization |
| * @param localizerFc the {@link FileContext} to use when localizing files |
| * @return the new {@link ContainerLocalizer} instance |
| * @throws IOException if {@code user} or {@code locId} is {@code null} or if |
| * the container localizer has an initialization failure |
| */ |
| @Private |
| @VisibleForTesting |
| protected ContainerLocalizer createContainerLocalizer(String user, |
| String appId, String locId, List<String> localDirs, |
| FileContext localizerFc) throws IOException { |
| ContainerLocalizer localizer = |
| new ContainerLocalizer(localizerFc, user, appId, locId, |
| getPaths(localDirs), |
| RecordFactoryProvider.getRecordFactory(getConf())); |
| return localizer; |
| } |
| |
| @Override |
| public int launchContainer(ContainerStartContext ctx) |
| throws IOException, ConfigurationException { |
| Container container = ctx.getContainer(); |
| Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath(); |
| Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath(); |
| String user = ctx.getUser(); |
| Path containerWorkDir = ctx.getContainerWorkDir(); |
| List<String> localDirs = ctx.getLocalDirs(); |
| List<String> logDirs = ctx.getLogDirs(); |
| |
| FsPermission dirPerm = new FsPermission(APPDIR_PERM); |
| ContainerId containerId = container.getContainerId(); |
| |
| // create container dirs on all disks |
| String containerIdStr = containerId.toString(); |
| String appIdStr = |
| containerId.getApplicationAttemptId(). |
| getApplicationId().toString(); |
| for (String sLocalDir : localDirs) { |
| Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); |
| Path userdir = new Path(usersdir, user); |
| Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE); |
| Path appDir = new Path(appCacheDir, appIdStr); |
| Path containerDir = new Path(appDir, containerIdStr); |
| createDir(containerDir, dirPerm, true, user); |
| } |
| |
| // Create the container log-dirs on all disks |
| createContainerLogDirs(appIdStr, containerIdStr, logDirs, user); |
| |
| Path tmpDir = new Path(containerWorkDir, |
| YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); |
| createDir(tmpDir, dirPerm, false, user); |
| |
| |
| // copy container tokens to work dir |
| Path tokenDst = |
| new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); |
| copyFile(nmPrivateTokensPath, tokenDst, user); |
| |
| // copy launch script to work dir |
| Path launchDst = |
| new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); |
| copyFile(nmPrivateContainerScriptPath, launchDst, user); |
| |
| // Create new local launch wrapper script |
| LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder( |
| containerIdStr, containerWorkDir); |
| |
| // Fail fast if attempting to launch the wrapper script would fail due to |
| // Windows path length limitation. |
| if (Shell.WINDOWS && |
| sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) { |
| throw new IOException(String.format( |
| "Cannot launch container using script at path %s, because it exceeds " + |
| "the maximum supported path length of %d characters. Consider " + |
| "configuring shorter directories in %s.", sb.getWrapperScriptPath(), |
| WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS)); |
| } |
| |
| Path pidFile = getPidFilePath(containerId); |
| if (pidFile != null) { |
| sb.writeLocalWrapperScript(launchDst, pidFile); |
| } else { |
| LOG.info("Container " + containerIdStr |
| + " pid file not set. Returning terminated error"); |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| |
| // create log dir under app |
| // fork script |
| Shell.CommandExecutor shExec = null; |
| try { |
| setScriptExecutable(launchDst, user); |
| setScriptExecutable(sb.getWrapperScriptPath(), user); |
| |
| shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(), |
| containerIdStr, user, pidFile, container.getResource(), |
| new File(containerWorkDir.toUri().getPath()), |
| container.getLaunchContext().getEnvironment()); |
| |
| if (isContainerActive(containerId)) { |
| shExec.execute(); |
| } else { |
| LOG.info("Container " + containerIdStr + |
| " was marked as inactive. Returning terminated error"); |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| } catch (IOException e) { |
| if (null == shExec) { |
| return -1; |
| } |
| int exitCode = shExec.getExitCode(); |
| LOG.warn("Exit code from container " + containerId + " is : " + exitCode); |
| // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was |
| // terminated/killed forcefully. In all other cases, log the |
| // container-executor's output |
| if (exitCode != ExitCode.FORCE_KILLED.getExitCode() |
| && exitCode != ExitCode.TERMINATED.getExitCode()) { |
| LOG.warn("Exception from container-launch with container ID: " |
| + containerId + " and exit code: " + exitCode , e); |
| |
| StringBuilder builder = new StringBuilder(); |
| builder.append("Exception from container-launch.\n"); |
| builder.append("Container id: ").append(containerId).append("\n"); |
| builder.append("Exit code: ").append(exitCode).append("\n"); |
| if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { |
| builder.append("Exception message: "); |
| builder.append(e.getMessage()).append("\n"); |
| } |
| |
| if (!shExec.getOutput().isEmpty()) { |
| builder.append("Shell output: "); |
| builder.append(shExec.getOutput()).append("\n"); |
| } |
| String diagnostics = builder.toString(); |
| logOutput(diagnostics); |
| container.handle(new ContainerDiagnosticsUpdateEvent(containerId, |
| diagnostics)); |
| } else { |
| container.handle(new ContainerDiagnosticsUpdateEvent(containerId, |
| "Container killed on request. Exit code is " + exitCode)); |
| } |
| return exitCode; |
| } finally { |
| if (shExec != null) shExec.close(); |
| } |
| return 0; |
| } |
| |
| /** |
| * Create a new {@link ShellCommandExecutor} using the parameters. |
| * |
| * @param wrapperScriptPath the path to the script to execute |
| * @param containerIdStr the container ID |
| * @param user the application owner's username |
| * @param pidFile the path to the container's PID file |
| * @param resource this parameter controls memory and CPU limits. |
| * @param workDir If not-null, specifies the directory which should be set |
| * as the current working directory for the command. If null, |
| * the current working directory is not modified. |
| * @param environment the container environment |
| * @return the new {@link ShellCommandExecutor} |
| * @see ShellCommandExecutor |
| */ |
| protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, |
| String containerIdStr, String user, Path pidFile, Resource resource, |
| File workDir, Map<String, String> environment) { |
| |
| String[] command = getRunCommand(wrapperScriptPath, |
| containerIdStr, user, pidFile, this.getConf(), resource); |
| |
| LOG.info("launchContainer: " + Arrays.toString(command)); |
| return new ShellCommandExecutor( |
| command, |
| workDir, |
| environment, |
| 0L, |
| false); |
| } |
| |
| /** |
| * Create a {@link LocalWrapperScriptBuilder} for the given container ID |
| * and path that is appropriate to the current platform. |
| * |
| * @param containerIdStr the container ID |
| * @param containerWorkDir the container's working directory |
| * @return a new {@link LocalWrapperScriptBuilder} |
| */ |
| protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( |
| String containerIdStr, Path containerWorkDir) { |
| return Shell.WINDOWS ? |
| new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) : |
| new UnixLocalWrapperScriptBuilder(containerWorkDir); |
| } |
| |
| /** |
| * This class is a utility to create a wrapper script that is platform |
| * appropriate. |
| */ |
| protected abstract class LocalWrapperScriptBuilder { |
| |
| private final Path wrapperScriptPath; |
| |
| /** |
| * Return the path for the wrapper script. |
| * |
| * @return the path for the wrapper script |
| */ |
| public Path getWrapperScriptPath() { |
| return wrapperScriptPath; |
| } |
| |
| /** |
| * Write out the wrapper script for the container launch script. This method |
| * will create the script at the configured wrapper script path. |
| * |
| * @param launchDst the script to launch |
| * @param pidFile the file that will hold the PID |
| * @throws IOException if the wrapper script cannot be created |
| * @see #getWrapperScriptPath |
| */ |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile) |
| throws IOException { |
| try (DataOutputStream out = |
| lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); |
| PrintStream pout = |
| new PrintStream(out, false, "UTF-8")) { |
| writeLocalWrapperScript(launchDst, pidFile, pout); |
| } |
| } |
| |
| /** |
| * Write out the wrapper script for the container launch script. |
| * |
| * @param launchDst the script to launch |
| * @param pidFile the file that will hold the PID |
| * @param pout the stream to use to write out the wrapper script |
| */ |
| protected abstract void writeLocalWrapperScript(Path launchDst, |
| Path pidFile, PrintStream pout); |
| |
| /** |
| * Create an instance for the given container working directory. |
| * |
| * @param containerWorkDir the working directory for the container |
| */ |
| protected LocalWrapperScriptBuilder(Path containerWorkDir) { |
| this.wrapperScriptPath = new Path(containerWorkDir, |
| Shell.appendScriptExtension("default_container_executor")); |
| } |
| } |
| |
| /** |
| * This class is an instance of {@link LocalWrapperScriptBuilder} for |
| * non-Windows hosts. |
| */ |
| private final class UnixLocalWrapperScriptBuilder |
| extends LocalWrapperScriptBuilder { |
| private final Path sessionScriptPath; |
| |
| /** |
| * Create an instance for the given container path. |
| * |
| * @param containerWorkDir the container's working directory |
| */ |
| public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { |
| super(containerWorkDir); |
| this.sessionScriptPath = new Path(containerWorkDir, |
| Shell.appendScriptExtension("default_container_executor_session")); |
| } |
| |
| @Override |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile) |
| throws IOException { |
| writeSessionScript(launchDst, pidFile); |
| super.writeLocalWrapperScript(launchDst, pidFile); |
| } |
| |
| @Override |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile, |
| PrintStream pout) { |
| String exitCodeFile = ContainerLaunch.getExitCodeFile( |
| pidFile.toString()); |
| String tmpFile = exitCodeFile + ".tmp"; |
| pout.println("#!/bin/bash"); |
| pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\""); |
| pout.println("rc=$?"); |
| pout.println("echo $rc > \"" + tmpFile + "\""); |
| pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); |
| pout.println("exit $rc"); |
| } |
| |
| private void writeSessionScript(Path launchDst, Path pidFile) |
| throws IOException { |
| try (DataOutputStream out = |
| lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); |
| PrintStream pout = |
| new PrintStream(out, false, "UTF-8")) { |
| // We need to do a move as writing to a file is not atomic |
| // Process reading a file being written to may get garbled data |
| // hence write pid to tmp file first followed by a mv |
| pout.println("#!/bin/bash"); |
| pout.println(); |
| pout.println("echo $$ > " + pidFile.toString() + ".tmp"); |
| pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); |
| String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; |
| pout.printf("%s /bin/bash \"%s\"", exec, launchDst.toUri().getPath()); |
| } |
| lfs.setPermission(sessionScriptPath, |
| ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); |
| } |
| } |
| |
| /** |
| * This class is an instance of {@link LocalWrapperScriptBuilder} for |
| * Windows hosts. |
| */ |
| private final class WindowsLocalWrapperScriptBuilder |
| extends LocalWrapperScriptBuilder { |
| |
| private final String containerIdStr; |
| |
| /** |
| * Create an instance for the given container and working directory. |
| * |
| * @param containerIdStr the container ID |
| * @param containerWorkDir the container's working directory |
| */ |
| public WindowsLocalWrapperScriptBuilder(String containerIdStr, |
| Path containerWorkDir) { |
| |
| super(containerWorkDir); |
| this.containerIdStr = containerIdStr; |
| } |
| |
| @Override |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile, |
| PrintStream pout) { |
| // TODO: exit code script for Windows |
| |
| // On Windows, the pid is the container ID, so that it can also serve as |
| // the name of the job object created by winutils for task management. |
| // Write to temp file followed by atomic move. |
| String normalizedPidFile = new File(pidFile.toString()).getPath(); |
| pout.println("@echo " + containerIdStr + " > " + normalizedPidFile + |
| ".tmp"); |
| pout.println("@move /Y " + normalizedPidFile + ".tmp " + |
| normalizedPidFile); |
| pout.println("@call " + launchDst.toString()); |
| } |
| } |
| |
| @Override |
| public boolean signalContainer(ContainerSignalContext ctx) |
| throws IOException { |
| String user = ctx.getUser(); |
| String pid = ctx.getPid(); |
| Signal signal = ctx.getSignal(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid |
| + " as user " + user); |
| } |
| if (!containerIsAlive(pid)) { |
| return false; |
| } |
| try { |
| killContainer(pid, signal); |
| } catch (IOException e) { |
| if (!containerIsAlive(pid)) { |
| return false; |
| } |
| throw e; |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean isContainerAlive(ContainerLivenessContext ctx) |
| throws IOException { |
| String pid = ctx.getPid(); |
| |
| return containerIsAlive(pid); |
| } |
| |
| /** |
| * Returns true if the process with the specified pid is alive. |
| * |
| * @param pid String pid |
| * @return boolean true if the process is alive |
| * @throws IOException if the command to test process liveliness fails |
| */ |
| @VisibleForTesting |
| public static boolean containerIsAlive(String pid) throws IOException { |
| try { |
| new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid)) |
| .execute(); |
| // successful execution means process is alive |
| return true; |
| } |
| catch (ExitCodeException e) { |
| // failure (non-zero exit code) means process is not alive |
| return false; |
| } |
| } |
| |
| /** |
| * Send a specified signal to the specified pid |
| * |
| * @param pid the pid of the process [group] to signal. |
| * @param signal signal to send |
| * @throws IOException if the command to kill the process fails |
| */ |
| protected void killContainer(String pid, Signal signal) throws IOException { |
| new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) |
| .execute(); |
| } |
| |
| @Override |
| public void deleteAsUser(DeletionAsUserContext ctx) |
| throws IOException, InterruptedException { |
| Path subDir = ctx.getSubDir(); |
| List<Path> baseDirs = ctx.getBasedirs(); |
| |
| if (baseDirs == null || baseDirs.size() == 0) { |
| LOG.info("Deleting absolute path : " + subDir); |
| if (!lfs.delete(subDir, true)) { |
| //Maybe retry |
| LOG.warn("delete returned false for path: [" + subDir + "]"); |
| } |
| return; |
| } |
| for (Path baseDir : baseDirs) { |
| Path del = subDir == null ? baseDir : new Path(baseDir, subDir); |
| LOG.info("Deleting path : " + del); |
| try { |
| if (!lfs.delete(del, true)) { |
| LOG.warn("delete returned false for path: [" + del + "]"); |
| } |
| } catch (FileNotFoundException e) { |
| continue; |
| } |
| } |
| } |
| |
| @Override |
| public void symLink(String target, String symlink) throws IOException { |
| FileUtil.symLink(target, symlink); |
| } |
| |
| /** |
| * Permissions for user dir. |
| * $local.dir/usercache/$user |
| */ |
| static final short USER_PERM = (short)0750; |
| /** |
| * Permissions for user appcache dir. |
| * $local.dir/usercache/$user/appcache |
| */ |
| static final short APPCACHE_PERM = (short)0710; |
| /** |
| * Permissions for user filecache dir. |
| * $local.dir/usercache/$user/filecache |
| */ |
| static final short FILECACHE_PERM = (short)0710; |
| /** |
| * Permissions for user app dir. |
| * $local.dir/usercache/$user/appcache/$appId |
| */ |
| static final short APPDIR_PERM = (short)0710; |
| |
| private long getDiskFreeSpace(Path base) throws IOException { |
| return lfs.getFsStatus(base).getRemaining(); |
| } |
| |
| private Path getApplicationDir(Path base, String user, String appId) { |
| return new Path(getAppcacheDir(base, user), appId); |
| } |
| |
| private Path getUserCacheDir(Path base, String user) { |
| return new Path(new Path(base, ContainerLocalizer.USERCACHE), user); |
| } |
| |
| private Path getAppcacheDir(Path base, String user) { |
| return new Path(getUserCacheDir(base, user), |
| ContainerLocalizer.APPCACHE); |
| } |
| |
| private Path getFileCacheDir(Path base, String user) { |
| return new Path(getUserCacheDir(base, user), |
| ContainerLocalizer.FILECACHE); |
| } |
| |
| /** |
| * Return a randomly chosen application directory from a list of local storage |
| * directories. The probability of selecting a directory is proportional to |
| * its size. |
| * |
| * @param localDirs the target directories from which to select |
| * @param user the user who owns the application |
| * @param appId the application ID |
| * @return the selected directory |
| * @throws IOException if no application directories for the user can be |
| * found |
| */ |
| protected Path getWorkingDir(List<String> localDirs, String user, |
| String appId) throws IOException { |
| long totalAvailable = 0L; |
| long[] availableOnDisk = new long[localDirs.size()]; |
| int i = 0; |
| // randomly choose the app directory |
| // the chance of picking a directory is proportional to |
| // the available space on the directory. |
| // firstly calculate the sum of all available space on these directories |
| for (String localDir : localDirs) { |
| Path curBase = getApplicationDir(new Path(localDir), user, appId); |
| long space = 0L; |
| try { |
| space = getDiskFreeSpace(curBase); |
| } catch (IOException e) { |
| LOG.warn("Unable to get Free Space for " + curBase.toString(), e); |
| } |
| availableOnDisk[i++] = space; |
| totalAvailable += space; |
| } |
| |
| // throw an IOException if totalAvailable is 0. |
| if (totalAvailable <= 0L) { |
| throw new IOException("Not able to find a working directory for " + user); |
| } |
| |
| // make probability to pick a directory proportional to |
| // the available space on the directory. |
| long randomPosition = RandomUtils.nextLong() % totalAvailable; |
| int dir = pickDirectory(randomPosition, availableOnDisk); |
| |
| return getApplicationDir(new Path(localDirs.get(dir)), user, appId); |
| } |
| |
| /** |
| * Picks a directory based on the input random number and |
| * available size at each dir. |
| */ |
| @Private |
| @VisibleForTesting |
| int pickDirectory(long randomPosition, final long[] availableOnDisk) { |
| int dir = 0; |
| // skip zero available space directory, |
| // because totalAvailable is greater than 0 and randomPosition |
| // is less than totalAvailable, we can find a valid directory |
| // with nonzero available space. |
| while (availableOnDisk[dir] == 0L) { |
| dir++; |
| } |
| while (randomPosition >= availableOnDisk[dir]) { |
| randomPosition -= availableOnDisk[dir++]; |
| } |
| return dir; |
| } |
| |
| /** |
| * Use the {@link #lfs} {@link FileContext} to create the target directory. |
| * |
| * @param dirPath the target directory |
| * @param perms the target permissions for the target directory |
| * @param createParent whether the parent directories should also be created |
| * @param user the user as whom the target directory should be created. |
| * Used only on secure Windows hosts. |
| * @throws IOException if there's a failure performing a file operation |
| * @see WindowsSecureContainerExecutor |
| */ |
| protected void createDir(Path dirPath, FsPermission perms, |
| boolean createParent, String user) throws IOException { |
| lfs.mkdir(dirPath, perms, createParent); |
| if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { |
| lfs.setPermission(dirPath, perms); |
| } |
| } |
| |
| /** |
| * Initialize the local directories for a particular user. |
| * <ul>.mkdir |
| * <li>$local.dir/usercache/$user</li> |
| * </ul> |
| * |
| * @param localDirs the target directories to create |
| * @param user the user whose local cache directories should be initialized |
| * @throws IOException if there's an issue initializing the user local |
| * directories |
| */ |
| void createUserLocalDirs(List<String> localDirs, String user) |
| throws IOException { |
| boolean userDirStatus = false; |
| FsPermission userperms = new FsPermission(USER_PERM); |
| for (String localDir : localDirs) { |
| // create $local.dir/usercache/$user and its immediate parent |
| try { |
| createDir(getUserCacheDir(new Path(localDir), user), userperms, true, |
| user); |
| } catch (IOException e) { |
| LOG.warn("Unable to create the user directory : " + localDir, e); |
| continue; |
| } |
| userDirStatus = true; |
| } |
| if (!userDirStatus) { |
| throw new IOException("Not able to initialize user directories " |
| + "in any of the configured local directories for user " + user); |
| } |
| } |
| |
| |
| /** |
| * Initialize the local cache directories for a particular user. |
| * <ul> |
| * <li>$local.dir/usercache/$user</li> |
| * <li>$local.dir/usercache/$user/appcache</li> |
| * <li>$local.dir/usercache/$user/filecache</li> |
| * </ul> |
| * |
| * @param localDirs the target directories to create |
| * @param user the user whose local cache directories should be initialized |
| * @throws IOException if there's an issue initializing the cache |
| * directories |
| */ |
| void createUserCacheDirs(List<String> localDirs, String user) |
| throws IOException { |
| LOG.info("Initializing user " + user); |
| |
| boolean appcacheDirStatus = false; |
| boolean distributedCacheDirStatus = false; |
| FsPermission appCachePerms = new FsPermission(APPCACHE_PERM); |
| FsPermission fileperms = new FsPermission(FILECACHE_PERM); |
| |
| for (String localDir : localDirs) { |
| // create $local.dir/usercache/$user/appcache |
| Path localDirPath = new Path(localDir); |
| final Path appDir = getAppcacheDir(localDirPath, user); |
| try { |
| createDir(appDir, appCachePerms, true, user); |
| appcacheDirStatus = true; |
| } catch (IOException e) { |
| LOG.warn("Unable to create app cache directory : " + appDir, e); |
| } |
| // create $local.dir/usercache/$user/filecache |
| final Path distDir = getFileCacheDir(localDirPath, user); |
| try { |
| createDir(distDir, fileperms, true, user); |
| distributedCacheDirStatus = true; |
| } catch (IOException e) { |
| LOG.warn("Unable to create file cache directory : " + distDir, e); |
| } |
| } |
| if (!appcacheDirStatus) { |
| throw new IOException("Not able to initialize app-cache directories " |
| + "in any of the configured local directories for user " + user); |
| } |
| if (!distributedCacheDirStatus) { |
| throw new IOException( |
| "Not able to initialize distributed-cache directories " |
| + "in any of the configured local directories for user " |
| + user); |
| } |
| } |
| |
| /** |
| * Initialize the local directories for a particular user. |
| * <ul> |
| * <li>$local.dir/usercache/$user/appcache/$appid</li> |
| * </ul> |
| * |
| * @param localDirs the target directories to create |
| * @param user the user whose local cache directories should be initialized |
| * @param appId the application ID |
| * @throws IOException if there's an issue initializing the application |
| * directories |
| */ |
| void createAppDirs(List<String> localDirs, String user, String appId) |
| throws IOException { |
| boolean initAppDirStatus = false; |
| FsPermission appperms = new FsPermission(APPDIR_PERM); |
| for (String localDir : localDirs) { |
| Path fullAppDir = getApplicationDir(new Path(localDir), user, appId); |
| // create $local.dir/usercache/$user/appcache/$appId |
| try { |
| createDir(fullAppDir, appperms, true, user); |
| initAppDirStatus = true; |
| } catch (IOException e) { |
| LOG.warn("Unable to create app directory " + fullAppDir.toString(), e); |
| } |
| } |
| if (!initAppDirStatus) { |
| throw new IOException("Not able to initialize app directories " |
| + "in any of the configured local directories for app " |
| + appId.toString()); |
| } |
| } |
| |
| /** |
| * Create application log directories on all disks. |
| * |
| * @param appId the application ID |
| * @param logDirs the target directories to create |
| * @param user the user whose local cache directories should be initialized |
| * @throws IOException if there's an issue initializing the application log |
| * directories |
| */ |
| void createAppLogDirs(String appId, List<String> logDirs, String user) |
| throws IOException { |
| |
| boolean appLogDirStatus = false; |
| FsPermission appLogDirPerms = new |
| FsPermission(getLogDirPermissions()); |
| for (String rootLogDir : logDirs) { |
| // create $log.dir/$appid |
| Path appLogDir = new Path(rootLogDir, appId); |
| try { |
| createDir(appLogDir, appLogDirPerms, true, user); |
| } catch (IOException e) { |
| LOG.warn("Unable to create the app-log directory : " + appLogDir, e); |
| continue; |
| } |
| appLogDirStatus = true; |
| } |
| if (!appLogDirStatus) { |
| throw new IOException("Not able to initialize app-log directories " |
| + "in any of the configured local directories for app " + appId); |
| } |
| } |
| |
| /** |
| * Create application log directories on all disks. |
| * |
| * @param appId the application ID |
| * @param containerId the container ID |
| * @param logDirs the target directories to create |
| * @param user the user as whom the directories should be created. |
| * Used only on secure Windows hosts. |
| * @throws IOException if there's an issue initializing the container log |
| * directories |
| */ |
| void createContainerLogDirs(String appId, String containerId, |
| List<String> logDirs, String user) throws IOException { |
| boolean containerLogDirStatus = false; |
| FsPermission containerLogDirPerms = new |
| FsPermission(getLogDirPermissions()); |
| for (String rootLogDir : logDirs) { |
| // create $log.dir/$appid/$containerid |
| Path appLogDir = new Path(rootLogDir, appId); |
| Path containerLogDir = new Path(appLogDir, containerId); |
| try { |
| createDir(containerLogDir, containerLogDirPerms, true, user); |
| } catch (IOException e) { |
| LOG.warn("Unable to create the container-log directory : " |
| + appLogDir, e); |
| continue; |
| } |
| containerLogDirStatus = true; |
| } |
| if (!containerLogDirStatus) { |
| throw new IOException( |
| "Not able to initialize container-log directories " |
| + "in any of the configured local directories for container " |
| + containerId); |
| } |
| } |
| |
| /** |
| * Return the default container log directory permissions. |
| * |
| * @return the default container log directory permissions |
| */ |
| @VisibleForTesting |
| public String getLogDirPermissions() { |
| if (this.logDirPermissions==null) { |
| this.logDirPermissions = getConf().get( |
| YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS, |
| YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS_DEFAULT); |
| } |
| return this.logDirPermissions; |
| } |
| |
| /** |
| * Clear the internal variable for repeatable testing. |
| */ |
| @VisibleForTesting |
| public void clearLogDirPermissions() { |
| this.logDirPermissions = null; |
| } |
| |
| /** |
| * Return the list of paths of given local directories. |
| * |
| * @return the list of paths of given local directories |
| */ |
| private static List<Path> getPaths(List<String> dirs) { |
| List<Path> paths = new ArrayList<>(dirs.size()); |
| for (int i = 0; i < dirs.size(); i++) { |
| paths.add(new Path(dirs.get(i))); |
| } |
| return paths; |
| } |
| } |