/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import junit.framework.TestCase;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;

/**
 * Verifies if TaskRunner.SetupWorkDir() is cleaning up files/dirs pointed
 * to by symlinks under work dir.
 */
public class TestSetupWorkDir extends TestCase {

  /**
   * Creates 1 subdirectory and 1 file under dir2. Creates 1 subdir, 1 file,
   * 1 symlink to a dir and a symlink to a file under dir1.
   * Creates dir1/subDir, dir1/file, dir2/subDir, dir2/file,
   * dir1/symlinkSubDir->dir2/subDir, dir1/symlinkFile->dir2/file.
   */
  static void createSubDirsAndSymLinks(JobConf jobConf, Path dir1, Path dir2)
       throws IOException {
    FileSystem fs = FileSystem.getLocal(jobConf);
    createSubDirAndFile(fs, dir1);
    createSubDirAndFile(fs, dir2);
    // now create symlinks under dir1 that point to file/dir under dir2
    FileUtil.symLink(dir2+"/subDir", dir1+"/symlinkSubDir");
    FileUtil.symLink(dir2+"/file", dir1+"/symlinkFile");
  }

  static void createSubDirAndFile(FileSystem fs, Path dir) throws IOException {
    Path subDir = new Path(dir, "subDir");
    fs.mkdirs(subDir);
    createFile(fs, dir, "file");
  }

  /**
   * Create a file 
   * 
   * @param fs filesystem
   * @param dir directory location of the file
   * @param fileName filename
   * @throws IOException
   */
  static void createFile(FileSystem fs, Path dir, String fileName) 
      throws IOException {
    Path p = new Path(dir, fileName);
    DataOutputStream out = fs.create(p);
    out.writeBytes("dummy input");
    out.close();    
  }
  
  void createEmptyDir(FileSystem fs, Path dir) throws IOException {
    if (fs.exists(dir)) {
      fs.delete(dir, true);
    }
    if (!fs.mkdirs(dir)) {
      throw new IOException("Unable to create directory " + dir);
    }
  }

  /**
   * Validates if TaskRunner.setupWorkDir() is properly cleaning up the
   * contents of workDir and creating tmp dir under it (even though workDir
   * contains symlinks to files/directories).
   */
  public void testSetupWorkDir() throws IOException {
    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
                            "testSetupWorkDir");
    Path myWorkDir = new Path(rootDir, "./work");
    Path myTargetDir = new Path(rootDir, "./tmp");
    JobConf jConf = new JobConf();
    FileSystem fs = FileSystem.getLocal(jConf);
    createEmptyDir(fs, myWorkDir);
    createEmptyDir(fs, myTargetDir);

    // create subDirs and symlinks under work dir
    createSubDirsAndSymLinks(jConf, myWorkDir, myTargetDir);

    assertTrue("Did not create symlinks/files/dirs properly. Check "
        + myWorkDir + " and " + myTargetDir,
        (fs.listStatus(myWorkDir).length == 4) &&
        (fs.listStatus(myTargetDir).length == 2));

    // let us disable creation of symlinks in setupWorkDir()
    jConf.set(MRJobConfig.CACHE_SYMLINK, "no");

    // Deletion of myWorkDir should not affect contents of myTargetDir.
    // myTargetDir is like $user/jobcache/distcache
    TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));

    // Contents of myWorkDir should be cleaned up and a tmp dir should be
    // created under myWorkDir
    assertTrue(myWorkDir + " is not cleaned up properly.",
        fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 1));

    // Make sure that the dir under myWorkDir is tmp
    assertTrue(fs.listStatus(myWorkDir)[0].getPath().toUri().getPath()
               .toString().equals(myWorkDir.toString() + "/tmp"));

    // Make sure that myTargetDir is not changed/deleted
    assertTrue("Dir " + myTargetDir + " seem to be modified.",
        fs.exists(myTargetDir) && (fs.listStatus(myTargetDir).length == 2));

    // cleanup
    fs.delete(rootDir, true);
  }

  /**
   * Validates distributed cache symlink getting created fine
   * 
   * @throws IOException, URISyntaxException 
   */
  public void testSetupWorkDirDistCacheSymlinkValid() 
      throws IOException, URISyntaxException {
    JobConf jConf = new JobConf();
    FileSystem fs = FileSystem.getLocal(jConf);

    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
                            "testSetupWorkDirSymlinkFailure");

    // create file for DistributedCache and set it
    Path myTargetDir = new Path(rootDir, "./tmp");
    createEmptyDir(fs, myTargetDir);
    createFile(fs, myTargetDir, "cacheFile.txt");
    DistributedCache.setLocalFiles(jConf, 
        (myTargetDir.toString()+Path.SEPARATOR+"cacheFile.txt"));
    assertTrue("Did not create cache file in " + myTargetDir,
        (fs.listStatus(myTargetDir).length == 1));

    // let us enable creation of symlinks in setupWorkDir()
    jConf.set(MRJobConfig.CACHE_SYMLINK, "yes");

    // add a valid symlink
    Path myWorkDir = new Path(rootDir, "./work");
    createEmptyDir(fs, myWorkDir);
    DistributedCache.addCacheFile(new URI(myWorkDir.toString() +
        Path.SEPARATOR + "file.txt#valid"), jConf);

    // setupWorkDir should create symlinks
    TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));

    // myWorkDir should have 2 entries, a tmp dir and the symlink valid
    assertTrue(myWorkDir + " does not have cache symlink.",
        fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 2));

    // make sure work dir has symlink valid
    boolean foundValid = false;
    for (FileStatus fstat : fs.listStatus(myWorkDir)) {
      if (fstat.getPath().toUri() != null &&
            fstat.getPath().toUri().getPath().toString()
              .equals(myWorkDir.toString() + Path.SEPARATOR+ "valid")) {
        foundValid = true;
      }
    }
    
    assertTrue("Valid symlink not created", foundValid);

    // cleanup
    fs.delete(rootDir, true);
  }

  /**
   * Invalid distributed cache files errors out with IOException
   * 
   * @throws IOException, URISyntaxException 
   */
  public void testSetupWorkDirDistCacheSymlinkInvalid() 
      throws IOException, URISyntaxException {
    JobConf jConf = new JobConf();
    FileSystem fs = FileSystem.getLocal(jConf);

    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
                            "testSetupWorkDirSymlinkFailure");

    // create file for DistributedCache and set it
    Path myTargetDir = new Path(rootDir, "./tmp");
    createEmptyDir(fs, myTargetDir);
    createFile(fs, myTargetDir, "cacheFile.txt");
    DistributedCache.setLocalFiles(jConf, (myTargetDir.toString() +
        Path.SEPARATOR+"cacheFile.txt"));
    assertTrue("Did not create cache file in " + myTargetDir,
        (fs.listStatus(myTargetDir).length == 1));

    // let us enable creation of symlinks in setupWorkDir()
    jConf.set(MRJobConfig.CACHE_SYMLINK, "yes");

    // add an invalid symlink
    Path myWorkDir = new Path(rootDir, "./work");
    createEmptyDir(fs, myWorkDir);
    DistributedCache.addCacheFile(new URI(myWorkDir.toString() + 
        Path.SEPARATOR+"file.txt#invalid/abc"), jConf);

    // setupWorkDir should throw exception
    try {
       TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));
       assertFalse("TaskRunner.setupWorkDir() did not throw exception when" +
           " given invalid cache file", true);
    } catch(IOException e) {
        // this is correct behavior
        assertTrue(myWorkDir + " does not have cache symlink.",
               fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 0));
    }

    // cleanup
    fs.delete(rootDir, true);
  }
}
