HIVE-14323 : Reduce number of FS permissions and redundant FS operations (Rajesh Balamohan via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index ac08ab3..3d3dddf 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -546,7 +546,11 @@
       } else {
         try {
           //set on the entire subtree
-          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent, true);
+          if (inheritPerms) {
+            HdfsUtils.setFullFileStatus(conf,
+                new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs,
+                firstNonExistentParent, true);
+          }
         } catch (Exception e) {
           LOG.warn("Error setting permissions of " + firstNonExistentParent, e);
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index ec5d693..89893eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -245,7 +245,9 @@
 
       if (mkdir) {
         try {
-          if (!FileUtils.mkdir(fs, dir, true, conf)) {
+          boolean inheritPerms = HiveConf.getBoolVar(conf,
+              HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+          if (!FileUtils.mkdir(fs, dir, inheritPerms, conf)) {
             throw new IllegalStateException("Cannot create staging directory  '" + dir.toString() + "'");
           }
 
@@ -380,10 +382,9 @@
       Path location = materializedTable.getDataLocation();
       try {
         FileSystem fs = location.getFileSystem(conf);
-        if (fs.exists(location)) {
-          fs.delete(location, true);
-          LOG.info("Removed " + location + " for materialized " + materializedTable.getTableName());
-        }
+        boolean status = fs.delete(location, true);
+        LOG.info("Removed " + location + " for materialized "
+            + materializedTable.getTableName() + ", status=" + status);
       } catch (IOException e) {
         // ignore
         LOG.warn("Error removing " + location + " for materialized " + materializedTable.getTableName() +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index cadda8f..fd25978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1388,10 +1388,11 @@
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     if (success) {
-      if (fs.exists(tmpPath)) {
+      FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+          tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
+      if(statuses != null && statuses.length > 0) {
         // remove any tmp file or double-committed output files
-        List<Path> emptyBuckets =
-            Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf);
+        List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf);
         // create empty buckets if necessary
         if (emptyBuckets.size() > 0) {
           createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
@@ -1462,21 +1463,31 @@
     removeTempOrDuplicateFiles(fs, path, null,null,null);
   }
 
+  public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, Path path,
+      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
+    if (path  == null) {
+      return null;
+    }
+    FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path,
+        ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
+    return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf);
+  }
+
   /**
    * Remove all temporary files and duplicate (double-committed) files from a given directory.
    *
    * @return a list of path names corresponding to should-be-created empty buckets.
    */
-  public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, Path path,
+  public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
       DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
-    if (path == null) {
+    if (fileStats == null) {
       return null;
     }
 
     List<Path> result = new ArrayList<Path>();
     HashMap<String, FileStatus> taskIDToFile = null;
     if (dpCtx != null) {
-      FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
+      FileStatus parts[] = fileStats;
 
       for (int i = 0; i < parts.length; ++i) {
         assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
@@ -1512,7 +1523,10 @@
         }
       }
     } else {
-      FileStatus[] items = fs.listStatus(path);
+      FileStatus[] items = fileStats;
+      if (items.length == 0) {
+        return result;
+      }
       taskIDToFile = removeTempOrDuplicateFiles(items, fs);
       if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && conf.getTable() != null
           && (conf.getTable().getNumBuckets() > taskIDToFile.size()) && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) {
@@ -2253,12 +2267,13 @@
 
   public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception {
     FileSystem inpFs = dirPath.getFileSystem(job);
-
-    if (inpFs.exists(dirPath)) {
+    try {
       FileStatus[] fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
       if (fStats.length > 0) {
         return false;
       }
+    } catch(FileNotFoundException fnf) {
+      return true;
     }
     return true;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9d927bd..66a2c94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3191,20 +3191,20 @@
       if (oldPath != null) {
         boolean oldPathDeleted = false;
         boolean isOldPathUnderDestf = false;
+        FileStatus[] statuses = null;
         try {
           FileSystem fs2 = oldPath.getFileSystem(conf);
-          if (fs2.exists(oldPath)) {
-            // Do not delete oldPath if:
-            //  - destf is subdir of oldPath
-            //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2)))
-            isOldPathUnderDestf = FileUtils.isSubDir(oldPath, destf, fs2);
-            if (isOldPathUnderDestf) {
-              // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
-              // existing content might result in incorrect (extra) data.
-              // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
-              // not the destf or its subdir?
-              oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf);
-            }
+          statuses = fs2.listStatus(oldPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+          // Do not delete oldPath if:
+          //  - destf is subdir of oldPath
+          //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2)))
+          isOldPathUnderDestf = FileUtils.isSubDir(oldPath, destf, fs2);
+          if (isOldPathUnderDestf) {
+            // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
+            // existing content might result in incorrect (extra) data.
+            // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
+            // not the destf or its subdir?
+            oldPathDeleted = trashFiles(fs2, statuses, conf);
           }
         } catch (IOException e) {
           if (isOldPathUnderDestf) {
@@ -3216,14 +3216,18 @@
             LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
           }
         }
-        if (isOldPathUnderDestf && !oldPathDeleted) {
-          throw new HiveException("Destination directory " + destf + " has not be cleaned up.");
+        if (statuses != null && statuses.length > 0) {
+          if (isOldPathUnderDestf && !oldPathDeleted) {
+            throw new HiveException("Destination directory " + destf + " has not be cleaned up.");
+          }
         }
       }
 
       // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
       // destf with inherited permissions
-      boolean destfExist = FileUtils.mkdir(destFs, destf, true, conf);
+      boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars
+          .HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+      boolean destfExist = FileUtils.mkdir(destFs, destf, inheritPerms, conf);
       if(!destfExist) {
         throw new IOException("Directory " + destf.toString()
             + " does not exist and could not be created.");
@@ -3255,16 +3259,18 @@
   /**
    * Trashes or deletes all files under a directory. Leaves the directory as is.
    * @param fs FileSystem to use
-   * @param f path of directory
+   * @param statuses fileStatuses of files to be deleted
    * @param conf hive configuration
-   * @param forceDelete whether to force delete files if trashing does not succeed
    * @return true if deletion successful
    * @throws IOException
    */
-  private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf)
+  private boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf)
       throws IOException {
-    FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);
     boolean result = true;
+
+    if (statuses == null || statuses.length == 0) {
+      return false;
+    }
     final List<Future<Boolean>> futures = new LinkedList<>();
     final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
         Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),