DRILL-8330:  Convert ESRI Shape File Reader to EVF2 (#2670)

diff --git a/contrib/format-esri/pom.xml b/contrib/format-esri/pom.xml
index cccabbe..84c445a 100644
--- a/contrib/format-esri/pom.xml
+++ b/contrib/format-esri/pom.xml
@@ -39,7 +39,7 @@
     <dependency>
       <groupId>com.esri.geometry</groupId>
       <artifactId>esri-geometry-api</artifactId>
-      <version>2.2.3</version>
+      <version>2.2.4</version>
     </dependency>
     <dependency>
       <groupId>org.jamel.dbf</groupId>
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
index 5a7325e..49d5775 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
@@ -24,8 +24,9 @@
 import com.esri.core.geometry.ogc.OGCGeometry;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -34,7 +35,6 @@
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 import org.jamel.dbf.DbfReader;
 import org.jamel.dbf.structure.DbfField;
 import org.slf4j.Logger;
@@ -48,7 +48,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class ShpBatchReader implements ManagedReader {
 
   private static final Logger logger = LoggerFactory.getLogger(ShpBatchReader.class);
   private static final String GID_FIELD_NAME = "gid";
@@ -56,36 +56,28 @@
   private static final String SHAPE_TYPE_FIELD_NAME = "shapeType";
   private static final String GEOM_FIELD_NAME = "geom";
   private static final String SRID_PATTERN_TEXT = "AUTHORITY\\[\"\\w+\"\\s*,\\s*\"*(\\d+)\"*\\]\\]$";
-
-  private FileSplit split;
-  private ResultSetLoader loader;
-  private Path hadoopShp;
-  private Path hadoopDbf;
-  private Path hadoopPrj;
+  private final FileDescrip file;
+  private final Path hadoopShp;
+  private final Path hadoopDbf;
+  private final Path hadoopPrj;
   private InputStream fileReaderShp = null;
   private InputStream fileReaderDbf = null;
   private InputStream fileReaderPrj = null;
   private GeometryCursor geomCursor = null;
   private DbfReader dbfReader = null;
-  private ScalarWriter gidWriter;
-  private ScalarWriter sridWriter;
-  private ScalarWriter shapeTypeWriter;
-  private ScalarWriter geomWriter;
-  private RowSetLoader rowWriter;
+  private final ScalarWriter gidWriter;
+  private final ScalarWriter sridWriter;
+  private final ScalarWriter shapeTypeWriter;
+  private final ScalarWriter geomWriter;
+  private final RowSetLoader rowWriter;
   private int srid;
   private SpatialReference spatialReference;
-  private final int maxRecords;
 
-  public ShpBatchReader(int maxRecords) {
-    this.maxRecords = maxRecords;
-  }
+  public ShpBatchReader(FileSchemaNegotiator negotiator) {
+    this.file = negotiator.file();
+    hadoopShp = file.split().getPath();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    hadoopShp = split.getPath();
-
-    String filePath = split.getPath().toString();
+    String filePath = file.split().getPath().toString();
     hadoopDbf = new Path(filePath.replace(".shp", ".dbf"));
     hadoopPrj = new Path(filePath.replace(".shp", ".prj"));
 
@@ -97,15 +89,13 @@
       .addNullable(GEOM_FIELD_NAME, TypeProtos.MinorType.VARBINARY);
 
     negotiator.tableSchema(builder.buildSchema(), false);
-    loader = negotiator.build();
+    ResultSetLoader loader = negotiator.build();
 
     rowWriter = loader.writer();
     gidWriter = rowWriter.scalar(GID_FIELD_NAME);
     sridWriter = rowWriter.scalar(SRID_FIELD_NAME);
     shapeTypeWriter = rowWriter.scalar(SHAPE_TYPE_FIELD_NAME);
     geomWriter = rowWriter.scalar(GEOM_FIELD_NAME);
-
-    return true;
   }
 
   @Override
@@ -125,7 +115,7 @@
 
   private void openFile(FileSchemaNegotiator negotiator) {
     try {
-      fileReaderShp = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      fileReaderShp = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
       byte[] shpBuf = new byte[fileReaderShp.available()];
       fileReaderShp.read(shpBuf);
 
@@ -135,10 +125,10 @@
       ShapefileReader shpReader = new ShapefileReader();
       geomCursor = shpReader.getGeometryCursor(byteBuffer);
 
-      fileReaderDbf = negotiator.fileSystem().openPossiblyCompressedStream(hadoopDbf);
+      fileReaderDbf = file.fileSystem().openPossiblyCompressedStream(hadoopDbf);
       dbfReader = new DbfReader(fileReaderDbf);
 
-      fileReaderPrj = negotiator.fileSystem().openPossiblyCompressedStream(hadoopPrj);
+      fileReaderPrj = file.fileSystem().openPossiblyCompressedStream(hadoopPrj);
       byte[] prjBuf = new byte[fileReaderPrj.available()];
       fileReaderPrj.read(prjBuf);
       fileReaderPrj.close();
@@ -156,17 +146,13 @@
     } catch (IOException e) {
       throw UserException
         .dataReadError(e)
-        .message("Failed to open open input file: %s", split.getPath())
+        .message("Failed to open open input file: %s",file.split().getPath())
         .addContext("User name", negotiator.userName())
         .build(logger);
     }
   }
 
   private void processShapefileSet(RowSetLoader rowWriter, final int gid, final Geometry geom, final Object[] dbfRow) {
-    if (rowWriter.limitReached(maxRecords)) {
-      return;
-    }
-
     rowWriter.start();
 
     gidWriter.setInt(gid);
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index 5023ff6..c8ffb1d 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -20,13 +20,11 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
@@ -37,15 +35,9 @@
 
   public static class ShpReaderFactory extends FileReaderFactory {
 
-    private final int maxRecords;
-
-    public ShpReaderFactory(int maxRecords) {
-      this.maxRecords = maxRecords;
-    }
-
     @Override
-    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new ShpBatchReader(maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new ShpBatchReader(negotiator);
     }
   }
 
@@ -53,18 +45,11 @@
     super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
   }
 
-  @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
-    return new ShpBatchReader(scan.getMaxRecords());
-  }
 
   @Override
-  protected FileScanFramework.FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
-    FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
-    builder.setReaderFactory(new ShpReaderFactory(scan.getMaxRecords()));
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new ShpReaderFactory());
   }
 
   private static EasyFormatConfig easyConfig(Configuration fsConf, ShpFormatConfig pluginConfig) {
@@ -77,7 +62,7 @@
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(PLUGIN_NAME)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .build();
   }