[HUDI-1330] handle prefix filtering at directory level (#2157)
The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories
e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading.
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 6b58003..47419e0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -31,16 +31,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -63,7 +62,7 @@
protected final TypedProperties props;
public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
- DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
this.props = props;
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
}
@@ -101,18 +100,8 @@
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
- List<FileStatus> eligibleFiles = new ArrayList<>();
- RemoteIterator<LocatedFileStatus> fitr =
- fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
- while (fitr.hasNext()) {
- LocatedFileStatus fileStatus = fitr.next();
- if (fileStatus.isDirectory()
- || fileStatus.getLen() == 0
- || IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
- continue;
- }
- eligibleFiles.add(fileStatus);
- }
+ long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+ List<FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime);
// sort them by modification time.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
@@ -120,11 +109,6 @@
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
- if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) {
- // skip processed files
- continue;
- }
-
if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
@@ -136,7 +120,7 @@
}
// no data to read
- if (filteredFiles.size() == 0) {
+ if (filteredFiles.isEmpty()) {
return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}
@@ -148,4 +132,25 @@
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
+
+ /**
+ * List files recursively, filter out illegible files/directories while doing so.
+ */
+ private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
+ // skip files/dirs whose names start with (_, ., etc)
+ FileStatus[] statuses = fs.listStatus(path, file ->
+ IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));
+ List<FileStatus> res = new ArrayList<>();
+ for (FileStatus status: statuses) {
+ if (status.isDirectory()) {
+ // avoid infinite loop
+ if (!status.isSymlink()) {
+ res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
+ }
+ } else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) {
+ res.add(status);
+ }
+ }
+ return res;
+ }
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index f63f3e9..e02d00c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -175,5 +175,18 @@
InputBatch<JavaRDD<GenericRecord>> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());
+
+ // 6. Should skip files/directories whose names start with prefixes ("_", ".")
+ generateOneFile(".checkpoint/3", "002", 100);
+ generateOneFile("_checkpoint/3", "002", 100);
+ generateOneFile(".3", "002", 100);
+ generateOneFile("_3", "002", 100);
+ // also work with nested directory
+ generateOneFile("foo/.bar/3", "002", 1); // not ok
+ generateOneFile("foo/bar/3", "002", 1); // ok
+ // fetch everything from the beginning
+ InputBatch<JavaRDD<GenericRecord>> fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat(
+ Option.empty(), Long.MAX_VALUE);
+ assertEquals(10101, fetch6.getBatch().get().count());
}
}