DRILL-8325: Convert PDF Format Plugin to EVF V2 (#2664)
diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
index 48a6bdd..94b4caf 100644
--- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
+++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
@@ -24,15 +24,15 @@
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,17 +52,16 @@
import java.util.GregorianCalendar;
import java.util.List;
-public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+public class PdfBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
private static final String NEW_FIELD_PREFIX = "field_";
- private final int maxRecords;
private final List<PdfColumnWriter> writers;
private final PdfReaderConfig config;
private final int startingTableIndex;
+ private final FileDescrip file;
private PdfMetadataReader metadataReader;
- private FileSplit split;
private CustomErrorContext errorContext;
private RowSetLoader rowWriter;
private PDDocument document;
@@ -73,7 +72,7 @@
private int currentTableIndex;
private List<String> firstRow;
private PdfRowIterator rowIterator;
- private FileScanFramework.FileSchemaNegotiator negotiator;
+ private final FileSchemaNegotiator negotiator;
private int unregisteredColumnCount;
// Tables
@@ -86,21 +85,16 @@
}
}
- public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
- this.maxRecords = maxRecords;
+ public PdfBatchReader(PdfReaderConfig readerConfig, FileSchemaNegotiator negotiator) {
this.unregisteredColumnCount = 0;
this.writers = new ArrayList<>();
this.config = readerConfig;
this.startingTableIndex = readerConfig.plugin.getConfig().defaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().defaultTableIndex();
this.currentTableIndex = this.startingTableIndex;
this.columnHeaders = new ArrayList<>();
- }
- @Override
- public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
this.negotiator = negotiator;
-
- split = negotiator.split();
+ this.file = negotiator.file();
errorContext = negotiator.parentErrorContext();
builder = new SchemaBuilder();
@@ -132,7 +126,7 @@
// Support provided schema
TupleMetadata schema = null;
- if (negotiator.hasProvidedSchema()) {
+ if (negotiator.providedSchema() != null) {
schema = negotiator.providedSchema();
negotiator.tableSchema(schema, false);
} else {
@@ -143,23 +137,19 @@
rowWriter = loader.writer();
metadataReader.setRowWriter(rowWriter);
// Build the schema
- if (negotiator.hasProvidedSchema()) {
+ if (negotiator.providedSchema() != null) {
buildWriterListFromProvidedSchema(schema);
} else {
buildWriterList();
}
metadataReader.addImplicitColumnsToSchema();
- return true;
}
@Override
public boolean next() {
while(!rowWriter.isFull()) {
- if (rowWriter.limitReached(maxRecords)) {
- // Stop reading if the limit has been reached
- return false;
- } else if (config.plugin.getConfig().combinePages() &&
+ if (config.plugin.getConfig().combinePages() &&
(!rowIterator.hasNext()) &&
currentTableIndex < (tables.size() - 1)) {
// Case for end of current page but more tables exist and combinePages is set to true.
@@ -227,7 +217,7 @@
*/
private void openFile() {
try {
- InputStream fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ InputStream fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(file.split().getPath());
if (Strings.isNullOrEmpty(config.plugin.getConfig().password())) {
document = PDDocument.load(fsStream);
} else {
@@ -239,7 +229,7 @@
} catch (Exception e) {
throw UserException
.dataReadError(e)
- .addContext("Failed to open open input file: %s", split.getPath().toString())
+ .addContext("Failed to open open input file: %s", file.split().getPath().toString())
.addContext(errorContext)
.build(logger);
}
@@ -410,7 +400,7 @@
public static class DatePdfColumnWriter extends PdfColumnWriter {
private String dateFormat;
- DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+ DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
@@ -441,7 +431,7 @@
public static class TimePdfColumnWriter extends PdfColumnWriter {
private String dateFormat;
- TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+ TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
@@ -476,7 +466,7 @@
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
- TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+ TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
index 380653d..bd28978 100644
--- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
+++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
@@ -21,15 +21,13 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
@@ -39,16 +37,14 @@
private static class PdfReaderFactory extends FileReaderFactory {
private final PdfBatchReader.PdfReaderConfig readerConfig;
- private final int maxRecords;
- public PdfReaderFactory(PdfBatchReader.PdfReaderConfig config, int maxRecords) {
+ public PdfReaderFactory(PdfBatchReader.PdfReaderConfig config) {
readerConfig = config;
- this.maxRecords = maxRecords;
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new PdfBatchReader(readerConfig, maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new PdfBatchReader(readerConfig, negotiator);
}
}
@@ -68,24 +64,15 @@
.extensions(pluginConfig.extensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
- return new PdfBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
- FileScanBuilder builder = new FileScanBuilder();
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
PdfBatchReader.PdfReaderConfig readerConfig = new PdfBatchReader.PdfReaderConfig(this);
- builder.setReaderFactory(new PdfReaderFactory(readerConfig, scan.getMaxRecords()));
-
- initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new PdfReaderFactory(readerConfig));
}
}
diff --git a/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java b/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java
index b304629..4985605 100644
--- a/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java
+++ b/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java
@@ -148,7 +148,7 @@
@Test
public void testNoHeaders() throws RpcException {
- String sql = "SELECT * " +
+ String sql = "SELECT field_0, field_1, field_2, field_3 " +
"FROM table(cp.`pdf/argentina_diputados_voting_record.pdf` " +
"(type => 'pdf', combinePages => false, extractHeaders => false)) WHERE field_2 = 'Rio Negro'";