DRILL-8312: Convert Format Plugins to EVF V2 (#2656)
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
index 0305f8b..80b8fe2 100644
--- a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
@@ -29,8 +29,10 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.MaterializedField;
@@ -39,7 +41,6 @@
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,18 +55,15 @@
import java.util.Date;
import java.util.List;
-public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+public class SasBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(SasBatchReader.class);
- private final int maxRecords;
private final List<SasColumnWriter> writerList;
- private FileSplit split;
+ private final FileDescrip file;
private InputStream fsStream;
private SasFileReader sasFileReader;
- private CustomErrorContext errorContext;
- private RowSetLoader rowWriter;
+ private final CustomErrorContext errorContext;
+ private final RowSetLoader rowWriter;
private Object[] firstRow;
-
-
private String compressionMethod;
private String fileLabel;
private String fileType;
@@ -113,26 +111,15 @@
}
}
- public static class SasReaderConfig {
- protected final SasFormatPlugin plugin;
- public SasReaderConfig(SasFormatPlugin plugin) {
- this.plugin = plugin;
- }
- }
-
- public SasBatchReader(int maxRecords) {
- this.maxRecords = maxRecords;
+ public SasBatchReader(FileSchemaNegotiator negotiator) {
writerList = new ArrayList<>();
- }
- @Override
- public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
- split = negotiator.split();
+ file = negotiator.file();
errorContext = negotiator.parentErrorContext();
- openFile(negotiator);
+ openFile();
TupleMetadata schema;
- if (negotiator.hasProvidedSchema()) {
+ if (negotiator.providedSchema() != null) {
schema = negotiator.providedSchema();
} else {
schema = buildSchema();
@@ -143,19 +130,17 @@
ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
buildWriterList(schema);
-
- return true;
}
- private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+ private void openFile() {
try {
- fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
sasFileReader = new SasFileReaderImpl(fsStream);
firstRow = sasFileReader.readNext();
} catch (IOException e) {
throw UserException
.dataReadError(e)
- .message("Unable to open SAS File %s", split.getPath())
+ .message("Unable to open SAS File %s", file.split().getPath())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
@@ -170,7 +155,7 @@
String columnType = column.getType().getSimpleName();
ColumnFormat columnFormat = column.getFormat();
try {
- MinorType type = null;
+ MinorType type;
if (DateTimeConstants.TIME_FORMAT_STRINGS.contains(columnFormat.getName())) {
type = MinorType.TIME;
} else if (DateTimeConstants.DATE_FORMAT_STRINGS.containsKey(columnFormat.getName())) {
@@ -276,9 +261,6 @@
}
private boolean processNextRow() {
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
Object[] row;
try {
// Process first row
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
index b5a135d..07b753b 100644
--- a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -21,16 +21,13 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
@@ -40,15 +37,9 @@
private static class SasReaderFactory extends FileReaderFactory {
- private final int maxRecords;
-
- public SasReaderFactory(int maxRecords) {
- this.maxRecords = maxRecords;
- }
-
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new SasBatchReader(maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new SasBatchReader(negotiator);
}
}
@@ -69,23 +60,14 @@
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
- return new SasBatchReader(scan.getMaxRecords());
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new SasReaderFactory(scan.getMaxRecords()));
-
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new SasReaderFactory());
}
}