MAPREDUCE-2238. Fix permissions handling to avoid leaving undeletable directories in local dirs. Contributed by Todd Lipcon


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1063088 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index bfad2b2..2062d70 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -489,6 +489,9 @@
     MAPREDUCE-2282. Fix TestMRServerPorts for the changes in
     TestHDFSServerPorts.  (shv via szetszwo)
 
+    MAPREDUCE-2238. Fix permissions handling to avoid leaving undeletable directories
+    in local dirs. (todd)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES
diff --git a/src/java/org/apache/hadoop/mapred/TaskController.java b/src/java/org/apache/hadoop/mapred/TaskController.java
index 3e5dced..6939327 100644
--- a/src/java/org/apache/hadoop/mapred/TaskController.java
+++ b/src/java/org/apache/hadoop/mapred/TaskController.java
@@ -27,9 +27,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
@@ -78,6 +78,8 @@
    * </ul>
    */
   public void setup() throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+
     for (String localDir : this.mapredLocalDirs) {
       // Set up the mapreduce.cluster.local.directories.
       File mapredlocalDir = new File(localDir);
@@ -85,8 +87,8 @@
         LOG.warn("Unable to create mapreduce.cluster.local.directory : "
             + mapredlocalDir.getPath());
       } else {
-        Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
-            Localizer.PermissionsHandler.sevenFiveFive);
+        localFs.setPermission(new Path(mapredlocalDir.getCanonicalPath()),
+                              new FsPermission((short)0755));
       }
     }
 
@@ -95,8 +97,8 @@
     if (!taskLog.isDirectory() && !taskLog.mkdirs()) {
       LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
     } else {
-      Localizer.PermissionsHandler.setPermissions(taskLog,
-          Localizer.PermissionsHandler.sevenFiveFive);
+      localFs.setPermission(new Path(taskLog.getCanonicalPath()),
+                            new FsPermission((short)0755));
     }
     DiskChecker.checkDir(TaskLog.getUserLogDir());
   }
diff --git a/src/java/org/apache/hadoop/mapred/TaskRunner.java b/src/java/org/apache/hadoop/mapred/TaskRunner.java
index 31a20ab..0161c79 100644
--- a/src/java/org/apache/hadoop/mapred/TaskRunner.java
+++ b/src/java/org/apache/hadoop/mapred/TaskRunner.java
@@ -39,12 +39,12 @@
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -279,8 +279,9 @@
     if (!b) {
       LOG.warn("mkdirs failed. Ignoring");
     } else {
-      Localizer.PermissionsHandler.setPermissions(logDir,
-          Localizer.PermissionsHandler.sevenZeroZero);
+      FileSystem localFs = FileSystem.getLocal(conf);
+      localFs.setPermission(new Path(logDir.getCanonicalPath()),
+                            new FsPermission((short)0700));
     }
 
     return logFiles;
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index e13a8cb..cd17c9c 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -1160,7 +1160,7 @@
 
     FileOutputStream out;
     try {
-      out = SecureIOUtils.createForWrite(aclFile, 0600);
+      out = SecureIOUtils.createForWrite(aclFile, 0700);
     } catch (SecureIOUtils.AlreadyExistsException aee) {
       LOG.warn("Job ACL file already exists at " + aclFile, aee);
       return;
@@ -1170,8 +1170,6 @@
     } finally {
       out.close();
     }
