DRILL-8188: Convert HDF5 format to EVF2 (#2515)

* Bump JHDF to latest version

---------

Co-authored-by: cgivre <cgivre@gmail.com>
diff --git a/contrib/format-hdf5/pom.xml b/contrib/format-hdf5/pom.xml
index bb0309a..b168162 100644
--- a/contrib/format-hdf5/pom.xml
+++ b/contrib/format-hdf5/pom.xml
@@ -29,7 +29,7 @@
 
   <artifactId>drill-format-hdf5</artifactId>
   <name>Drill : Contrib : Format : HDF5</name>
-  
+
   <dependencies>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
@@ -39,7 +39,7 @@
     <dependency>
       <groupId>io.jhdf</groupId>
       <artifactId>jhdf</artifactId>
-      <version>0.6.2</version>
+      <version>0.6.10</version>
     </dependency>
 
     <!-- Test dependencies -->
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 75c6f6d..67e2755 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -18,53 +18,8 @@
 
 package org.apache.drill.exec.store.hdf5;
 
-import io.jhdf.HdfFile;
-import io.jhdf.api.Attribute;
-import io.jhdf.api.Dataset;
-import io.jhdf.api.Group;
-import io.jhdf.api.Node;
-import io.jhdf.exceptions.HdfException;
-import io.jhdf.links.SoftLink;
-import io.jhdf.object.datatype.CompoundDataType;
-import io.jhdf.object.datatype.CompoundDataType.CompoundDataMember;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.CustomErrorContext;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.MapBuilder;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.store.hdf5.writers.WriterSpec;
-import org.apache.drill.exec.vector.accessor.ArrayWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.TupleWriter;
-
-import org.apache.hadoop.mapred.FileSplit;
-import org.jvnet.libpam.impl.CLibrary.group;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -74,7 +29,50 @@
 import java.util.List;
 import java.util.Map;
 
-public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.WriterSpec;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.jhdf.HdfFile;
+import io.jhdf.api.Attribute;
+import io.jhdf.api.Dataset;
+import io.jhdf.api.Group;
+import io.jhdf.api.Node;
+import io.jhdf.exceptions.HdfException;
+import io.jhdf.links.SoftLink;
+import io.jhdf.object.datatype.CompoundDataType;
+import io.jhdf.object.datatype.CompoundDataType.CompoundDataMember;
+
+public class HDF5BatchReader implements ManagedReader {
   private static final Logger logger = LoggerFactory.getLogger(HDF5BatchReader.class);
 
   private static final String PATH_COLUMN_NAME = "path";
@@ -119,49 +117,44 @@
 
   private final List<HDF5DataWriter> dataWriters;
 
-  private final int maxRecords;
+  private final String fileName;
 
-  private String fileName;
+  private final FileDescrip file;
 
-  private FileSplit split;
+  private final HdfFile hdfFile;
 
-  private HdfFile hdfFile;
+  private final RowSetLoader rowWriter;
 
-  private BufferedReader reader;
+  private final WriterSpec writerSpec;
 
-  private RowSetLoader rowWriter;
+  private final Iterator<HDF5DrillMetadata> metadataIterator;
 
-  private WriterSpec writerSpec;
+  private final ScalarWriter pathWriter;
 
-  private Iterator<HDF5DrillMetadata> metadataIterator;
+  private final ScalarWriter dataTypeWriter;
 
-  private ScalarWriter pathWriter;
+  private final ScalarWriter fileNameWriter;
 
-  private ScalarWriter dataTypeWriter;
+  private final ScalarWriter linkWriter;
 
-  private ScalarWriter fileNameWriter;
+  private final ScalarWriter dataSizeWriter;
 
-  private ScalarWriter linkWriter;
+  private final ScalarWriter elementCountWriter;
 
-  private ScalarWriter dataSizeWriter;
+  private final ScalarWriter datasetTypeWriter;
 
-  private ScalarWriter elementCountWriter;
+  private final ScalarWriter dimensionsWriter;
 
-  private ScalarWriter datasetTypeWriter;
+  private final CustomErrorContext errorContext;
 
-  private ScalarWriter dimensionsWriter;
+  private final boolean showMetadataPreview;
 
-  private CustomErrorContext errorContext;
-
-  private boolean showMetadataPreview;
-
-  private int[] dimensions;
+  private final int[] dimensions;
 
   public static class HDF5ReaderConfig {
+
     final HDF5FormatPlugin plugin;
-
     final String defaultPath;
-
     final HDF5FormatConfig formatConfig;
 
     public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) {
@@ -171,107 +164,104 @@
     }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
-    this.readerConfig = readerConfig;
-    this.maxRecords = maxRecords;
-    dataWriters = new ArrayList<>();
-    this.showMetadataPreview = readerConfig.formatConfig.showPreview();
-  }
-
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
+  public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
     errorContext = negotiator.parentErrorContext();
+    file = negotiator.file();
+    readerConfig = config;
+    dataWriters = new ArrayList<>();
+    showMetadataPreview = readerConfig.formatConfig.showPreview();
+
     // Since the HDF file reader uses a stream to actually read the file, the file name from the
     // module is incorrect.
-    fileName = split.getPath().getName();
-    try {
-      openFile(negotiator);
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .addContext("Failed to close input file: %s", split.getPath())
-        .addContext(errorContext)
-        .build(logger);
-    }
+    fileName = file.split().getPath().getName();
 
-    ResultSetLoader loader;
-    if (readerConfig.defaultPath == null) {
-      // Get file metadata
-      List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
-      metadataIterator = metadata.iterator();
-
-      // Schema for Metadata query
-      SchemaBuilder builder = new SchemaBuilder()
-        .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
-        .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
-        .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
-        .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
-        .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
-      negotiator.tableSchema(builder.buildSchema(), false);
-
-      loader = negotiator.build();
-      dimensions = new int[0];
-      rowWriter = loader.writer();
-
-    } else {
-      // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
-      // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
-      Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
-      dimensions = dataSet.getDimensions();
-
-      loader = negotiator.build();
-      rowWriter = loader.writer();
-      writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
-          negotiator.parentErrorContext());
-      if (dimensions.length <= 1) {
-        buildSchemaFor1DimensionalDataset(dataSet);
-      } else if (dimensions.length == 2) {
-        buildSchemaFor2DimensionalDataset(dataSet);
-      } else {
-        // Case for datasets of greater than 2D
-        // These are automatically flattened
-        buildSchemaFor2DimensionalDataset(dataSet);
+    { // Opens an HDF5 file
+      try (InputStream in = file.fileSystem().openPossiblyCompressedStream(file.split().getPath())) {
+        /*
+         * As a possible future improvement, the jhdf reader has the ability to read hdf5 files from
+         * a byte array or byte buffer. This implementation is better in that it does not require creating
+         * a temporary file which must be deleted later.  However, it could result in memory issues in the
+         * event of large files.
+         */
+        hdfFile = HdfFile.fromInputStream(in);
+      } catch (IOException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to open input file: %s", file.split().getPath())
+          .addContext(errorContext)
+          .build(logger);
       }
     }
-    if (readerConfig.defaultPath == null) {
-      pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
-      dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
-      fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
-      dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
-      linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
-      elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
-      datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
-      dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
-    }
-    return true;
-  }
 
-  /**
-   * This function is called when the default path is set and the data set is a single dimension.
-   * This function will create an array of one dataWriter of the
-   * correct datatype
-   * @param dataset The HDF5 dataset
-   */
-  private void buildSchemaFor1DimensionalDataset(Dataset dataset) {
-    MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
+    { // Build the schema and initial the writer
+      ResultSetLoader loader;
+      if (readerConfig.defaultPath == null) {
+        // Get file metadata
+        List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
+        metadataIterator = metadata.iterator();
 
-    // Case for null or unknown data types:
-    if (currentDataType == null) {
-      logger.warn("Couldn't add {}", dataset.getJavaType().getName());
-      return;
+        // Schema for Metadata query
+        SchemaBuilder builder = new SchemaBuilder()
+          .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+          .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+          .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+          .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+          .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+          .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+          .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+          .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
+
+        negotiator.tableSchema(builder.buildSchema(), false);
+
+        loader = negotiator.build();
+        rowWriter = loader.writer();
+
+        dimensions = null;
+        writerSpec = null;
+        pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+        dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+        fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+        dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
+        linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
+        elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
+        datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
+        dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
+      } else {
+        // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
+        // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
+        Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
+        dimensions = dataSet.getDimensions();
+
+        loader = negotiator.build();
+        rowWriter = loader.writer();
+        writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(), negotiator.parentErrorContext());
+        if (dimensions.length <= 1) {
+          buildSchemaFor1DimensionalDataset(dataSet);
+        } else if (dimensions.length == 2) {
+          buildSchemaFor2DimensionalDataset(dataSet);
+        } else {
+          // Case for datasets of greater than 2D
+          // These are automatically flattened
+          buildSchemaFor2DimensionalDataset(dataSet);
+        }
+        // Initial the final fields
+        metadataIterator = null;
+        pathWriter = null;
+        dataTypeWriter = null;
+        fileNameWriter = null;
+        dataSizeWriter = null;
+        linkWriter = null;
+        elementCountWriter = null;
+        datasetTypeWriter = null;
+        dimensionsWriter = null;
+      }
     }
-    dataWriters.add(buildWriter(currentDataType));
   }
 
   private HDF5DataWriter buildWriter(MinorType dataType) {
     switch (dataType) {
-      /*case GENERIC_OBJECT:
-        return new HDF5EnumDataWriter(hdfFile, writerSpec, readerConfig.defaultPath);*/
+      /* case GENERIC_OBJECT:
+        return new HDF5EnumDataWriter(hdfFile, writerSpec, readerConfig.defaultPath); */
       case VARCHAR:
         return new HDF5StringDataWriter(hdfFile, writerSpec, readerConfig.defaultPath);
       case TIMESTAMP:
@@ -292,13 +282,29 @@
   }
 
   /**
+   * This function is called when the default path is set and the data set is a single dimension.
+   * This function will create an array of one dataWriter of the correct datatype.
+   *
+   * @param dataset The HDF5 dataset
+   */
+  private void buildSchemaFor1DimensionalDataset(Dataset dataset) {
+    MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
+
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", dataset.getJavaType().getName());
+      return;
+    }
+    dataWriters.add(buildWriter(currentDataType));
+  }
+
+  /**
    * Builds a Drill schema from a dataset with 2 or more dimensions. HDF5 only
    * supports INT, LONG, DOUBLE and FLOAT for >2 data types so this function is
    * not as inclusive as the 1D function. This function will build the schema
    * by adding DataWriters to the dataWriters array.
    *
-   * @param dataset
-   *          The dataset which Drill will use to build a schema
+   * @param dataset The dataset which Drill will use to build a schema
    */
   private void buildSchemaFor2DimensionalDataset(Dataset dataset) {
     MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
@@ -333,43 +339,9 @@
       }
     }
   }
