DRILL-7578: HDF5 Metadata Queries Fail with Large Files
diff --git a/contrib/format-hdf5/README.md b/contrib/format-hdf5/README.md
index 6f6dcdd..d80c836 100644
--- a/contrib/format-hdf5/README.md
+++ b/contrib/format-hdf5/README.md
@@ -26,11 +26,11 @@
 Since HDF5 can be viewed as a file system within a file, a single file can contain many datasets. For instance, if you have a simple HDF5 file, a star query will produce the following result:
 ```
 apache drill> select * from dfs.test.`dset.h5`;
-+-------+-----------+-----------+--------------------------------------------------------------------------+
-| path  | data_type | file_name |                                 int_data                                 |
-+-------+-----------+-----------+--------------------------------------------------------------------------+
-| /dset | DATASET   | dset.h5   | [[1,2,3,4,5,6],[7,8,9,10,11,12],[13,14,15,16,17,18],[19,20,21,22,23,24]] |
-+-------+-----------+-----------+--------------------------------------------------------------------------+
++-------+-----------+-----------+-----------+---------------+--------------+------------------+-------------------+------------+--------------------------------------------------------------------------+
+| path  | data_type | file_name | data_size | element_count | is_timestamp | is_time_duration | dataset_data_type | dimensions |                                 int_data                                 |
++-------+-----------+-----------+-----------+---------------+--------------+------------------+-------------------+------------+--------------------------------------------------------------------------+
+| /dset | DATASET   | dset.h5   | 96        | 24            | false        | false            | INTEGER           | [4, 6]     | [[1,2,3,4,5,6],[7,8,9,10,11,12],[13,14,15,16,17,18],[19,20,21,22,23,24]] |
++-------+-----------+-----------+-----------+---------------+--------------+------------------+-------------------+------------+--------------------------------------------------------------------------+
 ```
 The actual data in this file is mapped to a column called int_data. In order to effectively access the data, you should use Drill's `FLATTEN()` function on the `int_data` column, which produces the following result.
 
@@ -69,6 +69,8 @@
  
  ** Note: Once you have determined which data set you are querying, it is advisable to use this method to query HDF5 data. **
  
+ ** Note: Datasets larger that 16MB will be truncated in the metadata view. **
+ 
  You can set the `defaultPath` variable in either the plugin configuration, or at query time using the `table()` function as shown in the example below:
  
  ```
@@ -123,12 +125,14 @@
 +---------+-----------+-------------+
 ```
 
-### Known Limitations
+### Limitations
 There are several limitations with the HDF5 format plugin in Drill.
 * Drill cannot read unsigned 64 bit integers. When the plugin encounters this data type, it will write an INFO message to the log.
 * While Drill can read compressed HDF5 files, Drill cannot read individual compressed fields within an HDF5 file.
 * HDF5 files can contain nested data sets of up to `n` dimensions. Since Drill works best with two dimensional data, datasets with more than two dimensions are reduced to 2
  dimensions.
+ * HDF5 has a `COMPOUND` data type. At present, Drill supports reading `COMPOUND` data types that contain multiple datasets. At present Drill does not support `COMPOUND` fields
+  with multidimesnional columns. Drill will ignore multidimensional columns within `COMPOUND` fields.
  
  [1]: https://en.wikipedia.org/wiki/Hierarchical_Data_Format
  [2]: https://www.hdfgroup.org
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 277d4dd..2ea26d3 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -45,6 +45,7 @@
 import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
 import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
 import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
@@ -61,6 +62,7 @@
 import java.io.InputStreamReader;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.BitSet;
 import java.util.Iterator;
@@ -92,6 +94,18 @@
 
   private static final String LONG_COLUMN_NAME = "long_data";
 
+  private static final String DATA_SIZE_COLUMN_NAME = "data_size";
+
+  private static final String ELEMENT_COUNT_NAME = "element_count";
+
+  private static final String DATASET_DATA_TYPE_NAME = "dataset_data_type";
+
+  private static final String DIMENSIONS_FIELD_NAME = "dimensions";
+
+  private static final int PREVIEW_ROW_LIMIT = 20;
+
+  private static final int MAX_DATASET_SIZE = ValueVector.MAX_BUFFER_SIZE;
+
   private final HDF5ReaderConfig readerConfig;
 
   private final List<HDF5DataWriter> dataWriters;
@@ -114,6 +128,14 @@
 
   private ScalarWriter fileNameWriter;
 
+  private ScalarWriter dataSizeWriter;
+
+  private ScalarWriter elementCountWriter;
+
+  private ScalarWriter datasetTypeWriter;
+
+  private ScalarWriter dimensionsWriter;
+
   private long[] dimensions;
 
   public static class HDF5ReaderConfig {
@@ -161,7 +183,11 @@
       SchemaBuilder builder = new SchemaBuilder()
         .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
         .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR);