-    Localizer.PermissionsHandler.setPermissions(aclFile,
-        Localizer.PermissionsHandler.sevenZeroZero);
   }
 
   /**
diff --git a/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java b/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
index ccb5b7c..00f9388 100644
--- a/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
+++ b/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
@@ -59,105 +60,6 @@
     taskController = tc;
   }
 
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class PermissionsHandler {
-    /**
-     * Permission information useful for setting permissions for a given path.
-     * Using this, one can set all possible combinations of permissions for the
-     * owner of the file. But permissions for the group and all others can only
-     * be set together, i.e. permissions for group cannot be set different from
-     * those for others and vice versa.
-     */
-    @InterfaceAudience.Private
-    @InterfaceStability.Unstable
-    public static class PermissionsInfo {
-      public boolean readPermissions;
-      public boolean writePermissions;
-      public boolean executablePermissions;
-      public boolean readPermsOwnerOnly;
-      public boolean writePermsOwnerOnly;
-      public boolean executePermsOwnerOnly;
-
-      /**
-       * Create a permissions-info object with the given attributes
-       * 
-       * @param readPerms
-       * @param writePerms
-       * @param executePerms
-       * @param readOwnerOnly
-       * @param writeOwnerOnly
-       * @param executeOwnerOnly
-       */
-      public PermissionsInfo(boolean readPerms, boolean writePerms,
-          boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
-          boolean executeOwnerOnly) {
-        readPermissions = readPerms;
-        writePermissions = writePerms;
-        executablePermissions = executePerms;
-        readPermsOwnerOnly = readOwnerOnly;
-        writePermsOwnerOnly = writeOwnerOnly;
-        executePermsOwnerOnly = executeOwnerOnly;
-      }
-    }
-
-    /**
-     * Set permission on the given file path using the specified permissions
-     * information. We use java api to set permission instead of spawning chmod
-     * processes. This saves a lot of time. Using this, one can set all possible
-     * combinations of permissions for the owner of the file. But permissions
-     * for the group and all others can only be set together, i.e. permissions
-     * for group cannot be set different from those for others and vice versa.
-     * 
-     * This method should satisfy the needs of most of the applications. For
-     * those it doesn't, {@link FileUtil#chmod} can be used.
-     * 
-     * @param f file path
-     * @param pInfo permissions information
-     * @return true if success, false otherwise
-     */
-    public static boolean setPermissions(File f, PermissionsInfo pInfo) {
-      if (pInfo == null) {
-        LOG.debug(" PermissionsInfo is null, returning.");
-        return true;
-      }
-
-      LOG.debug("Setting permission for " + f.getAbsolutePath());
-
-      boolean ret = true;
-
-      // Clear all the flags
-      ret = f.setReadable(false, false) && ret;
-      ret = f.setWritable(false, false) && ret;
-      ret = f.setExecutable(false, false) && ret;
-
-      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
-      LOG.debug("Readable status for " + f + " set to " + ret);
-      ret =
-          f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
-              && ret;
-      LOG.debug("Writable status for " + f + " set to " + ret);
-      ret =
-          f.setExecutable(pInfo.executablePermissions,
-              pInfo.executePermsOwnerOnly)
-              && ret;
-
-      LOG.debug("Executable status for " + f + " set to " + ret);
-      return ret;
-    }
-
-    /**
-     * Permissions rwxr_xr_x
-     */
-    public static PermissionsInfo sevenFiveFive =
-        new PermissionsInfo(true, true, true, false, true, false);
-    /**
-     * Completely private permissions
-     */
-    public static PermissionsInfo sevenZeroZero =
-        new PermissionsInfo(true, true, true, true, true, true);
-  }
-
   // Data-structure for synchronizing localization of user directories.
   private Map<String, AtomicBoolean> localizedUsers =
       new HashMap<String, AtomicBoolean>();
@@ -214,35 +116,31 @@
         if (fs.exists(userDir) || fs.mkdirs(userDir)) {
 
           // Set permissions on the user-directory
-          PermissionsHandler.setPermissions(
-              new File(userDir.toUri().getPath()),
-              PermissionsHandler.sevenZeroZero);
+          fs.setPermission(userDir, new FsPermission((short)0700));
           userDirStatus = true;
 
           // Set up the jobcache directory
-          File jobCacheDir =
-              new File(localDir, TaskTracker.getJobCacheSubdir(user));
-          if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
+          Path jobCacheDir =
+              new Path(localDir, TaskTracker.getJobCacheSubdir(user));
+          if (fs.exists(jobCacheDir) || fs.mkdirs(jobCacheDir)) {
             // Set permissions on the jobcache-directory
-            PermissionsHandler.setPermissions(jobCacheDir,
-                PermissionsHandler.sevenZeroZero);
+            fs.setPermission(jobCacheDir, new FsPermission((short)0700));
             jobCacheDirStatus = true;
           } else {
             LOG.warn("Unable to create job cache directory : "
-                + jobCacheDir.getPath());
+                + jobCacheDir);
           }
 
           // Set up the cache directory used for distributed cache files
-          File distributedCacheDir =
-              new File(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
-          if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
+          Path distributedCacheDir =
+              new Path(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
+          if (fs.exists(distributedCacheDir) || fs.mkdirs(distributedCacheDir)) {
             // Set permissions on the distcache-directory
-            PermissionsHandler.setPermissions(distributedCacheDir,
-                PermissionsHandler.sevenZeroZero);
+            fs.setPermission(distributedCacheDir, new FsPermission((short)0700));
             distributedCacheDirStatus = true;
           } else {
             LOG.warn("Unable to create distributed-cache directory : "
-                + distributedCacheDir.getPath());
+                + distributedCacheDir);
           }
         } else {
           LOG.warn("Unable to create the user directory : " + userDir);
@@ -311,8 +209,7 @@
       initJobDirStatus = initJobDirStatus || jobDirStatus;
 
       // job-dir has to be private to the TT
-      Localizer.PermissionsHandler.setPermissions(new File(jobDir.toUri()
-          .getPath()), Localizer.PermissionsHandler.sevenZeroZero);
+      fs.setPermission(jobDir, new FsPermission((short)0700));
     }
 
     if (!initJobDirStatus) {
@@ -366,15 +263,11 @@
    * @param jobId
    */
   public void initializeJobLogDir(JobID jobId) throws IOException {
-    File jobUserLogDir = TaskLog.getJobDir(jobId);
-    if (!jobUserLogDir.exists()) {
-      boolean ret = jobUserLogDir.mkdirs();
-      if (!ret) {
-        throw new IOException("Could not create job user log directory: " +
-            jobUserLogDir);
-      }
+    Path jobUserLogDir = new Path(TaskLog.getJobDir(jobId).getCanonicalPath());
+    if (!fs.mkdirs(jobUserLogDir)) {
+      throw new IOException("Could not create job user log directory: " +
+                            jobUserLogDir);
     }
-    Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
-        Localizer.PermissionsHandler.sevenZeroZero);
+    fs.setPermission(jobUserLogDir, new FsPermission((short)0700));
   }
 }