-  /**
-   * Opens an HDF5 file.
-   * @param negotiator The negotiator represents Drill's interface with the file system
-   */
-  private void openFile(FileSchemaNegotiator negotiator) throws IOException {
-    InputStream in = null;
-    try {
-      /*
-       * As a possible future improvement, the jhdf reader has the ability to read hdf5 files from
-       * a byte array or byte buffer. This implementation is better in that it does not require creating
-       * a temporary file which must be deleted later.  However, it could result in memory issues in the
-       * event of large files.
-       */
-
-      in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
-      hdfFile = HdfFile.fromInputStream(in);
-    } catch (Exception e) {
-      if (in != null) {
-        in.close();
-      }
-      throw UserException
-        .dataReadError(e)
-        .message("Failed to open input file: %s", split.getPath())
-        .addContext(errorContext)
-        .build(logger);
-    }
-    reader = new BufferedReader(new InputStreamReader(in));
-  }
 
   @Override
   public boolean next() {
-
-    // Limit pushdown
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     while (!rowWriter.isFull()) {
       if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) {
         if (!metadataIterator.hasNext()){
@@ -395,7 +367,6 @@
         int currentRowCount = 0;
         HDF5DataWriter currentDataWriter;
         rowWriter.start();
-
         for (int i = 0; i < dimensions[1]; i++) {
           currentDataWriter = dataWriters.get(i);
           currentDataWriter.write();
@@ -410,8 +381,23 @@
     return true;
   }
 
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(hdfFile);
+    /*
+     * The current implementation of the HDF5 reader creates a temp file,
+     * these files are deleted only before application exits.
+     * The temporary file needs to be removed when the batch reader is closed.
+     */
+    boolean result = hdfFile.getFile().delete();
+    if (!result) {
+      logger.warn("Failed to delete HDF5 temp file {}", hdfFile.getFile().getName());
+    }
+  }
+
   /**
    * Writes one row of HDF5 metadata.
+   *
    * @param rowWriter The input rowWriter object
    */
   private void projectMetadataRow(RowSetLoader rowWriter) {
@@ -423,7 +409,7 @@
     fileNameWriter.setString(fileName);
     linkWriter.setBoolean(metadataRow.isLink());
 
-    //Write attributes if present
+    // Write attributes if present
     if (metadataRow.getAttributes().size() > 0) {
       writeAttributes(rowWriter, metadataRow);
     }
@@ -448,9 +434,9 @@
    * Gets the file metadata from a given HDF5 file. It will extract the file
    * name the path, and adds any information to the metadata List.
    *
-   * @param group A list of paths from which the metadata will be extracted
-   * @param metadata The HDF5 metadata object from which the metadata will be extracted
-   * @return A list of metadata from the given file paths
+   * @param group     A list of paths from which the metadata will be extracted
+   * @param metadata  The HDF5 metadata object from which the metadata will be extracted
+   * @return          A list of metadata from the given file paths
    */
   private List<HDF5DrillMetadata> getFileMetadata(Group group, List<HDF5DrillMetadata> metadata) {
     Map<String, HDF5Attribute> attribs;
@@ -495,10 +481,10 @@
   }
 
   /**
-   * Gets the attributes of a HDF5 dataset and returns them into a HashMap
+   * Gets the attributes of a HDF5 dataset and returns them into a HashMap.
    *
    * @param path The path for which you wish to retrieve attributes
-   * @return Map The attributes for the given path.  Empty Map if no attributes present
+   * @return Map The attributes for the given path. Empty Map if no attributes present
    */
   private Map<String, HDF5Attribute> getAttributes(String path) {
     Map<String, Attribute> attributeList;
@@ -543,13 +529,11 @@
    * Writes one row of data in a metadata query. The number of dimensions here
    * is n+1. So if the actual dataset is a 1D column, it will be written as a list.
    * This is function is only called in metadata queries as the schema is not
-   * known in advance. If the datasize is greater than 16MB, the function does
-   * not project the dataset
+   * known in advance. If the data size is greater than 16MB, the function does
+   * not project the dataset.
    *
-   * @param rowWriter
-   *          The rowWriter to which the data will be written
-   * @param datapath
-   *          The datapath from which the data will be read
+   * @param rowWriter The rowWriter to which the data will be written
+   * @param datapath  The data path from which the data will be read
    */
 
   private void projectDataset(RowSetLoader rowWriter, String datapath) {
@@ -562,7 +546,7 @@
     }
 
     int[] dimensions = dataset.getDimensions();
-    //Case for single dimensional data
+    // Case for single dimensional data
     if (dimensions.length == 1) {
       MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
 
@@ -632,7 +616,7 @@
           logger.warn("{} not implemented.", currentDataType.name());
       }
     } else if (dimensions.length == 2) {
-      // Case for 2D data sets.  These are projected as lists of lists or maps of maps
+      // Case for 2D data sets. These are projected as lists of lists or maps of maps
       int cols = dimensions[1];
       int rows = dimensions[0];
 
@@ -685,22 +669,23 @@
   }
 
   /**
-   * Helper function to write a 1D boolean column
+   * Helper function to write a 1D boolean column.
    *
    * @param rowWriter The row to which the data will be written
-   * @param name The column name
-   * @param value The value to be written
+   * @param name      The column name
+   * @param value     The value to be written
    */
+  @SuppressWarnings("unused")
   private void writeBooleanColumn(TupleWriter rowWriter, String name, int value) {
     writeBooleanColumn(rowWriter, name, value != 0);
   }
 
   /**
-   * Helper function to write a 1D boolean column
+   * Helper function to write a 1D boolean column.
    *
    * @param rowWriter The row to which the data will be written
-   * @param name The column name
-   * @param value The value to be written
+   * @param name      The column name
+   * @param value     The value to be written
    */
   private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean value) {
     ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.BIT);
@@ -708,11 +693,11 @@
   }
 
   /**
-   * Helper function to write a 1D short column
+   * Helper function to write a 1D short column.
    *
    * @param rowWriter The row to which the data will be written
-   * @param name The column name
-   * @param value The value to be written
+   * @param name      The column name
+   * @param value     The value to be written
    */
   private void writeSmallIntColumn(TupleWriter rowWriter, String name, short value) {
     ScalarWriter colWriter = getColWriter(rowWriter, name, MinorType.SMALLINT);
@@ -720,10 +705,11 @@
   }
 
   /**
-   * Helper function to write a 2D short list
-   * @param rowWriter the row to which the data will be written
-   * @param name the name of the outer list
-   * @param list the list of data
+   * Helper function to write a 2D short list.
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name      The name of the outer list
+   * @param list      The list of data
    */
   private void writeSmallIntColumn(TupleWriter rowWriter, String name, short[] list) {
     int index = rowWriter.tupleSchema().index(name);
@@ -740,11 +726,11 @@
   }
 
   /**
-   * Helper function to write a 1D byte column
+   * Helper function to write a 1D byte column.
    *
    * @param rowWriter The row to which the data will be written
-   * @param name The column name
-   * @param value The value to be written
+   * @param name      The column name
+   * @param value     The value to be written
    */
   private void writeByteColumn(TupleWriter rowWriter, String name, byte value) {
     ScalarWriter colWriter = getColWriter(rowWriter, name, MinorType.TINYINT);
@@ -752,10 +738,11 @@
   }
 
   /**
-   * Helper function to write a 2D byte list
-   * @param rowWriter the row to which the data will be written
-   * @param name the name of the outer list
-   * @param list the list of data
+   * Helper function to write a 2D byte list.
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name      The name of the outer list
+   * @param list      The list of data
    */
   private void writeByteListColumn(TupleWriter rowWriter, String name, byte[] list) {
     int index = rowWriter.tupleSchema().index(name);
@@ -772,11 +759,11 @@
   }
 
   /**
-   * Helper function to write a 1D int column
+   * Helper function to write a 1D integer column.
    *
    * @param rowWriter The row to which the data will be written
-   * @param name The column name
-   * @param value The value to be written
+   * @param name      The column name
+   * @param value     The value to be written
    */
   private void writeIntColumn(TupleWriter rowWriter, String name, int value) {
     ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.INT);
@@ -784,10 +771,11 @@
   }
 
   /**
-   * Helper function to write a 2D int list
-   * @param rowWriter the row to which the data will be written
-   * @param name the name of the outer list
-   * @param list the list of data
+   * Helper function to write a 2D integer list.
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name      The name of the outer list
+   * @param list      The list of data
    */
   private void writeIntListColumn(TupleWriter rowWriter, String name, int[] list) {
     int index = rowWriter.tupleSchema().index(name);
@@ -945,7 +933,7 @@
   }
 
   private void floatMatrixHelper(float[][] colData, int cols, int rows, RowSetLoader rowWriter) {
-    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+    // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
 
     TupleMetadata nestedSchema = new SchemaBuilder()
       .addRepeatedList(FLOAT_COLUMN_NAME)
@@ -992,7 +980,7 @@
   }
 
   private void doubleMatrixHelper(double[][] colData, int cols, int rows, RowSetLoader rowWriter) {
-    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+    // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
 
     TupleMetadata nestedSchema = new SchemaBuilder()
       .addRepeatedList(DOUBLE_COLUMN_NAME)
@@ -1040,7 +1028,7 @@
   }
 
   private void bigIntMatrixHelper(long[][] colData, int cols, int rows, RowSetLoader rowWriter) {
-    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+    // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
 
     TupleMetadata nestedSchema = new SchemaBuilder()
       .addRepeatedList(LONG_COLUMN_NAME)
@@ -1099,13 +1087,11 @@
   }
 
   /**
-   * Gets the attributes for an HDF5 datapath. These attributes are projected as
+   * Gets the attributes for an HDF5 data path. These attributes are projected as
    * a map in select * queries when the defaultPath is null.
    *
-   * @param rowWriter
-   *          the row to which the data will be written
-   * @param record
-   *          the record for the attributes
+   * @param rowWriter The row to which the data will be written
+   * @param record    The record for the attributes
    */
   private void writeAttributes(TupleWriter rowWriter, HDF5DrillMetadata record) {
     Map<String, HDF5Attribute> attribs = getAttributes(record.getPath());
@@ -1152,7 +1138,7 @@
           writeTimestampColumn(mapWriter, key, (Long) attrib.getValue());
           break;
         case GENERIC_OBJECT:
-          //This is the case for HDF5 enums
+          // This is the case for HDF5 enums
           String enumText = attrib.getValue().toString();
           writeStringColumn(mapWriter, key, enumText);
           break;
@@ -1166,9 +1152,9 @@
    * Processes the MAP data type which can be found in HDF5 files.
    * It automatically flattens anything greater than 2 dimensions.
    *
-   * @param path the HDF5 path tp the compound data
-   * @param reader the HDF5 reader for the data file
-   * @param rowWriter the rowWriter to write the data
+   * @param path      The HDF5 path of the compound data
+   * @param reader    The HDF5 reader for the data file
+   * @param rowWriter The rowWriter to write the data
    */
   private void getAndMapCompoundData(String path, HdfFile reader, RowSetLoader rowWriter) {
 
@@ -1227,7 +1213,8 @@
       String dataType = dataMember.getDataType().getJavaType().getName();
       String fieldName = dataMember.getName();
       int[] dataLength = reader.getDatasetByPath(path).getDimensions();
-      Object rawData = ((LinkedHashMap<String, ?>)reader.getDatasetByPath(path).getData()).get(fieldName);
+      @SuppressWarnings("rawtypes")
+      Object rawData = ((LinkedHashMap) reader.getDatasetByPath(path).getData()).get(fieldName);
 
       ArrayWriter innerWriter = listWriter.array(fieldName);
       for (int i = 0; i < dataLength[0]; i++) {
@@ -1267,21 +1254,4 @@
       }
     }
   }
