blob: 96ed9fa4ffd0dab4c14565a0ccaa61613fe9d877 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedHashMap;
import java.util.TreeMap;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
import junit.framework.TestCase;
/**
* Test to verify localization of a job and localization of a task on a
* TaskTracker.
*
*/
public class TestTaskTrackerLocalization extends TestCase {
private static File TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"));
private File ROOT_MAPRED_LOCAL_DIR;
private File HADOOP_LOG_DIR;
private static File PERMISSION_SCRIPT_DIR;
private static File PERMISSION_SCRIPT_FILE;
private static final String PERMISSION_SCRIPT_CONTENT = "ls -l -d $1 | " +
"awk '{print $1\":\"$3\":\"$4}'";
private int numLocalDirs = 6;
private static final Log LOG =
LogFactory.getLog(TestTaskTrackerLocalization.class);
protected TaskTracker tracker;
protected UserGroupInformation taskTrackerUGI;
protected TaskController taskController;
protected JobConf trackerFConf;
private JobConf localizedJobConf;
protected JobID jobId;
protected TaskAttemptID taskId;
protected Task task;
protected String[] localDirs;
protected static LocalDirAllocator lDirAlloc =
new LocalDirAllocator(MRConfig.LOCAL_DIR);
protected Path attemptWorkDir;
protected File[] attemptLogFiles;
protected JobConf localizedTaskConf;
private TaskInProgress tip;
private JobConf jobConf;
private File jobConfFile;
/**
* Dummy method in this base class. Only derived classes will define this
* method for checking if a test can be run.
*/
protected boolean canRun() {
return true;
}
@Override
protected void setUp()
throws Exception {
if (!canRun()) {
return;
}
TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"), getClass()
.getSimpleName());
if (!TEST_ROOT_DIR.exists()) {
TEST_ROOT_DIR.mkdirs();
}
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
ROOT_MAPRED_LOCAL_DIR.mkdirs();
HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs");
HADOOP_LOG_DIR.mkdir();
System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
trackerFConf = new JobConf();
trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
localDirs = new String[numLocalDirs];
for (int i = 0; i < numLocalDirs; i++) {
localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
}
trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
trackerFConf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
// Create the job configuration file. Same as trackerConf in this test.
jobConf = new JobConf(trackerFConf);
// Set job view ACLs in conf sothat validation of contents of jobACLsFile
// can be done against this value. Have both users and groups
String jobViewACLs = "user1,user2, group1,group2";
jobConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACLs);
jobConf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 0);
jobConf.setUser(getJobOwner().getShortUserName());
String queue = "default";
// set job queue name in job conf
jobConf.setQueueName(queue);
// Set queue admins acl in job conf similar to what JobClient does so that
// it goes into job conf also.
jobConf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()),
"qAdmin1,qAdmin2 qAdminsGroup1,qAdminsGroup2");
Job job = Job.getInstance(jobConf);
String jtIdentifier = "200907202331";
jobId = new JobID(jtIdentifier, 1);
// JobClient uploads the job jar to the file system and sets it in the
// jobConf.
uploadJobJar(job);
// JobClient uploads the jobConf to the file system.
jobConfFile = uploadJobConf(job.getConfiguration());
// create jobTokens file
uploadJobTokensFile();
taskTrackerUGI = UserGroupInformation.getCurrentUser();
startTracker();
// Set up the task to be localized
taskId =
new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
createTask();
// mimic register task
// create the tip
tip = tracker.new TaskInProgress(task, trackerFConf);
}
private void startTracker() throws IOException {
// Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf));
initializeTracker();
}
private void initializeTracker() throws IOException {
tracker.setIndexCache(new IndexCache(trackerFConf));
tracker.setTaskMemoryManagerEnabledFlag();
// for test case system FS is the local FS
tracker.systemFS = FileSystem.getLocal(trackerFConf);
tracker.setLocalFileSystem(tracker.systemFS);
tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
tracker.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
tracker.runningJobs = new TreeMap<JobID, RunningJob>();
tracker.setAsyncDiskService(new MRAsyncDiskService(trackerFConf));
tracker.getAsyncDiskService().cleanupAllVolumes();
// Set up TaskTracker instrumentation
tracker.setTaskTrackerInstrumentation(
TaskTracker.createInstrumentation(tracker, trackerFConf));
// setup task controller
taskController = createTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
tracker.setTaskController(taskController);
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
taskController));
}
protected TaskController createTaskController() {
return new DefaultTaskController();
}
private void createTask()
throws IOException {
task = new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
task.setConf(jobConf); // Set conf. Set user name in particular.
task.setUser(jobConf.getUser());
}
protected UserGroupInformation getJobOwner() throws IOException {
return UserGroupInformation.getCurrentUser();
}
/**
* static block setting up the permission script which would be used by the
* checkFilePermissions
*/
static {
PERMISSION_SCRIPT_DIR = new File(TEST_ROOT_DIR, "permission_script_dir");
PERMISSION_SCRIPT_FILE = new File(PERMISSION_SCRIPT_DIR, "getperms.sh");
if(PERMISSION_SCRIPT_FILE.exists()) {
PERMISSION_SCRIPT_FILE.delete();
}
if(PERMISSION_SCRIPT_DIR.exists()) {
PERMISSION_SCRIPT_DIR.delete();
}
PERMISSION_SCRIPT_DIR.mkdir();
try {
PrintWriter writer = new PrintWriter(PERMISSION_SCRIPT_FILE);
writer.write(PERMISSION_SCRIPT_CONTENT);
writer.close();
} catch (FileNotFoundException fe) {
fail();
}
PERMISSION_SCRIPT_FILE.setExecutable(true, true);
}
/**
* @param job
* @throws IOException
* @throws FileNotFoundException
*/
private void uploadJobJar(Job job)
throws IOException,
FileNotFoundException {
File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
JarOutputStream jstream =
new JarOutputStream(new FileOutputStream(jobJarFile));
ZipEntry ze = new ZipEntry("lib/lib1.jar");
jstream.putNextEntry(ze);
jstream.closeEntry();
ze = new ZipEntry("lib/lib2.jar");
jstream.putNextEntry(ze);
jstream.closeEntry();
jstream.finish();
jstream.close();
job.setJar(jobJarFile.toURI().toString());
}
/**
* @param conf
* @return
* @throws FileNotFoundException
* @throws IOException
*/
protected File uploadJobConf(Configuration conf)
throws FileNotFoundException,
IOException {
File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
FileOutputStream out = new FileOutputStream(jobConfFile);
conf.writeXml(out);
out.close();
return jobConfFile;
}
/**
* create fake JobTokens file
* @return
* @throws IOException
*/
protected void uploadJobTokensFile() throws IOException {
File dir = new File(TEST_ROOT_DIR, jobId.toString());
if(!dir.exists())
assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
// writing empty file, we don't need the keys for this test
new Credentials().writeTokenStorageFile(new Path("file:///" + dir,
TokenCache.JOB_TOKEN_HDFS_FILE), new Configuration());
}
@Override
protected void tearDown()
throws Exception {
if (!canRun()) {
return;
}
FileUtil.fullyDelete(TEST_ROOT_DIR);
}
protected static String[] getFilePermissionAttrs(String path)
throws IOException {
String[] command = {"bash",PERMISSION_SCRIPT_FILE.getAbsolutePath(), path};
String output=Shell.execCommand(command);
return output.split(":|\n");
}
/**
* Utility method to check permission of a given path. Requires the permission
* script directory to be setup in order to call.
*
*
* @param path
* @param expectedPermissions
* @param expectedOwnerUser
* @param expectedOwnerGroup
* @throws IOException
*/
static void checkFilePermissions(String path, String expectedPermissions,
String expectedOwnerUser, String expectedOwnerGroup)
throws IOException {
String[] attrs = getFilePermissionAttrs(path);
assertTrue("File attrs length is not 3 but " + attrs.length,
attrs.length == 3);
assertTrue("Path " + path + " has the permissions " + attrs[0]
+ " instead of the expected " + expectedPermissions, attrs[0]
.equals(expectedPermissions));
assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser
+ " but by " + attrs[1], attrs[1].equals(expectedOwnerUser));
assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup
+ " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup));
}
/**
* Verify the task-controller's setup functionality
*
* @throws IOException
*/
public void testTaskControllerSetup()
throws IOException {
if (!canRun()) {
return;
}
// Task-controller is already set up in the test's setup method. Now verify.
for (String localDir : localDirs) {
// Verify the local-dir itself.
File lDir = new File(localDir);
assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task
.getUser(), taskTrackerUGI.getGroupNames()[0]);
}
// Verify the pemissions on the userlogs dir
File taskLog = TaskLog.getUserLogDir();
checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task
.getUser(), taskTrackerUGI.getGroupNames()[0]);
}
/**
* Test the localization of a user on the TT.
*
* @throws IOException
*/
public void testUserLocalization()
throws IOException {
if (!canRun()) {
return;
}
// /////////// The main method being tested
tracker.getLocalizer().initializeUserDirs(task.getUser());
// ///////////
// Check the directory structure and permissions
checkUserLocalization();
// For the sake of testing re-entrancy of initializeUserDirs(), we remove
// the user directories now and make sure that further calls of the method
// don't create directories any more.
for (String dir : localDirs) {
File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
if (!FileUtil.fullyDelete(userDir)) {
throw new IOException("Uanble to delete " + userDir);
}
}
// Now call the method again.
tracker.getLocalizer().initializeUserDirs(task.getUser());
// Files should not be created now and so shouldn't be there anymore.
for (String dir : localDirs) {
File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath()
+ " exists!", userDir.exists());
}
}
protected void checkUserLocalization()
throws IOException {
for (String dir : localDirs) {
File localDir = new File(dir);
assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
localDir.exists());
File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ "is not created!", taskTrackerSubDir.exists());
File userDir = new File(taskTrackerSubDir, task.getUser());
assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+ "is not created!", userDir.exists());
checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task
.getUser(), taskTrackerUGI.getGroupNames()[0]);
File jobCache = new File(userDir, TaskTracker.JOBCACHE);
assertTrue("jobcache in the userDir " + userDir + " isn't created!",
jobCache.exists());
checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task
.getUser(), taskTrackerUGI.getGroupNames()[0]);
// Verify the distributed cache dir.
File distributedCacheDir =
new File(localDir, TaskTracker
.getPrivateDistributedCacheDir(task.getUser()));
assertTrue("distributed cache dir " + distributedCacheDir
+ " doesn't exists!", distributedCacheDir.exists());
checkFilePermissions(distributedCacheDir.getAbsolutePath(),
"drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]);
}
}
/**
* Test job localization on a TT. Tests localization of job.xml, job.jar and
* corresponding setting of configuration. Also test
* {@link TaskController#initializeJob(JobInitializationContext)}
*
* @throws IOException
*/
public void testJobLocalization()
throws Exception {
if (!canRun()) {
return;
}
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
checkJobLocalization();
}
/**
* Test that, if the job log dir can't be created, the job will fail
* during localization rather than at the time when the task itself
* tries to write into it.
*/
public void testJobLocalizationFailsIfLogDirUnwritable()
throws Exception {
if (!canRun()) {
return;
}
File logDir = TaskLog.getJobDir(jobId);
File logDirParent = logDir.getParentFile();
try {
assertTrue(logDirParent.mkdirs() || logDirParent.isDirectory());
FileUtil.fullyDelete(logDir);
FileUtil.chmod(logDirParent.getAbsolutePath(), "000");
tracker.localizeJob(tip);
fail("No exception");
} catch (IOException ioe) {
LOG.info("Got exception", ioe);
assertTrue(ioe.getMessage().contains("Could not create job user log"));
} finally {
// Put it back just to be safe
FileUtil.chmod(logDirParent.getAbsolutePath(), "755");
}
}
protected void checkJobLocalization()
throws IOException {
// Check the directory structure
for (String dir : localDirs) {
File localDir = new File(dir);
File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
File userDir = new File(taskTrackerSubDir, task.getUser());
File jobCache = new File(userDir, TaskTracker.JOBCACHE);
File jobDir = new File(jobCache, jobId.toString());
assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists());
// check the private permissions on the job directory
checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task
.getUser(), taskTrackerUGI.getGroupNames()[0]);
}
// check the localization of job.xml
assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
.getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(),
jobId.toString()), trackerFConf) != null);
// check the localization of job.jar
Path jarFileLocalized =
lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(),
jobId.toString()), trackerFConf);
assertTrue("job.jar is not localized on this TaskTracker!!",
jarFileLocalized != null);
assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
.exists());
assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
.exists());
// check the creation of job work directory
assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId
.toString()), trackerFConf) != null);
// Check the setting of mapreduce.job.local.dir and job.jar which will eventually be
// used by the user's task
boolean jobLocalDirFlag = false, mapredJarFlag = false;
String localizedJobLocalDir =
localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
String localizedJobJar = localizedJobConf.getJar();
for (String localDir : localizedJobConf.getStrings(MRConfig.LOCAL_DIR)) {
if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
+ TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) {
jobLocalDirFlag = true;
}
if (localizedJobJar.equals(localDir + Path.SEPARATOR
+ TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) {
mapredJarFlag = true;
}
}
assertTrue(TaskTracker.JOB_LOCAL_DIR
+ " is not set properly to the target users directory : "
+ localizedJobLocalDir, jobLocalDirFlag);
assertTrue(
"mapreduce.job.jar is not set properly to the target users directory : "
+ localizedJobJar, mapredJarFlag);
// check job user-log directory permissions
File jobLogDir = TaskLog.getJobDir(jobId);
assertTrue("job log directory " + jobLogDir + " does not exist!", jobLogDir
.exists());
checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
taskTrackerUGI.getGroupNames()[0]);
// Make sure that the job ACLs file job-acls.xml exists in job userlog dir
File jobACLsFile = new File(jobLogDir, TaskTracker.jobACLsFile);
assertTrue("JobACLsFile is missing in the job userlog dir " + jobLogDir,
jobACLsFile.exists());
// With default task controller, the job-acls.xml file is owned by TT and
// permissions are 700
checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rw-------",
taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
validateJobACLsFileContent();
}
// Validate the contents of jobACLsFile ( i.e. user name, job-view-acl, queue
// name and queue-admins-acl ).
protected void validateJobACLsFileContent() {
JobConf jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(jobId);
assertTrue(jobACLsConf.get("user.name").equals(
localizedJobConf.getUser()));
assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
equals(localizedJobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB)));
String queue = localizedJobConf.getQueueName();
assertTrue(queue.equalsIgnoreCase(jobACLsConf.getQueueName()));
String qACLName = toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName());
assertTrue(jobACLsConf.get(qACLName).equals(
localizedJobConf.get(qACLName)));
}
/**
* Test task localization on a TT.
*
* @throws IOException
*/
public void testTaskLocalization()
throws Exception {
if (!canRun()) {
return;
}
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
initializeTask();
checkTaskLocalization();
}
private void initializeTask() throws IOException {
tip.setJobConf(localizedJobConf);
// ////////// The central method being tested
tip.localizeTask(task);
// //////////
// check the functionality of localizeTask
for (String dir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
File attemptDir =
new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
.toString(), taskId.toString(), task.isTaskCleanupTask()));
assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+ " is not created!!", attemptDir.exists());
}
attemptWorkDir =
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
task.getUser(), task.getJobID().toString(), task.getTaskID()
.toString(), task.isTaskCleanupTask()), trackerFConf);
assertTrue("atttempt work dir for " + taskId.toString()
+ " is not created in any of the configured dirs!!",
attemptWorkDir != null);
TaskRunner runner = task.createRunner(tracker, tip);
tip.setTaskRunner(runner);
// /////// Few more methods being tested
runner.setupChildTaskConfiguration(lDirAlloc);
TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
localizedJobConf);
attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
task.isTaskCleanupTask());
// Make sure the task-conf file is created
Path localTaskFile =
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
.getUser(), task.getJobID().toString(), task.getTaskID()
.toString(), task.isTaskCleanupTask()), trackerFConf);
assertTrue("Task conf file " + localTaskFile.toString()
+ " is not created!!", new File(localTaskFile.toUri().getPath())
.exists());
// /////// One more method being tested. This happens in child space.
localizedTaskConf = new JobConf(localTaskFile);
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
// ///////
// Initialize task via TaskController
TaskControllerContext taskContext =
new TaskController.TaskControllerContext();
taskContext.env =
new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
.get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
taskContext.task = task;
// /////////// The method being tested
taskController.initializeTask(taskContext);
// ///////////
}
protected void checkTaskLocalization()
throws IOException {
// Make sure that the mapreduce.cluster.local.dir is sandboxed
for (String childMapredLocalDir : localizedTaskConf
.getStrings(MRConfig.LOCAL_DIR)) {
assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
.getUser(), jobId.toString(), taskId.toString(),
task.isTaskCleanupTask())));
}
// Make sure task task.getJobFile is changed and pointed correctly.
assertTrue(task.getJobFile().endsWith(
TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
.toString(), task.isTaskCleanupTask())));
// Make sure that the tmp directories are created
assertTrue("tmp dir is not created in workDir "
+ attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri()
.getPath(), "tmp").exists());
// Make sure that the logs are setup properly
File logDir = TaskLog.getAttemptDir(taskId, task.isTaskCleanupTask());
assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
logDir.exists());
checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
.getUser(), taskTrackerUGI.getGroupNames()[0]);
File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
assertTrue("stdout log file is improper. Expected : "
+ expectedStdout.toString() + " Observed : "
+ attemptLogFiles[0].toString(), expectedStdout.toString().equals(
attemptLogFiles[0].toString()));
File expectedStderr =
new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
assertTrue("stderr log file is improper. Expected : "
+ expectedStderr.toString() + " Observed : "
+ attemptLogFiles[1].toString(), expectedStderr.toString().equals(
attemptLogFiles[1].toString()));
}
/**
* Create a file in the given dir and set permissions r_xr_xr_x sothat no one
* can delete it directly(without doing chmod).
* Creates dir/subDir and dir/subDir/file
*/
static void createFileAndSetPermissions(JobConf jobConf, Path dir)
throws IOException {
Path subDir = new Path(dir, "subDir");
FileSystem fs = FileSystem.getLocal(jobConf);
fs.mkdirs(subDir);
Path p = new Path(subDir, "file");
java.io.DataOutputStream out = fs.create(p);
out.writeBytes("dummy input");
out.close();
// no write permission for subDir and subDir/file
try {
int ret = 0;
if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
}
} catch(InterruptedException e) {
LOG.warn("Interrupted while doing chmod for " + subDir);
}
}
/**
* Validates the removal of $taskid and $tasid/work under mapred-local-dir
* in cases where those directories cannot be deleted without adding
* write permission to the newly created directories under $taskid and
* $taskid/work
* Also see createFileAndSetPermissions for details
*/
void validateRemoveTaskFiles(boolean needCleanup, boolean jvmReuse,
TaskInProgress tip) throws IOException {
// create files and set permissions 555. Verify if task controller sets
// the permissions for TT to delete the taskDir or workDir
String dir = (!needCleanup || jvmReuse) ?
TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask())
: TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask());
Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
assertTrue("No paths found", paths.length > 0);
for (Path p : paths) {
if (tracker.getLocalFileSystem().exists(p)) {
createFileAndSetPermissions(localizedJobConf, p);
}
}
InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
tracker.setCleanupThread(cleanupQueue);
tip.removeTaskFiles(needCleanup, taskId);
if (jvmReuse) {
// work dir should still exist and cleanup queue should be empty
assertTrue("cleanup queue is not empty after removeTaskFiles() in case "
+ "of jvm reuse.", cleanupQueue.isQueueEmpty());
boolean workDirExists = false;
for (Path p : paths) {
if (tracker.getLocalFileSystem().exists(p)) {
workDirExists = true;
}
}
assertTrue("work dir does not exist in case of jvm reuse", workDirExists);
// now try to delete the work dir and verify that there are no stale paths
JvmManager.deleteWorkDir(tracker, task);
}
assertTrue("Some task files are not deleted!! Number of stale paths is "
+ cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
}
/**
* Validates if task cleanup is done properly for a succeeded task
* @throws IOException
*/
public void testTaskFilesRemoval()
throws Exception {
if (!canRun()) {
return;
}
testTaskFilesRemoval(false, false);// no needCleanup; no jvmReuse
}
/**
* Validates if task cleanup is done properly for a task that is not succeeded
* @throws IOException
*/
public void testFailedTaskFilesRemoval()
throws Exception {
if (!canRun()) {
return;
}
testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
// initialize a cleanupAttempt for the task.
task.setTaskCleanupTask();
// localize task cleanup attempt
initializeTask();
checkTaskLocalization();
// verify the cleanup of cleanup attempt.
testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
}
/**
* Validates if task cleanup is done properly for a succeeded task
* @throws IOException
*/
public void testTaskFilesRemovalWithJvmUse()
throws Exception {
if (!canRun()) {
return;
}
testTaskFilesRemoval(false, true);// no needCleanup; jvmReuse
}
/**
* Validates if task cleanup is done properly
*/
private void testTaskFilesRemoval(boolean needCleanup, boolean jvmReuse)
throws Exception {
// Localize job and localize task.
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
if (jvmReuse) {
localizedJobConf.setNumTasksToExecutePerJvm(2);
}
initializeTask();
// TODO: Let the task run and create files.
// create files and set permissions 555. Verify if task controller sets
// the permissions for TT to delete the task dir or work dir properly
validateRemoveTaskFiles(needCleanup, jvmReuse, tip);
}
/**
* Test userlogs cleanup.
*
* @throws IOException
*/
private void verifyUserLogsRemoval()
throws IOException {
// verify user logs cleanup
File jobUserLogDir = TaskLog.getJobDir(jobId);
// Logs should be there before cleanup.
assertTrue("Userlogs dir " + jobUserLogDir + " is not present as expected!!",
jobUserLogDir.exists());
tracker.purgeJob(new KillJobAction(jobId));
tracker.getTaskLogCleanupThread().processCompletedJobs();
// Logs should be gone after cleanup.
assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
jobUserLogDir.exists());
}
/**
* Test job cleanup by doing the following
* - create files with no write permissions to TT under job-work-dir
* - create files with no write permissions to TT under task-work-dir
*/
public void testJobFilesRemoval() throws IOException, InterruptedException {
if (!canRun()) {
return;
}
LOG.info("Running testJobCleanup()");
// Localize job and localize task.
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
// Set an inline cleanup queue
InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
tracker.setCleanupThread(cleanupQueue);
// Create a file in job's work-dir with 555
String jobWorkDir =
TaskTracker.getJobWorkDir(task.getUser(), task.getJobID().toString());
Path[] jPaths = tracker.getLocalFiles(localizedJobConf, jobWorkDir);
assertTrue("No paths found for job", jPaths.length > 0);
for (Path p : jPaths) {
if (tracker.getLocalFileSystem().exists(p)) {
createFileAndSetPermissions(localizedJobConf, p);
}
}
// Initialize task dirs
tip.setJobConf(localizedJobConf);
tip.localizeTask(task);
// Create a file in task local dir with 555
// this is to simply test the case where the jvm reuse is enabled and some
// files in task-attempt-local-dir are left behind to be cleaned up when the
// job finishes.
String taskLocalDir =
TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
task.getTaskID().toString(), false);
Path[] tPaths = tracker.getLocalFiles(localizedJobConf, taskLocalDir);
assertTrue("No paths found for task", tPaths.length > 0);
for (Path p : tPaths) {
if (tracker.getLocalFileSystem().exists(p)) {
createFileAndSetPermissions(localizedJobConf, p);
}
}
// remove the job work dir
tracker.removeJobFiles(task.getUser(), task.getJobID());
// check the task-local-dir
boolean tLocalDirExists = false;
for (Path p : tPaths) {
if (tracker.getLocalFileSystem().exists(p)) {
tLocalDirExists = true;
}
}
assertFalse("Task " + task.getTaskID() + " local dir exists after cleanup",
tLocalDirExists);
// Verify that the TaskTracker (via the task-controller) cleans up the dirs.
// check the job-work-dir
boolean jWorkDirExists = false;
for (Path p : jPaths) {
if (tracker.getLocalFileSystem().exists(p)) {
jWorkDirExists = true;
}
}
assertFalse("Job " + task.getJobID() + " work dir exists after cleanup",
jWorkDirExists);
// Test userlogs cleanup.
verifyUserLogsRemoval();
// Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
// there.
for (String localDir : localDirs) {
Path userDir =
new Path(localDir, TaskTracker.getUserDir(task.getUser()));
assertTrue("User directory " + userDir + " is not present!!",
tracker.getLocalFileSystem().exists(userDir));
}
}
/**
* Tests TaskTracker restart after the localization.
*
* This tests the following steps:
*
* Localize Job, initialize a task.
* Then restart the Tracker.
* launch a cleanup attempt for the task.
*
* @throws IOException
* @throws InterruptedException
*/
public void testTrackerRestart() throws IOException, InterruptedException {
if (!canRun()) {
return;
}
// Localize job and localize task.
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
initializeTask();
// imitate tracker restart
startTracker();
// create a task cleanup attempt
createTask();
task.setTaskCleanupTask();
// register task
tip = tracker.new TaskInProgress(task, trackerFConf);
// localize the job again.
rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
checkJobLocalization();
// localize task cleanup attempt
initializeTask();
checkTaskLocalization();
}
/**
* Tests TaskTracker re-init after the localization.
*
* This tests the following steps:
*
* Localize Job, initialize a task.
* Then reinit the Tracker.
* launch a cleanup attempt for the task.
*
* @throws IOException
* @throws InterruptedException
*/
public void testTrackerReinit() throws IOException, InterruptedException {
if (!canRun()) {
return;
}
// Localize job and localize task.
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
initializeTask();
// imitate tracker reinit
initializeTracker();
// create a task cleanup attempt
createTask();
task.setTaskCleanupTask();
// register task
tip = tracker.new TaskInProgress(task, trackerFConf);
// localize the job again.
rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
checkJobLocalization();
// localize task cleanup attempt
initializeTask();
checkTaskLocalization();
}
/**
* Localizes a cleanup task and validates permissions.
*
* @throws InterruptedException
* @throws IOException
*/
public void testCleanupTaskLocalization() throws IOException,
InterruptedException {
if (!canRun()) {
return;
}
task.setTaskCleanupTask();
// register task
tip = tracker.new TaskInProgress(task, trackerFConf);
// localize the job.
RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
checkJobLocalization();
// localize task cleanup attempt
initializeTask();
checkTaskLocalization();
}
}