blob: 83ae95b0e47b5ab7e31a20719ff828a41f1f8dfb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
* Controls initialization, finalization and clean up of tasks, and
* also the launching and killing of task JVMs.
*
* This class defines the API for initializing, finalizing and cleaning
* up of tasks, as also the launching and killing task JVMs.
* Subclasses of this class will implement the logic required for
* performing the actual actions.
*
* <br/>
*
* NOTE: This class is internal only class and not intended for users!!
*/
public abstract class TaskController implements Configurable {
/**
* The constants for the signals.
*/
public enum Signal {
NULL(0, "NULL"), QUIT(3, "SIGQUIT"),
KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
private final int value;
private final String str;
private Signal(int value, String str) {
this.str = str;
this.value = value;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return str;
}
}
private Configuration conf;
public static final Log LOG = LogFactory.getLog(TaskController.class);
//Name of the executable script that will contain the child
// JVM command line. See writeCommand for details.
protected static final String COMMAND_FILE = "taskjvm.sh";
protected LocalDirAllocator allocator;
final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Does initialization and setup.
* @param allocator the local dir allocator to use
*/
public abstract void setup(LocalDirAllocator allocator) throws IOException;
/**
* Create all of the directories necessary for the job to start and download
* all of the job and private distributed cache files.
* Creates both the user directories and the job log directory.
* @param user the user name
* @param jobid the job
* @param credentials a filename containing the job secrets
* @param jobConf the path to the localized configuration file
* @param taskTracker the connection the task tracker
* @param ttAddr the tasktracker's RPC address
* @throws IOException
*/
public abstract void initializeJob(String user, String jobid,
Path credentials, Path jobConf,
TaskUmbilicalProtocol taskTracker,
InetSocketAddress ttAddr)
throws IOException, InterruptedException;
/**
* Create all of the directories for the task and launches the child jvm.
* @param user the user name
* @param jobId the jobId in question
* @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
* @param setup list of shell commands to execute before the jvm
* @param jvmArguments list of jvm arguments
* @param currentWorkDirectory the full path of the cwd for the task
* @param stdout the file to redirect stdout to
* @param stderr the file to redirect stderr to
* @return the exit code for the task
* @throws IOException
*/
public abstract
int launchTask(String user,
String jobId,
String attemptId,
List<String> setup,
List<String> jvmArguments,
File currentWorkDirectory,
String stdout,
String stderr) throws IOException;
/**
* Send a signal to a task pid as the user. Always signal the process group.
* An implementation may elect to signal the pid directly if the former is
* unavailable or fails.
* @param user the user name
* @param taskPid the pid of the task
* @param signal the id of the signal to send
* @return false if the process does not exist
* @throws IOException If the task controller failed to signal the process
* (group), but the process exists.
*/
public abstract boolean signalTask(String user, int taskPid,
Signal signal) throws IOException;
/**
* Delete the user's files under all of the task tracker root directories.
* @param user the user name
* @param subDir the path relative to base directories
* @param baseDirs the base directories (absolute paths)
* @throws IOException
*/
public abstract void deleteAsUser(String user,
String subDir,
String... baseDirs) throws IOException;
/**
* Delete the user's files under the userlogs directory.
* @param user the user to work as
* @param subDir the path under the userlogs directory.
* @throws IOException
*/
public abstract void deleteLogAsUser(String user,
String subDir) throws IOException;
static class DeletionContext extends CleanupQueue.PathDeletionContext {
private TaskController controller;
private boolean isLog;
private String user;
private String subDir;
private String[] baseDirs;
DeletionContext(TaskController controller, boolean isLog, String user,
String subDir, String[] baseDirs) {
super(null, null);
this.controller = controller;
this.isLog = isLog;
this.user = user;
this.subDir = subDir;
this.baseDirs = baseDirs;
}
@Override
protected void deletePath() throws IOException {
if (isLog) {
controller.deleteLogAsUser(user, subDir);
} else {
controller.deleteAsUser(user, subDir, baseDirs);
}
}
@Override
public String toString() {
return (isLog ? "log(" : "dir(") +
user + "," + subDir + ")";
}
}
/**
* Returns the local unix user that a given job will run as.
*/
public String getRunAsUser(JobConf conf) {
return System.getProperty("user.name");
}
//Write the JVM command line to a file under the specified directory
// Note that the JVM will be launched using a setuid executable, and
// could potentially contain strings defined by a user. Hence, to
// prevent special character attacks, we write the command line to
// a file and execute it.
protected static String writeCommand(String cmdLine, FileSystem fs,
Path commandFile) throws IOException {
PrintWriter pw = null;
LOG.info("Writing commands to " + commandFile);
try {
pw = new PrintWriter(FileSystem.create(
fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
pw.write(cmdLine);
} catch (IOException ioe) {
LOG.error("Caught IOException while writing JVM command line to file. ",
ioe);
} finally {
if (pw != null) {
pw.close();
}
}
return commandFile.makeQualified(fs).toUri().getPath();
}
protected void logOutput(String output) {
String shExecOutput = output;
if (shExecOutput != null) {
for (String str : shExecOutput.split("\n")) {
LOG.info(str);
}
}
}
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
ShellCommandExecutor shexec = null;
boolean setsidSupported = true;
try {
String[] args = {"setsid", "bash", "-c", "echo $$"};
shexec = new ShellCommandExecutor(args);
shexec.execute();
} catch (IOException ioe) {
LOG.warn("setsid is not available on this machine. So not using it.");
setsidSupported = false;
} finally { // handle the exit code
LOG.info("setsid exited with exit code " + shexec.getExitCode());
}
return setsidSupported;
}
public static class DelayedProcessKiller extends Thread {
private final String user;
private final int pid;
private final long delay;
private final Signal signal;
private final TaskController taskController;
public DelayedProcessKiller(String user, int pid, long delay, Signal signal,
TaskController taskController) {
this.user = user;
this.pid = pid;
this.delay = delay;
this.signal = signal;
this.taskController = taskController;
setName("Task killer for " + pid);
setDaemon(false);
}
@Override
public void run() {
try {
Thread.sleep(delay);
taskController.signalTask(user, pid, signal);
} catch (InterruptedException e) {
return;
} catch (IOException e) {
LOG.warn("Exception when killing task " + pid, e);
}
}
}
}