MAPREDUCE-2804. Fixed a race condition in setting up the log directories
for tasks that are starting at the same time. (omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1161328 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 71bca85..5f9ce6d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,9 @@
 
   BUG FIXES
 
+    MAPREDUCE-2804. Fixed a race condition in setting up the log directories
+    for tasks that are starting at the same time. (omalley)
+
     MAPREDUCE-2846. Fixed a race condition in writing the log index file that
     caused tasks to fail. (omalley)
 
diff --git a/src/core/org/apache/hadoop/fs/FileUtil.java b/src/core/org/apache/hadoop/fs/FileUtil.java
index bfb13d2..e9c3d70 100644
--- a/src/core/org/apache/hadoop/fs/FileUtil.java
+++ b/src/core/org/apache/hadoop/fs/FileUtil.java
@@ -25,6 +25,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell;
@@ -561,9 +563,25 @@
     } catch(InterruptedException e){
       //do nothing as of yet
     }
+    if (returnVal != 0) {
+      LOG.warn("Command '" + cmd + "' failed " + returnVal + 
+               " with: " + copyStderr(p));
+    }
     return returnVal;
   }
   
+  private static String copyStderr(Process p) throws IOException {
+    InputStream err = p.getErrorStream();
+    StringBuilder result = new StringBuilder();
+    byte[] buff = new byte[4096];
+    int len = err.read(buff);
+    while (len > 0) {
+      result.append(new String(buff, 0 , len));
+      len = err.read(buff);
+    }
+    return result.toString();
+  }
+
   /**
    * Change the permissions on a filename.
    * @param filename the name of the file to change
@@ -608,6 +626,75 @@
     }
     return shExec.getExitCode();
   }
+
+  /**
+   * Set permissions to the required value. Uses the java primitives instead
+   * of forking if group == other.
+   * @param f the file to change
+   * @param permission the new permissions
+   * @throws IOException
+   */
+  public static void setPermission(File f, FsPermission permission
+                                   ) throws IOException {
+    FsAction user = permission.getUserAction();
+    FsAction group = permission.getGroupAction();
+    FsAction other = permission.getOtherAction();
+    
+    // Fork chmod if group and other permissions are different...
+    if (group != other) {
+      execSetPermission(f, permission);
+      return;
+    }
+    
+    boolean rv = true;
+    
+    // read perms
+    rv = f.setReadable(group.implies(FsAction.READ), false);
+    checkReturnValue(rv, f, permission);
+    if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
+      f.setReadable(user.implies(FsAction.READ), true);
+      checkReturnValue(rv, f, permission);
+    }
+
+    // write perms
+    rv = f.setWritable(group.implies(FsAction.WRITE), false);
+    checkReturnValue(rv, f, permission);
+    if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
+      f.setWritable(user.implies(FsAction.WRITE), true);
+      checkReturnValue(rv, f, permission);
+    }
+
+    // exec perms
+    rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
+    checkReturnValue(rv, f, permission);
+    if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
+      f.setExecutable(user.implies(FsAction.EXECUTE), true);
+      checkReturnValue(rv, f, permission);
+    }
+  }
+
+  private static void checkReturnValue(boolean rv, File p, 
+                                       FsPermission permission
+                                       ) throws IOException {
+    if (!rv) {
+      throw new IOException("Failed to set permissions of path: " + p + " to " + 
+                            String.format("%04o", permission.toShort()));
+    }
+  }
+  
+  private static void execSetPermission(File f, FsPermission permission) 
+  throws IOException {
+    execCommand(f, Shell.SET_PERMISSION_COMMAND,
+        String.format("%04o", permission.toShort()));
+  }
+  
+  static String execCommand(File f, String... cmd) throws IOException {
+    String[] args = new String[cmd.length + 1];
+    System.arraycopy(cmd, 0, args, 0, cmd.length);
+    args[cmd.length] = f.getCanonicalPath();
+    String output = Shell.execCommand(args);
+    return output;
+  }
   
   /**
    * Create a tmp file for a base file.
diff --git a/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java b/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
index 1a8f085..f3c38a95 100644
--- a/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -21,7 +21,6 @@
 import java.io.*;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -258,6 +257,7 @@
     if (pathToFile(src).renameTo(pathToFile(dst))) {
       return true;
     }
+    LOG.debug("Falling through to a copy of " + src + " to " + dst);
     return FileUtil.copy(this, src, this, dst, true, getConf());
   }
   
@@ -415,8 +415,8 @@
       IOException e = null;
       try {
         StringTokenizer t = new StringTokenizer(
-            execCommand(new File(getPath().toUri()), 
-                        Shell.getGET_PERMISSION_COMMAND()));
+            FileUtil.execCommand(new File(getPath().toUri()), 
+                                 Shell.getGET_PERMISSION_COMMAND()));
         //expected format
         //-rw-------    1 username groupname ...
         String permission = t.nextToken();
@@ -466,11 +466,11 @@
     }
 
     if (username == null) {
-      execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
+      FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
     } else {
       //OWNER[:[GROUP]]
       String s = username + (groupname == null? "": ":" + groupname);
-      execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
+      FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
     }
   }
 
@@ -479,65 +479,8 @@
    */
   @Override
   public void setPermission(Path p, FsPermission permission
-      ) throws IOException {
-    FsAction user = permission.getUserAction();
-    FsAction group = permission.getGroupAction();
-    FsAction other = permission.getOtherAction();
-    
-    File f = pathToFile(p);
-    
-    // Fork chmod if group and other permissions are different...
-    if (group != other) {
-      execSetPermission(f, permission);
-      return;
-    }
-    
-    boolean rv = true;
-    
-    // read perms
-    rv = f.setReadable(group.implies(FsAction.READ), false);
-    checkReturnValue(rv, p, permission);
-    if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
-      f.setReadable(user.implies(FsAction.READ), true);
-      checkReturnValue(rv, p, permission);
-    }
-
-    // write perms
-    rv = f.setWritable(group.implies(FsAction.WRITE), false);
-    checkReturnValue(rv, p, permission);
-    if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
-      f.setWritable(user.implies(FsAction.WRITE), true);
-      checkReturnValue(rv, p, permission);
-    }
-
-    // exec perms
-    rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
-    checkReturnValue(rv, p, permission);
-    if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
-      f.setExecutable(user.implies(FsAction.EXECUTE), true);
-      checkReturnValue(rv, p, permission);
-    }
+                            ) throws IOException {
+    FileUtil.setPermission(pathToFile(p), permission);
   }
 