-
-  @Override
-  public void close() {
-    AutoCloseables.closeSilently(hdfFile);
-    /*
-     * The current implementation of the HDF5 reader creates a temp file which needs to be removed
-     * when the batch reader is closed.  A possible future functionality might be to use the
-     */
-    boolean result = hdfFile.getFile().delete();
-    if (!result) {
-      logger.warn("Failed to delete HDF5 temp file {}", hdfFile.getFile().getName());
-    }
-    hdfFile = null;
-
-    AutoCloseables.closeSilently(reader);
-    reader = null;
-  }
 }
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index d56ffa4..0a51e60 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -21,23 +21,21 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.hdf5.HDF5BatchReader.HDF5ReaderConfig;
-
+import org.apache.hadoop.conf.Configuration;
 
 public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
 
   public static final String DEFAULT_NAME = "hdf5";
 
-
   public HDF5FormatPlugin(String name, DrillbitContext context,
                           Configuration fsConf,
                           StoragePluginConfig storageConfig,
@@ -51,38 +49,33 @@
         .writable(false)
         .blockSplittable(false)
         .compressible(true)
-        .supportsProjectPushdown(true)
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
-        .defaultName(DEFAULT_NAME)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
+        .supportsProjectPushdown(true)
+        .defaultName(DEFAULT_NAME)
         .build();
   }
 
