[HUDI-1137] Add option to configure different path selector
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index b67e21f..bfc8368 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -32,12 +32,17 @@
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline.
*/
public class DFSTestSuitePathSelector extends DFSPathSelector {
+ private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) {
super(props, hadoopConf);
@@ -54,9 +59,12 @@
lastBatchId = Integer.parseInt(lastCheckpointStr.get());
nextBatchId = lastBatchId + 1;
} else {
- lastBatchId = -1;
- nextBatchId = 0;
+ lastBatchId = 0;
+ nextBatchId = 1;
}
+
+ log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit
+ + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
// obtain all eligible files for the batch
List<FileStatus> eligibleFiles = new ArrayList<>();
FileStatus[] fileStatuses = fs.globStatus(
@@ -73,6 +81,8 @@
}
}
}
+
+ log.info("Reading " + eligibleFiles.size() + " files. ");
// no data to readAvro
if (eligibleFiles.size() == 0) {
return new ImmutablePair<>(Option.empty(),
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 0531196..14e16ab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -352,12 +352,17 @@
}
}
- public static DFSPathSelector createSourceSelector(String sourceSelectorClass, TypedProperties props,
+ public static DFSPathSelector createSourceSelector(TypedProperties props,
Configuration conf) throws IOException {
+ String sourceSelectorClass =
+ props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
try {
- return (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
+ DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
new Class<?>[]{TypedProperties.class, Configuration.class},
props, conf);
+
+ LOG.info("Using path selector " + selector.getClass().getName());
+ return selector;
} catch (Throwable e) {
throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index b5ce96f..b8f29e8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -47,8 +47,7 @@
SchemaProvider schemaProvider) throws IOException {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = UtilHelpers
- .createSourceSelector(DFSPathSelector.class.getName(), props, sparkContext
- .hadoopConfiguration());
+ .createSourceSelector(props, sparkContext.hadoopConfiguration());
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 59263e4..5d56f2a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -52,6 +52,7 @@
public static class Config {
public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
+ public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector";
}
protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");