-  private void checkReturnValue(boolean rv, Path p, FsPermission permission) 
-  throws IOException {
-    if (!rv) {
-      throw new IOException("Failed to set permissions of path: " + p + " to " + 
-                            String.format("%04o", permission.toShort()));
-    }
-  }
-  
-  private void execSetPermission(File f, FsPermission permission) 
-  throws IOException {
-    execCommand(f, Shell.SET_PERMISSION_COMMAND,
-        String.format("%04o", permission.toShort()));
-  }
-  
-  private static String execCommand(File f, String... cmd) throws IOException {
-    String[] args = new String[cmd.length + 1];
-    System.arraycopy(cmd, 0, args, 0, cmd.length);
-    args[cmd.length] = f.getCanonicalPath();
-    String output = Shell.execCommand(args);
-    return output;
-  }
 }
diff --git a/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java b/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
index d5c4ce4..9ab8095 100644
--- a/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
+++ b/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
@@ -68,12 +68,8 @@
   @Override
   public void createLogDir(TaskAttemptID taskID, 
 		  			boolean isCleanup) throws IOException {
-	boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup, 
-	    		 		localStorage.getGoodLocalDirs());
-	if (!b) {
-	    LOG.warn("Creation of attempt log dir for " + taskID
-	                 + " failed. Ignoring");
-	}
+    TaskLog.createTaskAttemptLogDir(taskID, isCleanup, 
+                                    localStorage.getGoodLocalDirs());
   }
   
   /**
diff --git a/src/mapred/org/apache/hadoop/mapred/TaskLog.java b/src/mapred/org/apache/hadoop/mapred/TaskLog.java
index 4b3a303..5d502c2 100644
--- a/src/mapred/org/apache/hadoop/mapred/TaskLog.java
+++ b/src/mapred/org/apache/hadoop/mapred/TaskLog.java
@@ -23,7 +23,6 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -50,7 +49,6 @@
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -92,51 +90,29 @@
    * @param taskID attempt-id for which log dir is to be created
    * @param isCleanup Is this attempt a cleanup attempt ?
    * @param localDirs mapred local directories
-   * @return true if attempt log directory creation is succeeded
    * @throws IOException
    */
