/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;

import junit.framework.TestCase;

/**
 * Test to verify localization of a job and localization of a task on a
 * TaskTracker.
 * 
 */
public class TestTaskTrackerLocalization extends TestCase {

  private static File TEST_ROOT_DIR = 
    new File(System.getProperty("test.build.data", "/tmp"));
  private File ROOT_MAPRED_LOCAL_DIR;
  private File HADOOP_LOG_DIR;
  private static File PERMISSION_SCRIPT_DIR;
  private static File PERMISSION_SCRIPT_FILE;
  private static final String PERMISSION_SCRIPT_CONTENT = "ls -l -d $1 | " +
  		"awk '{print $1\":\"$3\":\"$4}'";

  private int numLocalDirs = 6;
  private static final Log LOG =
      LogFactory.getLog(TestTaskTrackerLocalization.class);

  protected TaskTracker tracker;
  protected UserGroupInformation taskTrackerUGI;
  protected TaskController taskController;
  protected JobConf trackerFConf;
  private JobConf localizedJobConf;
  protected JobID jobId;
  protected TaskAttemptID taskId;
  protected Task task;
  protected String[] localDirs;
  protected static LocalDirAllocator lDirAlloc =
      new LocalDirAllocator(MRConfig.LOCAL_DIR);
  protected Path attemptWorkDir;
  protected File[] attemptLogFiles;
  protected JobConf localizedTaskConf;

  /**
   * Dummy method in this base class. Only derived classes will define this
   * method for checking if a test can be run.
   */
  protected boolean canRun() {
    return true;
  }

  @Override
  protected void setUp()
      throws Exception {
    if (!canRun()) {
      return;
    }
    TEST_ROOT_DIR =
        new File(System.getProperty("test.build.data", "/tmp"), getClass()
            .getSimpleName());
    if (!TEST_ROOT_DIR.exists()) {
      TEST_ROOT_DIR.mkdirs();
    }

    ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
    ROOT_MAPRED_LOCAL_DIR.mkdirs();

    HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs");
    HADOOP_LOG_DIR.mkdir();
    System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());

