blob: 9a4b8a0731220d290ff52b818c5d6ed1f0663c4f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class DefaultContainerExecutor extends ContainerExecutor {
private static final Log LOG = LogFactory
.getLog(DefaultContainerExecutor.class);
private final FileContext lfs;
private static final String WRAPPER_LAUNCH_SCRIPT =
"default_container_executor.sh";
public DefaultContainerExecutor() {
try {
this.lfs = FileContext.getLocalFSFileContext();
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
}
DefaultContainerExecutor(FileContext lfs) {
this.lfs = lfs;
}
@Override
public void init() throws IOException {
// nothing to do or verify here
}
@Override
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException {
ContainerLocalizer localizer =
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs);
// TODO: Why pick first app dir. The same in LCE why not random?
Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
@Override
public int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
String userName, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID();
// create container dirs on all disks
String containerIdStr = ConverterUtils.toString(containerId);
String appIdStr =
ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId().
getApplicationId());
for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(appCacheDir, appIdStr);
Path containerDir = new Path(appDir, containerIdStr);
lfs.mkdir(containerDir, null, false);
}
// Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr, logDirs);
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
lfs.mkdir(tmpDir, null, false);
// copy launch script to work dir
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
// copy container tokens to work dir
Path tokenDst =
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
lfs.util().copy(nmPrivateTokensPath, tokenDst);
// Create new local launch wrapper script
Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
DataOutputStream wrapperScriptOutStream =
lfs.create(wrapperScriptDst,
EnumSet.of(CREATE, OVERWRITE));
Path pidFile = getPidFilePath(containerId);
if (pidFile != null) {
writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
.getPath().toString(), pidFile.toString());
} else {
LOG.info("Container " + containerIdStr
+ " was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
// create log dir under app
// fork script
ShellCommandExecutor shExec = null;
try {
lfs.setPermission(launchDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
lfs.setPermission(wrapperScriptDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
// Setup command to run
String[] command = {"bash",
wrapperScriptDst.toUri().getPath().toString()};
LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor(
command,
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment()); // sanitized env
if (isContainerActive(containerId)) {
shExec.execute();
}
else {
LOG.info("Container " + containerIdStr +
" was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
} catch (IOException e) {
if (null == shExec) {
return -1;
}
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from task is : " + exitCode);
String message = shExec.getOutput();
logOutput(message);
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
message));
return exitCode;
} finally {
; //
}
return 0;
}
private void writeLocalWrapperScript(DataOutputStream out,
String launchScriptDst, String pidFilePath) throws IOException {
// We need to do a move as writing to a file is not atomic
// Process reading a file being written to may get garbled data
// hence write pid to tmp file first followed by a mv
StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
sb.append("echo $$ > " + pidFilePath + ".tmp\n");
sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
sb.append(" /bin/bash ");
sb.append("\"");
sb.append(launchScriptDst);
sb.append("\"\n");
PrintStream pout = null;
try {
pout = new PrintStream(out);
pout.append(sb);
} finally {
if (out != null) {
out.close();
}
}
}
@Override
public boolean signalContainer(String user, String pid, Signal signal)
throws IOException {
final String sigpid = ContainerExecutor.isSetsidAvailable
? "-" + pid
: pid;
LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
+ " as user " + user);
try {
sendSignal(sigpid, Signal.NULL);
} catch (ExitCodeException e) {
return false;
}
try {
sendSignal(sigpid, signal);
} catch (IOException e) {
try {
sendSignal(sigpid, Signal.NULL);
} catch (IOException ignore) {
return false;
}
throw e;
}
return true;
}
/**
* Send a specified signal to the specified pid
*
* @param pid the pid of the process [group] to signal.
* @param signal signal to send
* (for logging).
*/
protected void sendSignal(String pid, Signal signal) throws IOException {
ShellCommandExecutor shexec = null;
String[] arg = { "kill", "-" + signal.getValue(), pid };
shexec = new ShellCommandExecutor(arg);
shexec.execute();
}
@Override
public void deleteAsUser(String user, Path subDir, Path... baseDirs)
throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) {
LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) {
//Maybe retry
LOG.warn("delete returned false for path: [" + subDir + "]");
}
return;
}
for (Path baseDir : baseDirs) {
Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
LOG.info("Deleting path : " + del);
if (!lfs.delete(del, true)) {
LOG.warn("delete returned false for path: [" + del + "]");
}
}
}
/** Permissions for user dir.
* $loaal.dir/usercache/$user */
private static final short USER_PERM = (short)0750;
/** Permissions for user appcache dir.
* $loaal.dir/usercache/$user/appcache */
private static final short APPCACHE_PERM = (short)0710;
/** Permissions for user filecache dir.
* $loaal.dir/usercache/$user/filecache */
private static final short FILECACHE_PERM = (short)0710;
/** Permissions for user app dir.
* $loaal.dir/usercache/$user/filecache */
private static final short APPDIR_PERM = (short)0710;
/** Permissions for user log dir.
* $logdir/$user/$appId */
private static final short LOGDIR_PERM = (short)0710;
private Path getFirstApplicationDir(List<String> localDirs, String user,
String appId) {
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
}
private Path getApplicationDir(Path base, String user, String appId) {
return new Path(getAppcacheDir(base, user), appId);
}
private Path getUserCacheDir(Path base, String user) {
return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
}
private Path getAppcacheDir(Path base, String user) {
return new Path(getUserCacheDir(base, user),
ContainerLocalizer.APPCACHE);
}
private Path getFileCacheDir(Path base, String user) {
return new Path(getUserCacheDir(base, user),
ContainerLocalizer.FILECACHE);
}
/**
* Initialize the local directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user</li>
* </ul>
*/
private void createUserLocalDirs(List<String> localDirs, String user)
throws IOException {
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent
try {
lfs.mkdir(getUserCacheDir(new Path(localDir), user), userperms, true);
} catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e);
continue;
}
userDirStatus = true;
}
if (!userDirStatus) {
throw new IOException("Not able to initialize user directories "
+ "in any of the configured local directories for user " + user);
}
}
/**
* Initialize the local cache directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user</li>
* <li>$local.dir/usercache/$user/appcache</li>
* <li>$local.dir/usercache/$user/filecache</li>
* </ul>
*/
private void createUserCacheDirs(List<String> localDirs, String user)
throws IOException {
LOG.info("Initializing user " + user);
boolean appcacheDirStatus = false;
boolean distributedCacheDirStatus = false;
FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
FsPermission fileperms = new FsPermission(FILECACHE_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user/appcache
Path localDirPath = new Path(localDir);
final Path appDir = getAppcacheDir(localDirPath, user);
try {
lfs.mkdir(appDir, appCachePerms, true);
appcacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app cache directory : " + appDir, e);
}
// create $local.dir/usercache/$user/filecache
final Path distDir = getFileCacheDir(localDirPath, user);
try {
lfs.mkdir(distDir, fileperms, true);
distributedCacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create file cache directory : " + distDir, e);
}
}
if (!appcacheDirStatus) {
throw new IOException("Not able to initialize app-cache directories "
+ "in any of the configured local directories for user " + user);
}
if (!distributedCacheDirStatus) {
throw new IOException(
"Not able to initialize distributed-cache directories "
+ "in any of the configured local directories for user "
+ user);
}
}
/**
* Initialize the local directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user/appcache/$appid</li>
* </ul>
* @param localDirs
*/
private void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException {
boolean initAppDirStatus = false;
FsPermission appperms = new FsPermission(APPDIR_PERM);
for (String localDir : localDirs) {
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
// create $local.dir/usercache/$user/appcache/$appId
try {
lfs.mkdir(fullAppDir, appperms, true);
initAppDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
}
}
if (!initAppDirStatus) {
throw new IOException("Not able to initialize app directories "
+ "in any of the configured local directories for app "
+ appId.toString());
}
}
/**
* Create application log directories on all disks.
*/
private void createAppLogDirs(String appId, List<String> logDirs)
throws IOException {
boolean appLogDirStatus = false;
FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : logDirs) {
// create $log.dir/$appid
Path appLogDir = new Path(rootLogDir, appId);
try {
lfs.mkdir(appLogDir, appLogDirPerms, true);
} catch (IOException e) {
LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
continue;
}
appLogDirStatus = true;
}
if (!appLogDirStatus) {
throw new IOException("Not able to initialize app-log directories "
+ "in any of the configured local directories for app " + appId);
}
}
/**
* Create application log directories on all disks.
*/
private void createContainerLogDirs(String appId, String containerId,
List<String> logDirs) throws IOException {
boolean containerLogDirStatus = false;
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : logDirs) {
// create $log.dir/$appid/$containerid
Path appLogDir = new Path(rootLogDir, appId);
Path containerLogDir = new Path(appLogDir, containerId);
try {
lfs.mkdir(containerLogDir, containerLogDirPerms, true);
} catch (IOException e) {
LOG.warn("Unable to create the container-log directory : "
+ appLogDir, e);
continue;
}
containerLogDirStatus = true;
}
if (!containerLogDirStatus) {
throw new IOException(
"Not able to initialize container-log directories "
+ "in any of the configured local directories for container "
+ containerId);
}
}
/**
* @return the list of paths of given local directories
*/
private static List<Path> getPaths(List<String> dirs) {
List<Path> paths = new ArrayList<Path>(dirs.size());
for (int i = 0; i < dirs.size(); i++) {
paths.add(new Path(dirs.get(i)));
}
return paths;
}
}