-  public static boolean createTaskAttemptLogDir(TaskAttemptID taskID,
+  public static void createTaskAttemptLogDir(TaskAttemptID taskID,
       boolean isCleanup, String[] localDirs) throws IOException{
     String cleanupSuffix = isCleanup ? ".cleanup" : "";
     String strAttemptLogDir = getTaskAttemptLogDir(taskID, 
         cleanupSuffix, localDirs);
     File attemptLogDir = new File(strAttemptLogDir);
-    boolean isSucceeded = attemptLogDir.mkdirs();
-    if(isSucceeded) {
-      String strLinkAttemptLogDir = getJobDir(
-          taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
-          taskID.toString() + cleanupSuffix;
-      if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
-        LOG.warn("Creation of symlink to attempt log dir failed.");
-        isSucceeded = false;
-      }
-
-      File linkAttemptLogDir = new File(strLinkAttemptLogDir);
-      // Set permissions for job dir in userlogs
-      if (!Localizer.PermissionsHandler.setPermissions(
-          linkAttemptLogDir.getParentFile(),
-          Localizer.PermissionsHandler.sevenZeroZero)) {
-        LOG.warn("Setting permissions to "
-                 + linkAttemptLogDir.getParentFile() + " failed.");
-        isSucceeded = false;
-      }
-      //Set permissions for target attempt log dir 
-      if (!Localizer.PermissionsHandler.setPermissions(attemptLogDir,
-          Localizer.PermissionsHandler.sevenZeroZero)) {
-        LOG.warn("Setting permissions to the real attempt log dir "
-                 + attemptLogDir + " failed.");
-        isSucceeded = false;
-      }
-      //Set permissions for target job log dir
-      if (!Localizer.PermissionsHandler.setPermissions(
-          attemptLogDir.getParentFile(),
-          Localizer.PermissionsHandler.sevenZeroZero)) {
-        LOG.warn("Setting permissions to the real job log dir "
-                 + attemptLogDir.getParentFile() + " failed.");
-        isSucceeded = false;
-      }
+    if (!attemptLogDir.mkdirs()) {
+      throw new IOException("Creation of " + attemptLogDir + " failed.");
     }
-    return isSucceeded;
+    String strLinkAttemptLogDir = 
+        getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
+        taskID.toString() + cleanupSuffix;
+    if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
+      throw new IOException("Creation of symlink from " + 
+                            strLinkAttemptLogDir + " to " + strAttemptLogDir +
+                            " failed.");
+    }
+
+    //Set permissions for target attempt log dir 
+    FsPermission userOnly = new FsPermission((short) 0700);
+    FileUtil.setPermission(attemptLogDir, userOnly);
   }
 
   /**
diff --git a/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java b/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
index 3339538..1a564ec 100644
--- a/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
+++ b/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
@@ -28,10 +28,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController;
-import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapreduce.JobID;
 
 /**
  * 
@@ -98,51 +96,6 @@
     }
 
     /**
-     * Set permission on the given file path using the specified permissions
-     * information. We use java api to set permission instead of spawning chmod
-     * processes. This saves a lot of time. Using this, one can set all possible
-     * combinations of permissions for the owner of the file. But permissions
-     * for the group and all others can only be set together, i.e. permissions
-     * for group cannot be set different from those for others and vice versa.
-     * 
-     * This method should satisfy the needs of most of the applications. For
-     * those it doesn't, {@link FileUtil#chmod} can be used.
-     * 
-     * @param f file path
-     * @param pInfo permissions information
-     * @return true if success, false otherwise
-     */
-    public static boolean setPermissions(File f, PermissionsInfo pInfo) {
-      if (pInfo == null) {
-        LOG.debug(" PermissionsInfo is null, returning.");
-        return true;
-      }
-
-      LOG.debug("Setting permission for " + f.getAbsolutePath());
-
-      boolean ret = true;
-
-      // Clear all the flags
-      ret = f.setReadable(false, false) && ret;
-      ret = f.setWritable(false, false) && ret;
-      ret = f.setExecutable(false, false) && ret;
-
-      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
-      LOG.debug("Readable status for " + f + " set to " + ret);
-      ret =
-          f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
-              && ret;
-      LOG.debug("Writable status for " + f + " set to " + ret);
-      ret =
-          f.setExecutable(pInfo.executablePermissions,
-              pInfo.executePermsOwnerOnly)
-              && ret;
-
-      LOG.debug("Executable status for " + f + " set to " + ret);
-      return ret;
-    }
-
-    /**
      * Permissions rwxr_xr_x
      */
     public static final PermissionsInfo sevenFiveFive =
@@ -210,9 +163,8 @@
         if (fs.exists(userDir) || fs.mkdirs(userDir)) {
 
           // Set permissions on the user-directory
-          PermissionsHandler.setPermissions(
-              new File(userDir.toUri().getPath()),
-              PermissionsHandler.sevenZeroZero);
+          FsPermission userOnly = new FsPermission((short) 0700);
+          FileUtil.setPermission(new File(userDir.toUri().getPath()), userOnly);
           userDirStatus = true;
 
           // Set up the jobcache directory
@@ -220,8 +172,7 @@
               new File(localDir, TaskTracker.getJobCacheSubdir(user));
           if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
             // Set permissions on the jobcache-directory
-            PermissionsHandler.setPermissions(jobCacheDir,
-                PermissionsHandler.sevenZeroZero);
+            FileUtil.setPermission(jobCacheDir, userOnly);
             jobCacheDirStatus = true;
           } else {
             LOG.warn("Unable to create job cache directory : "
@@ -233,8 +184,7 @@
               new File(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
           if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
             // Set permissions on the distcache-directory
-            PermissionsHandler.setPermissions(distributedCacheDir,
-                PermissionsHandler.sevenZeroZero);
+            FileUtil.setPermission(distributedCacheDir, userOnly);
             distributedCacheDirStatus = true;
           } else {
             LOG.warn("Unable to create distributed-cache directory : "
@@ -266,52 +216,6 @@
   }
 
   /**
-   * Prepare the job directories for a given job. To be called by the job
-   * localization code, only if the job is not already localized.
-   * 
-   * <br>
-   * Here, we set 700 permissions on the job directories created on all disks.
-   * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob} is run at a
-   * later time to set proper private permissions on the job directories. <br>
-   * 
-   * @param user
-   * @param jobId
-   * @throws IOException
-   */
-  public void initializeJobDirs(String user, JobID jobId)
-      throws IOException {
-    boolean initJobDirStatus = false;
-    String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
-    for (String localDir : localDirs) {
-      Path jobDir = new Path(localDir, jobDirPath);
-      if (fs.exists(jobDir)) {
-        // this will happen on a partial execution of localizeJob. Sometimes
-        // copying job.xml to the local disk succeeds but copying job.jar might
-        // throw out an exception. We should clean up and then try again.
-        fs.delete(jobDir, true);
-      }
-
-      boolean jobDirStatus = fs.mkdirs(jobDir);
-      if (!jobDirStatus) {
-        LOG.warn("Not able to create job directory " + jobDir.toString());
-      }
-
-      initJobDirStatus = initJobDirStatus || jobDirStatus;
-
-      // job-dir has to be private to the TT
-      Localizer.PermissionsHandler.setPermissions(new File(jobDir.toUri()
-          .getPath()), Localizer.PermissionsHandler.sevenZeroZero);
-    }
-
-    if (!initJobDirStatus) {
-      throw new IOException("Not able to initialize job directories "
-          + "in any of the configured local directories for job "
-          + jobId.toString());
-    }
-  }
-
-  /**
    * Create taskDirs on all the disks. Otherwise, in some cases, like when
    * LinuxTaskController is in use, child might wish to balance load across
    * disks but cannot itself create attempt directory because of the fact that
@@ -347,22 +251,4 @@
           + attemptId);
     }
   }
-
-  /**
-   * Create job log directory and set appropriate permissions for the directory.
-   * 
-   * @param jobId
-   */
-  public void initializeJobLogDir(JobID jobId) {
-    File jobUserLogDir = TaskLog.getJobDir(jobId);
-    if (!jobUserLogDir.exists()) {
-      boolean ret = jobUserLogDir.mkdirs();
-      if (!ret) {
-        LOG.warn("Could not create job user log directory: " + jobUserLogDir);
-        return;
-      }
-    }
-    Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
-        Localizer.PermissionsHandler.sevenZeroZero);
-  }
 }