(TWILL-208) add Location.mkdirs(String permissions)

This closes #26 from GitHub.

Signed-off-by: anew <anew@apache.org>
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index 37ec3d0..fd5470b 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -98,8 +98,8 @@
 
   @Override
   public OutputStream getOutputStream(String permission) throws IOException {
-    ensureDirectory(file.getParentFile());
     Set<PosixFilePermission> permissions = parsePermissions(permission);
+    ensureDirectory(file.getParentFile(), permissions);
     Path path = file.toPath();
     WritableByteChannel channel = Files.newByteChannel(path,
                                                        EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.WRITE),
@@ -110,7 +110,7 @@
   }
 
   /**
-   * @return Returns the name of the file or directory denoteed by this abstract pathname.
+   * @return Returns the name of the file or directory denoted by this abstract pathname.
    */
   @Override
   public String getName() {
@@ -251,6 +251,34 @@
     return file.mkdirs();
   }
 
+  @Override
+  public boolean mkdirs(String permission) throws IOException {
+    Set<PosixFilePermission> posixPermissions = parsePermissions(permission);
+    return mkdirs(file, posixPermissions);
+  }
+
+  private boolean mkdirs(File file, Set<PosixFilePermission> permissions) throws IOException {
+    if (file.exists()) {
+      return false;
+    }
+    if (file.mkdir()) {
+      Files.setPosixFilePermissions(file.toPath(), permissions);
+      return true;
+    }
+    File parent = file.getParentFile();
+    if (parent == null) {
+      return false;
+    }
+    if (!mkdirs(parent, permissions) && !parent.exists()) {
+      return false;
+    }
+    if (file.mkdir()) {
+      Files.setPosixFilePermissions(file.toPath(), permissions);
+      return true;
+    }
+    return false;
+  }
+
   /**
    * @return Length of file.
    */
@@ -322,6 +350,16 @@
   }
 
   /**
+   * Ensures the given {@link File} is a directory. If it doesn't exist, it will be created.
+   */
+  private void ensureDirectory(File dir, Set<PosixFilePermission> permission) throws IOException {
+    // Check, create, check to resolve race conditions if there are concurrent creation of the directory.
+    if (!dir.isDirectory() && !mkdirs(dir, permission) && !dir.isDirectory()) {
+      throw new IOException("Failed to create directory " + dir);
+    }
+  }
+
+  /**
    * Parses the given permission to a set of {@link PosixFilePermission}.
    *
    * @param permission the permission as passed to the {@link #createNew(String)} or {@link #getOutputStream(String)}
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/Location.java b/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
index 973cb92..757a296 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
@@ -173,7 +173,7 @@
    * pathname denotes a directory, then the directory must be empty in order
    * to be deleted.
    *
-   * @return true if and only if the file or directory is successfully delete; false otherwise.
+   * @return true if and only if the file or directory is successfully deleted; false otherwise.
    */
   boolean delete() throws IOException;
 
@@ -185,7 +185,7 @@
    * failure during deletion will have some entries inside the directory being deleted while some are not.
    *
    * @param recursive Indicate if recursively delete a directory. Ignored if the pathname represents a file.
-   * @return true if and only if the file or directory is successfully delete; false otherwise.
+   * @return true if and only if the file or directory is successfully deleted; false otherwise.
    */
   boolean delete(boolean recursive) throws IOException;
 
@@ -202,11 +202,27 @@
    * Creates the directory named by this abstract pathname, including any necessary
    * but nonexistent parent directories.
    *
