DRILL-7233: Format Plugin for HDF5

closes #1778
diff --git a/contrib/format-hdf5/README.md b/contrib/format-hdf5/README.md
new file mode 100644
index 0000000..6f6dcdd
--- /dev/null
+++ b/contrib/format-hdf5/README.md
@@ -0,0 +1,135 @@
+# Drill HDF5 Format Plugin
+Per wikipedia, Hierarchical Data Format (HDF) is a set of file formats designed to store and organize large amounts of data.[1] Originally developed at the National Center for
+ Supercomputing Applications, it is supported by The HDF Group, a non-profit corporation whose mission is to ensure continued development of HDF5 technologies and the continued
+  accessibility of data stored in HDF.[2]
+
+This plugin enables Apache Drill to query HDF5 files.
+
+## Configuration
+There are three configuration variables in this plugin:
+* `type`: This should be set to `hdf5`.
+* `extensions`: This is a list of the file extensions used to identify HDF5 files. Typically HDF5 uses `.h5` or `.hdf5` as file extensions. This defaults to `.h5`.
+* `defaultPath`: The default path defines which path Drill will query for data. Typically this should be left as `null` in the configuration file. Its usage is explained below.
+
+### Example Configuration
+For most uses, the configuration below will suffice to enable Drill to query HDF5 files.
+```
+"hdf5": {
+      "type": "hdf5",
+      "extensions": [
+        "h5"
+      ],
+      "defaultPath": null
+    }
+```
+## Usage
+Since HDF5 can be viewed as a file system within a file, a single file can contain many datasets. For instance, if you have a simple HDF5 file, a star query will produce the following result:
+```
+apache drill> select * from dfs.test.`dset.h5`;
++-------+-----------+-----------+--------------------------------------------------------------------------+
+| path  | data_type | file_name |                                 int_data                                 |
++-------+-----------+-----------+--------------------------------------------------------------------------+
+| /dset | DATASET   | dset.h5   | [[1,2,3,4,5,6],[7,8,9,10,11,12],[13,14,15,16,17,18],[19,20,21,22,23,24]] |
++-------+-----------+-----------+--------------------------------------------------------------------------+
+```
+The actual data in this file is mapped to a column called int_data. In order to effectively access the data, you should use Drill's `FLATTEN()` function on the `int_data` column, which produces the following result.
+
+```
+apache drill> select flatten(int_data) as int_data from dfs.test.`dset.h5`;
++---------------------+
+|      int_data       |
++---------------------+
+| [1,2,3,4,5,6]       |
+| [7,8,9,10,11,12]    |
+| [13,14,15,16,17,18] |
+| [19,20,21,22,23,24] |
++---------------------+
+```
+Once the data is in this form, you can access it similarly to how you might access nested data in JSON or other files. 
+
+```
+apache drill> SELECT int_data[0] as col_0,
+. .semicolon> int_data[1] as col_1,
+. .semicolon> int_data[2] as col_2
+. .semicolon> FROM ( SELECT flatten(int_data) AS int_data
+. . . . . .)> FROM dfs.test.`dset.h5`
+. . . . . .)> );
++-------+-------+-------+
+| col_0 | col_1 | col_2 |
++-------+-------+-------+
+| 1     | 2     | 3     |
+| 7     | 8     | 9     |
+| 13    | 14    | 15    |
+| 19    | 20    | 21    |
++-------+-------+-------+
+```
+
+However, a better way to query the actual data in an HDF5 file is to use the `defaultPath` field in your query. If the `defaultPath` field is defined in the query, or via
+ the plugin configuration, Drill will only return the data, rather than the file metadata.
+ 
+ ** Note: Once you have determined which data set you are querying, it is advisable to use this method to query HDF5 data. **
+ 
+ You can set the `defaultPath` variable in either the plugin configuration, or at query time using the `table()` function as shown in the example below:
+ 
+ ```
+SELECT * 
+FROM table(dfs.test.`dset.h5` (type => 'hdf5', defaultPath => '/dset'))
+```
+ This query will return the result below:
+ 
+ ```
+ apache drill> SELECT * FROM table(dfs.test.`dset.h5` (type => 'hdf5', defaultPath => '/dset'));
+ +-----------+-----------+-----------+-----------+-----------+-----------+
+ | int_col_0 | int_col_1 | int_col_2 | int_col_3 | int_col_4 | int_col_5 |
+ +-----------+-----------+-----------+-----------+-----------+-----------+
+ | 1         | 2         | 3         | 4         | 5         | 6         |
+ | 7         | 8         | 9         | 10        | 11        | 12        |
+ | 13        | 14        | 15        | 16        | 17        | 18        |
+ | 19        | 20        | 21        | 22        | 23        | 24        |
+ +-----------+-----------+-----------+-----------+-----------+-----------+
+ 4 rows selected (0.223 seconds)
+
+```
+
+If the data in `defaultPath` is a column, the column name will be the last part of the path. If the data is multidimensional, the columns will get a name of `<data_type>_col_n`
+. Therefore a column of integers will be called `int_col_1`.
+
+### Attributes
+Occasionally, HDF5 paths will contain attributes. Drill will map these to a map data structure called `attributes`, as shown in the query below.
+```
+apache drill> SELECT attributes FROM dfs.test.`browsing.h5`;
++----------------------------------------------------------------------------------+
+|                                    attributes                                    |
++----------------------------------------------------------------------------------+
+| {}                                                                               |
+| {"__TYPE_VARIANT__":"TIMESTAMP_MILLISECONDS_SINCE_START_OF_THE_EPOCH"}           |
+| {}                                                                               |
+| {}                                                                               |
+| {"important":false,"__TYPE_VARIANT__timestamp__":"TIMESTAMP_MILLISECONDS_SINCE_START_OF_THE_EPOCH","timestamp":1550033296762} |
+| {}                                                                               |
+| {}                                                                               |
+| {}                                                                               |
++----------------------------------------------------------------------------------+
+8 rows selected (0.292 seconds)
+```
+You can access the individual fields within the `attributes` map by using the structure `table.map.key`. Note that you will have to give the table an alias for this to work properly.
+```
+apache drill> SELECT path, data_type, file_name
+FROM dfs.test.`browsing.h5` AS t1 WHERE t1.attributes.important = false;
++---------+-----------+-------------+
+|  path   | data_type |  file_name  |
++---------+-----------+-------------+
+| /groupB | GROUP     | browsing.h5 |
++---------+-----------+-------------+
+```
+
+### Known Limitations
+There are several limitations with the HDF5 format plugin in Drill.
+* Drill cannot read unsigned 64 bit integers. When the plugin encounters this data type, it will write an INFO message to the log.
+* While Drill can read compressed HDF5 files, Drill cannot read individual compressed fields within an HDF5 file.
+* HDF5 files can contain nested data sets of up to `n` dimensions. Since Drill works best with two dimensional data, datasets with more than two dimensions are reduced to 2
+ dimensions.
+ 
+ [1]: https://en.wikipedia.org/wiki/Hierarchical_Data_Format
+ [2]: https://www.hdfgroup.org
+ 
\ No newline at end of file
diff --git a/contrib/format-hdf5/pom.xml b/contrib/format-hdf5/pom.xml
new file mode 100644
index 0000000..86eb023
--- /dev/null
+++ b/contrib/format-hdf5/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.18.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-format-hdf5</artifactId>
+  <name>contrib/format-hdf5</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>cisd</groupId>
+      <artifactId>jhdf5</artifactId>
+      <version>14.12.6</version>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-java-sources</id>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/hdf5
+              </outputDirectory>
+              <resources>
+                <resource>
+                  <directory>src/main/java/org/apache/drill/exec/store/hdf5</directory>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5Attribute.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5Attribute.java
new file mode 100644
index 0000000..570a9ef
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5Attribute.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hdf5;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+/**
+ * This class represents an HDF5 attribute and is used when the attributes are projected.
+ */
+public class HDF5Attribute {
+  private final MinorType dataType;
+  private final String key;
+  private final Object value;
+
+  public HDF5Attribute(TypeProtos.MinorType type, String key, Object value) {
+    this.dataType = type;
+    this.key = key;
+    this.value = value;
+  }
+
+  public MinorType getDataType(){ return dataType; }
+
+  public String getKey() { return key; }
+
+  public Object getValue() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s: %s type: %s", getKey(), getValue(), getDataType());
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
new file mode 100644
index 0000000..277d4dd
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -0,0 +1,1120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
+import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
+import ch.systemsx.cisd.hdf5.HDF5FactoryProvider;
+import ch.systemsx.cisd.hdf5.HDF5LinkInformation;
+import ch.systemsx.cisd.hdf5.IHDF5Factory;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private static final Logger logger = LoggerFactory.getLogger(HDF5BatchReader.class);
+
+  private static final String PATH_COLUMN_NAME = "path";
+
+  private static final String DATA_TYPE_COLUMN_NAME = "data_type";
+
+  private static final String FILE_NAME_COLUMN_NAME = "file_name";
+
+  private static final String INT_COLUMN_PREFIX = "int_col_";
+
+  private static final String LONG_COLUMN_PREFIX = "long_col_";
+
+  private static final String FLOAT_COLUMN_PREFIX = "float_col_";
+
+  private static final String DOUBLE_COLUMN_PREFIX = "double_col_";
+
+  private static final String INT_COLUMN_NAME = "int_data";
+
+  private static final String FLOAT_COLUMN_NAME = "float_data";
+
+  private static final String DOUBLE_COLUMN_NAME = "double_data";
+
+  private static final String LONG_COLUMN_NAME = "long_data";
+
+  private final HDF5ReaderConfig readerConfig;
+
+  private final List<HDF5DataWriter> dataWriters;
+
+  private FileSplit split;
+
+  private IHDF5Reader hdf5Reader;
+
+  private File inFile;
+
+  private BufferedReader reader;
+
+  private RowSetLoader rowWriter;
+
+  private Iterator<HDF5DrillMetadata> metadataIterator;
+
+  private ScalarWriter pathWriter;
+
+  private ScalarWriter dataTypeWriter;
+
+  private ScalarWriter fileNameWriter;
+
+  private long[] dimensions;
+
+  public static class HDF5ReaderConfig {
+    final HDF5FormatPlugin plugin;
+
+    final String defaultPath;
+
+    final HDF5FormatConfig formatConfig;
+
+    final File tempDirectory;
+
+    public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) {
+      this.plugin = plugin;
+      this.formatConfig = formatConfig;
+      defaultPath = formatConfig.getDefaultPath();
+      tempDirectory = plugin.getTmpDir();
+    }
+  }
+
+  public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    dataWriters = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    try {
+      openFile(negotiator);
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to close input file: %s", split.getPath())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    ResultSetLoader loader;
+    if (readerConfig.defaultPath == null) {
+      // Get file metadata
+      List<HDF5DrillMetadata> metadata = getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new ArrayList<>());
+      metadataIterator = metadata.iterator();
+
+      // Schema for Metadata query
+      SchemaBuilder builder = new SchemaBuilder()
+        .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
+        .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
+        .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR);
+
+      negotiator.setTableSchema(builder.buildSchema(), false);
+
+      loader = negotiator.build();
+      dimensions = new long[0];
+      rowWriter = loader.writer();
+
+    } else {
+      // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
+      // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
+      HDF5DataSetInformation dsInfo = hdf5Reader.object().getDataSetInformation(readerConfig.defaultPath);
+      dimensions = dsInfo.getDimensions();
+
+      loader = negotiator.build();
+      rowWriter = loader.writer();
+      if (dimensions.length <= 1) {
+        buildSchemaFor1DimensionalDataset(dsInfo);
+      } else if (dimensions.length == 2) {
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      } else {
+        // Case for datasets of greater than 2D
+        // These are automatically flattened
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      }
+    }
+    if (readerConfig.defaultPath == null) {
+      pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+      dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+      fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+    }
+    return true;
+  }
+
+  /**
+   * This function is called when the default path is set and the data set is a single dimension.
+   * This function will create an array of one dataWriter of the
+   * correct datatype
+   * @param dsInfo The HDF5 dataset information
+   */
+  private void buildSchemaFor1DimensionalDataset(HDF5DataSetInformation dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+
+    switch (currentDataType) {
+      case GENERIC_OBJECT:
+        dataWriters.add(new HDF5EnumDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case VARCHAR:
+        dataWriters.add(new HDF5StringDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case TIMESTAMP:
+        dataWriters.add(new HDF5TimestampDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case INT:
+        dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case BIGINT:
+        dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case FLOAT8:
+        dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case FLOAT4:
+        dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+      case MAP:
+        dataWriters.add(new HDF5MapDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
+        break;
+    }
+  }
+
+  /**
+   * This function builds a Drill schema from a dataset with 2 or more dimensions.  HDF5 only supports INT, LONG, DOUBLE and FLOAT for >2 data types so this function is
+   * not as inclusinve as the 1D function.  This function will build the schema by adding DataWriters to the dataWriters array.
+   * @param dsInfo The dataset which Drill will use to build a schema
+   */
+
+  private void buildSchemaFor2DimensionalDataset(HDF5DataSetInformation dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+    long cols = dimensions[1];
+
+    String tempFieldName;
+    for (int i = 0; i < cols; i++) {
+      switch (currentDataType) {
+        case INT:
+          tempFieldName = INT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case BIGINT:
+          tempFieldName = LONG_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT8:
+          tempFieldName = DOUBLE_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT4:
+          tempFieldName = FLOAT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
+          break;
+      }
+    }
+  }
+  /**
+   * This function contains the logic to open an HDF5 file.
+   * @param negotiator The negotiator represents Drill's interface with the file system
+   */
+  private void openFile(FileSchemaNegotiator negotiator) throws IOException {
+    InputStream in = null;
+    try {
+      in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      IHDF5Factory factory = HDF5FactoryProvider.get();
+      inFile = convertInputStreamToFile(in);
+      hdf5Reader = factory.openForReading(inFile);
+    } catch (Exception e) {
+      if (in != null) {
+        in.close();
+      }
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: %s", split.getPath())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(in));
+  }
+
+  /**
+   * This function converts the Drill InputStream into a File object for the HDF5 library. This function
+   * exists due to a known limitation in the HDF5 library which cannot parse HDF5 directly from an input stream. A future
+   * release of the library will support this.
+   *
+   * @param stream The input stream to be converted to a File
+   * @return File The file which was converted from an InputStream
+   */
+  private File convertInputStreamToFile(InputStream stream) {
+    File tmpDir = readerConfig.tempDirectory;
+    String tempFileName = tmpDir.getPath() + "/~" + split.getPath().getName();
+    File targetFile = new File(tempFileName);
+
+    try {
+      java.nio.file.Files.copy(stream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+    } catch (Exception e) {
+      if (targetFile.exists()) {
+        if (!targetFile.delete()) {
+          logger.warn("{} not deleted.", targetFile.getName());
+        }
+      }
+      throw UserException
+        .dataWriteError(e)
+        .message("Failed to create temp HDF5 file: %s", split.getPath())
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+
+    IOUtils.closeQuietly(stream);
+    return targetFile;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) {
+        if (!metadataIterator.hasNext()){
+          return false;
+        }
+        projectMetadataRow(rowWriter);
+      } else if (dimensions.length <= 1 && dataWriters.get(0).isCompound()) {
+        if (!dataWriters.get(0).hasNext()) {
+          return false;
+        }
+        dataWriters.get(0).write();
+      } else if (dimensions.length <= 1) {
+        // Case for Compound Data Type
+        if (!dataWriters.get(0).hasNext()) {
+          return false;
+        }
+        rowWriter.start();
+        dataWriters.get(0).write();
+        rowWriter.save();
+      } else {
+        int currentRowCount = 0;
+        HDF5DataWriter currentDataWriter;
+        rowWriter.start();
+
+        for (int i = 0; i < dimensions[1]; i++) {
+          currentDataWriter = dataWriters.get(i);
+          currentDataWriter.write();
+          currentRowCount = currentDataWriter.currentRowCount();
+        }
+        rowWriter.save();
+        if (currentRowCount >= dimensions[0]) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This function writes one row of HDF5 metadata.
+   * @param rowWriter The input rowWriter object
+   */
+  private void projectMetadataRow(RowSetLoader rowWriter) {
+    String realFileName = inFile.getName().replace("~", "");
+    HDF5DrillMetadata metadataRow = metadataIterator.next();
+    rowWriter.start();
+
+    pathWriter.setString(metadataRow.getPath());
+    dataTypeWriter.setString(metadataRow.getDataType());
+    fileNameWriter.setString(realFileName);
+
+    //Write attributes if present
+    if (metadataRow.getAttributes().size() > 0) {
+      writeAttributes(rowWriter, metadataRow);
+    }
+
+    if (metadataRow.getDataType().equalsIgnoreCase("DATASET")) {
+      projectDataset(rowWriter, metadataRow.getPath());
+    }
+    rowWriter.save();
+  }
+
+  /**
+   * This function gets the file metadata from a given HDF5 file.  It will extract the file name the path, and adds any information to the
+   * metadata List.
+   * @param members A list of paths from which the metadata will be extracted
+   * @param metadata the HDF5 metadata object from which the metadata will be extracted
+   * @return A list of metadata from the given file paths
+   */
+  private List<HDF5DrillMetadata> getFileMetadata(List<HDF5LinkInformation> members, List<HDF5DrillMetadata> metadata) {
+    for (HDF5LinkInformation info : members) {
+      HDF5DrillMetadata metadataRow = new HDF5DrillMetadata();
+
+      metadataRow.setPath(info.getPath());
+      metadataRow.setDataType(info.getType().toString());
+
+      switch (info.getType()) {
+        case DATASET:
+          metadataRow.setAttributes(getAttributes(info.getPath()));
+          metadata.add(metadataRow);
+          break;
+        case SOFT_LINK:
+          // Soft links cannot have attributes
+          metadata.add(metadataRow);
+          break;
+        case GROUP:
+          metadataRow.setAttributes(getAttributes(info.getPath()));
+          metadata.add(metadataRow);
+          metadata = getFileMetadata(hdf5Reader.object().getGroupMemberInformation(info.getPath(), true), metadata);
+          break;
+        default:
+          logger.warn("Unknown data type: {}", info.getType());
+      }
+    }
+    return metadata;
+  }
+
+  /**
+   * Gets the attributes of a HDF5 dataset and returns them into a HashMap
+   *
+   * @param path The path for which you wish to retrieve attributes
+   * @return Map The attributes for the given path.  Empty Map if no attributes present
+   */
+  private Map<String, HDF5Attribute> getAttributes(String path) {
+    Map<String, HDF5Attribute> attributes = new HashMap<>();
+
+    long attrCount = hdf5Reader.object().getObjectInformation(path).getNumberOfAttributes();
+    if (attrCount > 0) {
+      List<String> attrNames = hdf5Reader.object().getAllAttributeNames(path);
+      for (String name : attrNames) {
+        try {
+          HDF5Attribute attribute = HDF5Utils.getAttribute(path, name, hdf5Reader);
+          if (attribute == null) {
+            continue;
+          }
+          attributes.put(attribute.getKey(), attribute);
+        } catch (Exception e) {
+          logger.warn("Couldn't add attribute: {} - {}", path, name);
+        }
+      }
+    }
+    return attributes;
+  }
+
+  /**
+   * This function writes one row of data in a metadata query. The number of dimensions here is n+1. So if the actual dataset is a 1D column, it will be written as a list.
+   * This is function is only called in metadata queries as the schema is not known in advance.
+   *
+   * @param rowWriter The rowWriter to which the data will be written
+   * @param datapath The datapath from which the data will be read
+   */
+  private void projectDataset(RowSetLoader rowWriter, String datapath) {
+    String fieldName = HDF5Utils.getNameFromPath(datapath);
+    IHDF5Reader reader = hdf5Reader;
+    HDF5DataSetInformation dsInfo = reader.object().getDataSetInformation(datapath);
+    long[] dimensions = dsInfo.getDimensions();
+    //Case for single dimensional data
+    if (dimensions.length <= 1) {
+      TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+
+      assert currentDataType != null;
+      switch (currentDataType) {
+        case GENERIC_OBJECT:
+          logger.warn("Couldn't read {}", datapath );
+          break;
+        case VARCHAR:
+          String[] data = hdf5Reader.readStringArray(datapath);
+          writeStringListColumn(rowWriter, fieldName, data);
+          break;
+        case TIMESTAMP:
+          long[] longList = hdf5Reader.time().readTimeStampArray(datapath);
+          writeTimestampListColumn(rowWriter, fieldName, longList);
+          break;
+        case INT:
+          if (!dsInfo.getTypeInformation().isSigned()) {
+            if (dsInfo.getTypeInformation().getElementSize() > 4) {
+                longList = hdf5Reader.uint64().readArray(datapath);
+                writeLongListColumn(rowWriter, fieldName, longList);
+            }
+          } else {
+            int[] intList = hdf5Reader.readIntArray(datapath);
+            writeIntListColumn(rowWriter, fieldName, intList);
+          }
+          break;
+        case FLOAT4:
+          float[] tempFloatList = hdf5Reader.readFloatArray(datapath);
+          writeFloat4ListColumn(rowWriter, fieldName, tempFloatList);
+          break;
+        case FLOAT8:
+          double[] tempDoubleList = hdf5Reader.readDoubleArray(datapath);
+          writeFloat8ListColumn(rowWriter, fieldName, tempDoubleList);
+          break;
+        case BIGINT:
+          if (!dsInfo.getTypeInformation().isSigned()) {
+            logger.warn("Drill does not support unsigned 64bit integers.");
+            break;
+          }
+          long[] tempBigIntList = hdf5Reader.readLongArray(datapath);
+          writeLongListColumn(rowWriter, fieldName, tempBigIntList);
+          break;
+        case MAP:
+          try {
+            getAndMapCompoundData(datapath, new ArrayList<>(), hdf5Reader, rowWriter);
+          } catch (Exception e) {
+            throw UserException
+              .dataReadError()
+              .message("Error writing Compound Field: ")
+              .addContext(e.getMessage())
+              .build(logger);
+          }
+          break;
+
+        default:
+          // Case for data types that cannot be read
+          logger.warn("{} not implemented yet.", dsInfo.getTypeInformation().tryGetJavaType());
+      }
+    } else if (dimensions.length == 2) {
+      // Case for 2D data sets.  These are projected as lists of lists or maps of maps
+      long cols = dimensions[1];
+      long rows = dimensions[0];
+      switch (HDF5Utils.getDataType(dsInfo)) {
+        case INT:
+          int[][] colData = hdf5Reader.readIntMatrix(datapath);
+          mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT4:
+          float[][] floatData = hdf5Reader.readFloatMatrix(datapath);
+          mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT8:
+          double[][] doubleData = hdf5Reader.readDoubleMatrix(datapath);
+          mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
+          break;
+        case BIGINT:
+          long[][] longData = hdf5Reader.readLongMatrix(datapath);
+          mapBigIntMatrixField(longData, (int) cols, (int) rows, rowWriter);
+          break;
+        default:
+          logger.warn("{} not implemented.", HDF5Utils.getDataType(dsInfo));
+      }
+    } else {
+      // Case for data sets with dimensions > 2
+      long cols = dimensions[1];
+      long rows = dimensions[0];
+      switch (HDF5Utils.getDataType(dsInfo)) {
+        case INT:
+          int[][] colData = hdf5Reader.int32().readMDArray(datapath).toMatrix();
+          mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT4:
+          float[][] floatData = hdf5Reader.float32().readMDArray(datapath).toMatrix();
+          mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT8:
+          double[][] doubleData = hdf5Reader.float64().readMDArray(datapath).toMatrix();
+          mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
+          break;
+        case BIGINT:
+          long[][] longData = hdf5Reader.int64().readMDArray(datapath).toMatrix();
+          mapBigIntMatrixField(longData, (int) cols, (int) rows, rowWriter);
+          break;
+        default:
+          logger.warn("{} not implemented.", HDF5Utils.getDataType(dsInfo));
+      }
+    }
+  }
+
+  /**
+   * Helper function to write a 1D boolean column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeBooleanColumn(TupleWriter rowWriter, String name, int value) {
+    writeBooleanColumn(rowWriter, name, value != 0);
+  }
+
+  /**
+   * Helper function to write a 1D boolean column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.BIT);
+    colWriter.setBoolean(value);
+  }
+
+  /**
+   * Helper function to write a 1D int column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeIntColumn(TupleWriter rowWriter, String name, int value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.INT);
+    colWriter.setInt(value);
+  }
+
+  /**
+   * Helper function to write a 2D int list
+   * @param rowWriter the row to which the data will be written
+   * @param name the name of the outer list
+   * @param list the list of data
+   */
+  private void writeIntListColumn(TupleWriter rowWriter, String name, int[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (int value : list) {
+      arrayWriter.setInt(value);
+    }
+  }
+
+  private void mapIntMatrixField(int[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = INT_COLUMN_PREFIX + k;
+          writeIntColumn(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      intMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void intMatrixHelper(int[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(INT_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.INT)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(INT_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(INT_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter intWriter = innerWriter.scalar();
+
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        intWriter.setInt(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void writeLongColumn(TupleWriter rowWriter, String name, long value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.BIGINT);
+    colWriter.setLong(value);
+  }
+
+  private void writeLongListColumn(TupleWriter rowWriter, String name, long[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (long l : list) {
+      arrayWriter.setLong(l);
+    }
+  }
+
+  private void writeStringColumn(TupleWriter rowWriter, String name, String value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.VARCHAR);
+    colWriter.setString(value);
+  }
+
+  private void writeStringListColumn(TupleWriter rowWriter, String name, String[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (String s : list) {
+      arrayWriter.setString(s);
+    }
+  }
+
+  private void writeFloat8Column(TupleWriter rowWriter, String name, double value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.FLOAT8);
+    colWriter.setDouble(value);
+  }
+
+  private void writeFloat8ListColumn(TupleWriter rowWriter, String name, double[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (double v : list) {
+      arrayWriter.setDouble(v);
+    }
+  }
+
+  private void writeFloat4Column(TupleWriter rowWriter, String name, float value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.FLOAT4);
+    colWriter.setDouble(value);
+  }
+
+  private void writeFloat4ListColumn(TupleWriter rowWriter, String name, float[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (float v : list) {
+      arrayWriter.setDouble(v);
+    }
+  }
+
+  private void mapFloatMatrixField(float[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = FLOAT_COLUMN_PREFIX + k;
+          writeFloat4Column(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      floatMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void floatMatrixHelper(float[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(FLOAT_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.FLOAT4)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(FLOAT_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(FLOAT_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter floatWriter = innerWriter.scalar();
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        floatWriter.setDouble(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void mapDoubleMatrixField(double[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = DOUBLE_COLUMN_PREFIX + k;
+          writeFloat8Column(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      doubleMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void doubleMatrixHelper(double[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(DOUBLE_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.FLOAT8)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(DOUBLE_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(DOUBLE_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter floatWriter = innerWriter.scalar();
+
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        floatWriter.setDouble(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void mapBigIntMatrixField(long[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = LONG_COLUMN_PREFIX + k;
+          writeLongColumn(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      bigIntMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void bigIntMatrixHelper(long[][] colData, int cols, int rows, RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(LONG_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.BIGINT)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(LONG_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(LONG_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter bigintWriter = innerWriter.scalar();
+
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        bigintWriter.setLong(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void writeTimestampColumn(TupleWriter rowWriter, String name, long timestamp) {
+    Instant ts = new Instant(timestamp);
+    ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.TIMESTAMP);
+    colWriter.setTimestamp(ts);
+  }
+
+  private void writeTimestampListColumn(TupleWriter rowWriter, String name, long[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (long l : list) {
+      arrayWriter.setTimestamp(new Instant(l));
+    }
+  }
+
+  private ScalarWriter getColWriter(TupleWriter tupleWriter, String fieldName, TypeProtos.MinorType type) {
+    int index = tupleWriter.tupleSchema().index(fieldName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type, TypeProtos.DataMode.OPTIONAL);
+      index = tupleWriter.addColumn(colSchema);
+    }
+    return tupleWriter.scalar(index);
+  }
+
+  /**
+   * This helper function gets the attributes for an HDF5 datapath.  These attributes are projected as a map in select * queries when the defaultPath is null.
+   * @param rowWriter the row to which the data will be written
+   * @param record the record for the attributes
+   */
+  private void writeAttributes(TupleWriter rowWriter, HDF5DrillMetadata record) {
+    Map<String, HDF5Attribute> attribs = getAttributes(record.getPath());
+    Iterator<Map.Entry<String, HDF5Attribute>> entries = attribs.entrySet().iterator();
+
+    int index = rowWriter.tupleSchema().index("attributes");
+    if (index == -1) {
+      index = rowWriter
+        .addColumn(SchemaBuilder.columnSchema("attributes", TypeProtos.MinorType.MAP, TypeProtos.DataMode.REQUIRED));
+    }
+    TupleWriter mapWriter = rowWriter.tuple(index);
+
+    while (entries.hasNext()) {
+      Map.Entry<String, HDF5Attribute> entry = entries.next();
+      String key = entry.getKey();
+
+      HDF5Attribute attrib = entry.getValue();
+      switch (attrib.getDataType()) {
+        case BIT:
+          boolean value = (Boolean) attrib.getValue();
+          writeBooleanColumn(mapWriter, key, value);
+          break;
+        case BIGINT:
+          writeLongColumn(mapWriter, key, (Long) attrib.getValue());
+          break;
+        case INT:
+          writeIntColumn(mapWriter, key, (Integer) attrib.getValue());
+          break;
+        case FLOAT8:
+          writeFloat8Column(mapWriter, key, (Double) attrib.getValue());
+          break;
+        case FLOAT4:
+          writeFloat4Column(mapWriter, key, (Float) attrib.getValue());
+          break;
+        case VARCHAR:
+          writeStringColumn(mapWriter, key, (String) attrib.getValue());
+          break;
+        case TIMESTAMP:
+          writeTimestampColumn(mapWriter, key, (Long) attrib.getValue());
+        case GENERIC_OBJECT:
+          //This is the case for HDF5 enums
+          String enumText = attrib.getValue().toString();
+          writeStringColumn(mapWriter, key, enumText);
+          break;
+      }
+    }
+  }
+
+  /**
+   * This function processes the MAP data type which can be found in HDF5 files.
+   * It automatically flattens anything greater than 2 dimensions.
+   *
+   * @param path the HDF5 path tp the compound data
+   * @param fieldNames The field names to be mapped
+   * @param reader the HDF5 reader for the data file
+   * @param rowWriter the rowWriter to write the data
+   */
+
+  private void getAndMapCompoundData(String path, List<String> fieldNames, IHDF5Reader reader, RowSetLoader rowWriter) {
+
+    final String COMPOUND_DATA_FIELD_NAME = "compound_data";
+    String resolvedPath = HDF5Utils.resolvePath(path, reader);
+    Class<?> dataClass = HDF5Utils.getDatasetClass(resolvedPath, reader);
+    if (dataClass == Map.class) {
+      Object[][] values = reader.compound().readArray(resolvedPath, Object[].class);
+      String currentFieldName;
+
+      if (fieldNames != null) {
+        HDF5CompoundMemberInformation[] infos = reader.compound().getDataSetInfo(resolvedPath);
+        for (HDF5CompoundMemberInformation info : infos) {
+          fieldNames.add(info.getName());
+        }
+      }
+
+      // Case for auto-flatten
+      if (readerConfig.defaultPath != null) {
+        for (int row = 0; row < values.length; row++) {
+          rowWriter.start();
+          for (int col = 0; col < values[row].length; col++) {
+
+            assert fieldNames != null;
+            currentFieldName = fieldNames.get(col);
+
+            if (values[row][col] instanceof Integer) {
+              writeIntColumn(rowWriter, currentFieldName, (Integer) values[row][col]);
+            } else if (values[row][col] instanceof Short) {
+              writeIntColumn(rowWriter, currentFieldName, ((Short) values[row][col]).intValue());
+            } else if (values[row][col] instanceof Byte) {
+              writeIntColumn(rowWriter, currentFieldName, ((Byte) values[row][col]).intValue());
+            } else if (values[row][col] instanceof Long) {
+              writeLongColumn(rowWriter, currentFieldName, (Long) values[row][col]);
+            } else if (values[row][col] instanceof Float) {
+              writeFloat4Column(rowWriter, currentFieldName, (Float) values[row][col]);
+            } else if (values[row][col] instanceof Double) {
+              writeFloat8Column(rowWriter, currentFieldName, (Double) values[row][col]);
+            } else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) {
+              assert values[row][col] instanceof Integer;
+              writeBooleanColumn(rowWriter, currentFieldName, (Integer) values[row][col]);
+            } else if (values[row][col] instanceof String) {
+              String stringValue = (String) values[row][col];
+              writeStringColumn(rowWriter, currentFieldName, stringValue);
+            }
+          }
+          rowWriter.save();
+        }
+      } else {
+        int index = 0;
+        if (fieldNames != null) {
+          HDF5CompoundMemberInformation[] infos = reader.compound().getDataSetInfo(resolvedPath);
+
+          SchemaBuilder innerSchema = new SchemaBuilder();
+          MapBuilder mapBuilder = innerSchema.addMap(COMPOUND_DATA_FIELD_NAME);
+
+          for (HDF5CompoundMemberInformation info : infos) {
+            fieldNames.add(info.getName());
+
+            switch (info.getType().tryGetJavaType().getSimpleName()) {
+              case "int":
+                mapBuilder.add(info.getName(), TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
+                break;
+              case "double":
+                mapBuilder.add(info.getName(), TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REPEATED);
+                break;
+              case "float":
+                mapBuilder.add(info.getName(), TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REPEATED);
+                break;
+              case "long":
+                mapBuilder.add(info.getName(), TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REPEATED);
+                break;
+              case "String":
+                mapBuilder.add(info.getName(), TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REPEATED);
+                break;
+            }
+          }
+          TupleMetadata finalInnerSchema = mapBuilder.resumeSchema().buildSchema();
+
+          index = rowWriter.tupleSchema().index(COMPOUND_DATA_FIELD_NAME);
+          if (index == -1) {
+            index = rowWriter.addColumn(finalInnerSchema.column(COMPOUND_DATA_FIELD_NAME));
+          }
+        }
+        TupleWriter listWriter = rowWriter.column(index).tuple();
+
+        //Note: The [][] returns an array of [rows][cols]
+        for (int row = 0; row < values.length; row++) {
+          // Iterate over rows
+          for (int col = 0; col < values[row].length; col++) {
+            assert fieldNames != null;
+            currentFieldName = fieldNames.get(col);
+            ArrayWriter innerWriter = listWriter.array(currentFieldName);
+            if (values[row][col] instanceof Integer) {
+              innerWriter.scalar().setInt((Integer) values[row][col]);
+            } else if (values[row][col] instanceof Short) {
+              innerWriter.scalar().setInt((Short) values[row][col]);
+            } else if (values[row][col] instanceof Byte) {
+              innerWriter.scalar().setInt((Byte) values[row][col]);
+            } else if (values[row][col] instanceof Long) {
+              innerWriter.scalar().setLong((Long) values[row][col]);
+            } else if (values[row][col] instanceof Float) {
+              innerWriter.scalar().setDouble((Float) values[row][col]);
+            } else if (values[row][col] instanceof Double) {
+              innerWriter.scalar().setDouble((Double) values[row][col]);
+            } else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) {
+              innerWriter.scalar().setBoolean((Boolean) values[row][col]);
+            } else if (values[row][col] instanceof String) {
+              innerWriter.scalar().setString((String) values[row][col]);
+            }
+            if (col == values[row].length) {
+              innerWriter.save();
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    if (hdf5Reader != null) {
+      hdf5Reader.close();
+      hdf5Reader = null;
+    }
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        logger.debug("Failed to close HDF5 Reader.");
+      }
+      reader = null;
+    }
+    if (inFile != null) {
+      if (!inFile.delete()) {
+        logger.warn("{} file not deleted.", inFile.getName());
+      }
+      inFile = null;
+    }
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
new file mode 100644
index 0000000..2ed43d3
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HDF5DrillMetadata {
+  private String path;
+
+  private String dataType;
+
+  private Map<String, HDF5Attribute> attributes;
+
+  public HDF5DrillMetadata() {
+    attributes = new HashMap<>();
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public String getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(String dataType) {
+    this.dataType = dataType;
+  }
+
+  public Map<String, HDF5Attribute> getAttributes() {
+    return attributes;
+  }
+
+  public void setAttributes(Map<String, HDF5Attribute> attribs) {
+    this.attributes = attribs;
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java
new file mode 100644
index 0000000..87d013d
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(HDF5FormatPlugin.DEFAULT_NAME)
+public class HDF5FormatConfig implements FormatPluginConfig {
+
+  public List<String> extensions = Collections.singletonList("h5");
+
+  public String defaultPath;
+
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  public String getDefaultPath() {
+    return defaultPath;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    HDF5FormatConfig other = (HDF5FormatConfig) obj;
+    return Objects.equals(extensions, other.getExtensions()) &&
+      Objects.equals(defaultPath, other.defaultPath);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(new Object[]{extensions, defaultPath});
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
new file mode 100644
index 0000000..b696cdb
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.hdf5.HDF5BatchReader.HDF5ReaderConfig;
+
+import java.io.File;
+
+
+public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
+
+  public static final String DEFAULT_NAME = "hdf5";
+
+  private final DrillbitContext context;
+
+  public HDF5FormatPlugin(String name, DrillbitContext context,
+                          Configuration fsConf,
+                          StoragePluginConfig storageConfig,
+                          HDF5FormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+    this.context = context;
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, HDF5FormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = false;
+    config.compressible = true;
+    config.supportsProjectPushdown = true;
+    config.extensions = pluginConfig.getExtensions();
+    config.fsConf = fsConf;
+    config.defaultName = DEFAULT_NAME;
+    config.readerOperatorType = UserBitShared.CoreOperatorType.HDF5_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    return config;
+  }
+
+  @Override
+  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+    FileScanBuilder builder = new FileScanBuilder();
+
+    builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig)));
+    initScanBuilder(builder, scan);
+    builder.setNullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
+  }
+
+  public static class HDF5ReaderFactory extends FileScanFramework.FileReaderFactory {
+    private final HDF5ReaderConfig readerConfig;
+
+    HDF5ReaderFactory(HDF5ReaderConfig config) {
+      readerConfig = config;
+    }
+
+    @Override
+    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
+      return new HDF5BatchReader(readerConfig);
+    }
+  }
+
+  /**
+   * First tries to get drill temporary directory value from from config ${drill.tmp-dir},
+   * then checks environmental variable $DRILL_TMP_DIR.
+   * If value is still missing, generates directory using {@link Files#createTempDir()}.
+   *
+   * @return drill temporary directory path
+   */
+  protected File getTmpDir() {
+    DrillConfig config = context.getConfig();
+    String drillTempDir;
+    if (config.hasPath(ExecConstants.DRILL_TMP_DIR)) {
+      drillTempDir = config.getString(ExecConstants.DRILL_TMP_DIR);
+    } else {
+      drillTempDir = System.getenv("DRILL_TMP_DIR");
+    }
+
+    if (drillTempDir == null) {
+      return Files.createTempDir();
+    }
+
+    return new File(drillTempDir);
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5Utils.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5Utils.java
new file mode 100644
index 0000000..3401ce8
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5Utils.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import ch.systemsx.cisd.hdf5.HDF5DataClass;
+import ch.systemsx.cisd.hdf5.HDF5EnumerationValue;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
+import ch.systemsx.cisd.hdf5.HDF5DataTypeInformation;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class HDF5Utils {
+  private static final Logger logger = LoggerFactory.getLogger(HDF5Utils.class);
+
+  private static final boolean CASE_SENSITIVE = false;
+
+  /*
+   * This regex is used to extract the final part of an HDF5 path, which is the name of the data field or column.
+   * While these look like file paths, they are fully contained within HDF5. This regex would extract part3 from:
+   * /part1/part2/part3
+   */
+  private static final Pattern PATH_PATTERN = Pattern.compile("/*.*/(.+?)$");
+
+  /**
+   * This function returns and HDF5Attribute object for use when Drill maps the attributes.
+   *
+   * @param pathName The path to retrieve attributes from
+   * @param key The key for the specific attribute you are retrieving
+   * @param reader The IHDF5 reader object for the file you are querying
+   * @return HDF5Attribute The attribute from the path with the key that was requested.
+   */
+  public static HDF5Attribute getAttribute(String pathName, String key, IHDF5Reader reader) {
+    if (pathName.equals("")) {
+      pathName = "/";
+    }
+
+    if (!reader.exists(pathName)) {
+      return null;
+    }
+
+    if (key.equals("dimensions")) {
+      HDF5DataSetInformation datasetInfo = reader.object().getDataSetInformation(pathName);
+      long[] dimensions = datasetInfo.getDimensions();
+      ArrayUtils.reverse(dimensions);
+      return new HDF5Attribute(MinorType.LIST, "dimensions", dimensions);
+    }
+
+    if (key.equals("dataType")) {
+      HDF5DataSetInformation datasetInfo = reader.object().getDataSetInformation(pathName);
+      return new HDF5Attribute(getDataType(datasetInfo), "DataType", datasetInfo.getTypeInformation().getDataClass());
+    }
+
+    if (!reader.object().hasAttribute(pathName, key)) {
+      return null;
+    }
+
+    HDF5DataTypeInformation attributeInfo = reader.object().getAttributeInformation(pathName, key);
+    Class<?> type = attributeInfo.tryGetJavaType();
+    if (type.isAssignableFrom(long[].class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.BIGINT, key, reader.int64().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.BIGINT, key, reader.uint64().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(int[].class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.INT, key, reader.int32().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.INT, key, reader.uint32().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(short[].class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.INT, key, reader.int16().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.INT, key, reader.uint16().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(byte[].class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.INT, key, reader.int8().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.INT, key, reader.uint8().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(double[].class)) {
+      return new HDF5Attribute(MinorType.FLOAT8, key, reader.float64().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(float[].class)) {
+      return new HDF5Attribute(MinorType.FLOAT8, key, reader.float32().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(String[].class)) {
+      return new HDF5Attribute(MinorType.VARCHAR, key, reader.string().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(long.class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.BIGINT, key, reader.int64().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.BIGINT, key, reader.uint64().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(int.class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.INT, key, reader.int32().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.INT, key, reader.uint32().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(short.class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.INT, key, reader.int16().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.INT, key, reader.uint16().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(byte.class)) {
+      if (attributeInfo.isSigned()) {
+        return new HDF5Attribute(MinorType.INT, key, reader.int8().getAttr(pathName, key));
+      } else {
+        return new HDF5Attribute(MinorType.INT, key, reader.uint8().getAttr(pathName, key));
+      }
+    } else if (type.isAssignableFrom(double.class)) {
+      return new HDF5Attribute(MinorType.FLOAT8, key, reader.float64().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(float.class)) {
+      return new HDF5Attribute(MinorType.FLOAT4, key, reader.float32().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(String.class)) {
+      return new HDF5Attribute(MinorType.VARCHAR, key, reader.string().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(boolean.class)) {
+      return new HDF5Attribute(MinorType.BIT, key, reader.bool().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(HDF5EnumerationValue.class)) {
+      // Convert HDF5 Enum to String
+      return new HDF5Attribute(MinorType.GENERIC_OBJECT, key, reader.enumeration().getAttr(pathName, key));
+    } else if (type.isAssignableFrom(BitSet.class)) {
+      return new HDF5Attribute(MinorType.BIT, key, reader.bool().getAttr(pathName, key));
+    }
+
+    logger.warn("Reading attributes of type {} not yet implemented.", attributeInfo);
+    return null;
+  }
+
+  /**
+   * This function returns the Drill data type of a given HDF5 dataset.
+   * @param datasetInfo The input data set.
+   * @return MinorType The Drill data type of the dataset in question
+   */
+  public static MinorType getDataType(HDF5DataSetInformation datasetInfo) {
+
+    HDF5DataTypeInformation typeInfo = datasetInfo.getTypeInformation();
+    Class<?> type = typeInfo.tryGetJavaType();
+    String name = typeInfo.getDataClass().name();
+    if (type == null) {
+      logger.warn("Datasets of type {} not implemented.", typeInfo);
+      //Fall back to string
+      if(name.equalsIgnoreCase("OTHER")) {
+        return MinorType.GENERIC_OBJECT;
+      } else {
+        return MinorType.VARCHAR;
+      }
+    } else if (type.isAssignableFrom(long.class)) {
+      return MinorType.BIGINT;
+    } else if (type.isAssignableFrom(short.class)) {
+      return MinorType.SMALLINT;
+    } else if (type.isAssignableFrom(byte.class)) {
+      return MinorType.TINYINT;
+    } else if (type.isAssignableFrom(int.class)) {
+      return MinorType.INT;
+    } else if (type.isAssignableFrom(double.class) || type.isAssignableFrom(float.class)) {
+      return MinorType.FLOAT8;
+    } else if (type.isAssignableFrom(String.class)) {
+      return MinorType.VARCHAR;
+    } else if (type.isAssignableFrom(java.util.Date.class)) {
+      return MinorType.TIMESTAMP;
+    } else if (type.isAssignableFrom(boolean.class) || type.isAssignableFrom(BitSet.class)) {
+      return MinorType.BIT;
+    } else if (type.isAssignableFrom(Map.class)) {
+      return MinorType.MAP;
+    } else if (type.isAssignableFrom(Enum.class)) {
+      return MinorType.GENERIC_OBJECT;
+    }
+    return MinorType.GENERIC_OBJECT;
+  }
+
+  /**
+   * This function gets the type of dataset
+   * @param path The path of the dataset
+   * @param reader The HDF5 reader
+   * @return The data type
+   */
+  public static Class<?> getDatasetClass(String path, IHDF5Reader reader) {
+    HDF5DataSetInformation info = reader.getDataSetInformation(resolvePath(path, reader));
+    return info.getTypeInformation().tryGetJavaType();
+  }
+
+  /**
+   * This function resolves path references
+   * @param path The path for the possible reference
+   * @param reader The HDF5 reader object for the file
+   * @return the string for the relative path
+   */
+  public static String resolvePath(String path, IHDF5Reader reader) {
+    if (reader.exists(path)) {
+      // Resolve references, if any.
+      if (reader.object().isDataSet(path)) {
+        HDF5DataClass dataClass = reader.getDataSetInformation(path).getTypeInformation().getDataClass();
+        if (dataClass.toString().equals("REFERENCE")) {
+          return reader.reference().read(path);
+        }
+      }
+      return path;
+    } else if (!CASE_SENSITIVE) {
+      // Look for a match with a different case.
+      String[] pathParts = path.split("/");
+      String resolvedPath = "/";
+      for (int i = 1; i < pathParts.length; i++) {
+        String testPath = (resolvedPath.endsWith("/")) ? resolvedPath + pathParts[i] : resolvedPath + "/" + pathParts[i];
+        if (reader.exists(testPath)) {
+          resolvedPath = testPath;
+        } else if (reader.isGroup(resolvedPath)) {
+          List<String> children = reader.getGroupMembers(resolvedPath);
+          for (String child : children) {
+            if (child.equalsIgnoreCase(pathParts[i])) {
+              resolvedPath = resolvedPath + "/" + child;
+            }
+          }
+        }
+      }
+      if (path.equalsIgnoreCase(resolvedPath)) {
+        return resolvedPath;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This helper function returns the name of a HDF5 record from a data path
+   *
+   * @param path Path to HDF5 data
+   * @return String name of data
+   */
+  public static String getNameFromPath(String path) {
+    if( path == null) {
+      return null;
+    }
+    // Now create matcher object.
+    Matcher m = PATH_PATTERN.matcher(path);
+    if (m.find()) {
+      return m.group(1);
+    } else {
+      return "";
+    }
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5DataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5DataWriter.java
new file mode 100644
index 0000000..e86ece1
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5DataWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class HDF5DataWriter {
+  protected final RowSetLoader columnWriter;
+
+  protected final IHDF5Reader reader;
+
+  protected final String datapath;
+
+  protected String fieldName;
+
+  protected int colCount;
+
+  protected int counter;
+
+  protected Object[][] compoundData;
+
+  public HDF5DataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    this.reader = reader;
+    this.columnWriter = columnWriter;
+    this.datapath = datapath;
+  }
+
+  public HDF5DataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath, String fieldName, int colCount) {
+    this.reader = reader;
+    this.columnWriter = columnWriter;
+    this.datapath = datapath;
+    this.fieldName = fieldName;
+    this.colCount = colCount;
+  }
+
+  public boolean write() {
+    return false;
+  }
+
+  public boolean hasNext() {
+    return false;
+  }
+
+  public int currentRowCount() {
+    return counter;
+  }
+
+  public List<Object> getColumn(int columnIndex) {
+    List<Object> result = new ArrayList<>();
+    for (Object[] compoundDatum : compoundData) {
+      result.add(compoundDatum[columnIndex]);
+    }
+    return result;
+  }
+
+  public abstract int getDataSize();
+
+  public boolean isCompound() {
+    return false;
+  }
+
+  protected static ScalarWriter makeWriter(TupleWriter tupleWriter, String name, TypeProtos.MinorType type, TypeProtos.DataMode mode) {
+    ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, mode);
+    int index = tupleWriter.addColumn(colSchema);
+    return tupleWriter.scalar(index);
+  }
+
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5DoubleDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5DoubleDataWriter.java
new file mode 100644
index 0000000..5fcfa7c
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5DoubleDataWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.util.List;
+
+public class HDF5DoubleDataWriter extends HDF5DataWriter {
+
+  private final double[] data;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5DoubleDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.readDoubleArray(datapath);
+
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  // This constructor is used when the data is part of a 2D array.  In this case the column name is provided in the constructor
+  public HDF5DoubleDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath, String fieldName, int currentColumn) {
+    super(reader, columnWriter, datapath, fieldName, currentColumn);
+    // Get dimensions
+    long[] dimensions = reader.object().getDataSetInformation(datapath).getDimensions();
+    double[][] tempData;
+    if (dimensions.length == 2) {
+      tempData = transpose(reader.readDoubleMatrix(datapath));
+    } else {
+      tempData = transpose(reader.float64().readMDArray(datapath).toMatrix());
+    }
+    data = tempData[currentColumn];
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public HDF5DoubleDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String fieldName, List<Double> tempListData) {
+    super(reader, columnWriter, null);
+    this.fieldName = fieldName;
+    data = new double[tempListData.size()];
+    for (int i = 0; i < tempListData.size(); i++) {
+      data[i] = tempListData.get(i);
+    }
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+  }
+
+
+  public boolean write() {
+    if (counter > data.length) {
+      return false;
+    } else {
+      rowWriter.setDouble(data[counter++]);
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+
+  /**
+   * Transposes the input matrix by flipping a matrix over its diagonal by switching the row and column
+   * indices of the matrix by producing another matrix.
+   * @param matrix The input matrix to be transposed
+   * @return The transposed matrix.
+   */
+  private double[][] transpose(double[][] matrix) {
+    if (matrix == null || matrix.length == 0) {
+      return matrix;
+    }
+    int width = matrix.length;
+    int height = matrix[0].length;
+
+    double[][] transposedMatrix = new double[height][width];
+
+    for (int x = 0; x < width; x++) {
+      for (int y = 0; y < height; y++) {
+        transposedMatrix[y][x] = matrix[x][y];
+      }
+    }
+    return transposedMatrix;
+  }
+
+  public int getDataSize() {
+    return data.length;
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5EnumDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5EnumDataWriter.java
new file mode 100644
index 0000000..7942c75
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5EnumDataWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+public class HDF5EnumDataWriter extends HDF5DataWriter {
+
+  private final String[] data;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5EnumDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.readEnumArrayAsString(datapath);
+
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public boolean write() {
+    if (counter > data.length) {
+      return false;
+    } else {
+      rowWriter.setString(data[counter++]);
+      return true;
+    }
+  }
+
+  @Override
+  public int getDataSize() { return data.length; }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5FloatDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5FloatDataWriter.java
new file mode 100644
index 0000000..69ec164
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5FloatDataWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.util.List;
+
+public class HDF5FloatDataWriter extends HDF5DataWriter {
+
+  private final float[] data;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5FloatDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.readFloatArray(datapath);
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  // This constructor is used when the data is part of a 2D array.  In this case the column name is provided in the constructor
+  public HDF5FloatDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath, String fieldName, int currentColumn) {
+    super(reader, columnWriter, datapath, fieldName, currentColumn);
+    // Get dimensions
+    long[] dimensions = reader.object().getDataSetInformation(datapath).getDimensions();
+    float[][] tempData;
+    if (dimensions.length == 2) {
+      tempData = transpose(reader.readFloatMatrix(datapath));
+    } else {
+      tempData = transpose(reader.float32().readMDArray(datapath).toMatrix());
+    }
+    data = tempData[currentColumn];
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public HDF5FloatDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String fieldName, List<Float> tempListData) {
+    super(reader, columnWriter, null);
+    this.fieldName = fieldName;
+    data = new float[tempListData.size()];
+    for (int i = 0; i < tempListData.size(); i++) {
+      data[i] = tempListData.get(i);
+    }
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+  }
+
+
+  public boolean write() {
+    if (counter > data.length) {
+      return false;
+    } else {
+      rowWriter.setDouble(data[counter++]);
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+
+  /**
+   * Transposes the input matrix by flipping a matrix over its diagonal by switching the row and column
+   * indices of the matrix by producing another matrix.
+   * @param matrix The input matrix to be transposed
+   * @return The transposed matrix.
+   */
+  private float[][] transpose (float[][] matrix) {
+    if (matrix == null || matrix.length == 0) {
+      return matrix;
+    }
+    int width = matrix.length;
+    int height = matrix[0].length;
+
+    float[][] transposedMatrix = new float[height][width];
+
+    for (int x = 0; x < width; x++) {
+      for (int y = 0; y < height; y++) {
+        transposedMatrix[y][x] = matrix[x][y];
+      }
+    }
+    return transposedMatrix;
+  }
+
+  @Override
+  public int getDataSize() {
+    return data.length;
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5IntDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5IntDataWriter.java
new file mode 100644
index 0000000..221acb8
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5IntDataWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import java.util.List;
+
+public class HDF5IntDataWriter extends HDF5DataWriter {
+
+  private final int[] data;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5IntDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.readIntArray(datapath);
+
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  // This constructor is used when the data is part of a 2D array.  In this case the column name is provided in the constructor
+  public HDF5IntDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath, String fieldName, int currentColumn) {
+    super(reader, columnWriter, datapath, fieldName, currentColumn);
+    // Get dimensions
+    long[] dimensions = reader.object().getDataSetInformation(datapath).getDimensions();
+    int[][] tempData;
+    if (dimensions.length == 2) {
+      tempData = transpose(reader.readIntMatrix(datapath));
+    } else {
+      tempData = transpose(reader.int32().readMDArray(datapath).toMatrix());
+    }
+    data = tempData[currentColumn];
+
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  // This constructor is used for compound data types.
+  public HDF5IntDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String fieldName, List<Integer> tempListData) {
+    super(reader, columnWriter, null);
+    this.fieldName = fieldName;
+    data = new int[tempListData.size()];
+    for (int i = 0; i < tempListData.size(); i++) {
+      data[i] = tempListData.get(i);
+    }
+
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL);
+  }
+
+
+  public boolean write() {
+    if (counter > data.length) {
+      return false;
+    } else {
+      rowWriter.setInt(data[counter++]);
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+
+  /**
+   * Transposes the input matrix by flipping a matrix over its diagonal by switching the row and column
+   * indices of the matrix by producing another matrix.
+   * @param matrix The input matrix to be transposed
+   * @return The transposed matrix.
+   */
+  private int[][] transpose (int[][] matrix) {
+    if (matrix == null || matrix.length == 0) {
+      return matrix;
+    }
+    int width = matrix.length;
+    int height = matrix[0].length;
+
+    int[][] transposedMatrix = new int[height][width];
+
+    for (int x = 0; x < width; x++) {
+      for (int y = 0; y < height; y++) {
+        transposedMatrix[y][x] = matrix[x][y];
+      }
+    }
+    return transposedMatrix;
+  }
+
+  public int getDataSize() {
+    return data.length;
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5LongDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5LongDataWriter.java
new file mode 100644
index 0000000..fe9b05a
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5LongDataWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.util.List;
+
+public class HDF5LongDataWriter extends HDF5DataWriter {
+
+  private final long[] data;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5LongDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.readLongArray(datapath);
+
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL);
+
+  }
+
+  // This constructor is used when the data is part of a 2D array.  In this case the column name is provided in the constructor
+  public HDF5LongDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath, String fieldName, int currentColumn) {
+    super(reader, columnWriter, datapath, fieldName, currentColumn);
+    // Get dimensions
+    long[] dimensions = reader.object().getDataSetInformation(datapath).getDimensions();
+    long[][] tempData;
+    if (dimensions.length == 2) {
+      tempData = transpose(reader.readLongMatrix(datapath));
+    } else {
+      tempData = transpose(reader.int64().readMDArray(datapath).toMatrix());
+    }
+    data = tempData[currentColumn];
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public HDF5LongDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String fieldName, List<Long> tempListData) {
+    super(reader, columnWriter, null);
+    this.fieldName = fieldName;
+    data = new long[tempListData.size()];
+    for (int i = 0; i < tempListData.size(); i++) {
+      data[i] = tempListData.get(i);
+    }
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public boolean write() {
+    if (counter > data.length) {
+      return false;
+    } else {
+      rowWriter.setLong(data[counter++]);
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+
+  /**
+   * Transposes the input matrix by flipping a matrix over its diagonal by switching the row and column
+   * indices of the matrix by producing another matrix.
+   * @param matrix The input matrix to be transposed
+   * @return The transposed matrix.
+   */
+  private long[][] transpose (long[][] matrix) {
+    if (matrix == null || matrix.length == 0) {
+      return matrix;
+    }
+    int width = matrix.length;
+    int height = matrix[0].length;
+
+    long[][] transposedMatrix = new long[height][width];
+
+    for (int x = 0; x < width; x++) {
+      for (int y = 0; y < height; y++) {
+        transposedMatrix[y][x] = matrix[x][y];
+      }
+    }
+    return transposedMatrix;
+  }
+
+  public int getDataSize() {
+    return data.length;
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5MapDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5MapDataWriter.java
new file mode 100644
index 0000000..1d34597
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5MapDataWriter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HDF5MapDataWriter extends HDF5DataWriter {
+  private static final Logger logger = LoggerFactory.getLogger(HDF5MapDataWriter.class);
+
+  private final String UNSAFE_SPACE_SEPARATOR = " ";
+
+  private final String SAFE_SPACE_SEPARATOR = "_";
+
+  private final List<HDF5DataWriter> dataWriters;
+
+  private List<String> fieldNames;
+
+
+  public HDF5MapDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    fieldNames = new ArrayList<>();
+
+    compoundData = reader.compound().readArray(datapath, Object[].class);
+    dataWriters = new ArrayList<>();
+    fieldNames = getFieldNames();
+    try {
+      getDataWriters();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError()
+        .message("Error writing Compound Field: %s", e.getMessage())
+        .build(logger);
+    }
+  }
+
+  public boolean write() {
+    if (!hasNext()) {
+      return false;
+    } else {
+      // Loop through the columns and write the columns
+      columnWriter.start();
+      for (HDF5DataWriter dataWriter : dataWriters) {
+        dataWriter.write();
+      }
+      columnWriter.save();
+      counter++;
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < dataWriters.get(0).getDataSize();
+  }
+
+  private List<String> getFieldNames() {
+    List<String> names = new ArrayList<>();
+
+    HDF5CompoundMemberInformation[] infos = reader.compound().getDataSetInfo(datapath);
+    for (HDF5CompoundMemberInformation info : infos) {
+      names.add(info.getName().replace(UNSAFE_SPACE_SEPARATOR, SAFE_SPACE_SEPARATOR));
+    }
+    return names;
+  }
+
+  /**
+   * This function will populate the ArrayList of DataWriters. Since HDF5 Maps contain homogeneous columns,
+   * it is fine to get the first row, and iterate through the columns to get the data types and build the schema accordingly.
+   */
+  private void getDataWriters() {
+    List listData;
+
+    for (int col = 0; col < compoundData[0].length; col++) {
+      Object currentColumn = compoundData[0][col];
+      String dataType = currentColumn.getClass().getSimpleName();
+      listData = getColumn(col);
+
+      switch (dataType) {
+        case "Byte":
+        case "Short":
+        case "Integer":
+          dataWriters.add(new HDF5IntDataWriter(reader, columnWriter, fieldNames.get(col), listData));
+          break;
+        case "Long":
+          dataWriters.add(new HDF5LongDataWriter(reader, columnWriter, fieldNames.get(col), listData));
+          break;
+        case "Double":
+          dataWriters.add(new HDF5DoubleDataWriter(reader, columnWriter, fieldNames.get(col), listData));
+          break;
+        case "Float":
+          dataWriters.add(new HDF5FloatDataWriter(reader, columnWriter, fieldNames.get(col), listData));
+          break;
+        case "String":
+          dataWriters.add(new HDF5StringDataWriter(reader, columnWriter, fieldNames.get(col), listData));
+          break;
+        default:
+          // Log unknown data type
+          logger.warn("Drill cannot process data type {} in compound fields.", dataType);
+          break;
+      }
+    }
+  }
+
+  /**
+   * This function returns true if the data writer is a compound writer, false if not.
+   * @return boolean true if the data writer is a compound writer, false if not.
+   */
+  public boolean isCompound() {
+    return true;
+  }
+
+  @Override
+  public int getDataSize() {
+    return dataWriters.size();
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5StringDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5StringDataWriter.java
new file mode 100644
index 0000000..6414c24
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5StringDataWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class HDF5StringDataWriter extends HDF5DataWriter {
+
+  private String[] data;
+
+  private final List<String> listData;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5StringDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.readStringArray(datapath);
+    listData = Arrays.asList(data);
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public HDF5StringDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String fieldName, List<String> data) {
+    super(reader, columnWriter, null);
+    this.fieldName = fieldName;
+    this.listData = data;
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public boolean write() {
+    if (counter > listData.size()) {
+      return false;
+    } else {
+      rowWriter.setString(listData.get(counter++));
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+
+  @Override
+  public int getDataSize() {
+    return listData.size();
+  }
+}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5TimestampDataWriter.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5TimestampDataWriter.java
new file mode 100644
index 0000000..c342221
--- /dev/null
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/HDF5TimestampDataWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5.writers;
+
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.hdf5.HDF5Utils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+public class HDF5TimestampDataWriter extends HDF5DataWriter {
+
+  private final long[] data;
+
+  private final ScalarWriter rowWriter;
+
+  // This constructor is used when the data is a 1D column.  The column is inferred from the datapath
+  public HDF5TimestampDataWriter(IHDF5Reader reader, RowSetLoader columnWriter, String datapath) {
+    super(reader, columnWriter, datapath);
+    data = reader.time().readTimeStampArray(datapath);
+
+    fieldName = HDF5Utils.getNameFromPath(datapath);
+    rowWriter = makeWriter(columnWriter, fieldName, TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public boolean write() {
+    if (counter > data.length) {
+      return false;
+    } else {
+      rowWriter.setTimestamp(new Instant(data[counter++]));
+      return true;
+    }
+  }
+
+  public boolean hasNext() {
+    return counter < data.length;
+  }
+
+  @Override
+  public int getDataSize() {
+    return data.length;
+  }
+
+  // Datasets of greater than 1D are not supported for timestamps
+}
diff --git a/contrib/format-hdf5/src/main/resources/bootstrap-format-plugins.json b/contrib/format-hdf5/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..99f74c3
--- /dev/null
+++ b/contrib/format-hdf5/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,26 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "hdf5": {
+          "type": "hdf5",
+          "extensions": [
+            "h5"
+          ]
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "hdf5": {
+          "type": "hdf5",
+          "extensions": [
+            "h5"
+          ]
+        }
+      }
+    }
+  }
+}
diff --git a/contrib/format-hdf5/src/main/resources/drill-module.conf b/contrib/format-hdf5/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..2bb6136
--- /dev/null
+++ b/contrib/format-hdf5/src/main/resources/drill-module.conf
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning.packages += "org.apache.drill.exec.store.hdf5"
diff --git a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
new file mode 100644
index 0000000..ee11747
--- /dev/null
+++ b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
@@ -0,0 +1,938 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.dfs.ZipCodec;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@Category(RowSetTests.class)
+public class TestHDF5Format extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+
+    HDF5FormatConfig formatConfig = new HDF5FormatConfig();
+    cluster.defineFormat("dfs", "hdf5", formatConfig);
+    dirTestWatcher.copyResourceToRoot(Paths.get("hdf5/"));
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "SELECT path, data_type, file_name FROM dfs.`hdf5/dset.h5`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("path", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("data_type", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("file_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("/dset", "DATASET", "dset.h5")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    List<Integer> t1 = Arrays.asList(1, 2, 3, 4, 5, 6);
+    List<Integer> t2 = Arrays.asList(7, 8, 9, 10, 11, 12);
+    List<Integer> t3 = Arrays.asList(13, 14, 15, 16, 17, 18);
+    List<Integer> t4 = Arrays.asList(19, 20, 21, 22, 23, 24);
+    List<List<Integer>> finalList = new ArrayList<>();
+    finalList.add(t1);
+    finalList.add(t2);
+    finalList.add(t3);
+    finalList.add(t4);
+
+    testBuilder()
+      .sqlQuery("SELECT * FROM dfs.`hdf5/dset.h5`")
+      .unOrdered()
+      .baselineColumns("path", "data_type", "file_name", "int_data")
+      .baselineValues("/dset", "DATASET", "dset.h5", finalList)
+      .go();
+  }
+
+  @Test
+  public void testSimpleExplicitQuery() throws Exception {
+    List<Integer> t1 = Arrays.asList(1, 2, 3, 4, 5, 6);
+    List<Integer> t2 = Arrays.asList(7, 8, 9, 10, 11, 12);
+    List<Integer> t3 = Arrays.asList(13, 14, 15, 16, 17, 18);
+    List<Integer> t4 = Arrays.asList(19, 20, 21, 22, 23, 24);
+    List<List<Integer>> finalList = new ArrayList<>();
+    finalList.add(t1);
+    finalList.add(t2);
+    finalList.add(t3);
+    finalList.add(t4);
+
+    testBuilder()
+      .sqlQuery("SELECT path, data_type, file_name, int_data FROM dfs.`hdf5/dset.h5`")
+      .ordered()
+      .baselineColumns("path", "data_type", "file_name", "int_data")
+      .baselineValues("/dset", "DATASET", "dset.h5", finalList)
+      .go();
+  }
+
+  @Test
+  public void testFlattenColumnQuery() throws RpcException {
+    String sql = "SELECT data[0] AS col1,\n" +
+            "data[1] as col2,\n" +
+            "data[2] as col3\n" +
+            "FROM \n" +
+            "(\n" +
+            "SELECT FLATTEN(double_data) AS data \n" +
+            "FROM dfs.`hdf5/browsing.h5` WHERE path='/groupB/dmat'\n" +
+            ")";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("col1", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL)
+      .add("col2", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL)
+      .add("col3", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1.1, 2.2, 3.3)
+      .addRow(4.4, 5.5, 6.6)
+      .addRow(7.7, 8.8, 9.9)
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+ @Test
+  public void testFilterWithNonProjectedFieldQuery() throws Exception {
+    String sql = "SELECT `path` FROM dfs.`hdf5/browsing.h5` WHERE data_type='DATASET'";
+
+   RowSet results = client.queryBuilder().sql(sql).rowSet();
+   TupleMetadata expectedSchema = new SchemaBuilder()
+     .add("path", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+     .buildSchema();
+
+   RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+     .addRow("/groupA/date")
+     .addRow("/groupA/string")
+     .addRow("/groupB/dmat")
+     .addRow("/groupB/inarr")
+     .build();
+   new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFloat32ScalarQuery() throws Exception {
+    String sql = "SELECT flatten(float32) AS float_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/datatype/float32'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("float_col", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-3.4028234663852886E38)
+      .addRow(1.0)
+      .addRow(2.0)
+      .addRow(3.0)
+      .addRow(4.0)
+      .addRow(5.0)
+      .addRow(6.0)
+      .addRow(7.0)
+      .addRow(8.0)
+      .addRow(3.4028234663852886E38)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFlattenFloat32ScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/datatype/float32'))";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("float32", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-3.4028234663852886E38)
+      .addRow(1.0)
+      .addRow(2.0)
+      .addRow(3.0)
+      .addRow(4.0)
+      .addRow(5.0)
+      .addRow(6.0)
+      .addRow(7.0)
+      .addRow(8.0)
+      .addRow(3.4028234663852886E38)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFloat64ScalarQuery() throws Exception {
+    String sql = "SELECT flatten(float64) AS float_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/datatype/float64'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("float_col", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-1.7976931348623157E308)
+      .addRow(1.0)
+      .addRow(2.0)
+      .addRow(3.0)
+      .addRow(4.0)
+      .addRow(5.0)
+      .addRow(6.0)
+      .addRow(7.0)
+      .addRow(8.0)
+      .addRow(1.7976931348623157E308)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFlattenFloat64ScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/datatype/float64'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("float64", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-1.7976931348623157E308)
+      .addRow(1.0)
+      .addRow(2.0)
+      .addRow(3.0)
+      .addRow(4.0)
+      .addRow(5.0)
+      .addRow(6.0)
+      .addRow(7.0)
+      .addRow(8.0)
+      .addRow(1.7976931348623157E308)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testInt32ScalarQuery() throws Exception {
+    String sql = "SELECT flatten(int32) AS int_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/datatype/int32'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648)
+      .addRow(1)
+      .addRow(2)
+      .addRow(3)
+      .addRow(4)
+      .addRow(5)
+      .addRow(6)
+      .addRow(7)
+      .addRow(8)
+      .addRow(2147483647)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFlattenInt32ScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/datatype/int32'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int32", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648)
+      .addRow(1)
+      .addRow(2)
+      .addRow(3)
+      .addRow(4)
+      .addRow(5)
+      .addRow(6)
+      .addRow(7)
+      .addRow(8)
+      .addRow(2147483647)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testInt64ScalarQuery() throws Exception {
+    String sql = "SELECT flatten(int64) AS long_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/datatype/int64'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("long_col", TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-9223372036854775808L)
+      .addRow(1L)
+      .addRow(2L)
+      .addRow(3L)
+      .addRow(4L)
+      .addRow(5L)
+      .addRow(6L)
+      .addRow(7L)
+      .addRow(8L)
+      .addRow(9223372036854775807L)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+
+  }
+
+  @Test
+  public void testFlattenInt64ScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/datatype/int64'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int64", TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-9223372036854775808L)
+      .addRow(1L)
+      .addRow(2L)
+      .addRow(3L)
+      .addRow(4L)
+      .addRow(5L)
+      .addRow(6L)
+      .addRow(7L)
+      .addRow(8L)
+      .addRow(9223372036854775807L)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testStringScalarQuery() throws Exception {
+    String sql = "SELECT flatten(s10) AS string_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/datatype/s10'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("string_col", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("a         ")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("abcdefghij")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFlattenStringScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/datatype/s10'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("s10", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("a         ")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("abcdefghij")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+
+  @Test
+  public void testUnicodeScalarQuery() throws Exception {
+    String sql = "SELECT flatten(unicode) AS string_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/datatype/unicode'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("string_col", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("a")
+      .addRow("Ελληνικά")
+      .addRow("日本語")
+      .addRow("العربية")
+      .addRow("экземпляр")
+      .addRow("סקרן")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("abcdefghij")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testUnicodeFlattenScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/datatype/unicode'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("unicode", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("a")
+      .addRow("Ελληνικά")
+      .addRow("日本語")
+      .addRow("العربية")
+      .addRow("экземпляр")
+      .addRow("סקרן")
+      .addRow("")
+      .addRow("")
+      .addRow("")
+      .addRow("abcdefghij")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+
+  @Test
+  public void test1DScalarQuery() throws Exception {
+    String sql = "SELECT int_col FROM (SELECT FLATTEN(`1D`) AS int_col\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/nd/1D') WHERE int_col < 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648)
+      .addRow(1)
+      .addRow(2)
+      .addRow(3)
+      .addRow(4)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void test1DFlattenScalarQuery() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/nd/1D')) WHERE `1D` < 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("1D", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648)
+      .addRow(1)
+      .addRow(2)
+      .addRow(3)
+      .addRow(4)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+
+  @Test
+  public void test2DFlattenScalarQuery() throws Exception {
+    String sql = "SELECT int_col_0, int_col_1 FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/nd/2D'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col_0", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648, 1)
+      .addRow(10, 11)
+      .addRow(20, 21)
+      .addRow(30, 31)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void test2DScalarQuery() throws Exception {
+    String sql = "SELECT int_data[0] AS col1,\n" +
+      "int_data[1] AS col2\n" +
+      "FROM\n" +
+      "(\n" +
+      "SELECT flatten(int_data) AS int_data\n" +
+      "FROM dfs.`hdf5/scalar.h5`\n" +
+      "WHERE path='/nd/2D'\n" +
+      ") AS t1";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("col1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("col2", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648, 1)
+      .addRow(10, 11)
+      .addRow(20, 21)
+      .addRow(30, 31)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+
+  @Test
+  public void test3DScalarQuery() throws Exception {
+    String sql = "SELECT int_data[0] AS col1,\n" +
+      "int_data[1] AS col2\n" +
+      "FROM\n" +
+      "(\n" +
+      "SELECT flatten(int_data) AS int_data\n" +
+      "FROM dfs.`hdf5/scalar.h5`\n" +
+      "WHERE path='/nd/3D'\n" +
+      ") AS t1";
+
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("col1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("col2", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648, 1)
+      .addRow(2, 3)
+      .addRow(4, 5)
+      .addRow(6, 7)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void test3DFlattenScalarQuery() throws Exception {
+    String sql = "SELECT int_col_0, int_col_1 FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/nd/3D'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col_0", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648, 1)
+      .addRow(2, 3)
+      .addRow(4, 5)
+      .addRow(6, 7)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void test4DScalarQuery() throws Exception {
+    String sql = "SELECT int_data[0] AS col1,\n" +
+            "int_data[1] AS col2\n" +
+            "FROM\n" +
+            "(\n" +
+            "SELECT flatten(int_data) AS int_data\n" +
+            "FROM dfs.`hdf5/scalar.h5`\n" +
+            "WHERE path='/nd/4D'\n" +
+            ") AS t1";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("col1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("col2", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648, 1)
+      .addRow(2, 3)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void test4DFlattenScalarQuery() throws Exception {
+    String sql = "SELECT int_col_0, int_col_1 FROM table(dfs.`hdf5/scalar.h5` (type => 'hdf5', defaultPath => '/nd/4D')) WHERE int_col_0 <= 2";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col_0", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(-2147483648, 1)
+      .addRow(2, 3)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testNonScalarIntQuery() throws Exception {
+    String sql = "SELECT field_1 FROM( SELECT flatten(t1.compound_data.`field 1`) as field_1\n" +
+            "FROM dfs.`hdf5/non-scalar.h5` AS t1) WHERE field_1 < 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(0)
+      .addRow(1)
+      .addRow(2)
+      .addRow(3)
+      .addRow(4)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+  @Test
+  public void testNonScalarFloatQuery() throws Exception {
+    String sql = "SELECT field_2 FROM (SELECT flatten(t1.compound_data.`field 2`) as field_2\n" +
+            "FROM dfs.`hdf5/non-scalar.h5` AS t1) WHERE field_2 < 5.0";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field_2", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(0.0)
+      .addRow(1.0)
+      .addRow(2.0)
+      .addRow(3.0)
+      .addRow(4.0)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+ @Test
+  public void testNonScalarStringQuery() throws Exception {
+    String sql = "SELECT field_3 FROM (SELECT flatten(t1.compound_data.`field 3`) as field_3\n" +
+            "FROM dfs.`hdf5/non-scalar.h5` AS t1) WHERE CAST(field_3 AS INTEGER) < 5 ";
+
+   RowSet results = client.queryBuilder().sql(sql).rowSet();
+   TupleMetadata expectedSchema = new SchemaBuilder()
+     .add("field_3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+     .buildSchema();
+
+   RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+     .addRow("0")
+     .addRow("1")
+     .addRow("2")
+     .addRow("3")
+     .addRow("4")
+     .build();
+
+   new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testAttributes() throws Exception {
+    String sql = "SELECT path, file_name\n" +
+            "FROM dfs.`hdf5/browsing.h5` AS t1 WHERE t1.attributes.`important` = false";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("path", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("file_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("/groupB", "browsing.h5")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testStarProjectDatasetQuery() throws Exception {
+    String sql = "SELECT * \n"+
+      "FROM \n" +
+      "table(dfs.`hdf5/dset.h5` (type => 'hdf5', defaultPath => '/dset'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col_0", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_2", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_3", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_4", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_5", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1,2,3,4,5,6)
+      .addRow(7,8,9,10,11,12)
+      .addRow(13,14,15,16,17,18)
+      .addRow(19,20,21,22,23,24)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicitProjectDatasetQuery() throws Exception {
+    String sql = "SELECT int_col_0, int_col_1, int_col_2, int_col_3, int_col_4\n"+
+      "FROM \n" +
+      "table(dfs.`hdf5/dset.h5` (type => 'hdf5', defaultPath => '/dset'))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("int_col_0", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_2", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_3", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("int_col_4", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1,2,3,4,5)
+      .addRow(7,8,9,10,11)
+      .addRow(13,14,15,16,17)
+      .addRow(19,20,21,22,23)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testCompoundStarQuery() throws Exception {
+
+    String sql = "SELECT * FROM table(dfs.`hdf5/non-scalar.h5` (type => 'hdf5', defaultPath => '/compound')) WHERE field_1 < 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("field_2", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL)
+      .add("field_3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(0, 0.0, "0")
+      .addRow(1, 1.0, "1")
+      .addRow(2, 2.0, "2")
+      .addRow(3, 3.0, "3")
+      .addRow(4, 4.0, "4")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testCompoundExplicitQuery() throws Exception {
+
+    String sql = "SELECT `field_1`, `field_3` FROM table(dfs.`hdf5/non-scalar.h5` (type => 'hdf5', defaultPath => '/compound')) WHERE field_1 < 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("field_3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(0, "0")
+      .addRow(1, "1")
+      .addRow(2, "2")
+      .addRow(3, "3")
+      .addRow(4, "4")
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testCompoundExplicitQuery2() throws Exception {
+
+    String sql = "SELECT `field_1` FROM table(dfs.`hdf5/non-scalar.h5` (type => 'hdf5', defaultPath => '/compound')) WHERE field_1 < 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(0)
+      .addRow(1)
+      .addRow(2)
+      .addRow(3)
+      .addRow(4)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) FROM dfs.`hdf5/dset.h5`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match",1L, cnt);
+  }
+
+  @Test
+  public void testExplicitQueryWithCompressedFile() throws Exception {
+    generateCompressedFile("hdf5/dset.h5", "zip", "hdf5/dset.h5.zip" );
+
+    String sql = "SELECT path, data_type, file_name FROM dfs.`hdf5/dset.h5.zip`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("path", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("data_type", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("file_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("/dset", "DATASET", "dset.h5.zip")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testInlineSchema() throws Exception {
+
+    String sql = "SELECT * FROM table(dfs.`hdf5/non-scalar.h5` (type => 'hdf5', defaultPath => '/compound', schema => 'inline=(field_1 int not null, field_2 double not null, " +
+      "field_3 varchar not null, fixed_field int not null default `20`)')) WHERE field_1 < 5";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    // Verify that the returned data used the schema.
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field_1", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .add("field_2", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .add("field_3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add("fixed_field", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(0, 0.0, "0", 20)
+      .addRow(1, 1.0, "1", 20)
+      .addRow(2, 2.0, "2", 20)
+      .addRow(3, 3.0, "3", 20)
+      .addRow(4, 4.0, "4", 20)
+      .build();
+
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  /**
+   * Generates a compressed file for testing
+   * @param fileName the input file to be compressed
+   * @param codecName the CODEC to be used for compression
+   * @param outFileName the output file name
+   * @throws IOException Throws IO exception if the file cannot be found or any other IO error
+   */
+  private void generateCompressedFile(String fileName, String codecName, String outFileName) throws IOException {
+    FileSystem fs = ExecTest.getLocalFileSystem();
+    Configuration conf = fs.getConf();
+    conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, ZipCodec.class.getCanonicalName());
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+
+    CompressionCodec codec = factory.getCodecByName(codecName);
+    assertNotNull(codecName + " is not found", codec);
+
+    Path outFile = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), outFileName);
+    Path inFile = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), fileName);
+
+    try (InputStream inputStream = new FileInputStream(inFile.toUri().toString());
+         OutputStream outputStream = codec.createOutputStream(fs.create(outFile))) {
+      IOUtils.copyBytes(inputStream, outputStream, fs.getConf(), false);
+    }
+  }
+}
diff --git a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Utils.java b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Utils.java
new file mode 100644
index 0000000..6e09b28
--- /dev/null
+++ b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Utils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import org.apache.drill.test.BaseTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestHDF5Utils extends BaseTest {
+
+  @Test
+  public void testGetNameFromPath() {
+    String path1 = "/group1";
+    assertEquals(HDF5Utils.getNameFromPath(path1), "group1");
+  }
+
+  @Test
+  public void testMultiplePath() {
+    String path2 = "/group1/group2/group3";
+    assertEquals(HDF5Utils.getNameFromPath(path2), "group3");
+  }
+
+  @Test
+  public void testEmptyPath() {
+    String emptyPath = "";
+    assertEquals(HDF5Utils.getNameFromPath(emptyPath), "");
+  }
+
+  @Test
+  public void testNullPath() {
+    String nullPath = null;
+    assertNull(HDF5Utils.getNameFromPath(nullPath));
+  }
+
+  @Test
+  public void testRootPath() {
+    String rootPath = "/";
+    assertEquals(HDF5Utils.getNameFromPath(rootPath), "");
+  }
+}
diff --git a/contrib/format-hdf5/src/test/resources/hdf5/browsing.h5 b/contrib/format-hdf5/src/test/resources/hdf5/browsing.h5
new file mode 100644
index 0000000..59af932
--- /dev/null
+++ b/contrib/format-hdf5/src/test/resources/hdf5/browsing.h5
Binary files differ
diff --git a/contrib/format-hdf5/src/test/resources/hdf5/dset.h5 b/contrib/format-hdf5/src/test/resources/hdf5/dset.h5
new file mode 100644
index 0000000..f10b959
--- /dev/null
+++ b/contrib/format-hdf5/src/test/resources/hdf5/dset.h5
Binary files differ
diff --git a/contrib/format-hdf5/src/test/resources/hdf5/non-scalar.h5 b/contrib/format-hdf5/src/test/resources/hdf5/non-scalar.h5
new file mode 100644
index 0000000..8886ca0
--- /dev/null
+++ b/contrib/format-hdf5/src/test/resources/hdf5/non-scalar.h5
Binary files differ
diff --git a/contrib/format-hdf5/src/test/resources/hdf5/scalar.h5 b/contrib/format-hdf5/src/test/resources/hdf5/scalar.h5
new file mode 100644
index 0000000..2a8429a
--- /dev/null
+++ b/contrib/format-hdf5/src/test/resources/hdf5/scalar.h5
Binary files differ
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 5a51bb8..5f2186f 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -1034,7 +1034,7 @@
       "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000"
       "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014"
       "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022"
-      "\032\n\026CANCELLATION_REQUESTED\020\006*\321\n\n\020CoreOper"
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper"
       "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST"
       "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020"
       "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH"
@@ -1066,16 +1066,17 @@
       "N\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209"
       "\022\023\n\017SYSLOG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGR"
       "EGATE\020;\022\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_"
-      "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\022\n\016EXCEL_SUB_"
-      "SCAN\020@\022\020\n\014SHP_SUB_SCAN\020A\022\024\n\020METADATA_HAN"
-      "DLER\020B\022\027\n\023METADATA_CONTROLLER\020C*g\n\nSaslS"
-      "tatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022"
-      "\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022"
-      "\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.exe"
-      "c.protoB\rUserBitSharedH\001"
+      "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S"
+      "CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA"
+      "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO"
+      "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN"
+      "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002"
+      "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o"
+      "rg.apache.drill.exec.protoB\rUserBitShare"
+      "dH\001"
   };
   ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
-      descriptor, 5744);
+      descriptor, 5763);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "UserBitShared.proto", &protobuf_RegisterTypes);
   ::protobuf_Types_2eproto::AddDescriptors();
@@ -1317,6 +1318,7 @@
     case 60:
     case 61:
     case 62:
+    case 63:
     case 64:
     case 65:
     case 66:
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index aba2e75..0aa41cf 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -354,6 +354,7 @@
   UNPIVOT_MAPS = 60,
   STATISTICS_MERGE = 61,
   LTSV_SUB_SCAN = 62,
+  HDF5_SUB_SCAN = 63,
   EXCEL_SUB_SCAN = 64,
   SHP_SUB_SCAN = 65,
   METADATA_HANDLER = 66,
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 93a8553..2478cf3 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -45,6 +45,7 @@
     <module>format-ltsv</module>
     <module>format-excel</module>
     <module>format-esri</module>
+    <module>format-hdf5</module>
     <module>storage-hive</module>
     <module>storage-mongo</module>
     <module>storage-jdbc</module>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index b6e3484..e20abd5 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -334,6 +334,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-format-hdf5</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-format-ltsv</artifactId>
           <version>${project.version}</version>
         </dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 5dabc89..2eb50fd 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -43,6 +43,7 @@
         <include>org.apache.drill.contrib:drill-format-mapr:jar</include>
         <include>org.apache.drill.contrib:drill-format-syslog:jar</include>
         <include>org.apache.drill.contrib:drill-format-esri:jar</include>
+        <include>org.apache.drill.contrib:drill-format-hdf5:jar</include>
         <include>org.apache.drill.contrib:drill-format-ltsv:jar</include>
         <include>org.apache.drill.contrib:drill-format-excel:jar</include>
         <include>org.apache.drill.contrib:drill-jdbc-storage:jar</include>
diff --git a/pom.xml b/pom.xml
index 0ca1200..adc50dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -188,6 +188,12 @@
     </repository>
 
     <repository>
+      <id>jhdf5-repo</id>
+      <name>ImageJ Repository</name>
+      <url>https://maven.scijava.org/content/repositories/public/</url>
+    </repository>
+
+    <repository>
       <id>jitpack.io</id>
       <url>https://jitpack.io</url>
     </repository>
@@ -330,6 +336,7 @@
             <!-- Types log1, log2, sqllog and sqllog2, ssdlog are used to test he logRegex format plugin. -->
             <exclude>**/*.log1</exclude>
             <exclude>**/*.log2</exclude>
+            <exclude>**/*.h5</exclude>
             <exclude>**/*.sqllog</exclude>
             <exclude>**/*.sqllog2</exclude>
             <exclude>**/*.syslog</exclude>
@@ -615,6 +622,7 @@
               <exclude>**/*.woff2</exclude>
               <exclude>**/*.ks</exclude>
               <exclude>**/*.pcap</exclude>
+              <exclude>**/*.h5</exclude>
               <exclude>**/*.syslog</exclude>
               <exclude>**/*.xls</exclude>
               <exclude>**/*.xlsx</exclude>
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 25d5f89..2583595 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -648,6 +648,10 @@
      */
     LTSV_SUB_SCAN(62),
     /**
+     * <code>HDF5_SUB_SCAN = 63;</code>
+     */
+    HDF5_SUB_SCAN(63),
+    /**
      * <code>EXCEL_SUB_SCAN = 64;</code>
      */
     EXCEL_SUB_SCAN(64),
@@ -918,6 +922,10 @@
      */
     public static final int LTSV_SUB_SCAN_VALUE = 62;
     /**
+     * <code>HDF5_SUB_SCAN = 63;</code>
+     */
+    public static final int HDF5_SUB_SCAN_VALUE = 63;
+    /**
      * <code>EXCEL_SUB_SCAN = 64;</code>
      */
     public static final int EXCEL_SUB_SCAN_VALUE = 64;
@@ -1012,6 +1020,7 @@
         case 60: return UNPIVOT_MAPS;
         case 61: return STATISTICS_MERGE;
         case 62: return LTSV_SUB_SCAN;
+        case 63: return HDF5_SUB_SCAN;
         case 64: return EXCEL_SUB_SCAN;
         case 65: return SHP_SUB_SCAN;
         case 66: return METADATA_HANDLER;
@@ -27915,7 +27924,7 @@
       "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
       "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" +
       "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
-      "\032\n\026CANCELLATION_REQUESTED\020\006*\321\n\n\020CoreOper" +
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper" +
       "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
       "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
       "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
@@ -27947,13 +27956,14 @@
       "N\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209" +
       "\022\023\n\017SYSLOG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGR" +
       "EGATE\020;\022\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_" +
-      "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\022\n\016EXCEL_SUB_" +
-      "SCAN\020@\022\020\n\014SHP_SUB_SCAN\020A\022\024\n\020METADATA_HAN" +
-      "DLER\020B\022\027\n\023METADATA_CONTROLLER\020C*g\n\nSaslS" +
-      "tatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022" +
-      "\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022" +
-      "\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.exe" +
-      "c.protoB\rUserBitSharedH\001"
+      "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S" +
+      "CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" +
+      "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" +
+      "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN" +
+      "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002" +
+      "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o" +
+      "rg.apache.drill.exec.protoB\rUserBitShare" +
+      "dH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
         new com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 64eeec2..b31837e 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -356,6 +356,7 @@
   UNPIVOT_MAPS = 60;
   STATISTICS_MERGE = 61;
   LTSV_SUB_SCAN = 62;
+  HDF5_SUB_SCAN = 63;
   EXCEL_SUB_SCAN = 64;
   SHP_SUB_SCAN = 65;
   METADATA_HANDLER = 66;