-  @Override
-  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
-    FileScanBuilder builder = new FileScanBuilder();
-
-    builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan.getMaxRecords()));
-    initScanBuilder(builder, scan);
-    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
-  }
-
-  public static class HDF5ReaderFactory extends FileScanFramework.FileReaderFactory {
+  public static class HDF5ReaderFactory extends FileReaderFactory {
     private final HDF5ReaderConfig readerConfig;
-    private final int maxRecords;
+    private final EasySubScan scan;
 
-
-    HDF5ReaderFactory(HDF5ReaderConfig config, int maxRecords) {
+    HDF5ReaderFactory(HDF5ReaderConfig config, EasySubScan scan) {
       readerConfig = config;
-      this.maxRecords = maxRecords;
+      this.scan = scan;
     }
 
     @Override
-    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new HDF5BatchReader(readerConfig, maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+      return new HDF5BatchReader(readerConfig, scan, negotiator);
     }
   }
+
+  @Override
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    builder.readerFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan));
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
index 7b14c41..3dd864a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
@@ -158,7 +158,7 @@
     assertTrue(scan.buildSchema());
     {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-           .build();
+          .build();
       RowSetUtilities.verify(expected,
           fixture.wrap(scan.batchAccessor().container()));
     }
@@ -211,7 +211,7 @@
     assertTrue(scan.buildSchema());
     {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-           .build();
+          .build();
       RowSetUtilities.verify(expected,
           fixture.wrap(scan.batchAccessor().container()));
     }
