[HUDI-1330] handle prefix filtering at directory level (#2157)

The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories
e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading.
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 6b58003..47419e0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -31,16 +31,15 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -63,7 +62,7 @@
   protected final TypedProperties props;
 
   public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
-    DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
     this.props = props;
     this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
   }
@@ -101,18 +100,8 @@
     try {
       // obtain all eligible files under root folder.
       log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
-      List<FileStatus> eligibleFiles = new ArrayList<>();
-      RemoteIterator<LocatedFileStatus> fitr =
-          fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
-      while (fitr.hasNext()) {
-        LocatedFileStatus fileStatus = fitr.next();
-        if (fileStatus.isDirectory()
-            || fileStatus.getLen() == 0
-            || IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
-          continue;
-        }
-        eligibleFiles.add(fileStatus);
-      }
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+      List<FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime);
       // sort them by modification time.
       eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
       // Filter based on checkpoint & input size, if needed
@@ -120,11 +109,6 @@
       long maxModificationTime = Long.MIN_VALUE;
       List<FileStatus> filteredFiles = new ArrayList<>();
       for (FileStatus f : eligibleFiles) {
-        if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) {
-          // skip processed files
-          continue;
-        }
-
         if (currentBytes + f.getLen() >= sourceLimit) {
           // we have enough data, we are done
           break;
@@ -136,7 +120,7 @@
       }
 
       // no data to read
-      if (filteredFiles.size() == 0) {
+      if (filteredFiles.isEmpty()) {
         return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
       }
 
@@ -148,4 +132,25 @@
       throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
     }
   }
+
+  /**
+   * List files recursively, filter out illegible files/directories while doing so.
+   */
+  private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
+    // skip files/dirs whose names start with (_, ., etc)
+    FileStatus[] statuses = fs.listStatus(path, file ->
+      IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));
+    List<FileStatus> res = new ArrayList<>();
+    for (FileStatus status: statuses) {
+      if (status.isDirectory()) {
+        // avoid infinite loop
+        if (!status.isSymlink()) {
+          res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
+        }
+      } else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) {
+        res.add(status);
+      }
+    }
+    return res;
+  }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index f63f3e9..e02d00c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -175,5 +175,18 @@
     InputBatch<JavaRDD<GenericRecord>> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(
         Option.empty(), Long.MAX_VALUE);
     assertEquals(10100, fetch5.getBatch().get().count());
+
+    // 6. Should skip files/directories whose names start with prefixes ("_", ".")
+    generateOneFile(".checkpoint/3", "002", 100);
+    generateOneFile("_checkpoint/3", "002", 100);
+    generateOneFile(".3", "002", 100);
+    generateOneFile("_3", "002", 100);
+    // also work with nested directory
+    generateOneFile("foo/.bar/3", "002", 1); // not ok
+    generateOneFile("foo/bar/3", "002", 1); // ok
+    // fetch everything from the beginning
+    InputBatch<JavaRDD<GenericRecord>> fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat(
+        Option.empty(), Long.MAX_VALUE);
+    assertEquals(10101, fetch6.getBatch().get().count());
   }
 }