HIVE-14323 : Reduce number of FS permissions and redundant FS operations (Rajesh Balamohan via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index ac08ab3..3d3dddf 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -546,7 +546,11 @@
} else {
try {
//set on the entire subtree
- HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent, true);
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf,
+ new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs,
+ firstNonExistentParent, true);
+ }
} catch (Exception e) {
LOG.warn("Error setting permissions of " + firstNonExistentParent, e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index ec5d693..89893eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -245,7 +245,9 @@
if (mkdir) {
try {
- if (!FileUtils.mkdir(fs, dir, true, conf)) {
+ boolean inheritPerms = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ if (!FileUtils.mkdir(fs, dir, inheritPerms, conf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString() + "'");
}
@@ -380,10 +382,9 @@
Path location = materializedTable.getDataLocation();
try {
FileSystem fs = location.getFileSystem(conf);
- if (fs.exists(location)) {
- fs.delete(location, true);
- LOG.info("Removed " + location + " for materialized " + materializedTable.getTableName());
- }
+ boolean status = fs.delete(location, true);
+ LOG.info("Removed " + location + " for materialized "
+ + materializedTable.getTableName() + ", status=" + status);
} catch (IOException e) {
// ignore
LOG.warn("Error removing " + location + " for materialized " + materializedTable.getTableName() +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index cadda8f..fd25978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1388,10 +1388,11 @@
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
if (success) {
- if (fs.exists(tmpPath)) {
+ FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+ tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
+ if(statuses != null && statuses.length > 0) {
// remove any tmp file or double-committed output files
- List<Path> emptyBuckets =
- Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf);
+ List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf);
// create empty buckets if necessary
if (emptyBuckets.size() > 0) {
createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
@@ -1462,21 +1463,31 @@
removeTempOrDuplicateFiles(fs, path, null,null,null);
}
+ public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, Path path,
+ DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
+ if (path == null) {
+ return null;
+ }
+ FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path,
+ ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
+ return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf);
+ }
+
/**
* Remove all temporary files and duplicate (double-committed) files from a given directory.
*
* @return a list of path names corresponding to should-be-created empty buckets.
*/
- public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, Path path,
+ public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
- if (path == null) {
+ if (fileStats == null) {
return null;
}
List<Path> result = new ArrayList<Path>();
HashMap<String, FileStatus> taskIDToFile = null;
if (dpCtx != null) {
- FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
+ FileStatus parts[] = fileStats;
for (int i = 0; i < parts.length; ++i) {
assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
@@ -1512,7 +1523,10 @@
}
}
} else {
- FileStatus[] items = fs.listStatus(path);
+ FileStatus[] items = fileStats;
+ if (items.length == 0) {
+ return result;
+ }
taskIDToFile = removeTempOrDuplicateFiles(items, fs);
if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && conf.getTable() != null
&& (conf.getTable().getNumBuckets() > taskIDToFile.size()) && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) {
@@ -2253,12 +2267,13 @@
public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception {
FileSystem inpFs = dirPath.getFileSystem(job);
-
- if (inpFs.exists(dirPath)) {
+ try {
FileStatus[] fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (fStats.length > 0) {
return false;
}
+ } catch(FileNotFoundException fnf) {
+ return true;
}
return true;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9d927bd..66a2c94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3191,20 +3191,20 @@
if (oldPath != null) {
boolean oldPathDeleted = false;
boolean isOldPathUnderDestf = false;
+ FileStatus[] statuses = null;
try {
FileSystem fs2 = oldPath.getFileSystem(conf);
- if (fs2.exists(oldPath)) {
- // Do not delete oldPath if:
- // - destf is subdir of oldPath
- //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2)))
- isOldPathUnderDestf = FileUtils.isSubDir(oldPath, destf, fs2);
- if (isOldPathUnderDestf) {
- // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
- // existing content might result in incorrect (extra) data.
- // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
- // not the destf or its subdir?
- oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf);
- }
+ statuses = fs2.listStatus(oldPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ // Do not delete oldPath if:
+ // - destf is subdir of oldPath
+ //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2)))
+ isOldPathUnderDestf = FileUtils.isSubDir(oldPath, destf, fs2);
+ if (isOldPathUnderDestf) {
+ // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
+ // existing content might result in incorrect (extra) data.
+ // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
+ // not the destf or its subdir?
+ oldPathDeleted = trashFiles(fs2, statuses, conf);
}
} catch (IOException e) {
if (isOldPathUnderDestf) {
@@ -3216,14 +3216,18 @@
LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
}
}
- if (isOldPathUnderDestf && !oldPathDeleted) {
- throw new HiveException("Destination directory " + destf + " has not be cleaned up.");
+ if (statuses != null && statuses.length > 0) {
+ if (isOldPathUnderDestf && !oldPathDeleted) {
+ throw new HiveException("Destination directory " + destf + " has not be cleaned up.");
+ }
}
}
// first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
// destf with inherited permissions
- boolean destfExist = FileUtils.mkdir(destFs, destf, true, conf);
+ boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars
+ .HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ boolean destfExist = FileUtils.mkdir(destFs, destf, inheritPerms, conf);
if(!destfExist) {
throw new IOException("Directory " + destf.toString()
+ " does not exist and could not be created.");
@@ -3255,16 +3259,18 @@
/**
* Trashes or deletes all files under a directory. Leaves the directory as is.
* @param fs FileSystem to use
- * @param f path of directory
+ * @param statuses fileStatuses of files to be deleted
* @param conf hive configuration
- * @param forceDelete whether to force delete files if trashing does not succeed
* @return true if deletion successful
* @throws IOException
*/
- private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf)
+ private boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf)
throws IOException {
- FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);
boolean result = true;
+
+ if (statuses == null || statuses.length == 0) {
+ return false;
+ }
final List<Future<Boolean>> futures = new LinkedList<>();
final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),