@@ -254,13 +254,13 @@
         .add("a", MinorType.INT)
         .add("d", MinorType.BIGINT)
         .add("e", MinorType.BIGINT)
-         .buildSchema();
+        .buildSchema();
 
     // Initial schema
     assertTrue(scan.buildSchema());
     {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-           .build();
+          .build();
       RowSetUtilities.verify(expected,
           fixture.wrap(scan.batchAccessor().container()));
     }
@@ -303,13 +303,13 @@
     TupleMetadata expectedSchema = new SchemaBuilder()
         .add("d", MinorType.BIGINT)
         .add("e", MinorType.BIGINT)
-         .buildSchema();
+        .buildSchema();
 
     // Initial schema
     assertTrue(scan.buildSchema());
     {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-           .build();
+          .build();
       RowSetUtilities.verify(expected,
           fixture.wrap(scan.batchAccessor().container()));
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
index 56ffa97..09f62fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
@@ -19,7 +19,9 @@
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
-
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.listIntArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.listLongArray;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -324,4 +326,31 @@
 
     RowSetUtilities.verify(fixture.wrap(input), fixture.wrap(output));
   }
+
+  @Test
+  public void testListArray() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .addRepeatedList("int_list")
+          .addArray(MinorType.INT)
+          .resumeSchema()
+        .addRepeatedList("long_list")
+          .addArray(MinorType.BIGINT)
+          .resumeSchema()
+        .buildSchema();
+
+    final VectorContainer input = fixture.rowSetBuilder(schema)
+    .addRow(listIntArray(intArray(1, 2, 3)), listLongArray(new long[] { 1l, 2l, 3l }))
+    .addRow(listIntArray(intArray(10, 20, 30)), listLongArray(new long[] { 10l, 20l, 30l }))
+    .build()
+    .container();
+
+    final OutputBatchBuilder builder = new OutputBatchBuilder(schema,
+        Collections.singletonList(new BatchSource(schema, input)),
+        fixture.allocator());
+
+    builder.load(input.getRecordCount());
+    VectorContainer output = builder.outputContainer();
+
+    RowSetUtilities.verify(fixture.wrap(input), fixture.wrap(output));
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 5a2a465..d03ba1b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -276,6 +276,14 @@
     return elements;
   }
 
+  public static int[][] listIntArray(int[]... elements) {
+    return elements;
+  }
+
+  public static long[][] listLongArray(long[]... elements) {
+    return elements;
+  }
+
   public static Object[] singleList(Object element) {
     return new Object[] { element };
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
index 002fdde..614452b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
@@ -253,7 +253,7 @@
         variants[i].events().saveRow();
       }
     }
-   }
+  }
 
   @Override
   public void preRollover() {
@@ -323,4 +323,3 @@
   @Override
   public boolean isProjected() { return unionMemberShim.isProjected(); }
 }
-