blob: d2d59ae15fc21bc6fc857af2c58ecdb8ab7dc085 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The default implementation for controlling tasks.
*
* This class provides an implementation for launching and killing
* tasks that need to be run as the tasktracker itself. Hence,
* many of the initializing or cleanup methods are not required here.
*
* <br/>
*
* NOTE: This class is internal only class and not intended for users!!
*/
public class DefaultTaskController extends TaskController {
private static final Log LOG =
LogFactory.getLog(DefaultTaskController.class);
private FileSystem fs;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
try {
fs = FileSystem.getLocal(conf).getRaw();
} catch (IOException ie) {
throw new RuntimeException("Failed getting LocalFileSystem", ie);
}
}
/**
* Create all of the directories for the task and launches the child jvm.
* @param user the user name
* @param attemptId the attempt id
* @throws IOException
*/
@Override
public int launchTask(String user,
String jobId,
String attemptId,
List<String> setup,
List<String> jvmArguments,
File currentWorkDirectory,
String stdout,
String stderr) throws IOException {
ShellCommandExecutor shExec = null;
try {
FileSystem localFs = FileSystem.getLocal(getConf());
//create the attempt dirs
new Localizer(localFs,
getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)).
initializeAttemptDirs(user, jobId, attemptId);
// create the working-directory of the task
if (!currentWorkDirectory.mkdir()) {
throw new IOException("Mkdirs failed to create "
+ currentWorkDirectory.toString());
}
//mkdir the loglocation
String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString();
if (!localFs.mkdirs(new Path(logLocation))) {
throw new IOException("Mkdirs failed to create "
+ logLocation);
}
//read the configuration for the job
FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
long logSize = 0; //TODO: Ref BUG:2854624
// get the JVM command line.
String cmdLine =
TaskLog.buildCommandLine(setup, jvmArguments,
new File(stdout), new File(stderr), logSize, true);
// write the command to a file in the
// task specific cache directory
// TODO copy to user dir
Path p = new Path(allocator.getLocalPathForWrite(
TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
getConf()), COMMAND_FILE);
String commandFile = writeCommand(cmdLine, rawFs, p);
rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
shExec = new ShellCommandExecutor(new String[]{
"bash", "-c", commandFile},
currentWorkDirectory);
shExec.execute();
} catch (Exception e) {
if (shExec == null) {
return -1;
}
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from task is : " + exitCode);
LOG.info("Output from DefaultTaskController's launchTask follows:");
logOutput(shExec.getOutput());
return exitCode;
}
return 0;
}
/**
* This routine initializes the local file system for running a job.
* Details:
* <ul>
* <li>Copies the credentials file from the TaskTracker's private space to
* the job's private space </li>
* <li>Creates the job work directory and set
* {@link TaskTracker#JOB_LOCAL_DIR} in the configuration</li>
* <li>Downloads the job.jar, unjars it, and updates the configuration to
* reflect the localized path of the job.jar</li>
* <li>Creates a base JobConf in the job's private space</li>
* <li>Sets up the distributed cache</li>
* <li>Sets up the user logs directory for the job</li>
* </ul>
* This method must be invoked in the access control context of the job owner
* user. This is because the distributed cache is also setup here and the
* access to the hdfs files requires authentication tokens in case where
* security is enabled.
* @param user the user in question (the job owner)
* @param jobid the ID of the job in question
* @param credentials the path to the credentials file that the TaskTracker
* downloaded
* @param jobConf the path to the job configuration file that the TaskTracker
* downloaded
* @param taskTracker the connection to the task tracker
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initializeJob(String user, String jobid,
Path credentials, Path jobConf,
TaskUmbilicalProtocol taskTracker,
InetSocketAddress ttAddr
) throws IOException, InterruptedException {
final LocalDirAllocator lDirAlloc = allocator;
FileSystem localFs = FileSystem.getLocal(getConf());
JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid);
localizer.createLocalDirs();
localizer.createUserDirs();
localizer.createJobDirs();
JobConf jConf = new JobConf(jobConf);
localizer.createWorkDir(jConf);
//copy the credential file
Path localJobTokenFile = lDirAlloc.getLocalPathForWrite(
TaskTracker.getLocalJobTokenFile(user, jobid), getConf());
FileUtil.copy(
localFs, credentials, localFs, localJobTokenFile, false, getConf());
//setup the user logs dir
localizer.initializeJobLogDir();
// Download the job.jar for this job from the system FS
// setup the distributed cache
// write job acls
// write localized config
localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile,
taskTracker);
}
@Override
public boolean signalTask(String user, int taskPid, Signal signal)
throws IOException {
final int sigpid = TaskController.isSetsidAvailable
? -1 * taskPid
: taskPid;
try {
sendSignal(sigpid, Signal.NULL);
} catch (ExitCodeException e) {
return false;
}
try {
sendSignal(sigpid, signal);
} catch (IOException e) {
try {
sendSignal(sigpid, Signal.NULL);
} catch (IOException ignore) {
return false;
}
throw e;
}
return true;
}
/**
* Send a specified signal to the specified pid
*
* @param pid the pid of the process [group] to signal.
* @param signal signal to send
* (for logging).
*/
protected void sendSignal(int pid, Signal signal) throws IOException {
ShellCommandExecutor shexec = null;
String[] arg = { "kill", "-" + signal.getValue(), Integer.toString(pid) };
shexec = new ShellCommandExecutor(arg);
shexec.execute();
}
/**
* Delete the user's files under all of the task tracker root directories.
* @param user the user name
* @param subDir the path relative to base directories
* @param baseDirs the base directories (absolute paths)
* @throws IOException
*/
@Override
public void deleteAsUser(String user,
String subDir,
String... baseDirs) throws IOException {
if (baseDirs == null || baseDirs.length == 0) {
LOG.info("Deleting absolute path : " + subDir);
fs.delete(new Path(subDir), true);
return;
}
for (String baseDir : baseDirs) {
LOG.info("Deleting path : " + baseDir + Path.SEPARATOR + subDir);
fs.delete(new Path(baseDir + Path.SEPARATOR + subDir), true);
}
}
/**
* Delete the user's files under the userlogs directory.
* @param user the user to work as
* @param subDir the path under the userlogs directory.
* @throws IOException
*/
@Override
public void deleteLogAsUser(String user,
String subDir) throws IOException {
Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
fs.delete(dir, true);
}
@Override
public void setup(LocalDirAllocator allocator) {
this.allocator = allocator;
}
}