DRILL-8188: Convert HDF5 format to EVF2 (#2515)
* Bump JHDF to latest version
---------
Co-authored-by: cgivre <cgivre@gmail.com>
diff --git a/contrib/format-hdf5/pom.xml b/contrib/format-hdf5/pom.xml
index bb0309a..b168162 100644
--- a/contrib/format-hdf5/pom.xml
+++ b/contrib/format-hdf5/pom.xml
@@ -29,7 +29,7 @@
<artifactId>drill-format-hdf5</artifactId>
<name>Drill : Contrib : Format : HDF5</name>
-
+
<dependencies>
<dependency>
<groupId>org.apache.drill.exec</groupId>
@@ -39,7 +39,7 @@
<dependency>
<groupId>io.jhdf</groupId>
<artifactId>jhdf</artifactId>
- <version>0.6.2</version>
+ <version>0.6.10</version>
</dependency>
<!-- Test dependencies -->
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 75c6f6d..67e2755 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -18,53 +18,8 @@
package org.apache.drill.exec.store.hdf5;
-import io.jhdf.HdfFile;
-import io.jhdf.api.Attribute;
-import io.jhdf.api.Dataset;
-import io.jhdf.api.Group;
-import io.jhdf.api.Node;
-import io.jhdf.exceptions.HdfException;
-import io.jhdf.links.SoftLink;
-import io.jhdf.object.datatype.CompoundDataType;
-import io.jhdf.object.datatype.CompoundDataType.CompoundDataMember;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.CustomErrorContext;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.MapBuilder;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
-import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.store.hdf5.writers.WriterSpec;
-import org.apache.drill.exec.vector.accessor.ArrayWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.TupleWriter;
-
-import org.apache.hadoop.mapred.FileSplit;
-import org.jvnet.libpam.impl.CLibrary.group;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -74,7 +29,50 @@
import java.util.List;
import java.util.Map;
-public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.WriterSpec;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.jhdf.HdfFile;
+import io.jhdf.api.Attribute;
+import io.jhdf.api.Dataset;
+import io.jhdf.api.Group;
+import io.jhdf.api.Node;
+import io.jhdf.exceptions.HdfException;
+import io.jhdf.links.SoftLink;
+import io.jhdf.object.datatype.CompoundDataType;
+import io.jhdf.object.datatype.CompoundDataType.CompoundDataMember;
+
+public class HDF5BatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(HDF5BatchReader.class);
private static final String PATH_COLUMN_NAME = "path";
@@ -119,49 +117,44 @@
private final List<HDF5DataWriter> dataWriters;
- private final int maxRecords;
+ private final String fileName;
- private String fileName;
+ private final FileDescrip file;
- private FileSplit split;
+ private final HdfFile hdfFile;
- private HdfFile hdfFile;
+ private final RowSetLoader rowWriter;
- private BufferedReader reader;
+ private final WriterSpec writerSpec;
- private RowSetLoader rowWriter;
+ private final Iterator<HDF5DrillMetadata> metadataIterator;
- private WriterSpec writerSpec;
+ private final ScalarWriter pathWriter;
- private Iterator<HDF5DrillMetadata> metadataIterator;
+ private final ScalarWriter dataTypeWriter;
- private ScalarWriter pathWriter;
+ private final ScalarWriter fileNameWriter;
- private ScalarWriter dataTypeWriter;
+ private final ScalarWriter linkWriter;
- private ScalarWriter fileNameWriter;
+ private final ScalarWriter dataSizeWriter;
- private ScalarWriter linkWriter;
+ private final ScalarWriter elementCountWriter;
- private ScalarWriter dataSizeWriter;
+ private final ScalarWriter datasetTypeWriter;
- private ScalarWriter elementCountWriter;
+ private final ScalarWriter dimensionsWriter;
- private ScalarWriter datasetTypeWriter;
+ private final CustomErrorContext errorContext;
- private ScalarWriter dimensionsWriter;
+ private final boolean showMetadataPreview;
- private CustomErrorContext errorContext;
-
- private boolean showMetadataPreview;
-
- private int[] dimensions;
+ private final int[] dimensions;
public static class HDF5ReaderConfig {
+
final HDF5FormatPlugin plugin;
-
final String defaultPath;
-
final HDF5FormatConfig formatConfig;
public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) {
@@ -171,107 +164,104 @@
}
}
- public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
- this.readerConfig = readerConfig;
- this.maxRecords = maxRecords;
- dataWriters = new ArrayList<>();
- this.showMetadataPreview = readerConfig.formatConfig.showPreview();
- }
-
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
+ public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
errorContext = negotiator.parentErrorContext();
+ file = negotiator.file();
+ readerConfig = config;
+ dataWriters = new ArrayList<>();
+ showMetadataPreview = readerConfig.formatConfig.showPreview();
+
// Since the HDF file reader uses a stream to actually read the file, the file name from the
// module is incorrect.
- fileName = split.getPath().getName();
- try {
- openFile(negotiator);
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to close input file: %s", split.getPath())
- .addContext(errorContext)
- .build(logger);
- }
+ fileName = file.split().getPath().getName();
- ResultSetLoader loader;
- if (readerConfig.defaultPath == null) {
- // Get file metadata
- List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
- metadataIterator = metadata.iterator();
-
- // Schema for Metadata query
- SchemaBuilder builder = new SchemaBuilder()
- .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
- .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
- .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
- .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
- .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
- negotiator.tableSchema(builder.buildSchema(), false);
-
- loader = negotiator.build();
- dimensions = new int[0];
- rowWriter = loader.writer();
-
- } else {
- // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
- // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
- Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
- dimensions = dataSet.getDimensions();
-
- loader = negotiator.build();
- rowWriter = loader.writer();
- writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
- negotiator.parentErrorContext());
- if (dimensions.length <= 1) {
- buildSchemaFor1DimensionalDataset(dataSet);
- } else if (dimensions.length == 2) {
- buildSchemaFor2DimensionalDataset(dataSet);
- } else {
- // Case for datasets of greater than 2D
- // These are automatically flattened
- buildSchemaFor2DimensionalDataset(dataSet);
+ { // Opens an HDF5 file
+ try (InputStream in = file.fileSystem().openPossiblyCompressedStream(file.split().getPath())) {
+ /*
+ * As a possible future improvement, the jhdf reader has the ability to read hdf5 files from
+ * a byte array or byte buffer. This implementation is better in that it does not require creating
+ * a temporary file which must be deleted later. However, it could result in memory issues in the
+ * event of large files.
+ */
+ hdfFile = HdfFile.fromInputStream(in);
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: %s", file.split().getPath())
+ .addContext(errorContext)
+ .build(logger);
}
}
- if (readerConfig.defaultPath == null) {
- pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
- dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
- fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
- dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
- linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
- elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
- datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
- dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
- }
- return true;
- }
- /**
- * This function is called when the default path is set and the data set is a single dimension.
- * This function will create an array of one dataWriter of the
- * correct datatype
- * @param dataset The HDF5 dataset
- */
- private void buildSchemaFor1DimensionalDataset(Dataset dataset) {
- MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
+ { // Build the schema and initial the writer
+ ResultSetLoader loader;
+ if (readerConfig.defaultPath == null) {
+ // Get file metadata
+ List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
+ metadataIterator = metadata.iterator();
- // Case for null or unknown data types:
- if (currentDataType == null) {
- logger.warn("Couldn't add {}", dataset.getJavaType().getName());
- return;
+ // Schema for Metadata query
+ SchemaBuilder builder = new SchemaBuilder()
+ .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+ .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+ .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+ .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+ .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
+
+ negotiator.tableSchema(builder.buildSchema(), false);
+
+ loader = negotiator.build();
+ rowWriter = loader.writer();
+
+ dimensions = null;
+ writerSpec = null;
+ pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+ dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+ fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+ dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
+ linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
+ elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
+ datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
+ dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
+ } else {
+ // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
+ // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
+ Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
+ dimensions = dataSet.getDimensions();
+
+ loader = negotiator.build();
+ rowWriter = loader.writer();
+ writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(), negotiator.parentErrorContext());
+ if (dimensions.length <= 1) {
+ buildSchemaFor1DimensionalDataset(dataSet);
+ } else if (dimensions.length == 2) {
+ buildSchemaFor2DimensionalDataset(dataSet);
+ } else {
+ // Case for datasets of greater than 2D
+ // These are automatically flattened
+ buildSchemaFor2DimensionalDataset(dataSet);
+ }
+ // Initial the final fields
+ metadataIterator = null;
+ pathWriter = null;
+ dataTypeWriter = null;
+ fileNameWriter = null;
+ dataSizeWriter = null;
+ linkWriter = null;
+ elementCountWriter = null;
+ datasetTypeWriter = null;
+ dimensionsWriter = null;
+ }
}
- dataWriters.add(buildWriter(currentDataType));
}
private HDF5DataWriter buildWriter(MinorType dataType) {
switch (dataType) {
- /*case GENERIC_OBJECT:
- return new HDF5EnumDataWriter(hdfFile, writerSpec, readerConfig.defaultPath);*/
+ /* case GENERIC_OBJECT:
+ return new HDF5EnumDataWriter(hdfFile, writerSpec, readerConfig.defaultPath); */
case VARCHAR:
return new HDF5StringDataWriter(hdfFile, writerSpec, readerConfig.defaultPath);
case TIMESTAMP:
@@ -292,13 +282,29 @@
}
/**
+ * This function is called when the default path is set and the data set is a single dimension.
+ * This function will create an array of one dataWriter of the correct datatype.
+ *
+ * @param dataset The HDF5 dataset
+ */
+ private void buildSchemaFor1DimensionalDataset(Dataset dataset) {
+ MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
+
+ // Case for null or unknown data types:
+ if (currentDataType == null) {
+ logger.warn("Couldn't add {}", dataset.getJavaType().getName());
+ return;
+ }
+ dataWriters.add(buildWriter(currentDataType));
+ }
+
+ /**
* Builds a Drill schema from a dataset with 2 or more dimensions. HDF5 only
* supports INT, LONG, DOUBLE and FLOAT for >2 data types so this function is
* not as inclusive as the 1D function. This function will build the schema
* by adding DataWriters to the dataWriters array.
*
- * @param dataset
- * The dataset which Drill will use to build a schema
+ * @param dataset The dataset which Drill will use to build a schema
*/
private void buildSchemaFor2DimensionalDataset(Dataset dataset) {
MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
@@ -333,43 +339,9 @@
}
}
}
- /**
- * Opens an HDF5 file.
- * @param negotiator The negotiator represents Drill's interface with the file system
- */
- private void openFile(FileSchemaNegotiator negotiator) throws IOException {
- InputStream in = null;
- try {
- /*
- * As a possible future improvement, the jhdf reader has the ability to read hdf5 files from
- * a byte array or byte buffer. This implementation is better in that it does not require creating
- * a temporary file which must be deleted later. However, it could result in memory issues in the
- * event of large files.
- */
-
- in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
- hdfFile = HdfFile.fromInputStream(in);
- } catch (Exception e) {
- if (in != null) {
- in.close();
- }
- throw UserException
- .dataReadError(e)
- .message("Failed to open input file: %s", split.getPath())
- .addContext(errorContext)
- .build(logger);
- }
- reader = new BufferedReader(new InputStreamReader(in));
- }
@Override
public boolean next() {
-
- // Limit pushdown
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
while (!rowWriter.isFull()) {
if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) {
if (!metadataIterator.hasNext()){
@@ -395,7 +367,6 @@
int currentRowCount = 0;
HDF5DataWriter currentDataWriter;
rowWriter.start();
-
for (int i = 0; i < dimensions[1]; i++) {
currentDataWriter = dataWriters.get(i);
currentDataWriter.write();
@@ -410,8 +381,23 @@
return true;
}
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(hdfFile);
+ /*
+ * The current implementation of the HDF5 reader creates a temp file,
+ * these files are deleted only before application exits.
+ * The temporary file needs to be removed when the batch reader is closed.
+ */
+ boolean result = hdfFile.getFile().delete();
+ if (!result) {
+ logger.warn("Failed to delete HDF5 temp file {}", hdfFile.getFile().getName());
+ }
+ }
+
/**
* Writes one row of HDF5 metadata.
+ *
* @param rowWriter The input rowWriter object
*/
private void projectMetadataRow(RowSetLoader rowWriter) {
@@ -423,7 +409,7 @@
fileNameWriter.setString(fileName);
linkWriter.setBoolean(metadataRow.isLink());
- //Write attributes if present
+ // Write attributes if present
if (metadataRow.getAttributes().size() > 0) {
writeAttributes(rowWriter, metadataRow);
}
@@ -448,9 +434,9 @@
* Gets the file metadata from a given HDF5 file. It will extract the file
* name the path, and adds any information to the metadata List.
*
- * @param group A list of paths from which the metadata will be extracted
- * @param metadata The HDF5 metadata object from which the metadata will be extracted
- * @return A list of metadata from the given file paths
+ * @param group A list of paths from which the metadata will be extracted
+ * @param metadata The HDF5 metadata object from which the metadata will be extracted
+ * @return A list of metadata from the given file paths
*/
private List<HDF5DrillMetadata> getFileMetadata(Group group, List<HDF5DrillMetadata> metadata) {
Map<String, HDF5Attribute> attribs;
@@ -495,10 +481,10 @@
}
/**
- * Gets the attributes of a HDF5 dataset and returns them into a HashMap
+ * Gets the attributes of a HDF5 dataset and returns them into a HashMap.
*
* @param path The path for which you wish to retrieve attributes
- * @return Map The attributes for the given path. Empty Map if no attributes present
+ * @return Map The attributes for the given path. Empty Map if no attributes present
*/
private Map<String, HDF5Attribute> getAttributes(String path) {
Map<String, Attribute> attributeList;
@@ -543,13 +529,11 @@
* Writes one row of data in a metadata query. The number of dimensions here
* is n+1. So if the actual dataset is a 1D column, it will be written as a list.
* This is function is only called in metadata queries as the schema is not
- * known in advance. If the datasize is greater than 16MB, the function does
- * not project the dataset
+ * known in advance. If the data size is greater than 16MB, the function does
+ * not project the dataset.
*
- * @param rowWriter
- * The rowWriter to which the data will be written
- * @param datapath
- * The datapath from which the data will be read
+ * @param rowWriter The rowWriter to which the data will be written
+ * @param datapath The data path from which the data will be read
*/
private void projectDataset(RowSetLoader rowWriter, String datapath) {
@@ -562,7 +546,7 @@
}
int[] dimensions = dataset.getDimensions();
- //Case for single dimensional data
+ // Case for single dimensional data
if (dimensions.length == 1) {
MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
@@ -632,7 +616,7 @@
logger.warn("{} not implemented.", currentDataType.name());
}
} else if (dimensions.length == 2) {
- // Case for 2D data sets. These are projected as lists of lists or maps of maps
+ // Case for 2D data sets. These are projected as lists of lists or maps of maps
int cols = dimensions[1];
int rows = dimensions[0];
@@ -685,22 +669,23 @@
}
/**
- * Helper function to write a 1D boolean column
+ * Helper function to write a 1D boolean column.
*
* @param rowWriter The row to which the data will be written
- * @param name The column name
- * @param value The value to be written
+ * @param name The column name
+ * @param value The value to be written
*/
+ @SuppressWarnings("unused")
private void writeBooleanColumn(TupleWriter rowWriter, String name, int value) {
writeBooleanColumn(rowWriter, name, value != 0);
}
/**
- * Helper function to write a 1D boolean column
+ * Helper function to write a 1D boolean column.
*
* @param rowWriter The row to which the data will be written
- * @param name The column name
- * @param value The value to be written
+ * @param name The column name
+ * @param value The value to be written
*/
private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.BIT);
@@ -708,11 +693,11 @@
}
/**
- * Helper function to write a 1D short column
+ * Helper function to write a 1D short column.
*
* @param rowWriter The row to which the data will be written
- * @param name The column name
- * @param value The value to be written
+ * @param name The column name
+ * @param value The value to be written
*/
private void writeSmallIntColumn(TupleWriter rowWriter, String name, short value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, MinorType.SMALLINT);
@@ -720,10 +705,11 @@
}
/**
- * Helper function to write a 2D short list
- * @param rowWriter the row to which the data will be written
- * @param name the name of the outer list
- * @param list the list of data
+ * Helper function to write a 2D short list.
+ *
+ * @param rowWriter The row to which the data will be written
+ * @param name The name of the outer list
+ * @param list The list of data
*/
private void writeSmallIntColumn(TupleWriter rowWriter, String name, short[] list) {
int index = rowWriter.tupleSchema().index(name);
@@ -740,11 +726,11 @@
}
/**
- * Helper function to write a 1D byte column
+ * Helper function to write a 1D byte column.
*
* @param rowWriter The row to which the data will be written
- * @param name The column name
- * @param value The value to be written
+ * @param name The column name
+ * @param value The value to be written
*/
private void writeByteColumn(TupleWriter rowWriter, String name, byte value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, MinorType.TINYINT);
@@ -752,10 +738,11 @@
}
/**
- * Helper function to write a 2D byte list
- * @param rowWriter the row to which the data will be written
- * @param name the name of the outer list
- * @param list the list of data
+ * Helper function to write a 2D byte list.
+ *
+ * @param rowWriter The row to which the data will be written
+ * @param name The name of the outer list
+ * @param list The list of data
*/
private void writeByteListColumn(TupleWriter rowWriter, String name, byte[] list) {
int index = rowWriter.tupleSchema().index(name);
@@ -772,11 +759,11 @@
}
/**
- * Helper function to write a 1D int column
+ * Helper function to write a 1D integer column.
*
* @param rowWriter The row to which the data will be written
- * @param name The column name
- * @param value The value to be written
+ * @param name The column name
+ * @param value The value to be written
*/
private void writeIntColumn(TupleWriter rowWriter, String name, int value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.INT);
@@ -784,10 +771,11 @@
}
/**
- * Helper function to write a 2D int list
- * @param rowWriter the row to which the data will be written
- * @param name the name of the outer list
- * @param list the list of data
+ * Helper function to write a 2D integer list.
+ *
+ * @param rowWriter The row to which the data will be written
+ * @param name The name of the outer list
+ * @param list The list of data
*/
private void writeIntListColumn(TupleWriter rowWriter, String name, int[] list) {
int index = rowWriter.tupleSchema().index(name);
@@ -945,7 +933,7 @@
}
private void floatMatrixHelper(float[][] colData, int cols, int rows, RowSetLoader rowWriter) {
- // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
+ // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(FLOAT_COLUMN_NAME)
@@ -992,7 +980,7 @@
}
private void doubleMatrixHelper(double[][] colData, int cols, int rows, RowSetLoader rowWriter) {
- // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
+ // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(DOUBLE_COLUMN_NAME)
@@ -1040,7 +1028,7 @@
}
private void bigIntMatrixHelper(long[][] colData, int cols, int rows, RowSetLoader rowWriter) {
- // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
+ // This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(LONG_COLUMN_NAME)
@@ -1099,13 +1087,11 @@
}
/**
- * Gets the attributes for an HDF5 datapath. These attributes are projected as
+ * Gets the attributes for an HDF5 data path. These attributes are projected as
* a map in select * queries when the defaultPath is null.
*
- * @param rowWriter
- * the row to which the data will be written
- * @param record
- * the record for the attributes
+ * @param rowWriter The row to which the data will be written
+ * @param record The record for the attributes
*/
private void writeAttributes(TupleWriter rowWriter, HDF5DrillMetadata record) {
Map<String, HDF5Attribute> attribs = getAttributes(record.getPath());
@@ -1152,7 +1138,7 @@
writeTimestampColumn(mapWriter, key, (Long) attrib.getValue());
break;
case GENERIC_OBJECT:
- //This is the case for HDF5 enums
+ // This is the case for HDF5 enums
String enumText = attrib.getValue().toString();
writeStringColumn(mapWriter, key, enumText);
break;
@@ -1166,9 +1152,9 @@
* Processes the MAP data type which can be found in HDF5 files.
* It automatically flattens anything greater than 2 dimensions.
*
- * @param path the HDF5 path tp the compound data
- * @param reader the HDF5 reader for the data file
- * @param rowWriter the rowWriter to write the data
+ * @param path The HDF5 path of the compound data
+ * @param reader The HDF5 reader for the data file
+ * @param rowWriter The rowWriter to write the data
*/
private void getAndMapCompoundData(String path, HdfFile reader, RowSetLoader rowWriter) {
@@ -1227,7 +1213,8 @@
String dataType = dataMember.getDataType().getJavaType().getName();
String fieldName = dataMember.getName();
int[] dataLength = reader.getDatasetByPath(path).getDimensions();
- Object rawData = ((LinkedHashMap<String, ?>)reader.getDatasetByPath(path).getData()).get(fieldName);
+ @SuppressWarnings("rawtypes")
+ Object rawData = ((LinkedHashMap) reader.getDatasetByPath(path).getData()).get(fieldName);
ArrayWriter innerWriter = listWriter.array(fieldName);
for (int i = 0; i < dataLength[0]; i++) {
@@ -1267,21 +1254,4 @@
}
}
}
-
- @Override
- public void close() {
- AutoCloseables.closeSilently(hdfFile);
- /*
- * The current implementation of the HDF5 reader creates a temp file which needs to be removed
- * when the batch reader is closed. A possible future functionality might be to use the
- */
- boolean result = hdfFile.getFile().delete();
- if (!result) {
- logger.warn("Failed to delete HDF5 temp file {}", hdfFile.getFile().getName());
- }
- hdfFile = null;
-
- AutoCloseables.closeSilently(reader);
- reader = null;
- }
}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index d56ffa4..0a51e60 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -21,23 +21,21 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.hadoop.conf.Configuration;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.hdf5.HDF5BatchReader.HDF5ReaderConfig;
-
+import org.apache.hadoop.conf.Configuration;
public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
public static final String DEFAULT_NAME = "hdf5";
-
public HDF5FormatPlugin(String name, DrillbitContext context,
Configuration fsConf,
StoragePluginConfig storageConfig,
@@ -51,38 +49,33 @@
.writable(false)
.blockSplittable(false)
.compressible(true)
- .supportsProjectPushdown(true)
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
- .defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
+ .supportsProjectPushdown(true)
+ .defaultName(DEFAULT_NAME)
.build();
}
- @Override
- protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
- FileScanBuilder builder = new FileScanBuilder();
-
- builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan.getMaxRecords()));
- initScanBuilder(builder, scan);
- builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
- }
-
- public static class HDF5ReaderFactory extends FileScanFramework.FileReaderFactory {
+ public static class HDF5ReaderFactory extends FileReaderFactory {
private final HDF5ReaderConfig readerConfig;
- private final int maxRecords;
+ private final EasySubScan scan;
-
- HDF5ReaderFactory(HDF5ReaderConfig config, int maxRecords) {
+ HDF5ReaderFactory(HDF5ReaderConfig config, EasySubScan scan) {
readerConfig = config;
- this.maxRecords = maxRecords;
+ this.scan = scan;
}
@Override
- public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new HDF5BatchReader(readerConfig, maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new HDF5BatchReader(readerConfig, scan, negotiator);
}
}
+
+ @Override
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ builder.readerFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan));
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
index 7b14c41..3dd864a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
@@ -158,7 +158,7 @@
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
+ .build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
@@ -211,7 +211,7 @@
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
+ .build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
@@ -254,13 +254,13 @@
.add("a", MinorType.INT)
.add("d", MinorType.BIGINT)
.add("e", MinorType.BIGINT)
- .buildSchema();
+ .buildSchema();
// Initial schema
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
+ .build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
@@ -303,13 +303,13 @@
TupleMetadata expectedSchema = new SchemaBuilder()
.add("d", MinorType.BIGINT)
.add("e", MinorType.BIGINT)
- .buildSchema();
+ .buildSchema();
// Initial schema
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
+ .build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
index 56ffa97..09f62fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
@@ -19,7 +19,9 @@
import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
-
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.listIntArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.listLongArray;
import java.util.Arrays;
import java.util.Collections;
@@ -324,4 +326,31 @@
RowSetUtilities.verify(fixture.wrap(input), fixture.wrap(output));
}
+
+ @Test
+ public void testListArray() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .addRepeatedList("int_list")
+ .addArray(MinorType.INT)
+ .resumeSchema()
+ .addRepeatedList("long_list")
+ .addArray(MinorType.BIGINT)
+ .resumeSchema()
+ .buildSchema();
+
+ final VectorContainer input = fixture.rowSetBuilder(schema)
+ .addRow(listIntArray(intArray(1, 2, 3)), listLongArray(new long[] { 1l, 2l, 3l }))
+ .addRow(listIntArray(intArray(10, 20, 30)), listLongArray(new long[] { 10l, 20l, 30l }))
+ .build()
+ .container();
+
+ final OutputBatchBuilder builder = new OutputBatchBuilder(schema,
+ Collections.singletonList(new BatchSource(schema, input)),
+ fixture.allocator());
+
+ builder.load(input.getRecordCount());
+ VectorContainer output = builder.outputContainer();
+
+ RowSetUtilities.verify(fixture.wrap(input), fixture.wrap(output));
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 5a2a465..d03ba1b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -276,6 +276,14 @@
return elements;
}
+ public static int[][] listIntArray(int[]... elements) {
+ return elements;
+ }
+
+ public static long[][] listLongArray(long[]... elements) {
+ return elements;
+ }
+
public static Object[] singleList(Object element) {
return new Object[] { element };
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
index 002fdde..614452b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
@@ -253,7 +253,7 @@
variants[i].events().saveRow();
}
}
- }
+ }
@Override
public void preRollover() {
@@ -323,4 +323,3 @@
@Override
public boolean isProjected() { return unionMemberShim.isProjected(); }
}
-