| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import static org.apache.hadoop.fs.CreateFlag.CREATE; |
| import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.lang.math.RandomUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.UnsupportedFileSystemException; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.Shell.ShellCommandExecutor; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| |
| /** |
| * This executor will launch and run tasks inside Docker containers. It |
| * currently only supports simple authentication mode. It shares a lot of code |
| * with the DefaultContainerExecutor (and it may make sense to pull out those |
| * common pieces later). |
| */ |
| public class DockerContainerExecutor extends ContainerExecutor { |
| private static final Log LOG = LogFactory |
| .getLog(DockerContainerExecutor.class); |
| //The name of the script file that will launch the Docker containers |
| public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = |
| "docker_container_executor"; |
| //The name of the session script that the DOCKER_CONTAINER_EXECUTOR_SCRIPT |
| //launches in turn |
| public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = |
| "docker_container_executor_session"; |
| |
| //This validates that the image is a proper docker image and would not crash |
| //docker. The image name is not allowed to contain spaces. e.g. |
| //registry.somecompany.com:9999/containername:0.1 or |
| //containername:0.1 or |
| //containername |
| public static final String DOCKER_IMAGE_PATTERN = |
| "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$"; |
| |
| private final FileContext lfs; |
| private final Pattern dockerImagePattern; |
| |
| public DockerContainerExecutor() { |
| try { |
| this.lfs = FileContext.getLocalFSFileContext(); |
| this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN); |
| } catch (UnsupportedFileSystemException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| protected void copyFile(Path src, Path dst, String owner) throws IOException { |
| lfs.util().copy(src, dst); |
| } |
| |
| @Override |
| public void init() throws IOException { |
| String auth = |
| getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION); |
| if (auth != null && !auth.equals("simple")) { |
| throw new IllegalStateException( |
| "DockerContainerExecutor only works with simple authentication mode"); |
| } |
| String dockerExecutor = getConf().get( |
| YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, |
| YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); |
| if (!new File(dockerExecutor).exists()) { |
| throw new IllegalStateException( |
| "Invalid docker exec path: " + dockerExecutor); |
| } |
| } |
| |
| @Override |
| public synchronized void startLocalizer(LocalizerStartContext ctx) |
| throws IOException, InterruptedException { |
| Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens(); |
| InetSocketAddress nmAddr = ctx.getNmAddr(); |
| String user = ctx.getUser(); |
| String appId = ctx.getAppId(); |
| String locId = ctx.getLocId(); |
| LocalDirsHandlerService dirsHandler = ctx.getDirsHandler(); |
| List<String> localDirs = dirsHandler.getLocalDirs(); |
| List<String> logDirs = dirsHandler.getLogDirs(); |
| |
| ContainerLocalizer localizer = |
| new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs), |
| RecordFactoryProvider.getRecordFactory(getConf())); |
| |
| createUserLocalDirs(localDirs, user); |
| createUserCacheDirs(localDirs, user); |
| createAppDirs(localDirs, user, appId); |
| createAppLogDirs(appId, logDirs, user); |
| |
| // randomly choose the local directory |
| Path appStorageDir = getWorkingDir(localDirs, user, appId); |
| |
| String tokenFn = |
| String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); |
| Path tokenDst = new Path(appStorageDir, tokenFn); |
| copyFile(nmPrivateContainerTokensPath, tokenDst, user); |
| LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); |
| lfs.setWorkingDirectory(appStorageDir); |
| LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); |
| // TODO: DO it over RPC for maintaining similarity? |
| localizer.runLocalization(nmAddr); |
| } |
| |
| |
| @Override |
| public int launchContainer(ContainerStartContext ctx) throws IOException { |
| Container container = ctx.getContainer(); |
| Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath(); |
| Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath(); |
| String userName = ctx.getUser(); |
| Path containerWorkDir = ctx.getContainerWorkDir(); |
| List<String> localDirs = ctx.getLocalDirs(); |
| List<String> logDirs = ctx.getLogDirs(); |
| |
| //Variables for the launch environment can be injected from the command-line |
| //while submitting the application |
| String containerImageName = container.getLaunchContext().getEnvironment() |
| .get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("containerImageName from launchContext: " + containerImageName); |
| } |
| Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), |
| "Container image must not be null"); |
| containerImageName = containerImageName.replaceAll("['\"]", ""); |
| |
| Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " |
| + containerImageName + " is not a proper docker image"); |
| String dockerExecutor = getConf().get( |
| YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, |
| YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); |
| |
| FsPermission dirPerm = new FsPermission(APPDIR_PERM); |
| ContainerId containerId = container.getContainerId(); |
| |
| // create container dirs on all disks |
| String containerIdStr = ConverterUtils.toString(containerId); |
| String appIdStr = ConverterUtils.toString( |
| containerId.getApplicationAttemptId().getApplicationId()); |
| for (String sLocalDir : localDirs) { |
| Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); |
| Path userdir = new Path(usersdir, userName); |
| Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE); |
| Path appDir = new Path(appCacheDir, appIdStr); |
| Path containerDir = new Path(appDir, containerIdStr); |
| createDir(containerDir, dirPerm, true, userName); |
| } |
| |
| // Create the container log-dirs on all disks |
| createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName); |
| |
| Path tmpDir = new Path(containerWorkDir, |
| YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); |
| createDir(tmpDir, dirPerm, false, userName); |
| |
| // copy launch script to work dir |
| Path launchDst = |
| new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); |
| lfs.util().copy(nmPrivateContainerScriptPath, launchDst); |
| |
| // copy container tokens to work dir |
| Path tokenDst = |
| new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); |
| lfs.util().copy(nmPrivateTokensPath, tokenDst); |
| |
| String localDirMount = toMount(localDirs); |
| String logDirMount = toMount(logDirs); |
| String containerWorkDirMount = toMount(Collections.singletonList( |
| containerWorkDir.toUri().getPath())); |
| StringBuilder commands = new StringBuilder(); |
| //Use docker run to launch the docker container. See man pages for |
| //docker-run |
| //--rm removes the container automatically once the container finishes |
| //--net=host allows the container to take on the host's network stack |
| //--name sets the Docker Container name to the YARN containerId string |
| //-v is used to bind mount volumes for local, log and work dirs. |
| String commandStr = commands.append(dockerExecutor) |
| .append(" ") |
| .append("run") |
| .append(" ") |
| .append("--rm --net=host") |
| .append(" ") |
| .append(" --name " + containerIdStr) |
| .append(localDirMount) |
| .append(logDirMount) |
| .append(containerWorkDirMount) |
| .append(" ") |
| .append(containerImageName) |
| .toString(); |
| //Get the pid of the process which has been launched as a docker container |
| //using docker inspect |
| String dockerPidScript = "`" + dockerExecutor + |
| " inspect --format {{.State.Pid}} " + containerIdStr + "`"; |
| |
| // Create new local launch wrapper script |
| LocalWrapperScriptBuilder sb = new UnixLocalWrapperScriptBuilder( |
| containerWorkDir, commandStr, dockerPidScript); |
| Path pidFile = getPidFilePath(containerId); |
| if (pidFile != null) { |
| sb.writeLocalWrapperScript(launchDst, pidFile); |
| } else { |
| //Although the container was activated by ContainerLaunch before exec() |
| //was called, since then deactivateContainer() has been called. |
| LOG.info("Container " + containerIdStr |
| + " was marked as inactive. Returning terminated error"); |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| |
| ShellCommandExecutor shExec = null; |
| try { |
| lfs.setPermission(launchDst, |
| ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); |
| lfs.setPermission(sb.getWrapperScriptPath(), |
| ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); |
| |
| // Setup command to run |
| String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), |
| containerIdStr, userName, pidFile, this.getConf()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("launchContainer: " + commandStr + " " + |
| Joiner.on(" ").join(command)); |
| } |
| shExec = new ShellCommandExecutor( |
| command, |
| new File(containerWorkDir.toUri().getPath()), |
| container.getLaunchContext().getEnvironment()); // sanitized env |
| if (isContainerActive(containerId)) { |
| shExec.execute(); |
| } else { |
| LOG.info("Container " + containerIdStr + |
| " was marked as inactive. Returning terminated error"); |
| return ExitCode.TERMINATED.getExitCode(); |
| } |
| } catch (IOException e) { |
| if (null == shExec) { |
| return -1; |
| } |
| int exitCode = shExec.getExitCode(); |
| LOG.warn("Exit code from container " + containerId + " is : " + exitCode); |
| // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was |
| // terminated/killed forcefully. In all other cases, log the |
| // container-executor's output |
| if (exitCode != ExitCode.FORCE_KILLED.getExitCode() |
| && exitCode != ExitCode.TERMINATED.getExitCode()) { |
| LOG.warn("Exception from container-launch with container ID: " |
| + containerId + " and exit code: " + exitCode, e); |
| logOutput(shExec.getOutput()); |
| String diagnostics = "Exception from container-launch: \n" |
| + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); |
| container.handle(new ContainerDiagnosticsUpdateEvent(containerId, |
| diagnostics)); |
| } else { |
| container.handle(new ContainerDiagnosticsUpdateEvent(containerId, |
| "Container killed on request. Exit code is " + exitCode)); |
| } |
| return exitCode; |
| } finally { |
| if (shExec != null) { |
| shExec.close(); |
| } |
| } |
| return 0; |
| } |
| |
| @Override |
| /** |
| * Filter the environment variables that may conflict with the ones set in |
| * the docker image and write them out to an OutputStream. |
| */ |
| public void writeLaunchEnv(OutputStream out, Map<String, String> environment, |
| Map<Path, List<String>> resources, List<String> command) |
| throws IOException { |
| ContainerLaunch.ShellScriptBuilder sb = |
| ContainerLaunch.ShellScriptBuilder.create(); |
| |
| //Remove environments that may conflict with the ones in Docker image. |
| Set<String> exclusionSet = new HashSet<String>(); |
| exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); |
| exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name()); |
| exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name()); |
| exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name()); |
| exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); |
| exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name()); |
| |
| if (environment != null) { |
| for (Map.Entry<String,String> env : environment.entrySet()) { |
| if (!exclusionSet.contains(env.getKey())) { |
| sb.env(env.getKey().toString(), env.getValue().toString()); |
| } |
| } |
| } |
| if (resources != null) { |
| for (Map.Entry<Path,List<String>> entry : resources.entrySet()) { |
| for (String linkName : entry.getValue()) { |
| sb.symlink(entry.getKey(), new Path(linkName)); |
| } |
| } |
| } |
| |
| sb.command(command); |
| |
| PrintStream pout = null; |
| PrintStream ps = null; |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try { |
| pout = new PrintStream(out, false, "UTF-8"); |
| if (LOG.isDebugEnabled()) { |
| ps = new PrintStream(baos, false, "UTF-8"); |
| sb.write(ps); |
| } |
| sb.write(pout); |
| |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| if (ps != null) { |
| ps.close(); |
| } |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Script: " + baos.toString("UTF-8")); |
| } |
| } |
| |
| private boolean saneDockerImage(String containerImageName) { |
| return dockerImagePattern.matcher(containerImageName).matches(); |
| } |
| |
| @Override |
| public boolean signalContainer(ContainerSignalContext ctx) |
| throws IOException { |
| String user = ctx.getUser(); |
| String pid = ctx.getPid(); |
| Signal signal = ctx.getSignal(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid |
| + " as user " + user); |
| } |
| if (!containerIsAlive(pid)) { |
| return false; |
| } |
| try { |
| killContainer(pid, signal); |
| } catch (IOException e) { |
| if (!containerIsAlive(pid)) { |
| return false; |
| } |
| throw e; |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean isContainerAlive(ContainerLivenessContext ctx) |
| throws IOException { |
| String pid = ctx.getPid(); |
| |
| return containerIsAlive(pid); |
| } |
| |
| /** |
| * Returns true if the process with the specified pid is alive. |
| * |
| * @param pid String pid |
| * @return boolean true if the process is alive |
| */ |
| @VisibleForTesting |
| public static boolean containerIsAlive(String pid) throws IOException { |
| try { |
| new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid)) |
| .execute(); |
| // successful execution means process is alive |
| return true; |
| } |
| catch (Shell.ExitCodeException e) { |
| // failure (non-zero exit code) means process is not alive |
| return false; |
| } |
| } |
| |
| /** |
| * Send a specified signal to the specified pid |
| * |
| * @param pid the pid of the process [group] to signal. |
| * @param signal signal to send |
| * (for logging). |
| */ |
| protected void killContainer(String pid, Signal signal) throws IOException { |
| new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) |
| .execute(); |
| } |
| |
| @Override |
| public void deleteAsUser(DeletionAsUserContext ctx) |
| throws IOException, InterruptedException { |
| Path subDir = ctx.getSubDir(); |
| List<Path> baseDirs = ctx.getBasedirs(); |
| |
| if (baseDirs == null || baseDirs.size() == 0) { |
| LOG.info("Deleting absolute path : " + subDir); |
| if (!lfs.delete(subDir, true)) { |
| //Maybe retry |
| LOG.warn("delete returned false for path: [" + subDir + "]"); |
| } |
| return; |
| } |
| for (Path baseDir : baseDirs) { |
| Path del = subDir == null ? baseDir : new Path(baseDir, subDir); |
| LOG.info("Deleting path : " + del); |
| if (!lfs.delete(del, true)) { |
| LOG.warn("delete returned false for path: [" + del + "]"); |
| } |
| } |
| } |
| |
| /** |
| * Converts a directory list to a docker mount string |
| * @param dirs |
| * @return a string of mounts for docker |
| */ |
| private String toMount(List<String> dirs) { |
| StringBuilder builder = new StringBuilder(); |
| for (String dir : dirs) { |
| builder.append(" -v " + dir + ":" + dir); |
| } |
| return builder.toString(); |
| } |
| |
| //This class facilitates (only) the creation of platform-specific scripts that |
| //will be used to launch the containers |
| //TODO: This should be re-used from the DefaultContainerExecutor. |
| private abstract class LocalWrapperScriptBuilder { |
| |
| private final Path wrapperScriptPath; |
| |
| public Path getWrapperScriptPath() { |
| return wrapperScriptPath; |
| } |
| |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile) |
| throws IOException { |
| DataOutputStream out = null; |
| PrintStream pout = null; |
| |
| try { |
| out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); |
| pout = new PrintStream(out, false, "UTF-8"); |
| writeLocalWrapperScript(launchDst, pidFile, pout); |
| } finally { |
| IOUtils.cleanup(LOG, pout, out); |
| } |
| } |
| |
| protected abstract void writeLocalWrapperScript(Path launchDst, |
| Path pidFile, PrintStream pout); |
| |
| protected LocalWrapperScriptBuilder(Path containerWorkDir) { |
| this.wrapperScriptPath = new Path(containerWorkDir, |
| Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT)); |
| } |
| } |
| |
| //TODO: This class too should be used from DefaultContainerExecutor. |
| private final class UnixLocalWrapperScriptBuilder |
| extends LocalWrapperScriptBuilder { |
| private final Path sessionScriptPath; |
| private final String dockerCommand; |
| private final String dockerPidScript; |
| |
| public UnixLocalWrapperScriptBuilder(Path containerWorkDir, |
| String dockerCommand, String dockerPidScript) { |
| super(containerWorkDir); |
| this.dockerCommand = dockerCommand; |
| this.dockerPidScript = dockerPidScript; |
| this.sessionScriptPath = new Path(containerWorkDir, |
| Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT)); |
| } |
| |
| @Override |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile) |
| throws IOException { |
| writeSessionScript(launchDst, pidFile); |
| super.writeLocalWrapperScript(launchDst, pidFile); |
| } |
| |
| @Override |
| public void writeLocalWrapperScript(Path launchDst, Path pidFile, |
| PrintStream pout) { |
| String exitCodeFile = ContainerLaunch.getExitCodeFile( |
| pidFile.toString()); |
| String tmpFile = exitCodeFile + ".tmp"; |
| pout.println("#!/usr/bin/env bash"); |
| pout.println("bash \"" + sessionScriptPath.toString() + "\""); |
| pout.println("rc=$?"); |
| pout.println("echo $rc > \"" + tmpFile + "\""); |
| pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); |
| pout.println("exit $rc"); |
| } |
| |
| private void writeSessionScript(Path launchDst, Path pidFile) |
| throws IOException { |
| DataOutputStream out = null; |
| PrintStream pout = null; |
| try { |
| out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); |
| pout = new PrintStream(out, false, "UTF-8"); |
| // We need to do a move as writing to a file is not atomic |
| // Process reading a file being written to may get garbled data |
| // hence write pid to tmp file first followed by a mv |
| pout.println("#!/usr/bin/env bash"); |
| pout.println(); |
| pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() |
| + ".tmp"); |
| pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); |
| pout.println(dockerCommand + " bash \"" + |
| launchDst.toUri().getPath().toString() + "\""); |
| } finally { |
| IOUtils.cleanup(LOG, pout, out); |
| } |
| lfs.setPermission(sessionScriptPath, |
| ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); |
| } |
| } |
| |
| protected void createDir(Path dirPath, FsPermission perms, |
| boolean createParent, String user) throws IOException { |
| lfs.mkdir(dirPath, perms, createParent); |
| if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { |
| lfs.setPermission(dirPath, perms); |
| } |
| } |
| |
| /** |
| * Initialize the local directories for a particular user. |
| * <ul>.mkdir |
| * <li>$local.dir/usercache/$user</li> |
| * </ul> |
| */ |
| void createUserLocalDirs(List<String> localDirs, String user) |
| throws IOException { |
| boolean userDirStatus = false; |
| FsPermission userperms = new FsPermission(USER_PERM); |
| for (String localDir : localDirs) { |
| // create $local.dir/usercache/$user and its immediate parent |
| try { |
| createDir(getUserCacheDir(new Path(localDir), user), userperms, true, |
| user); |
| } catch (IOException e) { |
| LOG.warn("Unable to create the user directory : " + localDir, e); |
| continue; |
| } |
| userDirStatus = true; |
| } |
| if (!userDirStatus) { |
| throw new IOException("Not able to initialize user directories " |
| + "in any of the configured local directories for user " + user); |
| } |
| } |
| |
| |
| /** |
| * Initialize the local cache directories for a particular user. |
| * <ul> |
| * <li>$local.dir/usercache/$user</li> |
| * <li>$local.dir/usercache/$user/appcache</li> |
| * <li>$local.dir/usercache/$user/filecache</li> |
| * </ul> |
| */ |
| void createUserCacheDirs(List<String> localDirs, String user) |
| throws IOException { |
| LOG.info("Initializing user " + user); |
| |
| boolean appcacheDirStatus = false; |
| boolean distributedCacheDirStatus = false; |
| FsPermission appCachePerms = new FsPermission(APPCACHE_PERM); |
| FsPermission fileperms = new FsPermission(FILECACHE_PERM); |
| |
| for (String localDir : localDirs) { |
| // create $local.dir/usercache/$user/appcache |
| Path localDirPath = new Path(localDir); |
| final Path appDir = getAppcacheDir(localDirPath, user); |
| try { |
| createDir(appDir, appCachePerms, true, user); |
| appcacheDirStatus = true; |
| } catch (IOException e) { |
| LOG.warn("Unable to create app cache directory : " + appDir, e); |
| } |
| // create $local.dir/usercache/$user/filecache |
| final Path distDir = getFileCacheDir(localDirPath, user); |
| try { |
| createDir(distDir, fileperms, true, user); |
| distributedCacheDirStatus = true; |
| } catch (IOException e) { |
| LOG.warn("Unable to create file cache directory : " + distDir, e); |
| } |
| } |
| if (!appcacheDirStatus) { |
| throw new IOException("Not able to initialize app-cache directories " |
| + "in any of the configured local directories for user " + user); |
| } |
| if (!distributedCacheDirStatus) { |
| throw new IOException( |
| "Not able to initialize distributed-cache directories " |
| + "in any of the configured local directories for user " |
| + user); |
| } |
| } |
| |
| /** |
| * Initialize the local directories for a particular user. |
| * <ul> |
| * <li>$local.dir/usercache/$user/appcache/$appid</li> |
| * </ul> |
| * @param localDirs |
| */ |
| void createAppDirs(List<String> localDirs, String user, String appId) |
| throws IOException { |
| boolean initAppDirStatus = false; |
| FsPermission appperms = new FsPermission(APPDIR_PERM); |
| for (String localDir : localDirs) { |
| Path fullAppDir = getApplicationDir(new Path(localDir), user, appId); |
| // create $local.dir/usercache/$user/appcache/$appId |
| try { |
| createDir(fullAppDir, appperms, true, user); |
| initAppDirStatus = true; |
| } catch (IOException e) { |
| LOG.warn("Unable to create app directory " + fullAppDir.toString(), e); |
| } |
| } |
| if (!initAppDirStatus) { |
| throw new IOException("Not able to initialize app directories " |
| + "in any of the configured local directories for app " |
| + appId.toString()); |
| } |
| } |
| |
| |
| /** |
| * Create application log directories on all disks. |
| */ |
| void createContainerLogDirs(String appId, String containerId, |
| List<String> logDirs, String user) throws IOException { |
| |
| boolean containerLogDirStatus = false; |
| FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM); |
| for (String rootLogDir : logDirs) { |
| // create $log.dir/$appid/$containerid |
| Path appLogDir = new Path(rootLogDir, appId); |
| Path containerLogDir = new Path(appLogDir, containerId); |
| try { |
| createDir(containerLogDir, containerLogDirPerms, true, user); |
| } catch (IOException e) { |
| LOG.warn("Unable to create the container-log directory : " |
| + appLogDir, e); |
| continue; |
| } |
| containerLogDirStatus = true; |
| } |
| if (!containerLogDirStatus) { |
| throw new IOException( |
| "Not able to initialize container-log directories " |
| + "in any of the configured local directories for container " |
| + containerId); |
| } |
| } |
| |
| /** |
| * Permissions for user dir. |
| * $local.dir/usercache/$user |
| */ |
| static final short USER_PERM = (short) 0750; |
| /** |
| * Permissions for user appcache dir. |
| * $local.dir/usercache/$user/appcache |
| */ |
| static final short APPCACHE_PERM = (short) 0710; |
| /** |
| * Permissions for user filecache dir. |
| * $local.dir/usercache/$user/filecache |
| */ |
| static final short FILECACHE_PERM = (short) 0710; |
| /** |
| * Permissions for user app dir. |
| * $local.dir/usercache/$user/appcache/$appId |
| */ |
| static final short APPDIR_PERM = (short) 0710; |
| /** |
| * Permissions for user log dir. |
| * $logdir/$user/$appId |
| */ |
| static final short LOGDIR_PERM = (short) 0710; |
| |
| private long getDiskFreeSpace(Path base) throws IOException { |
| return lfs.getFsStatus(base).getRemaining(); |
| } |
| |
| private Path getApplicationDir(Path base, String user, String appId) { |
| return new Path(getAppcacheDir(base, user), appId); |
| } |
| |
| private Path getUserCacheDir(Path base, String user) { |
| return new Path(new Path(base, ContainerLocalizer.USERCACHE), user); |
| } |
| |
| private Path getAppcacheDir(Path base, String user) { |
| return new Path(getUserCacheDir(base, user), |
| ContainerLocalizer.APPCACHE); |
| } |
| |
| private Path getFileCacheDir(Path base, String user) { |
| return new Path(getUserCacheDir(base, user), |
| ContainerLocalizer.FILECACHE); |
| } |
| |
| protected Path getWorkingDir(List<String> localDirs, String user, |
| String appId) throws IOException { |
| Path appStorageDir = null; |
| long totalAvailable = 0L; |
| long[] availableOnDisk = new long[localDirs.size()]; |
| int i = 0; |
| // randomly choose the app directory |
| // the chance of picking a directory is proportional to |
| // the available space on the directory. |
| // firstly calculate the sum of all available space on these directories |
| for (String localDir : localDirs) { |
| Path curBase = getApplicationDir(new Path(localDir), |
| user, appId); |
| long space = 0L; |
| try { |
| space = getDiskFreeSpace(curBase); |
| } catch (IOException e) { |
| LOG.warn("Unable to get Free Space for " + curBase.toString(), e); |
| } |
| availableOnDisk[i++] = space; |
| totalAvailable += space; |
| } |
| |
| // throw an IOException if totalAvailable is 0. |
| if (totalAvailable <= 0L) { |
| throw new IOException("Not able to find a working directory for " |
| + user); |
| } |
| |
| // make probability to pick a directory proportional to |
| // the available space on the directory. |
| long randomPosition = RandomUtils.nextLong() % totalAvailable; |
| int dir = 0; |
| // skip zero available space directory, |
| // because totalAvailable is greater than 0 and randomPosition |
| // is less than totalAvailable, we can find a valid directory |
| // with nonzero available space. |
| while (availableOnDisk[dir] == 0L) { |
| dir++; |
| } |
| while (randomPosition > availableOnDisk[dir]) { |
| randomPosition -= availableOnDisk[dir++]; |
| } |
| appStorageDir = getApplicationDir(new Path(localDirs.get(dir)), |
| user, appId); |
| |
| return appStorageDir; |
| } |
| |
| /** |
| * Create application log directories on all disks. |
| */ |
| void createAppLogDirs(String appId, List<String> logDirs, String user) |
| throws IOException { |
| |
| boolean appLogDirStatus = false; |
| FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM); |
| for (String rootLogDir : logDirs) { |
| // create $log.dir/$appid |
| Path appLogDir = new Path(rootLogDir, appId); |
| try { |
| createDir(appLogDir, appLogDirPerms, true, user); |
| } catch (IOException e) { |
| LOG.warn("Unable to create the app-log directory : " + appLogDir, e); |
| continue; |
| } |
| appLogDirStatus = true; |
| } |
| if (!appLogDirStatus) { |
| throw new IOException("Not able to initialize app-log directories " |
| + "in any of the configured local directories for app " + appId); |
| } |
| } |
| |
| /** |
| * @return the list of paths of given local directories |
| */ |
| private static List<Path> getPaths(List<String> dirs) { |
| List<Path> paths = new ArrayList<Path>(dirs.size()); |
| for (int i = 0; i < dirs.size(); i++) { |
| paths.add(new Path(dirs.get(i))); |
| } |
| return paths; |
| } |
| |
| } |