Initial Commit (#2654)

diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
index 46cff86..ce77e83 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
@@ -24,14 +24,14 @@
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,26 +41,17 @@
 import java.util.List;
 import java.util.Map;
 
-public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class SpssBatchReader implements ManagedReader {
 
   private static final Logger logger = LoggerFactory.getLogger(SpssBatchReader.class);
 
   private static final String VALUE_LABEL = "_value";
-
-  private final int maxRecords;
-
-  private FileSplit split;
-
+  private final FileDescrip file;
   private InputStream fsStream;
-
   private SpssDataFileReader spssReader;
-
   private RowSetLoader rowWriter;
-
   private List<SpssVariable> variableList;
-
   private List<SpssColumnWriter> writerList;
-
   private CustomErrorContext errorContext;
 
 
@@ -73,21 +64,14 @@
     }
   }
 
-  public SpssBatchReader(int maxRecords) {
-    this.maxRecords = maxRecords;
-  }
-
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    openFile(negotiator);
+  public SpssBatchReader(FileSchemaNegotiator negotiator) {
+    file = negotiator.file();
+    openFile();
     negotiator.tableSchema(buildSchema(), true);
     errorContext = negotiator.parentErrorContext();
     ResultSetLoader loader = negotiator.build();
     rowWriter = loader.writer();
     buildReaderList();
-
-    return true;
   }
 
   @Override
@@ -108,14 +92,14 @@
     }
   }
 
-  private void openFile(FileSchemaNegotiator negotiator) {
+  private void openFile() {
     try {
-      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
       spssReader = new SpssDataFileReader(fsStream);
     } catch (IOException e) {
       throw UserException
         .dataReadError(e)
-        .message("Unable to open SPSS File %s", split.getPath())
+        .message("Unable to open SPSS File %s", file.split().getPath())
         .addContext(e.getMessage())
         .addContext(errorContext)
         .build(logger);
@@ -123,11 +107,6 @@
   }
 
   private boolean processNextRow() {
-    // Check to see if the limit has been reached
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     try {
       // Stop reading when you run out of data
       if (!spssReader.readNextCase()) {
diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index 35210e7..b0d42f1 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -21,16 +21,13 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -40,15 +37,15 @@
 
   private static class SpssReaderFactory extends FileReaderFactory {
 
-    private final int maxRecords;
+    private final EasySubScan scan;
 
-    public SpssReaderFactory(int maxRecords) {
-      this.maxRecords = maxRecords;
+    public SpssReaderFactory(EasySubScan scan) {
+      this.scan = scan;
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new SpssBatchReader(maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new SpssBatchReader(negotiator);
     }
   }
 
@@ -68,23 +65,14 @@
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .build();
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options)  {
-    return new SpssBatchReader(scan.getMaxRecords());
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new SpssReaderFactory(scan.getMaxRecords()));
-
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new SpssReaderFactory(scan));
   }
 }