DRILL-8325: Convert PDF Format Plugin to EVF V2 (#2664)

diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
index 48a6bdd..94b4caf 100644
--- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
+++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
@@ -24,15 +24,15 @@
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.pdfbox.pdmodel.PDDocument;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,17 +52,16 @@
 import java.util.GregorianCalendar;
 import java.util.List;
 
-public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+public class PdfBatchReader implements ManagedReader {
 
   private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
   private static final String NEW_FIELD_PREFIX = "field_";
-  private final int maxRecords;
 
   private final List<PdfColumnWriter> writers;
   private final PdfReaderConfig config;
   private final int startingTableIndex;
+  private final FileDescrip file;
   private PdfMetadataReader metadataReader;
-  private FileSplit split;
   private CustomErrorContext errorContext;
   private RowSetLoader rowWriter;
   private PDDocument document;
@@ -73,7 +72,7 @@
   private int currentTableIndex;
   private List<String> firstRow;
   private PdfRowIterator rowIterator;
-  private FileScanFramework.FileSchemaNegotiator negotiator;
+  private final FileSchemaNegotiator negotiator;
   private int unregisteredColumnCount;
 
   // Tables
@@ -86,21 +85,16 @@
     }
   }
 
-  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
-    this.maxRecords = maxRecords;
+  public PdfBatchReader(PdfReaderConfig readerConfig, FileSchemaNegotiator negotiator) {
     this.unregisteredColumnCount = 0;
     this.writers = new ArrayList<>();
     this.config = readerConfig;
     this.startingTableIndex = readerConfig.plugin.getConfig().defaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().defaultTableIndex();
     this.currentTableIndex = this.startingTableIndex;
     this.columnHeaders = new ArrayList<>();
-  }
 
-  @Override
-  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
     this.negotiator = negotiator;
-
-    split = negotiator.split();
+    this.file = negotiator.file();
     errorContext = negotiator.parentErrorContext();
     builder = new SchemaBuilder();
 
@@ -132,7 +126,7 @@
 
     // Support provided schema
     TupleMetadata schema = null;
-    if (negotiator.hasProvidedSchema()) {
+    if (negotiator.providedSchema() != null) {
       schema = negotiator.providedSchema();
       negotiator.tableSchema(schema, false);
     } else {
@@ -143,23 +137,19 @@
     rowWriter = loader.writer();
     metadataReader.setRowWriter(rowWriter);
     // Build the schema
-    if (negotiator.hasProvidedSchema()) {
+    if (negotiator.providedSchema() != null) {
       buildWriterListFromProvidedSchema(schema);
     } else {
       buildWriterList();
     }
     metadataReader.addImplicitColumnsToSchema();
-    return true;
   }
 
   @Override
   public boolean next() {
 
     while(!rowWriter.isFull()) {
-      if (rowWriter.limitReached(maxRecords)) {
-        // Stop reading if the limit has been reached
-        return false;
-      } else if (config.plugin.getConfig().combinePages() &&
+      if (config.plugin.getConfig().combinePages() &&
                 (!rowIterator.hasNext()) &&
                   currentTableIndex < (tables.size() - 1)) {
         // Case for end of current page but more tables exist and combinePages is set to true.
@@ -227,7 +217,7 @@
    */
   private void openFile() {
     try {
-      InputStream fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      InputStream fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(file.split().getPath());
       if (Strings.isNullOrEmpty(config.plugin.getConfig().password())) {
         document = PDDocument.load(fsStream);
       } else {
@@ -239,7 +229,7 @@
     } catch (Exception e) {
       throw UserException
         .dataReadError(e)
-        .addContext("Failed to open open input file: %s", split.getPath().toString())
+        .addContext("Failed to open open input file: %s", file.split().getPath().toString())
         .addContext(errorContext)
         .build(logger);
     }
@@ -410,7 +400,7 @@
   public static class DatePdfColumnWriter extends PdfColumnWriter {
     private String dateFormat;
 
-    DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+    DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
       super(columnIndex, columnName, rowWriter.scalar(columnName));
 
       ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
@@ -441,7 +431,7 @@
   public static class TimePdfColumnWriter extends PdfColumnWriter {
     private String dateFormat;
 
-    TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+    TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
       super(columnIndex, columnName, rowWriter.scalar(columnName));
 
       ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
@@ -476,7 +466,7 @@
       super(columnIndex, columnName, rowWriter.scalar(columnName));
     }
 
-    TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+    TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
       super(columnIndex, columnName, rowWriter.scalar(columnName));
 
       ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
index 380653d..bd28978 100644
--- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
+++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
@@ -21,15 +21,13 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -39,16 +37,14 @@
 
   private static class PdfReaderFactory extends FileReaderFactory {
     private final PdfBatchReader.PdfReaderConfig readerConfig;
-    private final int maxRecords;
 
-    public PdfReaderFactory(PdfBatchReader.PdfReaderConfig config, int maxRecords) {
+    public PdfReaderFactory(PdfBatchReader.PdfReaderConfig config) {
       readerConfig = config;
-      this.maxRecords = maxRecords;
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new PdfBatchReader(readerConfig, maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new PdfBatchReader(readerConfig, negotiator);
     }
   }
 
@@ -68,24 +64,15 @@
       .extensions(pluginConfig.extensions())
       .fsConf(fsConf)
       .defaultName(DEFAULT_NAME)
-      .scanVersion(ScanFrameworkVersion.EVF_V1)
+      .scanVersion(ScanFrameworkVersion.EVF_V2)
       .supportsLimitPushdown(true)
       .build();
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
-    return new PdfBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
-    FileScanBuilder builder = new FileScanBuilder();
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     PdfBatchReader.PdfReaderConfig readerConfig = new PdfBatchReader.PdfReaderConfig(this);
-    builder.setReaderFactory(new PdfReaderFactory(readerConfig, scan.getMaxRecords()));
-
-    initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new PdfReaderFactory(readerConfig));
   }
 }
diff --git a/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java b/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java
index b304629..4985605 100644
--- a/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java
+++ b/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java
@@ -148,7 +148,7 @@
 
   @Test
   public void testNoHeaders() throws RpcException {
-    String sql = "SELECT * " +
+    String sql = "SELECT field_0, field_1, field_2, field_3 " +
       "FROM table(cp.`pdf/argentina_diputados_voting_record.pdf` " +
       "(type => 'pdf', combinePages => false, extractHeaders => false)) WHERE field_2 = 'Rio Negro'";