+        .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
+        .addNullable(DATA_SIZE_COLUMN_NAME,  TypeProtos.MinorType.BIGINT )
+        .addNullable(ELEMENT_COUNT_NAME,  TypeProtos.MinorType.BIGINT)
+        .addNullable(DATASET_DATA_TYPE_NAME, TypeProtos.MinorType.VARCHAR)
+        .addNullable(DIMENSIONS_FIELD_NAME, TypeProtos.MinorType.VARCHAR);
 
       negotiator.setTableSchema(builder.buildSchema(), false);
 
@@ -191,6 +217,10 @@
       pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
       dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
       fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+      dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
+      elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
+      datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
+      dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
     }
     return true;
   }
@@ -389,6 +419,15 @@
     }
 
     if (metadataRow.getDataType().equalsIgnoreCase("DATASET")) {
+
+      HDF5DataSetInformation dsInfo = hdf5Reader.object().getDataSetInformation(metadataRow.getPath());
+
+      // Project Dataset Metadata
+      dataSizeWriter.setLong(dsInfo.getSize());
+      elementCountWriter.setLong(dsInfo.getNumberOfElements());
+      datasetTypeWriter.setString(dsInfo.getTypeInformation().getDataClass().name());
+      dimensionsWriter.setString(Arrays.toString(dsInfo.getDimensions()));
+
       projectDataset(rowWriter, metadataRow.getPath());
     }
     rowWriter.save();
