[MINOR] Make sure factory method is used to instanciate DFSPathSelector (#2187)

* Move createSourceSelector into DFSPathSelector factory method
* Replace constructor call with factory method
* Added some javadoc
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 29fc195..e767909 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.utilities;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -46,7 +45,6 @@
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.Source;
-import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
@@ -377,22 +375,6 @@
     }
   }
 
-  public static DFSPathSelector createSourceSelector(TypedProperties props,
-      Configuration conf) throws IOException {
-    String sourceSelectorClass =
-        props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
-    try {
-      DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
-          new Class<?>[]{TypedProperties.class, Configuration.class},
-          props, conf);
-
-      LOG.info("Using path selector " + selector.getClass().getName());
-      return selector;
-    } catch (Throwable e) {
-      throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
-    }
-  }
-
   public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) {
     SchemaProvider originalProvider = schemaProvider;
     if (schemaProvider instanceof SchemaProviderWithPostProcessor) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index b8f29e8..1152cd6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -21,7 +21,6 @@
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 
@@ -46,7 +45,7 @@
   public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) throws IOException {
     super(props, sparkContext, sparkSession, schemaProvider);
-    this.pathSelector = UtilHelpers
+    this.pathSelector = DFSPathSelector
         .createSourceSelector(props, sparkContext.hadoopConfiguration());
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
index 3d158ba..dc40b47 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
@@ -79,7 +79,7 @@
       SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
-    this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
+    this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
     if (schemaProvider != null) {
       sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
           .dataType();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
index e66bfdf..d34289d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
@@ -38,7 +38,7 @@
   public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
-    this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
+    this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index dea410a..55d2de2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -39,7 +39,7 @@
   public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
-    this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration());
+    this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration());
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index c8690f5..6b58003 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -22,8 +22,10 @@
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +68,33 @@
     this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
   }
 
+  /**
+   * Factory method for creating custom DFSPathSelector. Default selector
+   * to use is {@link DFSPathSelector}
+   */
+  public static DFSPathSelector createSourceSelector(TypedProperties props,
+                                                     Configuration conf) {
+    String sourceSelectorClass = props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
+        DFSPathSelector.class.getName());
+    try {
+      DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
+          new Class<?>[]{TypedProperties.class, Configuration.class},
+          props, conf);
+
+      log.info("Using path selector " + selector.getClass().getName());
+      return selector;
+    } catch (Exception e) {
+      throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
+    }
+  }
+
+  /**
+   * Get the list of files changed since last checkpoint.
+   *
+   * @param lastCheckpointStr the last checkpoint time string, empty if first run
+   * @param sourceLimit       max bytes to read each time
+   * @return the list of files concatenated and their latest modified time
+   */
   public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
       long sourceLimit) {