| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo; |
| import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; |
| import org.apache.hadoop.mapred.TaskTracker.LocalStorage; |
| import org.apache.hadoop.util.ProcessTree.Signal; |
| import org.apache.hadoop.util.ProcessTree; |
| import org.apache.hadoop.util.Shell.ShellCommandExecutor; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| /** |
| * The default implementation for controlling tasks. |
| * |
| * This class provides an implementation for launching and killing |
| * tasks that need to be run as the tasktracker itself. Hence, |
| * many of the initializing or cleanup methods are not required here. |
| * |
| * <br/> |
| * |
| * NOTE: This class is internal only class and not intended for users!! |
| */ |
| public class DefaultTaskController extends TaskController { |
| |
| private static final Log LOG = |
| LogFactory.getLog(DefaultTaskController.class); |
| private FileSystem fs; |
| @Override |
| public void setConf(Configuration conf) { |
| super.setConf(conf); |
| try { |
| fs = FileSystem.getLocal(conf).getRaw(); |
| } catch (IOException ie) { |
| throw new RuntimeException("Failed getting LocalFileSystem", ie); |
| } |
| } |
| |
| @Override |
| public void createLogDir(TaskAttemptID taskID, |
| boolean isCleanup) throws IOException { |
| TaskLog.createTaskAttemptLogDir(taskID, isCleanup, |
| localStorage.getGoodLocalDirs()); |
| } |
| |
| /** |
| * Create all of the directories for the task and launches the child jvm. |
| * @param user the user name |
| * @param attemptId the attempt id |
| * @throws IOException |
| */ |
| @Override |
| public int launchTask(String user, |
| String jobId, |
| String attemptId, |
| List<String> setup, |
| List<String> jvmArguments, |
| File currentWorkDirectory, |
| String stdout, |
| String stderr) throws IOException { |
| ShellCommandExecutor shExec = null; |
| try { |
| FileSystem localFs = FileSystem.getLocal(getConf()); |
| |
| //create the attempt dirs |
| new Localizer(localFs, |
| getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)). |
| initializeAttemptDirs(user, jobId, attemptId); |
| |
| // create the working-directory of the task |
| if (!currentWorkDirectory.mkdir()) { |
| throw new IOException("Mkdirs failed to create " |
| + currentWorkDirectory.toString()); |
| } |
| |
| //mkdir the loglocation |
| String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString(); |
| if (!localFs.mkdirs(new Path(logLocation))) { |
| throw new IOException("Mkdirs failed to create " |
| + logLocation); |
| } |
| //read the configuration for the job |
| FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw(); |
| long logSize = 0; //TODO: Ref BUG:2854624 |
| // get the JVM command line. |
| String cmdLine = |
| TaskLog.buildCommandLine(setup, jvmArguments, |
| new File(stdout), new File(stderr), logSize, true); |
| |
| // write the command to a file in the |
| // task specific cache directory |
| // TODO copy to user dir |
| Path p = new Path(allocator.getLocalPathForWrite( |
| TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId), |
| getConf()), COMMAND_FILE); |
| |
| String commandFile = writeCommand(cmdLine, rawFs, p); |
| rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION); |
| shExec = new ShellCommandExecutor(new String[]{ |
| "bash", "-c", commandFile}, |
| currentWorkDirectory); |
| shExec.execute(); |
| } catch (Exception e) { |
| if (shExec == null) { |
| return -1; |
| } |
| int exitCode = shExec.getExitCode(); |
| LOG.warn("Exit code from task is : " + exitCode); |
| LOG.info("Output from DefaultTaskController's launchTask follows:"); |
| logOutput(shExec.getOutput()); |
| return exitCode; |
| } |
| return 0; |
| } |
| |
| /** |
| * This routine initializes the local file system for running a job. |
| * Details: |
| * <ul> |
| * <li>Copies the credentials file from the TaskTracker's private space to |
| * the job's private space </li> |
| * <li>Creates the job work directory and set |
| * {@link TaskTracker#JOB_LOCAL_DIR} in the configuration</li> |
| * <li>Downloads the job.jar, unjars it, and updates the configuration to |
| * reflect the localized path of the job.jar</li> |
| * <li>Creates a base JobConf in the job's private space</li> |
| * <li>Sets up the distributed cache</li> |
| * <li>Sets up the user logs directory for the job</li> |
| * </ul> |
| * This method must be invoked in the access control context of the job owner |
| * user. This is because the distributed cache is also setup here and the |
| * access to the hdfs files requires authentication tokens in case where |
| * security is enabled. |
| * @param user the user in question (the job owner) |
| * @param jobid the ID of the job in question |
| * @param credentials the path to the credentials file that the TaskTracker |
| * downloaded |
| * @param jobConf the path to the job configuration file that the TaskTracker |
| * downloaded |
| * @param taskTracker the connection to the task tracker |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| @Override |
| public void initializeJob(String user, String jobid, |
| Path credentials, Path jobConf, |
| TaskUmbilicalProtocol taskTracker, |
| InetSocketAddress ttAddr |
| ) throws IOException, InterruptedException { |
| final LocalDirAllocator lDirAlloc = allocator; |
| FileSystem localFs = FileSystem.getLocal(getConf()); |
| JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid); |
| localizer.createLocalDirs(); |
| localizer.createUserDirs(); |
| localizer.createJobDirs(); |
| |
| JobConf jConf = new JobConf(jobConf); |
| localizer.createWorkDir(jConf); |
| //copy the credential file |
| Path localJobTokenFile = lDirAlloc.getLocalPathForWrite( |
| TaskTracker.getLocalJobTokenFile(user, jobid), getConf()); |
| FileUtil.copy( |
| localFs, credentials, localFs, localJobTokenFile, false, getConf()); |
| |
| |
| //setup the user logs dir |
| localizer.initializeJobLogDir(); |
| |
| // Download the job.jar for this job from the system FS |
| // setup the distributed cache |
| // write job acls |
| // write localized config |
| localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile, |
| taskTracker); |
| } |
| |
| @Override |
| public void signalTask(String user, int taskPid, Signal signal) { |
| if (ProcessTree.isSetsidAvailable) { |
| ProcessTree.killProcessGroup(Integer.toString(taskPid), signal); |
| } else { |
| ProcessTree.killProcess(Integer.toString(taskPid), signal); |
| } |
| } |
| |
| /** |
| * Delete the user's files under all of the task tracker root directories. |
| * @param user the user name |
| * @param subDir the path relative to the user's subdirectory under |
| * the task tracker root directories. |
| * @throws IOException |
| */ |
| @Override |
| public void deleteAsUser(String user, |
| String subDir) throws IOException { |
| String dir = TaskTracker.getUserDir(user) + Path.SEPARATOR + subDir; |
| for(Path fullDir: allocator.getAllLocalPathsToRead(dir, getConf())) { |
| fs.delete(fullDir, true); |
| } |
| } |
| |
| /** |
| * Delete the user's files under the userlogs directory. |
| * @param user the user to work as |
| * @param subDir the path under the userlogs directory. |
| * @throws IOException |
| */ |
| @Override |
| public void deleteLogAsUser(String user, |
| String subDir) throws IOException { |
| Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir); |
| //Delete the subDir in <hadoop.log.dir>/userlogs |
| File subDirPath = new File(dir.toString()); |
| FileUtil.fullyDelete( subDirPath ); |
| |
| //Delete the subDir in all good <mapred.local.dirs>/userlogs |
| String [] localDirs = localStorage.getGoodLocalDirs(); |
| for(String localdir : localDirs) { |
| String dirPath = localdir + File.separatorChar + |
| TaskLog.USERLOGS_DIR_NAME + File.separatorChar + |
| subDir; |
| try { |
| FileUtil.fullyDelete( new File(dirPath) ); |
| } catch(Exception e){ |
| //Skip bad dir for later deletion |
| LOG.warn("Could not delete dir: " + dirPath + |
| " , Reason : " + e.getMessage()); |
| } |
| } |
| } |
| |
| @Override |
| public void truncateLogsAsUser(String user, List<Task> allAttempts) |
| throws IOException { |
| Task firstTask = allAttempts.get(0); |
| TaskLogsTruncater trunc = new TaskLogsTruncater(getConf()); |
| |
| trunc.truncateLogs(new JVMInfo( |
| TaskLog.getAttemptDir(firstTask.getTaskID(), |
| firstTask.isTaskCleanupTask()), |
| allAttempts)); |
| } |
| |
| @Override |
| public void setup(LocalDirAllocator allocator, LocalStorage l) { |
| this.allocator = allocator; |
| this.localStorage = l; |
| } |
| |
| } |