@@ -458,7 +497,7 @@
 
   /**
    * This function writes one row of data in a metadata query. The number of dimensions here is n+1. So if the actual dataset is a 1D column, it will be written as a list.
-   * This is function is only called in metadata queries as the schema is not known in advance.
+   * This is function is only called in metadata queries as the schema is not known in advance. If the datasize is greater than 16MB, the function does not project the dataset
    *
    * @param rowWriter The rowWriter to which the data will be written
    * @param datapath The datapath from which the data will be read
@@ -467,6 +506,12 @@
     String fieldName = HDF5Utils.getNameFromPath(datapath);
     IHDF5Reader reader = hdf5Reader;
     HDF5DataSetInformation dsInfo = reader.object().getDataSetInformation(datapath);
+
+    // If the dataset is larger than 16MB, do not project the dataset
+    if (dsInfo.getSize() > MAX_DATASET_SIZE) {
+      logger.warn("Dataset {} is greater than 16MB.  Data will be truncated in Metadata view.", datapath);
+    }
+
     long[] dimensions = dsInfo.getDimensions();
     //Case for single dimensional data
     if (dimensions.length <= 1) {
@@ -628,8 +673,9 @@
     }
 
     ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
-    for (int value : list) {
-      arrayWriter.setInt(value);
+    int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
+      arrayWriter.setInt(list[i]);
     }
   }
 
@@ -671,7 +717,8 @@
     // The strings within the inner array
     ScalarWriter intWriter = innerWriter.scalar();
 
-    for (int i = 0; i < rows; i++) {
+    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
       for (int k = 0; k < cols; k++) {
         intWriter.setInt(colData[i][k]);
       }
@@ -692,8 +739,9 @@
     }
 
     ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
-    for (long l : list) {
-      arrayWriter.setLong(l);
+    int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
+      arrayWriter.setLong(list[i]);
     }
   }
 
@@ -710,8 +758,9 @@
     }
 
     ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
-    for (String s : list) {
-      arrayWriter.setString(s);
+    int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
+      arrayWriter.setString(list[i]);
     }
   }
 
@@ -728,8 +777,9 @@
     }
 
     ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
-    for (double v : list) {
-      arrayWriter.setDouble(v);
+    int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
+      arrayWriter.setDouble(list[i]);
     }
   }
 
@@ -746,8 +796,9 @@
     }
 
     ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
-    for (float v : list) {
-      arrayWriter.setDouble(v);
+    int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
+      arrayWriter.setDouble(list[i]);
     }
   }
 
@@ -788,7 +839,8 @@
     ArrayWriter innerWriter = listWriter.array();
     // The strings within the inner array
     ScalarWriter floatWriter = innerWriter.scalar();
-    for (int i = 0; i < rows; i++) {
+    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
       for (int k = 0; k < cols; k++) {
         floatWriter.setDouble(colData[i][k]);
       }
@@ -834,7 +886,8 @@
     // The strings within the inner array
     ScalarWriter floatWriter = innerWriter.scalar();
 
-    for (int i = 0; i < rows; i++) {
+    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
       for (int k = 0; k < cols; k++) {
         floatWriter.setDouble(colData[i][k]);
       }
@@ -880,7 +933,8 @@
     // The strings within the inner array
     ScalarWriter bigintWriter = innerWriter.scalar();
 
-    for (int i = 0; i < rows; i++) {
+    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    for (int i = 0; i < maxElements; i++) {
       for (int k = 0; k < cols; k++) {
         bigintWriter.setLong(colData[i][k]);
       }
@@ -946,7 +1000,11 @@
           writeLongColumn(mapWriter, key, (Long) attrib.getValue());
           break;
         case INT:
-          writeIntColumn(mapWriter, key, (Integer) attrib.getValue());
+          //try {
+            writeIntColumn(mapWriter, key, (Integer) attrib.getValue());
+          //} catch (Exception e) {
+           // logger.warn("{} {}", key, attrib);
+          //}
           break;
         case FLOAT8:
           writeFloat8Column(mapWriter, key, (Double) attrib.getValue());
@@ -994,6 +1052,7 @@
         }
       }
 
+
       // Case for auto-flatten
       if (readerConfig.defaultPath != null) {
         for (int row = 0; row < values.length; row++) {
@@ -1032,11 +1091,11 @@
 
           SchemaBuilder innerSchema = new SchemaBuilder();
           MapBuilder mapBuilder = innerSchema.addMap(COMPOUND_DATA_FIELD_NAME);
-
           for (HDF5CompoundMemberInformation info : infos) {
             fieldNames.add(info.getName());
+            String compoundColumnDataType = info.getType().tryGetJavaType().getSimpleName();
 
-            switch (info.getType().tryGetJavaType().getSimpleName()) {
+            switch (compoundColumnDataType) {
               case "int":
                 mapBuilder.add(info.getName(), TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
                 break;
@@ -1069,26 +1128,32 @@
           for (int col = 0; col < values[row].length; col++) {
             assert fieldNames != null;
             currentFieldName = fieldNames.get(col);
-            ArrayWriter innerWriter = listWriter.array(currentFieldName);
-            if (values[row][col] instanceof Integer) {
-              innerWriter.scalar().setInt((Integer) values[row][col]);
-            } else if (values[row][col] instanceof Short) {
-              innerWriter.scalar().setInt((Short) values[row][col]);
-            } else if (values[row][col] instanceof Byte) {
-              innerWriter.scalar().setInt((Byte) values[row][col]);
-            } else if (values[row][col] instanceof Long) {
-              innerWriter.scalar().setLong((Long) values[row][col]);
-            } else if (values[row][col] instanceof Float) {
-              innerWriter.scalar().setDouble((Float) values[row][col]);
-            } else if (values[row][col] instanceof Double) {
-              innerWriter.scalar().setDouble((Double) values[row][col]);
-            } else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) {
-              innerWriter.scalar().setBoolean((Boolean) values[row][col]);
-            } else if (values[row][col] instanceof String) {
-              innerWriter.scalar().setString((String) values[row][col]);
-            }
-            if (col == values[row].length) {
-              innerWriter.save();
+            try {
+              ArrayWriter innerWriter = listWriter.array(currentFieldName);
+              if (values[row][col] instanceof Integer) {
+                innerWriter.scalar().setInt((Integer) values[row][col]);
+              } else if (values[row][col] instanceof Short) {
+                innerWriter.scalar().setInt((Short) values[row][col]);
+              } else if (values[row][col] instanceof Byte) {
+                innerWriter.scalar().setInt((Byte) values[row][col]);
+              } else if (values[row][col] instanceof Long) {
+                innerWriter.scalar().setLong((Long) values[row][col]);
+              } else if (values[row][col] instanceof Float) {
+                innerWriter.scalar().setDouble((Float) values[row][col]);
+              } else if (values[row][col] instanceof Double) {
+                innerWriter.scalar().setDouble((Double) values[row][col]);
+              } else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) {
+                innerWriter.scalar().setBoolean((Boolean) values[row][col]);
+              } else if (values[row][col] instanceof String) {
+                innerWriter.scalar().setString((String) values[row][col]);
+              } else {
+                logger.warn("Skipping {}/{} due to unsupported data type.", resolvedPath, currentFieldName);
+              }
+              if (col == values[row].length) {
+                innerWriter.save();
+              }
+            } catch (TupleWriter.UndefinedColumnException e) {
+              logger.warn("Drill does not support maps and lists in HDF5 Compound fields. Skipping: {}/{}", resolvedPath, currentFieldName);
             }
           }
         }
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java
index 87d013d..770a248 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatConfig.java
@@ -43,6 +43,7 @@
     return defaultPath;
   }
 
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
index ee11747..39d778e 100644
--- a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
+++ b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
@@ -99,8 +99,8 @@
     testBuilder()
       .sqlQuery("SELECT * FROM dfs.`hdf5/dset.h5`")
       .unOrdered()
-      .baselineColumns("path", "data_type", "file_name", "int_data")
-      .baselineValues("/dset", "DATASET", "dset.h5", finalList)
+      .baselineColumns("path", "data_type", "file_name", "data_size", "element_count", "dataset_data_type", "dimensions", "int_data")
+      .baselineValues("/dset", "DATASET", "dset.h5", 96L, 24L, "INTEGER", "[4, 6]", finalList)
       .go();
   }
 
@@ -118,7 +118,7 @@
 
     testBuilder()
       .sqlQuery("SELECT path, data_type, file_name, int_data FROM dfs.`hdf5/dset.h5`")
-      .ordered()
+      .unOrdered()
       .baselineColumns("path", "data_type", "file_name", "int_data")
       .baselineValues("/dset", "DATASET", "dset.h5", finalList)
       .go();