| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.jar.JarOutputStream; |
| import java.util.zip.ZipEntry; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; |
| import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.mapred.JvmManager.JvmEnv; |
| import org.apache.hadoop.mapred.TaskController.JobInitializationContext; |
| import org.apache.hadoop.mapred.TaskController.TaskControllerContext; |
| import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; |
| import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue; |
| |
| import junit.framework.TestCase; |
| |
| /** |
| * Test to verify localization of a job and localization of a task on a |
| * TaskTracker. |
| * |
| */ |
| public class TestTaskTrackerLocalization extends TestCase { |
| |
| private static File TEST_ROOT_DIR = |
| new File(System.getProperty("test.build.data", "/tmp")); |
| private File ROOT_MAPRED_LOCAL_DIR; |
| private File HADOOP_LOG_DIR; |
| private static File PERMISSION_SCRIPT_DIR; |
| private static File PERMISSION_SCRIPT_FILE; |
| private static final String PERMISSION_SCRIPT_CONTENT = "ls -l -d $1 | " + |
| "awk '{print $1\":\"$3\":\"$4}'"; |
| |
| private int numLocalDirs = 6; |
| private static final Log LOG = |
| LogFactory.getLog(TestTaskTrackerLocalization.class); |
| |
| protected TaskTracker tracker; |
| protected UserGroupInformation taskTrackerUGI; |
| protected TaskController taskController; |
| protected JobConf trackerFConf; |
| private JobConf localizedJobConf; |
| protected JobID jobId; |
| protected TaskAttemptID taskId; |
| protected Task task; |
| protected String[] localDirs; |
| protected static LocalDirAllocator lDirAlloc = |
| new LocalDirAllocator(MRConfig.LOCAL_DIR); |
| protected Path attemptWorkDir; |
| protected File[] attemptLogFiles; |
| protected JobConf localizedTaskConf; |
| |
| /** |
| * Dummy method in this base class. Only derived classes will define this |
| * method for checking if a test can be run. |
| */ |
| protected boolean canRun() { |
| return true; |
| } |
| |
| @Override |
| protected void setUp() |
| throws Exception { |
| if (!canRun()) { |
| return; |
| } |
| TEST_ROOT_DIR = |
| new File(System.getProperty("test.build.data", "/tmp"), getClass() |
| .getSimpleName()); |
| if (!TEST_ROOT_DIR.exists()) { |
| TEST_ROOT_DIR.mkdirs(); |
| } |
| |
| ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); |
| ROOT_MAPRED_LOCAL_DIR.mkdirs(); |
| |
| HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs"); |
| HADOOP_LOG_DIR.mkdir(); |
| System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath()); |
| |
| trackerFConf = new JobConf(); |
| trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); |
| localDirs = new String[numLocalDirs]; |
| for (int i = 0; i < numLocalDirs; i++) { |
| localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath(); |
| } |
| trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs); |
| |
| // Create the job configuration file. Same as trackerConf in this test. |
| Job job = new Job(trackerFConf); |
| |
| job.setUGIAndUserGroupNames(); |
| |
| // JobClient uploads the job jar to the file system and sets it in the |
| // jobConf. |
| uploadJobJar(job); |
| |
| // JobClient uploads the jobConf to the file system. |
| File jobConfFile = uploadJobConf(job.getConfiguration()); |
| |
| // Set up the TaskTracker |
| tracker = new TaskTracker(); |
| tracker.setConf(trackerFConf); |
| |
| // for test case system FS is the local FS |
| tracker.systemFS = FileSystem.getLocal(trackerFConf); |
| tracker.setLocalFileSystem(tracker.systemFS); |
| tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath()); |
| |
| taskTrackerUGI = UserGroupInformation.login(trackerFConf); |
| |
| // Set up the task to be localized |
| String jtIdentifier = "200907202331"; |
| jobId = new JobID(jtIdentifier, 1); |
| taskId = |
| new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0); |
| task = |
| new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1); |
| task.setConf(job.getConfiguration()); // Set conf. Set user name in particular. |
| |
| // create jobTokens file |
| uploadJobTokensFile(); |
| |
| |
| taskController = new DefaultTaskController(); |
| taskController.setConf(trackerFConf); |
| taskController.setup(); |
| |
| tracker.setTaskController(taskController); |
| tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs, |
| taskController)); |
| } |
| |
| /** |
| * static block setting up the permission script which would be used by the |
| * checkFilePermissions |
| */ |
| static { |
| PERMISSION_SCRIPT_DIR = new File(TEST_ROOT_DIR, "permission_script_dir"); |
| PERMISSION_SCRIPT_FILE = new File(PERMISSION_SCRIPT_DIR, "getperms.sh"); |
| |
| if(PERMISSION_SCRIPT_FILE.exists()) { |
| PERMISSION_SCRIPT_FILE.delete(); |
| } |
| |
| if(PERMISSION_SCRIPT_DIR.exists()) { |
| PERMISSION_SCRIPT_DIR.delete(); |
| } |
| |
| PERMISSION_SCRIPT_DIR.mkdir(); |
| |
| try { |
| PrintWriter writer = new PrintWriter(PERMISSION_SCRIPT_FILE); |
| writer.write(PERMISSION_SCRIPT_CONTENT); |
| writer.close(); |
| } catch (FileNotFoundException fe) { |
| fail(); |
| } |
| PERMISSION_SCRIPT_FILE.setExecutable(true, true); |
| } |
| |
| /** |
| * @param job |
| * @throws IOException |
| * @throws FileNotFoundException |
| */ |
| private void uploadJobJar(Job job) |
| throws IOException, |
| FileNotFoundException { |
| File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar"); |
| JarOutputStream jstream = |
| new JarOutputStream(new FileOutputStream(jobJarFile)); |
| ZipEntry ze = new ZipEntry("lib/lib1.jar"); |
| jstream.putNextEntry(ze); |
| jstream.closeEntry(); |
| ze = new ZipEntry("lib/lib2.jar"); |
| jstream.putNextEntry(ze); |
| jstream.closeEntry(); |
| jstream.finish(); |
| jstream.close(); |
| job.setJar(jobJarFile.toURI().toString()); |
| } |
| |
| /** |
| * @param conf |
| * @return |
| * @throws FileNotFoundException |
| * @throws IOException |
| */ |
| protected File uploadJobConf(Configuration conf) |
| throws FileNotFoundException, |
| IOException { |
| File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml"); |
| FileOutputStream out = new FileOutputStream(jobConfFile); |
| conf.writeXml(out); |
| out.close(); |
| return jobConfFile; |
| } |
| |
| /** |
| * create fake JobTokens file |
| * @return |
| * @throws IOException |
| */ |
| protected void uploadJobTokensFile() throws IOException { |
| |
| File dir = new File(TEST_ROOT_DIR, jobId.toString()); |
| if(!dir.exists()) |
| assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs()); |
| |
| File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME); |
| FileOutputStream fos = new FileOutputStream(jobTokenFile); |
| java.io.DataOutputStream out = new java.io.DataOutputStream(fos); |
| Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(); |
| jt.write(out); // writing empty file, we don't the keys for this test |
| out.close(); |
| } |
| |
| @Override |
| protected void tearDown() |
| throws Exception { |
| if (!canRun()) { |
| return; |
| } |
| FileUtil.fullyDelete(TEST_ROOT_DIR); |
| } |
| |
| protected static String[] getFilePermissionAttrs(String path) |
| throws IOException { |
| String[] command = {"bash",PERMISSION_SCRIPT_FILE.getAbsolutePath(), path}; |
| String output=Shell.execCommand(command); |
| return output.split(":|\n"); |
| } |
| |
| |
| /** |
| * Utility method to check permission of a given path. Requires the permission |
| * script directory to be setup in order to call. |
| * |
| * |
| * @param path |
| * @param expectedPermissions |
| * @param expectedOwnerUser |
| * @param expectedOwnerGroup |
| * @throws IOException |
| */ |
| static void checkFilePermissions(String path, String expectedPermissions, |
| String expectedOwnerUser, String expectedOwnerGroup) |
| throws IOException { |
| String[] attrs = getFilePermissionAttrs(path); |
| assertTrue("File attrs length is not 3 but " + attrs.length, |
| attrs.length == 3); |
| assertTrue("Path " + path + " has the permissions " + attrs[0] |
| + " instead of the expected " + expectedPermissions, attrs[0] |
| .equals(expectedPermissions)); |
| assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser |
| + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser)); |
| assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup |
| + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup)); |
| } |
| |
| /** |
| * Verify the task-controller's setup functionality |
| * |
| * @throws IOException |
| */ |
| public void testTaskControllerSetup() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| // Task-controller is already set up in the test's setup method. Now verify. |
| for (String localDir : localDirs) { |
| |
| // Verify the local-dir itself. |
| File lDir = new File(localDir); |
| assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists()); |
| checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task |
| .getUser(), taskTrackerUGI.getGroupNames()[0]); |
| } |
| |
| // Verify the pemissions on the userlogs dir |
| File taskLog = TaskLog.getUserLogDir(); |
| checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task |
| .getUser(), taskTrackerUGI.getGroupNames()[0]); |
| } |
| |
| /** |
| * Test the localization of a user on the TT. |
| * |
| * @throws IOException |
| */ |
| public void testUserLocalization() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| // /////////// The main method being tested |
| tracker.getLocalizer().initializeUserDirs(task.getUser()); |
| // /////////// |
| |
| // Check the directory structure and permissions |
| checkUserLocalization(); |
| |
| // For the sake of testing re-entrancy of initializeUserDirs(), we remove |
| // the user directories now and make sure that further calls of the method |
| // don't create directories any more. |
| for (String dir : localDirs) { |
| File userDir = new File(dir, TaskTracker.getUserDir(task.getUser())); |
| FileUtil.fullyDelete(userDir); |
| } |
| |
| // Now call the method again. |
| tracker.getLocalizer().initializeUserDirs(task.getUser()); |
| |
| // Files should not be created now and so shouldn't be there anymore. |
| for (String dir : localDirs) { |
| File userDir = new File(dir, TaskTracker.getUserDir(task.getUser())); |
| assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath() |
| + " exists!", userDir.exists()); |
| } |
| } |
| |
| protected void checkUserLocalization() |
| throws IOException { |
| for (String dir : localDirs) { |
| |
| File localDir = new File(dir); |
| assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!", |
| localDir.exists()); |
| |
| File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR); |
| assertTrue("taskTracker sub-dir in the local-dir " + localDir |
| + "is not created!", taskTrackerSubDir.exists()); |
| |
| File userDir = new File(taskTrackerSubDir, task.getUser()); |
| assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir |
| + "is not created!", userDir.exists()); |
| checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task |
| .getUser(), taskTrackerUGI.getGroupNames()[0]); |
| |
| File jobCache = new File(userDir, TaskTracker.JOBCACHE); |
| assertTrue("jobcache in the userDir " + userDir + " isn't created!", |
| jobCache.exists()); |
| checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task |
| .getUser(), taskTrackerUGI.getGroupNames()[0]); |
| |
| // Verify the distributed cache dir. |
| File distributedCacheDir = |
| new File(localDir, TaskTracker |
| .getPrivateDistributedCacheDir(task.getUser())); |
| assertTrue("distributed cache dir " + distributedCacheDir |
| + " doesn't exists!", distributedCacheDir.exists()); |
| checkFilePermissions(distributedCacheDir.getAbsolutePath(), |
| "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]); |
| } |
| } |
| |
| /** |
| * Test job localization on a TT. Tests localization of job.xml, job.jar and |
| * corresponding setting of configuration. Also test |
| * {@link TaskController#initializeJob(JobInitializationContext)} |
| * |
| * @throws IOException |
| */ |
| public void testJobLocalization() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| tracker.getLocalizer().initializeUserDirs(task.getUser()); |
| |
| // /////////// The main method being tested |
| localizedJobConf = tracker.localizeJobFiles(task); |
| // /////////// |
| |
| // Now initialize the job via task-controller so as to set |
| // ownership/permissions of jars, job-work-dir |
| JobInitializationContext context = new JobInitializationContext(); |
| context.jobid = jobId; |
| context.user = task.getUser(); |
| context.workDir = |
| new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); |
| |
| // /////////// The method being tested |
| taskController.initializeJob(context); |
| // /////////// |
| |
| checkJobLocalization(); |
| } |
| |
| protected void checkJobLocalization() |
| throws IOException { |
| // Check the directory structure |
| for (String dir : localDirs) { |
| |
| File localDir = new File(dir); |
| File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR); |
| File userDir = new File(taskTrackerSubDir, task.getUser()); |
| File jobCache = new File(userDir, TaskTracker.JOBCACHE); |
| |
| File jobDir = new File(jobCache, jobId.toString()); |
| assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists()); |
| |
| // check the private permissions on the job directory |
| checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task |
| .getUser(), taskTrackerUGI.getGroupNames()[0]); |
| } |
| |
| // check the localization of job.xml |
| assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc |
| .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(), |
| jobId.toString()), trackerFConf) != null); |
| |
| // check the localization of job.jar |
| Path jarFileLocalized = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(), |
| jobId.toString()), trackerFConf); |
| assertTrue("job.jar is not localized on this TaskTracker!!", |
| jarFileLocalized != null); |
| assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File( |
| jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar") |
| .exists()); |
| assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File( |
| jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar") |
| .exists()); |
| |
| // check the creation of job work directory |
| assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc |
| .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId |
| .toString()), trackerFConf) != null); |
| |
| // Check the setting of mapreduce.job.local.dir and job.jar which will eventually be |
| // used by the user's task |
| boolean jobLocalDirFlag = false, mapredJarFlag = false; |
| String localizedJobLocalDir = |
| localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR); |
| String localizedJobJar = localizedJobConf.getJar(); |
| for (String localDir : localizedJobConf.getStrings(MRConfig.LOCAL_DIR)) { |
| if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR |
| + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) { |
| jobLocalDirFlag = true; |
| } |
| if (localizedJobJar.equals(localDir + Path.SEPARATOR |
| + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) { |
| mapredJarFlag = true; |
| } |
| } |
| assertTrue(TaskTracker.JOB_LOCAL_DIR |
| + " is not set properly to the target users directory : " |
| + localizedJobLocalDir, jobLocalDirFlag); |
| assertTrue( |
| "mapreduce.job.jar is not set properly to the target users directory : " |
| + localizedJobJar, mapredJarFlag); |
| } |
| |
| /** |
| * Test task localization on a TT. |
| * |
| * @throws IOException |
| */ |
| public void testTaskLocalization() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| tracker.getLocalizer().initializeUserDirs(task.getUser()); |
| localizedJobConf = tracker.localizeJobFiles(task); |
| |
| // Now initialize the job via task-controller so as to set |
| // ownership/permissions of jars, job-work-dir |
| JobInitializationContext jobContext = new JobInitializationContext(); |
| jobContext.jobid = jobId; |
| jobContext.user = task.getUser(); |
| jobContext.workDir = |
| new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); |
| taskController.initializeJob(jobContext); |
| |
| TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf); |
| tip.setJobConf(localizedJobConf); |
| |
| // ////////// The central method being tested |
| tip.localizeTask(task); |
| // ////////// |
| |
| // check the functionality of localizeTask |
| for (String dir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) { |
| File attemptDir = |
| new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId |
| .toString(), taskId.toString())); |
| assertTrue("attempt-dir " + attemptDir + " in localDir " + dir |
| + " is not created!!", attemptDir.exists()); |
| } |
| |
| attemptWorkDir = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir( |
| task.getUser(), task.getJobID().toString(), task.getTaskID() |
| .toString(), task.isTaskCleanupTask()), trackerFConf); |
| assertTrue("atttempt work dir for " + taskId.toString() |
| + " is not created in any of the configured dirs!!", |
| attemptWorkDir != null); |
| |
| TaskRunner runner = task.createRunner(tracker, tip); |
| |
| // /////// Few more methods being tested |
| runner.setupChildTaskConfiguration(lDirAlloc); |
| TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()), |
| localizedJobConf); |
| attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID()); |
| |
| // Make sure the task-conf file is created |
| Path localTaskFile = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task |
| .getUser(), task.getJobID().toString(), task.getTaskID() |
| .toString(), task.isTaskCleanupTask()), trackerFConf); |
| assertTrue("Task conf file " + localTaskFile.toString() |
| + " is not created!!", new File(localTaskFile.toUri().getPath()) |
| .exists()); |
| |
| // /////// One more method being tested. This happens in child space. |
| localizedTaskConf = new JobConf(localTaskFile); |
| TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf); |
| // /////// |
| |
| // Initialize task via TaskController |
| TaskControllerContext taskContext = |
| new TaskController.TaskControllerContext(); |
| taskContext.env = |
| new JvmEnv(null, null, null, null, -1, new File(localizedJobConf |
| .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf); |
| taskContext.task = task; |
| // /////////// The method being tested |
| taskController.initializeTask(taskContext); |
| // /////////// |
| |
| checkTaskLocalization(); |
| } |
| |
| protected void checkTaskLocalization() |
| throws IOException { |
| // Make sure that the mapreduce.cluster.local.dir is sandboxed |
| for (String childMapredLocalDir : localizedTaskConf |
| .getStrings(MRConfig.LOCAL_DIR)) { |
| assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!", |
| childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task |
| .getUser(), jobId.toString(), taskId.toString(), false))); |
| } |
| |
| // Make sure task task.getJobFile is changed and pointed correctly. |
| assertTrue(task.getJobFile().endsWith( |
| TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId |
| .toString(), false))); |
| |
| // Make sure that the tmp directories are created |
| assertTrue("tmp dir is not created in workDir " |
| + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri() |
| .getPath(), "tmp").exists()); |
| |
| // Make sure that the logs are setup properly |
| File logDir = |
| new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR |
| + task.getTaskID().toString()); |
| assertTrue("task's log dir " + logDir.toString() + " doesn't exist!", |
| logDir.exists()); |
| checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task |
| .getUser(), taskTrackerUGI.getGroupNames()[0]); |
| |
| File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString()); |
| assertTrue("stdout log file is improper. Expected : " |
| + expectedStdout.toString() + " Observed : " |
| + attemptLogFiles[0].toString(), expectedStdout.toString().equals( |
| attemptLogFiles[0].toString())); |
| File expectedStderr = |
| new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString()); |
| assertTrue("stderr log file is improper. Expected : " |
| + expectedStderr.toString() + " Observed : " |
| + attemptLogFiles[1].toString(), expectedStderr.toString().equals( |
| attemptLogFiles[1].toString())); |
| } |
| |
| /** |
| * Validates the removal of $taskid and $tasid/work under mapred-local-dir |
| * in cases where those directories cannot be deleted without adding |
| * write permission to the newly created directories under $taskid and |
| * $taskid/work |
| * Also see TestSetupWorkDir.createFileAndSetPermissions for details |
| */ |
| void validateRemoveFiles(boolean needCleanup, boolean jvmReuse, |
| TaskInProgress tip) throws IOException { |
| // create files and set permissions 555. Verify if task controller sets |
| // the permissions for TT to delete the taskDir or workDir |
| String dir = (!needCleanup || jvmReuse) ? |
| TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(), |
| taskId.toString(), task.isTaskCleanupTask()) |
| : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(), |
| taskId.toString(), task.isTaskCleanupTask()); |
| |
| Path[] paths = tracker.getLocalFiles(localizedJobConf, dir); |
| for (Path p : paths) { |
| if (tracker.getLocalFileSystem().exists(p)) { |
| TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p); |
| } |
| } |
| |
| InlineCleanupQueue cleanupQueue = new InlineCleanupQueue(); |
| tracker.setCleanupThread(cleanupQueue); |
| |
| tip.removeTaskFiles(needCleanup, taskId); |
| |
| if (jvmReuse) { |
| // work dir should still exist and cleanup queue should be empty |
| assertTrue("cleanup queue is not empty after removeTaskFiles() in case " |
| + "of jvm reuse.", cleanupQueue.isQueueEmpty()); |
| boolean workDirExists = false; |
| for (Path p : paths) { |
| if (tracker.getLocalFileSystem().exists(p)) { |
| workDirExists = true; |
| } |
| } |
| assertTrue("work dir does not exist in case of jvm reuse", workDirExists); |
| |
| // now try to delete the work dir and verify that there are no stale paths |
| JvmManager.deleteWorkDir(tracker, task); |
| } |
| tracker.removeJobFiles(task.getUser(), jobId.toString()); |
| |
| assertTrue("Some task files are not deleted!! Number of stale paths is " |
| + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); |
| } |
| |
| /** |
| * Validates if task cleanup is done properly for a succeeded task |
| * @throws IOException |
| */ |
| public void testTaskCleanup() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| testTaskCleanup(false, false);// no needCleanup; no jvmReuse |
| } |
| |
| /** |
| * Validates if task cleanup is done properly for a task that is not succeeded |
| * @throws IOException |
| */ |
| public void testFailedTaskCleanup() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| testTaskCleanup(true, false);// needCleanup; no jvmReuse |
| } |
| |
| /** |
| * Validates if task cleanup is done properly for a succeeded task |
| * @throws IOException |
| */ |
| public void testTaskCleanupWithJvmUse() |
| throws IOException { |
| if (!canRun()) { |
| return; |
| } |
| testTaskCleanup(false, true);// no needCleanup; jvmReuse |
| } |
| |
| /** |
| * Validates if task cleanup is done properly |
| */ |
| private void testTaskCleanup(boolean needCleanup, boolean jvmReuse) |
| throws IOException { |
| // Localize job and localize task. |
| tracker.getLocalizer().initializeUserDirs(task.getUser()); |
| localizedJobConf = tracker.localizeJobFiles(task); |
| if (jvmReuse) { |
| localizedJobConf.setNumTasksToExecutePerJvm(2); |
| } |
| // Now initialize the job via task-controller so as to set |
| // ownership/permissions of jars, job-work-dir |
| JobInitializationContext jobContext = new JobInitializationContext(); |
| jobContext.jobid = jobId; |
| jobContext.user = localizedJobConf.getUser(); |
| jobContext.workDir = |
| new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); |
| taskController.initializeJob(jobContext); |
| TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf); |
| tip.setJobConf(localizedJobConf); |
| tip.localizeTask(task); |
| Path workDir = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir( |
| task.getUser(), task.getJobID().toString(), task.getTaskID() |
| .toString(), task.isTaskCleanupTask()), trackerFConf); |
| TaskRunner runner = task.createRunner(tracker, tip); |
| tip.setTaskRunner(runner); |
| runner.setupChildTaskConfiguration(lDirAlloc); |
| TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()), |
| localizedJobConf); |
| TaskRunner.prepareLogFiles(task.getTaskID()); |
| Path localTaskFile = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task |
| .getUser(), task.getJobID().toString(), task.getTaskID() |
| .toString(), task.isTaskCleanupTask()), trackerFConf); |
| JobConf localizedTaskConf = new JobConf(localTaskFile); |
| TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf); |
| TaskControllerContext taskContext = |
| new TaskController.TaskControllerContext(); |
| taskContext.env = |
| new JvmEnv(null, null, null, null, -1, new File(localizedJobConf |
| .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf); |
| taskContext.task = task; |
| // /////////// The method being tested |
| taskController.initializeTask(taskContext); |
| |
| // TODO: Let the task run and create files. |
| |
| // create files and set permissions 555. Verify if task controller sets |
| // the permissions for TT to delete the task dir or work dir properly |
| validateRemoveFiles(needCleanup, jvmReuse, tip); |
| |
| // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still |
| // there. |
| for (String localDir : localDirs) { |
| Path userDir = |
| new Path(localDir, TaskTracker.getUserDir(task.getUser())); |
| assertTrue("User directory " + userDir + " is not present!!", |
| tracker.getLocalFileSystem().exists(userDir)); |
| } |
| |
| // Test userlogs cleanup. |
| verifyUserLogsCleanup(); |
| } |
| |
| /** |
| * Test userlogs cleanup. |
| * |
| * @throws IOException |
| */ |
| private void verifyUserLogsCleanup() |
| throws IOException { |
| Path logDir = |
| new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME |
| + Path.SEPARATOR + task.getTaskID().toString()); |
| |
| // Logs should be there before cleanup. |
| assertTrue("Userlogs dir " + logDir + " is not presen as expected!!", |
| tracker.getLocalFileSystem().exists(logDir)); |
| |
| // ////////// Another being tested |
| TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file |
| // modification time behind retainTimeStatmp |
| // ////////// |
| |
| // Logs should be gone after cleanup. |
| assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!", |
| tracker.getLocalFileSystem().exists(logDir)); |
| } |
| } |