-   * @return true if and only if the renaming succeeded; false otherwise
+   * @return true if and only if the directory was created; false otherwise
    */
   boolean mkdirs() throws IOException;
 
   /**
+   * Creates the directory named by this abstract pathname, including any necessary
+   * but nonexistent parent directories. The directories will be created with the
+   * exact given permissions, regardless of a possible umask setting.
+   *
+   * @param permission A permission string. It has to be either a three digit or a nine character string.
+   *                   For the three digit string, it is similar to the UNIX permission numeric representation.
+   *                   The first digit is the permission for owner, second digit is the permission for group and
+   *                   the third digit is the permission for all.
+   *                   For the nine character string, it uses the format as specified by the
+   *                   {@link PosixFilePermissions#fromString(String)} method.
+   *
+   * @return true if and only if the directory was created; false otherwise
+   */
+  boolean mkdirs(String permission) throws IOException;
+
+  /**
    * @return Length of file.
    */
   long length() throws IOException;
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
index 9530384..c50a301 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
@@ -199,6 +199,9 @@
 
   @Override
   public boolean mkdirs() throws IOException {
+    if (fc.util().exists(path)) {
+      return false;
+    }
     try {
       fc.mkdir(path, null, true);
       return true;
@@ -208,6 +211,60 @@
   }
 
   @Override
+  public boolean mkdirs(String permission) throws IOException {
+    return mkdirs(path, parsePermissions(permission));
+  }
+
+  /**
+   * Helper to create a directory and its parents id necessary, all with the given permissions.
+   * We cannot use the fs.mkdirs() because that would apply the umask to the permissions.
+   */
+  private boolean mkdirs(Path path, FsPermission permission) throws IOException {
+    if (fc.util().exists(path)) {
+      return false;
+    }
+    Path parent = path.getParent();
+    if (null == parent) {
+      return false;
+    }
+    // if parent exists, attempt to create the path as a directory.
+    if (fc.util().exists(parent)) {
+      return mkdir(path, permission);
+    }
+    // attempt to create the parent with the proper permissions
+    if (!mkdirs(parent, permission) && !isDirectory(parent)) {
+      return false;
+    }
+    // now the parent exists and we can create this directory
+    return mkdir(path, permission);
+  }
+
+  /**
+   * Helper to create a directory (but not its parents) with the given permissions.
+   * We cannot use fs.mkdirs() and then apply the permissions to override the umask.
+   */
+  private boolean mkdir(Path path, FsPermission permission) throws IOException {
+    try {
+      fc.mkdir(path, null, true);
+    } catch (FileAlreadyExistsException e) {
+      if (!isDirectory(path)) {
+        return false;
+      }
+    }
+    // explicitly set permissions to get around the umask
+    fc.setPermission(path, permission);
+    return true;
+  }
+
+  private boolean isDirectory(Path path) throws IOException {
+    try {
+      return fc.getFileStatus(path).isDirectory();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  @Override
   public long length() throws IOException {
     return fc.getFileStatus(path).getLen();
   }
@@ -219,11 +276,7 @@
 
   @Override
   public boolean isDirectory() throws IOException {
-    try {
-      return fc.getFileStatus(path).isDirectory();
-    } catch (FileNotFoundException e) {
-      return false;
-    }
+    return isDirectory(path);
   }
 
   @Override
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index 097bcc5..43b37da 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -221,15 +221,62 @@
     }
   }
 
-  /**
-   * Creates the directory named by this abstract pathname, including any necessary
-   * but nonexistent parent directories.
-   *
-   * @return true if and only if the renaming succeeded; false otherwise
-   */
   @Override
   public boolean mkdirs() throws IOException {
-    return fs.mkdirs(path);
+    if (fs.exists(path)) {
+      return false;
+    }
+    try {
+      return fs.mkdirs(path);
+    } catch (FileAlreadyExistsException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean mkdirs(String permission) throws IOException {
+    return mkdirs(path, parsePermissions(permission));
+  }
+
+  /**
+   * Helper to create a directory and its parents id necessary, all with the given permissions.
+   * We cannot use the fs.mkdirs() because that would apply the umask to the permissions.
+   */
+  private boolean mkdirs(Path path, FsPermission permission) throws IOException {
+    if (fs.exists(path)) {
+      return false;
+    }
+    Path parent = path.getParent();
+    if (null == parent) {
+      return false;
+    }
+    // if parent exists, attempt to create the path as a directory.
+    if (fs.exists(parent)) {
+      return mkdir(path, permission);
+    }
+    // attempt to create the parent with the proper permissions
+    if (!mkdirs(parent, permission) && !fs.isDirectory(parent)) {
+      return false;
+    }
+    // now the parent exists and we can create this directory
+    return mkdir(path, permission);
+  }
+
+  /**
+   * Helper to create a directory (but not its parents) with the given permissions.
+   * We cannot use fs.mkdirs() and then apply the permissions to override the umask.
+   */
+  private boolean mkdir(Path path, FsPermission permission) throws IOException {
+    try {
+      if (!fs.mkdirs(path) && !fs.isDirectory(path)) {
+        return false;
+      }
+    } catch (FileAlreadyExistsException e) {
+      return false;
+    }
+    // explicitly set permissions to get around the umask
+    fs.setPermission(path, permission);
+    return true;
   }
 
   /**
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
index e4c3774..4ed59a7 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
@@ -17,25 +17,36 @@
  */
 package org.apache.twill.filesystem;
 
+import com.google.common.base.Throwables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
+import java.security.PrivilegedAction;
 
 /**
  *
  */
 public class FileContextLocationTest extends LocationTestBase {
 
-  private static MiniDFSCluster dfsCluster;
+  public static MiniDFSCluster dfsCluster;
+  private static UserGroupInformation testUGI;
 
   @BeforeClass
   public static void init() throws IOException {
     Configuration conf = new Configuration();
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
     dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    // make root world-writable so that we can create all location factories as unprivileged user
+    dfsCluster.getFileSystem().setPermission(new Path("/"), FsPermission.valueOf("-rwxrwxrwx"));
+    // to run these tests not as superuser, make sure to use a user name other than the JVM user
+    String userName = System.getProperty("user.name").equals("tester") ? "twiller" : "tester";
+    testUGI = UserGroupInformation.createUserForTesting(userName, new String[] { "testgroup" });
   }
 
   @AfterClass
@@ -44,7 +55,39 @@
   }
 
   @Override
+  protected String getUserName() {
+    return testUGI.getUserName();
+  }
+
+  @Override
+  protected String getUserGroup(String ignoredGroupName) {
+    return testUGI.getGroupNames()[testUGI.getGroupNames().length - 1];
+  }
+
+  @Override
   protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+    return createLocationFactory(pathBase, testUGI);
+  }
+
+  @Override
+  protected LocationFactory createLocationFactory(final String pathBase, UserGroupInformation ugi) throws Exception {
+    LocationFactory factory = ugi.doAs(new PrivilegedAction<LocationFactory>() {
+      @Override
+      public LocationFactory run() {
+        try {
+          return doCreateLocationFactory(pathBase);
+        } catch (IOException e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    });
+    // make sure the root of the location factory exists and only permits the test user
+    Location root = factory.create("/");
+    root.mkdirs("rwx------");
+    return factory;
+  }
+
+  protected LocationFactory doCreateLocationFactory(String pathBase) throws IOException {
     return new FileContextLocationFactory(dfsCluster.getFileSystem().getConf(), pathBase);
   }
 }
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
index d57d49f..de0b4c5 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
@@ -17,34 +17,15 @@
  */
 package org.apache.twill.filesystem;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
 import java.io.IOException;
 
 /**
- *
+ * Everything here is set up the same as for FileContextLocation; except that we use an HDFSLocationFactory.
  */
-public class HDFSLocationTest extends LocationTestBase {
-
-  private static MiniDFSCluster dfsCluster;
-
-  @BeforeClass
-  public static void init() throws IOException {
-    Configuration conf = new Configuration();
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
-    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-  }
-
-  @AfterClass
-  public static void finish() {
-    dfsCluster.shutdown();
-  }
+public class HDFSLocationTest extends FileContextLocationTest {
 
   @Override
-  protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+  protected LocationFactory doCreateLocationFactory(String pathBase) throws IOException {
     return new HDFSLocationFactory(dfsCluster.getFileSystem(), pathBase);
   }
 }
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
index 6bdba27..a6b98e8 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.filesystem;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 
 import java.io.File;
@@ -27,12 +28,28 @@
 public class LocalLocationTest extends LocationTestBase {
 
   @Override
+  protected String getUserName() {
+    return System.getProperty("user.name");
+  }
+
+  @Override
+  protected String getUserGroup(String groupName) {
+    String newGroup = System.getProperty("new.group.name");
+    return newGroup != null ? newGroup : groupName;
+  }
+
+  @Override
   protected LocationFactory createLocationFactory(String pathBase) throws Exception {
     File basePath = new File(tmpFolder.newFolder(), pathBase);
+    //noinspection ResultOfMethodCallIgnored
     basePath.mkdirs();
     return new LocalLocationFactory(basePath);
   }
 
+  protected LocationFactory createLocationFactory(String pathBase, UserGroupInformation ugi) throws Exception {
+    return null;
+  }
+
   @Override
   public void testHomeLocation() throws Exception {
     // For Local location, UGI won't take an effect.
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
index 135b469..5931ada 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
@@ -22,6 +22,7 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.io.CharStreams;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -34,7 +35,6 @@
 import java.io.Reader;
 import java.io.Writer;
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
 /**
@@ -101,17 +101,12 @@
     LocationFactory locationFactory = createLocationFactory("/");
 
     // Without UGI, the home location should be the same as the user
-    Assert.assertEquals(System.getProperty("user.name"), locationFactory.getHomeLocation().getName());
+    Assert.assertEquals(getUserName(), locationFactory.getHomeLocation().getName());
 
     // With UGI, the home location should be based on the UGI current user
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(System.getProperty("user.name") + "1");
-    locationFactory = ugi.doAs(new PrivilegedExceptionAction<LocationFactory>() {
-      @Override
-      public LocationFactory run() throws Exception {
-        return createLocationFactory("/");
-      }
-    });
-
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+      getUserName() + "1", UserGroupInformation.getCurrentUser().getGroupNames());
+    locationFactory = createLocationFactory("/", ugi);
     Assert.assertEquals(ugi.getUserName(), locationFactory.getHomeLocation().getName());
   }
 
@@ -192,17 +187,12 @@
     LocationFactory factory = locationFactoryCache.getUnchecked("ownergroup");
     Location location = factory.create("ogtest");
     location.createNew();
-    Assert.assertEquals(System.getProperty("user.name"), location.getOwner());
-    String group = location.getGroup();
-    // we only have one group and user in a unit test (or at least that is all we know)
-    // this allows us to pass in a new group name via system property if another group is known
-    String newGroup = System.getProperty("new.group.name");
-    newGroup = newGroup == null ? group : newGroup;
+    Assert.assertEquals(getUserName(), location.getOwner());
+    String newGroup =  getUserGroup(location.getGroup());
     location.setGroup(newGroup);
     Assert.assertEquals(newGroup, location.getGroup());
   }
 
-
   @Test
   public void testPermissions() throws IOException {
     LocationFactory factory = locationFactoryCache.getUnchecked("permission1");
@@ -276,5 +266,101 @@
     Assert.assertEquals("-w--w--w-", location.getPermissions());
   }
 
+  @Test
+  public void testDirPermissions() throws IOException {
+    LocationFactory factory = locationFactoryCache.getUnchecked("permissionD");
+
+    Location location = factory.create("nn");
+    String permissions = "rwxr-x---";
+    location.mkdirs(permissions);
+    Assert.assertTrue(location.exists());
+    Assert.assertTrue(location.isDirectory());
+    Assert.assertEquals(permissions, location.getPermissions());
+
+    permissions = "rwx------";
+    location.setPermissions(permissions);
+    Assert.assertEquals(permissions, location.getPermissions());
+
+    Location child = location.append("p1");
+    Location grandchild = child.append("xx");
+    permissions = "rwx-w--w-";
+    grandchild.mkdirs(permissions);
+    Assert.assertTrue(child.isDirectory());
+    Assert.assertTrue(grandchild.isDirectory());
+    Assert.assertEquals(permissions, child.getPermissions());
+    Assert.assertEquals(permissions, grandchild.getPermissions());
+
+    permissions = "rwx------";
+    child.delete(true);
+    Assert.assertFalse(child.exists());
+    Location textfile = grandchild.append("a.txt");
+    textfile.getOutputStream(permissions).close();
+    Assert.assertTrue(child.isDirectory());
+    Assert.assertTrue(grandchild.isDirectory());
+    Assert.assertFalse(textfile.isDirectory());
+    Assert.assertEquals(permissions, child.getPermissions());
+    Assert.assertEquals(permissions, grandchild.getPermissions());
+    Assert.assertEquals(permissions, textfile.getPermissions());
+
+    // mkdirs of existing file
+    Location file = factory.create("existingfile");
+    Assert.assertTrue(file.createNew("rwx------"));
+    Assert.assertFalse(file.mkdirs());
+    Assert.assertFalse(file.mkdirs("rwxrwx---"));
+
+    // mkdirs where parent is existing file
+    file = file.append("newdir");
+    Assert.assertFalse(file.mkdirs());
+    Assert.assertFalse(file.mkdirs("rwxrwx---"));
+
+    // mkdirs of existing directory
+    Location dir = factory.create("existingdir");
+    Assert.assertTrue(dir.mkdirs());
+    Assert.assertFalse(dir.mkdirs());
+    Assert.assertFalse(dir.mkdirs("rwxrwx---"));
+
+    // mkdirs for existing parent with different permissions -> should not change
+    dir.setPermissions("rwx------");
+    Assert.assertEquals("rwx------", dir.getPermissions());
+    Location newdir = dir.append("newdir");
+    Assert.assertTrue(newdir.mkdirs("rwxrwx---"));
+    Assert.assertEquals("rwxrwx---", newdir.getPermissions());
+    Assert.assertEquals("rwx------", dir.getPermissions());
+
+    // mkdirs whithout permission for parent
+    Assert.assertTrue(newdir.delete(true));
+    dir.setPermissions("r-x------");
+    Assert.assertEquals("r-x------", dir.getPermissions());
+    try {
+      Assert.assertFalse(newdir.mkdirs());
+    } catch (AccessControlException e) {
+      // expected
+    }
+    try {
+      Assert.assertFalse(newdir.mkdirs("rwxrwx---"));
+    } catch (AccessControlException e) {
+      // expected
+    }
+  }
+
+  /**
+   * Create a location factory rooted at a given path.
+   */
   protected abstract LocationFactory createLocationFactory(String pathBase) throws Exception;
+
+  /**
+   * Create a location factory rooted at a given path, for the given UGI.
+   */
+  protected abstract LocationFactory createLocationFactory(String pathBase, UserGroupInformation ugi) throws Exception;
+
+  /**
+   * Get the user name used for {@link #createLocationFactory(String)}.
+   */
+  protected abstract String getUserName();
+
+  /**
+   * Given the group name of a location, return a valid group name to test changing the location's group.
+   * If no suitable group name is known, the passed-in group name can be returned.
+   */
+  protected abstract String getUserGroup(String groupName);
 }