    trackerFConf = new JobConf();
    trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
    localDirs = new String[numLocalDirs];
    for (int i = 0; i < numLocalDirs; i++) {
      localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
    }
    trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);

    // Create the job configuration file. Same as trackerConf in this test.
    Job job = new Job(trackerFConf);

    job.setUGIAndUserGroupNames();

    // JobClient uploads the job jar to the file system and sets it in the
    // jobConf.
    uploadJobJar(job);

    // JobClient uploads the jobConf to the file system.
    File jobConfFile = uploadJobConf(job.getConfiguration());
    
        // Set up the TaskTracker
    tracker = new TaskTracker();
    tracker.setConf(trackerFConf);

    // for test case system FS is the local FS
    tracker.systemFS = FileSystem.getLocal(trackerFConf);
    tracker.setLocalFileSystem(tracker.systemFS);
    tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
    
    taskTrackerUGI = UserGroupInformation.login(trackerFConf);

    // Set up the task to be localized
    String jtIdentifier = "200907202331";
    jobId = new JobID(jtIdentifier, 1);
    taskId =
        new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
    task =
        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
    task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.

    // create jobTokens file
    uploadJobTokensFile(); 
    
    
    taskController = new DefaultTaskController();
    taskController.setConf(trackerFConf);
    taskController.setup();

    tracker.setTaskController(taskController);
    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
        taskController));
  }

  /**
   * static block setting up the permission script which would be used by the 
   * checkFilePermissions
   */
  static {
    PERMISSION_SCRIPT_DIR = new File(TEST_ROOT_DIR, "permission_script_dir");
    PERMISSION_SCRIPT_FILE = new File(PERMISSION_SCRIPT_DIR, "getperms.sh");
    
    if(PERMISSION_SCRIPT_FILE.exists()) {
      PERMISSION_SCRIPT_FILE.delete();
    }
    
    if(PERMISSION_SCRIPT_DIR.exists()) {
      PERMISSION_SCRIPT_DIR.delete();
    }
    
    PERMISSION_SCRIPT_DIR.mkdir();
    
    try {
      PrintWriter writer = new PrintWriter(PERMISSION_SCRIPT_FILE);
      writer.write(PERMISSION_SCRIPT_CONTENT);
      writer.close();
    } catch (FileNotFoundException fe) {
      fail();
    }
    PERMISSION_SCRIPT_FILE.setExecutable(true, true);
  }

  /**
   * @param job
   * @throws IOException
   * @throws FileNotFoundException
   */
  private void uploadJobJar(Job job)
      throws IOException,
      FileNotFoundException {
    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
    JarOutputStream jstream =
        new JarOutputStream(new FileOutputStream(jobJarFile));
    ZipEntry ze = new ZipEntry("lib/lib1.jar");
    jstream.putNextEntry(ze);
    jstream.closeEntry();
    ze = new ZipEntry("lib/lib2.jar");
    jstream.putNextEntry(ze);
    jstream.closeEntry();
    jstream.finish();
    jstream.close();
    job.setJar(jobJarFile.toURI().toString());
  }

  /**
   * @param conf
   * @return
   * @throws FileNotFoundException
   * @throws IOException
   */
  protected File uploadJobConf(Configuration conf)
      throws FileNotFoundException,
      IOException {
    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
    FileOutputStream out = new FileOutputStream(jobConfFile);
    conf.writeXml(out);
    out.close();
    return jobConfFile;
  }
  
  /**
   * create fake JobTokens file
   * @return
   * @throws IOException
   */
  protected void uploadJobTokensFile() throws IOException {
    
    File dir = new File(TEST_ROOT_DIR, jobId.toString());
    if(!dir.exists())
      assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
    
    File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
    FileOutputStream fos = new FileOutputStream(jobTokenFile);
    java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
    jt.write(out); // writing empty file, we don't the keys for this test 
    out.close();
  }

  @Override
  protected void tearDown()
      throws Exception {
    if (!canRun()) {
      return;
    }
    FileUtil.fullyDelete(TEST_ROOT_DIR);
  }

  protected static String[] getFilePermissionAttrs(String path)
      throws IOException {
    String[] command = {"bash",PERMISSION_SCRIPT_FILE.getAbsolutePath(), path};
    String output=Shell.execCommand(command);
    return output.split(":|\n");
  }


  /**
   * Utility method to check permission of a given path. Requires the permission
   * script directory to be setup in order to call.
   * 
   * 
   * @param path
   * @param expectedPermissions
   * @param expectedOwnerUser
   * @param expectedOwnerGroup
   * @throws IOException
   */
  static void checkFilePermissions(String path, String expectedPermissions,
      String expectedOwnerUser, String expectedOwnerGroup)
      throws IOException {
    String[] attrs = getFilePermissionAttrs(path);
    assertTrue("File attrs length is not 3 but " + attrs.length,
        attrs.length == 3);
    assertTrue("Path " + path + " has the permissions " + attrs[0]
        + " instead of the expected " + expectedPermissions, attrs[0]
        .equals(expectedPermissions));
    assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser
        + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser));
    assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup
        + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup));
  }

  /**
   * Verify the task-controller's setup functionality
   * 
   * @throws IOException
   */
  public void testTaskControllerSetup()
      throws IOException {
    if (!canRun()) {
      return;
    }
    // Task-controller is already set up in the test's setup method. Now verify.
    for (String localDir : localDirs) {

      // Verify the local-dir itself.
      File lDir = new File(localDir);
      assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task
          .getUser(), taskTrackerUGI.getGroupNames()[0]);
    }

    // Verify the pemissions on the userlogs dir
    File taskLog = TaskLog.getUserLogDir();
    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task
        .getUser(), taskTrackerUGI.getGroupNames()[0]);
  }

  /**
   * Test the localization of a user on the TT.
   * 
   * @throws IOException
   */
  public void testUserLocalization()
      throws IOException {
    if (!canRun()) {
      return;
    }
    // /////////// The main method being tested
    tracker.getLocalizer().initializeUserDirs(task.getUser());
    // ///////////

    // Check the directory structure and permissions
    checkUserLocalization();

    // For the sake of testing re-entrancy of initializeUserDirs(), we remove
    // the user directories now and make sure that further calls of the method
    // don't create directories any more.
    for (String dir : localDirs) {
      File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
      FileUtil.fullyDelete(userDir);
    }

    // Now call the method again.
    tracker.getLocalizer().initializeUserDirs(task.getUser());

    // Files should not be created now and so shouldn't be there anymore.
    for (String dir : localDirs) {
      File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
      assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath()
          + " exists!", userDir.exists());
    }
  }

  protected void checkUserLocalization()
      throws IOException {
    for (String dir : localDirs) {

      File localDir = new File(dir);
      assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
          localDir.exists());

      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
      assertTrue("taskTracker sub-dir in the local-dir " + localDir
          + "is not created!", taskTrackerSubDir.exists());

      File userDir = new File(taskTrackerSubDir, task.getUser());
      assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
          + "is not created!", userDir.exists());
      checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task
          .getUser(), taskTrackerUGI.getGroupNames()[0]);

      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
      assertTrue("jobcache in the userDir " + userDir + " isn't created!",
          jobCache.exists());
      checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task
          .getUser(), taskTrackerUGI.getGroupNames()[0]);

      // Verify the distributed cache dir.
      File distributedCacheDir =
          new File(localDir, TaskTracker
              .getPrivateDistributedCacheDir(task.getUser()));
      assertTrue("distributed cache dir " + distributedCacheDir
          + " doesn't exists!", distributedCacheDir.exists());
      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
          "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]);
    }
  }

  /**
   * Test job localization on a TT. Tests localization of job.xml, job.jar and
   * corresponding setting of configuration. Also test
   * {@link TaskController#initializeJob(JobInitializationContext)}
   * 
   * @throws IOException
   */
  public void testJobLocalization()
      throws IOException {
    if (!canRun()) {
      return;
    }
    tracker.getLocalizer().initializeUserDirs(task.getUser());

    // /////////// The main method being tested
    localizedJobConf = tracker.localizeJobFiles(task);
    // ///////////

    // Now initialize the job via task-controller so as to set
    // ownership/permissions of jars, job-work-dir
    JobInitializationContext context = new JobInitializationContext();
    context.jobid = jobId;
    context.user = task.getUser();
    context.workDir =
        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));

    // /////////// The method being tested
    taskController.initializeJob(context);
    // ///////////

    checkJobLocalization();
  }

  protected void checkJobLocalization()
      throws IOException {
    // Check the directory structure
    for (String dir : localDirs) {

      File localDir = new File(dir);
      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
      File userDir = new File(taskTrackerSubDir, task.getUser());
      File jobCache = new File(userDir, TaskTracker.JOBCACHE);

      File jobDir = new File(jobCache, jobId.toString());
      assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists());

      // check the private permissions on the job directory
      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task
          .getUser(), taskTrackerUGI.getGroupNames()[0]);
    }

    // check the localization of job.xml
    assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(),
            jobId.toString()), trackerFConf) != null);

    // check the localization of job.jar
    Path jarFileLocalized =
        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(),
            jobId.toString()), trackerFConf);
    assertTrue("job.jar is not localized on this TaskTracker!!",
        jarFileLocalized != null);
    assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
        jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
        .exists());
    assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
        jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
        .exists());

    // check the creation of job work directory
    assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
        .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId
            .toString()), trackerFConf) != null);

    // Check the setting of mapreduce.job.local.dir and job.jar which will eventually be
    // used by the user's task
    boolean jobLocalDirFlag = false, mapredJarFlag = false;
    String localizedJobLocalDir =
        localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
    String localizedJobJar = localizedJobConf.getJar();
    for (String localDir : localizedJobConf.getStrings(MRConfig.LOCAL_DIR)) {
      if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
          + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) {
        jobLocalDirFlag = true;
      }
      if (localizedJobJar.equals(localDir + Path.SEPARATOR
          + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) {
        mapredJarFlag = true;
      }
    }
    assertTrue(TaskTracker.JOB_LOCAL_DIR
        + " is not set properly to the target users directory : "
        + localizedJobLocalDir, jobLocalDirFlag);
    assertTrue(
        "mapreduce.job.jar is not set properly to the target users directory : "
            + localizedJobJar, mapredJarFlag);
  }

  /**
   * Test task localization on a TT.
   * 
   * @throws IOException
   */
  public void testTaskLocalization()
      throws IOException {
    if (!canRun()) {
      return;
    }
    tracker.getLocalizer().initializeUserDirs(task.getUser());
    localizedJobConf = tracker.localizeJobFiles(task);

    // Now initialize the job via task-controller so as to set
    // ownership/permissions of jars, job-work-dir
    JobInitializationContext jobContext = new JobInitializationContext();
    jobContext.jobid = jobId;
    jobContext.user = task.getUser();
    jobContext.workDir =
        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
    taskController.initializeJob(jobContext);

    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
    tip.setJobConf(localizedJobConf);

    // ////////// The central method being tested
    tip.localizeTask(task);
    // //////////

    // check the functionality of localizeTask
    for (String dir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
      File attemptDir =
          new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
              .toString(), taskId.toString()));
      assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
          + " is not created!!", attemptDir.exists());
    }

    attemptWorkDir =
        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
            task.getUser(), task.getJobID().toString(), task.getTaskID()
                .toString(), task.isTaskCleanupTask()), trackerFConf);
    assertTrue("atttempt work dir for " + taskId.toString()
        + " is not created in any of the configured dirs!!",
        attemptWorkDir != null);

    TaskRunner runner = task.createRunner(tracker, tip);

    // /////// Few more methods being tested
    runner.setupChildTaskConfiguration(lDirAlloc);
    TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
        localizedJobConf);
    attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());

    // Make sure the task-conf file is created
    Path localTaskFile =
        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
            .getUser(), task.getJobID().toString(), task.getTaskID()
            .toString(), task.isTaskCleanupTask()), trackerFConf);
    assertTrue("Task conf file " + localTaskFile.toString()
        + " is not created!!", new File(localTaskFile.toUri().getPath())
        .exists());

    // /////// One more method being tested. This happens in child space.
    localizedTaskConf = new JobConf(localTaskFile);
    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
    // ///////

    // Initialize task via TaskController
    TaskControllerContext taskContext =
        new TaskController.TaskControllerContext();
    taskContext.env =
        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
    taskContext.task = task;
    // /////////// The method being tested
    taskController.initializeTask(taskContext);
    // ///////////

    checkTaskLocalization();
  }

  protected void checkTaskLocalization()
      throws IOException {
    // Make sure that the mapreduce.cluster.local.dir is sandboxed
    for (String childMapredLocalDir : localizedTaskConf
        .getStrings(MRConfig.LOCAL_DIR)) {
      assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
              .getUser(), jobId.toString(), taskId.toString(), false)));
    }

    // Make sure task task.getJobFile is changed and pointed correctly.
    assertTrue(task.getJobFile().endsWith(
        TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
            .toString(), false)));

    // Make sure that the tmp directories are created
    assertTrue("tmp dir is not created in workDir "
        + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri()
        .getPath(), "tmp").exists());

    // Make sure that the logs are setup properly
    File logDir =
        new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
            + task.getTaskID().toString());
    assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
        logDir.exists());
    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
        .getUser(), taskTrackerUGI.getGroupNames()[0]);

    File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
    assertTrue("stdout log file is improper. Expected : "
        + expectedStdout.toString() + " Observed : "
        + attemptLogFiles[0].toString(), expectedStdout.toString().equals(
        attemptLogFiles[0].toString()));
    File expectedStderr =
        new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
    assertTrue("stderr log file is improper. Expected : "
        + expectedStderr.toString() + " Observed : "
        + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
        attemptLogFiles[1].toString()));
  }

  /**
   * Validates the removal of $taskid and $tasid/work under mapred-local-dir
   * in cases where those directories cannot be deleted without adding
   * write permission to the newly created directories under $taskid and
   * $taskid/work
   * Also see TestSetupWorkDir.createFileAndSetPermissions for details
   */
  void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
                           TaskInProgress tip) throws IOException {
    // create files and set permissions 555. Verify if task controller sets
    // the permissions for TT to delete the taskDir or workDir
    String dir = (!needCleanup || jvmReuse) ?
        TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(),
          taskId.toString(), task.isTaskCleanupTask())
      : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
          taskId.toString(), task.isTaskCleanupTask());

    Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
    for (Path p : paths) {
      if (tracker.getLocalFileSystem().exists(p)) {
        TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p);
      }
    }

    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
    tracker.setCleanupThread(cleanupQueue);

    tip.removeTaskFiles(needCleanup, taskId);

    if (jvmReuse) {
      // work dir should still exist and cleanup queue should be empty
      assertTrue("cleanup queue is not empty after removeTaskFiles() in case "
          + "of jvm reuse.", cleanupQueue.isQueueEmpty());
      boolean workDirExists = false;
      for (Path p : paths) {
        if (tracker.getLocalFileSystem().exists(p)) {
          workDirExists = true;
        }
      }
      assertTrue("work dir does not exist in case of jvm reuse", workDirExists);

      // now try to delete the work dir and verify that there are no stale paths
      JvmManager.deleteWorkDir(tracker, task);
    }
    tracker.removeJobFiles(task.getUser(), jobId.toString());

    assertTrue("Some task files are not deleted!! Number of stale paths is "
        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
  }

  /**
   * Validates if task cleanup is done properly for a succeeded task
   * @throws IOException
   */
  public void testTaskCleanup()
      throws IOException {
    if (!canRun()) {
      return;
    }
    testTaskCleanup(false, false);// no needCleanup; no jvmReuse
  }

  /**
   * Validates if task cleanup is done properly for a task that is not succeeded
   * @throws IOException
   */
  public void testFailedTaskCleanup()
  throws IOException {
    if (!canRun()) {
      return;
    }
    testTaskCleanup(true, false);// needCleanup; no jvmReuse
  }

  /**
   * Validates if task cleanup is done properly for a succeeded task
   * @throws IOException
   */
  public void testTaskCleanupWithJvmUse()
      throws IOException {
    if (!canRun()) {
      return;
    }
    testTaskCleanup(false, true);// no needCleanup; jvmReuse
  }

  /**
   * Validates if task cleanup is done properly
   */
  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
      throws IOException {
    // Localize job and localize task.
    tracker.getLocalizer().initializeUserDirs(task.getUser());
    localizedJobConf = tracker.localizeJobFiles(task);
    if (jvmReuse) {
      localizedJobConf.setNumTasksToExecutePerJvm(2);
    }
    // Now initialize the job via task-controller so as to set
    // ownership/permissions of jars, job-work-dir
    JobInitializationContext jobContext = new JobInitializationContext();
    jobContext.jobid = jobId;
    jobContext.user = localizedJobConf.getUser();
    jobContext.workDir =
        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
    taskController.initializeJob(jobContext);
    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
    tip.setJobConf(localizedJobConf);
    tip.localizeTask(task);
    Path workDir =
        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
            task.getUser(), task.getJobID().toString(), task.getTaskID()
                .toString(), task.isTaskCleanupTask()), trackerFConf);
    TaskRunner runner = task.createRunner(tracker, tip);
    tip.setTaskRunner(runner);
    runner.setupChildTaskConfiguration(lDirAlloc);
    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
        localizedJobConf);
    TaskRunner.prepareLogFiles(task.getTaskID());
    Path localTaskFile =
        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
            .getUser(), task.getJobID().toString(), task.getTaskID()
            .toString(), task.isTaskCleanupTask()), trackerFConf);
    JobConf localizedTaskConf = new JobConf(localTaskFile);
    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
    TaskControllerContext taskContext =
        new TaskController.TaskControllerContext();
    taskContext.env =
        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
    taskContext.task = task;
    // /////////// The method being tested
    taskController.initializeTask(taskContext);

    // TODO: Let the task run and create files.

    // create files and set permissions 555. Verify if task controller sets
    // the permissions for TT to delete the task dir or work dir properly
    validateRemoveFiles(needCleanup, jvmReuse, tip);

    // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
    // there.
    for (String localDir : localDirs) {
      Path userDir =
          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
      assertTrue("User directory " + userDir + " is not present!!",
          tracker.getLocalFileSystem().exists(userDir));
    }

    // Test userlogs cleanup.
    verifyUserLogsCleanup();
  }

  /**
   * Test userlogs cleanup.
   * 
   * @throws IOException
   */
  private void verifyUserLogsCleanup()
      throws IOException {
    Path logDir =
        new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
            + Path.SEPARATOR + task.getTaskID().toString());

    // Logs should be there before cleanup.
    assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
        tracker.getLocalFileSystem().exists(logDir));

    // ////////// Another being tested
    TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
    // modification time behind retainTimeStatmp
    // //////////

    // Logs should be gone after cleanup.
    assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
        tracker.getLocalFileSystem().exists(logDir));
  }
}
