DRILL-8350: Convert PCAP Format Plugin to EVF2 (#2698)

diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index fd7fef9..a09c3e3 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -18,8 +18,9 @@
 package org.apache.drill.exec.store.pcap;
 
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -30,7 +31,6 @@
 import org.apache.drill.exec.store.pcap.schema.Schema;
 import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,13 +42,10 @@
 
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
 
-public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
-
+public class PcapBatchReader implements ManagedReader {
   protected static final int BUFFER_SIZE = 500_000;
-
   private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
-
-  private FileSplit split;
+  private final FileDescrip file;
   private PacketDecoder decoder;
   private InputStream fsStream;
   private RowSetLoader rowWriter;
@@ -97,22 +94,17 @@
   private ScalarWriter remoteDataVolumeWriter;
   private ScalarWriter hostDataWriter;
   private ScalarWriter remoteDataWriter;
-  private final int maxRecords;
   private Map<Long, TcpSession> sessionQueue;
 
 
-  public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
+  public PcapBatchReader(PcapFormatConfig readerConfig, FileSchemaNegotiator negotiator) {
     this.readerConfig = readerConfig;
     if (readerConfig.getSessionizeTCPStreams()) {
       sessionQueue = new HashMap<>();
     }
-    this.maxRecords = maxRecords;
-  }
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    openFile(negotiator);
+    file = negotiator.file();
+    openFile();
     SchemaBuilder builder = new SchemaBuilder();
     Schema pcapSchema = new Schema(readerConfig.getSessionizeTCPStreams());
     TupleMetadata schema = pcapSchema.buildSchema(builder);
@@ -122,8 +114,6 @@
     // Creates writers for all fields (Since schema is known)
     rowWriter = loader.writer();
     populateColumnWriters(rowWriter);
-
-    return true;
   }
 
   @Override
@@ -159,16 +149,16 @@
     decoder = null;
   }
 
-  private void openFile(FileSchemaNegotiator negotiator) {
+  private void openFile() {
     try {
-      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
       decoder = new PacketDecoder(fsStream);
       buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
       validBytes = fsStream.read(buffer);
     } catch (IOException io) {
       throw UserException
         .dataReadError(io)
-        .addContext("File name:", split.getPath().toString())
+        .addContext("File name:", file.split().getPath().toString())
         .build(logger);
     }
   }
@@ -234,12 +224,6 @@
   }
 
   private boolean parseNextPacket(RowSetLoader rowWriter) {
-
-    // Push down limit
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     // Decode the packet
     Packet packet = new Packet();
 
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
index 87b7c95..fe24907 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -19,19 +19,17 @@
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.drill.exec.store.pcap.PcapBatchReader;
 import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
 import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
@@ -64,7 +62,7 @@
         .compressible(true)
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .supportsProjectPushdown(true)
         .defaultName(PcapFormatConfig.NAME)
@@ -87,8 +85,9 @@
      * @return PCAP or PCAPNG batch reader
      */
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      if (fileFramework().isPresent()) { // todo: can be simplified with java9 ifPresentOrElse
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      if (negotiator.file().fileSystem() != null) { // todo: can be simplified with java9
+        // ifPresentOrElse
         Path path = scan.getWorkUnits().stream()
                 .findFirst()
                 .orElseThrow(() -> UserException.
@@ -96,7 +95,7 @@
                         .addContext("There are no files for scanning")
                         .build(logger))
                 .getPath();
-        fileFormat = getFileFormat(fileFramework().get().fileSystem(), path);
+        fileFormat = getFileFormat(negotiator.file().fileSystem(), path);
         if (config.getExtensions().stream()
                 .noneMatch(f -> f.equals(fileFormat.name().toLowerCase()))) {
           logger.error("File format {} is not within plugin extensions: {}. Trying to use default PCAP format plugin to " +
@@ -106,32 +105,23 @@
         logger.error("It is not possible to detect file format, because the File Framework is not initialized. " +
                 "Trying to use default PCAP format plugin to read the file");
       }
-      return createReader(scan, config);
+      return createReader(scan, config, negotiator);
     }
   }
 
-  @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
-    return createReader(scan, formatConfig);
-  }
-
-  private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) {
+  private static ManagedReader createReader(EasySubScan scan, PcapFormatConfig config,
+    FileSchemaNegotiator negotiator) {
     switch(fileFormat) {
-      case PCAPNG: return new PcapngBatchReader(config, scan);
+      case PCAPNG: return new PcapngBatchReader(config, scan, negotiator);
       case PCAP:
       case UNKNOWN:
-      default: return new PcapBatchReader(config, scan.getMaxRecords());
+      default: return new PcapBatchReader(config, negotiator);
     }
   }
-
   @Override
-  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
-
-    initScanBuilder(builder, scan);
-    builder.nullType(Types.optional(MinorType.VARCHAR));
-    return builder;
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    builder.readerFactory(new PcapReaderFactory(formatConfig, scan));
   }
 
   /**
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
index e8367c5..2e8d0aa 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
@@ -32,8 +32,9 @@
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -51,13 +52,12 @@
 import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
 import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
 
-public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
-
+public class PcapngBatchReader implements ManagedReader {
   private static final Logger logger = LoggerFactory.getLogger(PcapngBatchReader.class);
-
   private final PcapFormatConfig config;
   private final EasySubScan scan;
-  private final int maxRecords;
+  private final FileDescrip file;
+
   private CustomErrorContext errorContext;
   private List<SchemaPath> columns;
   private List<ColumnDefn> projectedColumns;
@@ -67,20 +67,17 @@
   private InputStream in;
   private Path path;
 
-  public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan) {
+  public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan,
+    FileSchemaNegotiator negotiator) {
     this.config = config;
     this.scan = scan;
-    this.maxRecords = scan.getMaxRecords();
     this.columns = scan.getColumns();
-  }
-
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
+    this.file = negotiator.file();
     try {
       // init InputStream for pcap file
       errorContext = negotiator.parentErrorContext();
-      DrillFileSystem dfs = negotiator.fileSystem();
-      path = dfs.makeQualified(negotiator.split().getPath());
+      DrillFileSystem dfs = file.fileSystem();
+      path = dfs.makeQualified(file.split().getPath());
       in = dfs.openPossiblyCompressedStream(path);
       // decode the pcap file
       PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
@@ -106,7 +103,6 @@
     loader = resultSetLoader.writer();
     // bind the writer for columns
     bindColumns(loader);
-    return true;
   }
 
   /**
@@ -135,9 +131,6 @@
         continue;
       }
       processBlock();
-      if (loader.limitReached(maxRecords)) {
-        return false;
-      }
     }
     return true;
   }
@@ -273,4 +266,4 @@
       writer.setString(value);
     }
   }
-}
\ No newline at end of file
+}