blob: 2ea26d32fbaf7dc839f6d9344b6bed65468301ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.hdf5;
import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
import ch.systemsx.cisd.hdf5.HDF5FactoryProvider;
import ch.systemsx.cisd.hdf5.HDF5LinkInformation;
import ch.systemsx.cisd.hdf5.IHDF5Factory;
import ch.systemsx.cisd.hdf5.IHDF5Reader;
import org.apache.commons.io.IOUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MapBuilder;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.hadoop.mapred.FileSplit;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(HDF5BatchReader.class);
private static final String PATH_COLUMN_NAME = "path";
private static final String DATA_TYPE_COLUMN_NAME = "data_type";
private static final String FILE_NAME_COLUMN_NAME = "file_name";
private static final String INT_COLUMN_PREFIX = "int_col_";
private static final String LONG_COLUMN_PREFIX = "long_col_";
private static final String FLOAT_COLUMN_PREFIX = "float_col_";
private static final String DOUBLE_COLUMN_PREFIX = "double_col_";
private static final String INT_COLUMN_NAME = "int_data";
private static final String FLOAT_COLUMN_NAME = "float_data";
private static final String DOUBLE_COLUMN_NAME = "double_data";
private static final String LONG_COLUMN_NAME = "long_data";
private static final String DATA_SIZE_COLUMN_NAME = "data_size";
private static final String ELEMENT_COUNT_NAME = "element_count";
private static final String DATASET_DATA_TYPE_NAME = "dataset_data_type";
private static final String DIMENSIONS_FIELD_NAME = "dimensions";
private static final int PREVIEW_ROW_LIMIT = 20;
private static final int MAX_DATASET_SIZE = ValueVector.MAX_BUFFER_SIZE;
private final HDF5ReaderConfig readerConfig;
private final List<HDF5DataWriter> dataWriters;
private FileSplit split;
private IHDF5Reader hdf5Reader;
private File inFile;
private BufferedReader reader;
private RowSetLoader rowWriter;
private Iterator<HDF5DrillMetadata> metadataIterator;
private ScalarWriter pathWriter;
private ScalarWriter dataTypeWriter;
private ScalarWriter fileNameWriter;
private ScalarWriter dataSizeWriter;
private ScalarWriter elementCountWriter;
private ScalarWriter datasetTypeWriter;
private ScalarWriter dimensionsWriter;
private long[] dimensions;
public static class HDF5ReaderConfig {
final HDF5FormatPlugin plugin;
final String defaultPath;
final HDF5FormatConfig formatConfig;
final File tempDirectory;
public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) {
this.plugin = plugin;
this.formatConfig = formatConfig;
defaultPath = formatConfig.getDefaultPath();
tempDirectory = plugin.getTmpDir();
}
}
public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
this.readerConfig = readerConfig;
dataWriters = new ArrayList<>();
}
@Override
public boolean open(FileSchemaNegotiator negotiator) {
split = negotiator.split();
try {
openFile(negotiator);
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("Failed to close input file: %s", split.getPath())
.message(e.getMessage())
.build(logger);
}
ResultSetLoader loader;
if (readerConfig.defaultPath == null) {
// Get file metadata
List<HDF5DrillMetadata> metadata = getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new ArrayList<>());
metadataIterator = metadata.iterator();
// Schema for Metadata query
SchemaBuilder builder = new SchemaBuilder()
.addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
.addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
.addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
.addNullable(DATA_SIZE_COLUMN_NAME, TypeProtos.MinorType.BIGINT )
.addNullable(ELEMENT_COUNT_NAME, TypeProtos.MinorType.BIGINT)
.addNullable(DATASET_DATA_TYPE_NAME, TypeProtos.MinorType.VARCHAR)
.addNullable(DIMENSIONS_FIELD_NAME, TypeProtos.MinorType.VARCHAR);
negotiator.setTableSchema(builder.buildSchema(), false);
loader = negotiator.build();
dimensions = new long[0];
rowWriter = loader.writer();
} else {
// This is the case when the default path is specified. Since the user is explicitly asking for a dataset
// Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
HDF5DataSetInformation dsInfo = hdf5Reader.object().getDataSetInformation(readerConfig.defaultPath);
dimensions = dsInfo.getDimensions();
loader = negotiator.build();
rowWriter = loader.writer();
if (dimensions.length <= 1) {
buildSchemaFor1DimensionalDataset(dsInfo);
} else if (dimensions.length == 2) {
buildSchemaFor2DimensionalDataset(dsInfo);
} else {
// Case for datasets of greater than 2D
// These are automatically flattened
buildSchemaFor2DimensionalDataset(dsInfo);
}
}
if (readerConfig.defaultPath == null) {
pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
}
return true;
}
/**
* This function is called when the default path is set and the data set is a single dimension.
* This function will create an array of one dataWriter of the
* correct datatype
* @param dsInfo The HDF5 dataset information
*/
private void buildSchemaFor1DimensionalDataset(HDF5DataSetInformation dsInfo) {
TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
// Case for null or unknown data types:
if (currentDataType == null) {
logger.warn("Couldn't add {}", dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
return;
}
switch (currentDataType) {
case GENERIC_OBJECT:
dataWriters.add(new HDF5EnumDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case VARCHAR:
dataWriters.add(new HDF5StringDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case TIMESTAMP:
dataWriters.add(new HDF5TimestampDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case INT:
dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case BIGINT:
dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case FLOAT8:
dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case FLOAT4:
dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
case MAP:
dataWriters.add(new HDF5MapDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath));
break;
}
}
/**
* This function builds a Drill schema from a dataset with 2 or more dimensions. HDF5 only supports INT, LONG, DOUBLE and FLOAT for >2 data types so this function is
* not as inclusinve as the 1D function. This function will build the schema by adding DataWriters to the dataWriters array.
* @param dsInfo The dataset which Drill will use to build a schema
*/
private void buildSchemaFor2DimensionalDataset(HDF5DataSetInformation dsInfo) {
TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
// Case for null or unknown data types:
if (currentDataType == null) {
logger.warn("Couldn't add {}", dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
return;
}
long cols = dimensions[1];
String tempFieldName;
for (int i = 0; i < cols; i++) {
switch (currentDataType) {
case INT:
tempFieldName = INT_COLUMN_PREFIX + i;
dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
break;
case BIGINT:
tempFieldName = LONG_COLUMN_PREFIX + i;
dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
break;
case FLOAT8:
tempFieldName = DOUBLE_COLUMN_PREFIX + i;
dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
break;
case FLOAT4:
tempFieldName = FLOAT_COLUMN_PREFIX + i;
dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i));
break;
}
}
}
/**
* This function contains the logic to open an HDF5 file.
* @param negotiator The negotiator represents Drill's interface with the file system
*/
private void openFile(FileSchemaNegotiator negotiator) throws IOException {
InputStream in = null;
try {
in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
IHDF5Factory factory = HDF5FactoryProvider.get();
inFile = convertInputStreamToFile(in);
hdf5Reader = factory.openForReading(inFile);
} catch (Exception e) {
if (in != null) {
in.close();
}
throw UserException
.dataReadError(e)
.message("Failed to open input file: %s", split.getPath())
.build(logger);
}
reader = new BufferedReader(new InputStreamReader(in));
}
/**
* This function converts the Drill InputStream into a File object for the HDF5 library. This function
* exists due to a known limitation in the HDF5 library which cannot parse HDF5 directly from an input stream. A future
* release of the library will support this.
*
* @param stream The input stream to be converted to a File
* @return File The file which was converted from an InputStream
*/
private File convertInputStreamToFile(InputStream stream) {
File tmpDir = readerConfig.tempDirectory;
String tempFileName = tmpDir.getPath() + "/~" + split.getPath().getName();
File targetFile = new File(tempFileName);
try {
java.nio.file.Files.copy(stream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
if (targetFile.exists()) {
if (!targetFile.delete()) {
logger.warn("{} not deleted.", targetFile.getName());
}
}
throw UserException
.dataWriteError(e)
.message("Failed to create temp HDF5 file: %s", split.getPath())
.addContext(e.getMessage())
.build(logger);
}
IOUtils.closeQuietly(stream);
return targetFile;
}
@Override
public boolean next() {
while (!rowWriter.isFull()) {
if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) {
if (!metadataIterator.hasNext()){
return false;
}
projectMetadataRow(rowWriter);
} else if (dimensions.length <= 1 && dataWriters.get(0).isCompound()) {
if (!dataWriters.get(0).hasNext()) {
return false;
}
dataWriters.get(0).write();
} else if (dimensions.length <= 1) {
// Case for Compound Data Type
if (!dataWriters.get(0).hasNext()) {
return false;
}
rowWriter.start();
dataWriters.get(0).write();
rowWriter.save();
} else {
int currentRowCount = 0;
HDF5DataWriter currentDataWriter;
rowWriter.start();
for (int i = 0; i < dimensions[1]; i++) {
currentDataWriter = dataWriters.get(i);
currentDataWriter.write();
currentRowCount = currentDataWriter.currentRowCount();
}
rowWriter.save();
if (currentRowCount >= dimensions[0]) {
return false;
}
}
}
return true;
}
/**
* This function writes one row of HDF5 metadata.
* @param rowWriter The input rowWriter object
*/
private void projectMetadataRow(RowSetLoader rowWriter) {
String realFileName = inFile.getName().replace("~", "");
HDF5DrillMetadata metadataRow = metadataIterator.next();
rowWriter.start();
pathWriter.setString(metadataRow.getPath());
dataTypeWriter.setString(metadataRow.getDataType());
fileNameWriter.setString(realFileName);
//Write attributes if present
if (metadataRow.getAttributes().size() > 0) {
writeAttributes(rowWriter, metadataRow);
}
if (metadataRow.getDataType().equalsIgnoreCase("DATASET")) {
HDF5DataSetInformation dsInfo = hdf5Reader.object().getDataSetInformation(metadataRow.getPath());
// Project Dataset Metadata
dataSizeWriter.setLong(dsInfo.getSize());
elementCountWriter.setLong(dsInfo.getNumberOfElements());
datasetTypeWriter.setString(dsInfo.getTypeInformation().getDataClass().name());
dimensionsWriter.setString(Arrays.toString(dsInfo.getDimensions()));
projectDataset(rowWriter, metadataRow.getPath());
}
rowWriter.save();
}
/**
* This function gets the file metadata from a given HDF5 file. It will extract the file name the path, and adds any information to the
* metadata List.
* @param members A list of paths from which the metadata will be extracted
* @param metadata the HDF5 metadata object from which the metadata will be extracted
* @return A list of metadata from the given file paths
*/
private List<HDF5DrillMetadata> getFileMetadata(List<HDF5LinkInformation> members, List<HDF5DrillMetadata> metadata) {
for (HDF5LinkInformation info : members) {
HDF5DrillMetadata metadataRow = new HDF5DrillMetadata();
metadataRow.setPath(info.getPath());
metadataRow.setDataType(info.getType().toString());
switch (info.getType()) {
case DATASET:
metadataRow.setAttributes(getAttributes(info.getPath()));
metadata.add(metadataRow);
break;
case SOFT_LINK:
// Soft links cannot have attributes
metadata.add(metadataRow);
break;
case GROUP:
metadataRow.setAttributes(getAttributes(info.getPath()));
metadata.add(metadataRow);
metadata = getFileMetadata(hdf5Reader.object().getGroupMemberInformation(info.getPath(), true), metadata);
break;
default:
logger.warn("Unknown data type: {}", info.getType());
}
}
return metadata;
}
/**
* Gets the attributes of a HDF5 dataset and returns them into a HashMap
*
* @param path The path for which you wish to retrieve attributes
* @return Map The attributes for the given path. Empty Map if no attributes present
*/
private Map<String, HDF5Attribute> getAttributes(String path) {
Map<String, HDF5Attribute> attributes = new HashMap<>();
long attrCount = hdf5Reader.object().getObjectInformation(path).getNumberOfAttributes();
if (attrCount > 0) {
List<String> attrNames = hdf5Reader.object().getAllAttributeNames(path);
for (String name : attrNames) {
try {
HDF5Attribute attribute = HDF5Utils.getAttribute(path, name, hdf5Reader);
if (attribute == null) {
continue;
}
attributes.put(attribute.getKey(), attribute);
} catch (Exception e) {
logger.warn("Couldn't add attribute: {} - {}", path, name);
}
}
}
return attributes;
}
/**
* This function writes one row of data in a metadata query. The number of dimensions here is n+1. So if the actual dataset is a 1D column, it will be written as a list.
* This is function is only called in metadata queries as the schema is not known in advance. If the datasize is greater than 16MB, the function does not project the dataset
*
* @param rowWriter The rowWriter to which the data will be written
* @param datapath The datapath from which the data will be read
*/
private void projectDataset(RowSetLoader rowWriter, String datapath) {
String fieldName = HDF5Utils.getNameFromPath(datapath);
IHDF5Reader reader = hdf5Reader;
HDF5DataSetInformation dsInfo = reader.object().getDataSetInformation(datapath);
// If the dataset is larger than 16MB, do not project the dataset
if (dsInfo.getSize() > MAX_DATASET_SIZE) {
logger.warn("Dataset {} is greater than 16MB. Data will be truncated in Metadata view.", datapath);
}
long[] dimensions = dsInfo.getDimensions();
//Case for single dimensional data
if (dimensions.length <= 1) {
TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
assert currentDataType != null;
switch (currentDataType) {
case GENERIC_OBJECT:
logger.warn("Couldn't read {}", datapath );
break;
case VARCHAR:
String[] data = hdf5Reader.readStringArray(datapath);
writeStringListColumn(rowWriter, fieldName, data);
break;
case TIMESTAMP:
long[] longList = hdf5Reader.time().readTimeStampArray(datapath);
writeTimestampListColumn(rowWriter, fieldName, longList);
break;
case INT:
if (!dsInfo.getTypeInformation().isSigned()) {
if (dsInfo.getTypeInformation().getElementSize() > 4) {
longList = hdf5Reader.uint64().readArray(datapath);
writeLongListColumn(rowWriter, fieldName, longList);
}
} else {
int[] intList = hdf5Reader.readIntArray(datapath);
writeIntListColumn(rowWriter, fieldName, intList);
}
break;
case FLOAT4:
float[] tempFloatList = hdf5Reader.readFloatArray(datapath);
writeFloat4ListColumn(rowWriter, fieldName, tempFloatList);
break;
case FLOAT8:
double[] tempDoubleList = hdf5Reader.readDoubleArray(datapath);
writeFloat8ListColumn(rowWriter, fieldName, tempDoubleList);
break;
case BIGINT:
if (!dsInfo.getTypeInformation().isSigned()) {
logger.warn("Drill does not support unsigned 64bit integers.");
break;
}
long[] tempBigIntList = hdf5Reader.readLongArray(datapath);
writeLongListColumn(rowWriter, fieldName, tempBigIntList);
break;
case MAP:
try {
getAndMapCompoundData(datapath, new ArrayList<>(), hdf5Reader, rowWriter);
} catch (Exception e) {
throw UserException
.dataReadError()
.message("Error writing Compound Field: ")
.addContext(e.getMessage())
.build(logger);
}
break;
default:
// Case for data types that cannot be read
logger.warn("{} not implemented yet.", dsInfo.getTypeInformation().tryGetJavaType());
}
} else if (dimensions.length == 2) {
// Case for 2D data sets. These are projected as lists of lists or maps of maps
long cols = dimensions[1];
long rows = dimensions[0];
switch (HDF5Utils.getDataType(dsInfo)) {
case INT:
int[][] colData = hdf5Reader.readIntMatrix(datapath);
mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
break;
case FLOAT4:
float[][] floatData = hdf5Reader.readFloatMatrix(datapath);
mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
break;
case FLOAT8:
double[][] doubleData = hdf5Reader.readDoubleMatrix(datapath);
mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
break;
case BIGINT:
long[][] longData = hdf5Reader.readLongMatrix(datapath);
mapBigIntMatrixField(longData, (int) cols, (int) rows, rowWriter);
break;
default:
logger.warn("{} not implemented.", HDF5Utils.getDataType(dsInfo));
}
} else {
// Case for data sets with dimensions > 2
long cols = dimensions[1];
long rows = dimensions[0];
switch (HDF5Utils.getDataType(dsInfo)) {
case INT:
int[][] colData = hdf5Reader.int32().readMDArray(datapath).toMatrix();
mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
break;
case FLOAT4:
float[][] floatData = hdf5Reader.float32().readMDArray(datapath).toMatrix();
mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
break;
case FLOAT8:
double[][] doubleData = hdf5Reader.float64().readMDArray(datapath).toMatrix();
mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
break;
case BIGINT:
long[][] longData = hdf5Reader.int64().readMDArray(datapath).toMatrix();
mapBigIntMatrixField(longData, (int) cols, (int) rows, rowWriter);
break;
default:
logger.warn("{} not implemented.", HDF5Utils.getDataType(dsInfo));
}
}
}
/**
* Helper function to write a 1D boolean column
*
* @param rowWriter The row to which the data will be written
* @param name The column name
* @param value The value to be written
*/
private void writeBooleanColumn(TupleWriter rowWriter, String name, int value) {
writeBooleanColumn(rowWriter, name, value != 0);
}
/**
* Helper function to write a 1D boolean column
*
* @param rowWriter The row to which the data will be written
* @param name The column name
* @param value The value to be written
*/
private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.BIT);
colWriter.setBoolean(value);
}
/**
* Helper function to write a 1D int column
*
* @param rowWriter The row to which the data will be written
* @param name The column name
* @param value The value to be written
*/
private void writeIntColumn(TupleWriter rowWriter, String name, int value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.INT);
colWriter.setInt(value);
}
/**
* Helper function to write a 2D int list
* @param rowWriter the row to which the data will be written
* @param name the name of the outer list
* @param list the list of data
*/
private void writeIntListColumn(TupleWriter rowWriter, String name, int[] list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
index = rowWriter.addColumn(colSchema);
}
ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
arrayWriter.setInt(list[i]);
}
}
private void mapIntMatrixField(int[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// If the default path is not null, auto flatten the data
// The end result are that a 2D array gets mapped to Drill columns
if (readerConfig.defaultPath != null) {
for (int i = 0; i < rows; i++) {
rowWriter.start();
for (int k = 0; k < cols; k++) {
String tempColumnName = INT_COLUMN_PREFIX + k;
writeIntColumn(rowWriter, tempColumnName, colData[i][k]);
}
rowWriter.save();
}
} else {
intMatrixHelper(colData, cols, rows, rowWriter);
}
}
private void intMatrixHelper(int[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(INT_COLUMN_NAME)
.addArray(TypeProtos.MinorType.INT)
.resumeSchema()
.buildSchema();
int index = rowWriter.tupleSchema().index(INT_COLUMN_NAME);
if (index == -1) {
index = rowWriter.addColumn(nestedSchema.column(INT_COLUMN_NAME));
}
// The outer array
ArrayWriter listWriter = rowWriter.column(index).array();
// The inner array
ArrayWriter innerWriter = listWriter.array();
// The strings within the inner array
ScalarWriter intWriter = innerWriter.scalar();
int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
for (int k = 0; k < cols; k++) {
intWriter.setInt(colData[i][k]);
}
listWriter.save();
}
}
private void writeLongColumn(TupleWriter rowWriter, String name, long value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.BIGINT);
colWriter.setLong(value);
}
private void writeLongListColumn(TupleWriter rowWriter, String name, long[] list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REPEATED);
index = rowWriter.addColumn(colSchema);
}
ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
arrayWriter.setLong(list[i]);
}
}
private void writeStringColumn(TupleWriter rowWriter, String name, String value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.VARCHAR);
colWriter.setString(value);
}
private void writeStringListColumn(TupleWriter rowWriter, String name, String[] list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REPEATED);
index = rowWriter.addColumn(colSchema);
}
ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
arrayWriter.setString(list[i]);
}
}
private void writeFloat8Column(TupleWriter rowWriter, String name, double value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.FLOAT8);
colWriter.setDouble(value);
}
private void writeFloat8ListColumn(TupleWriter rowWriter, String name, double[] list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REPEATED);
index = rowWriter.addColumn(colSchema);
}
ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
arrayWriter.setDouble(list[i]);
}
}
private void writeFloat4Column(TupleWriter rowWriter, String name, float value) {
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.FLOAT4);
colWriter.setDouble(value);
}
private void writeFloat4ListColumn(TupleWriter rowWriter, String name, float[] list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REPEATED);
index = rowWriter.addColumn(colSchema);
}
ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
int maxElements = Math.min(list.length, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
arrayWriter.setDouble(list[i]);
}
}
private void mapFloatMatrixField(float[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// If the default path is not null, auto flatten the data
// The end result are that a 2D array gets mapped to Drill columns
if (readerConfig.defaultPath != null) {
for (int i = 0; i < rows; i++) {
rowWriter.start();
for (int k = 0; k < cols; k++) {
String tempColumnName = FLOAT_COLUMN_PREFIX + k;
writeFloat4Column(rowWriter, tempColumnName, colData[i][k]);
}
rowWriter.save();
}
} else {
floatMatrixHelper(colData, cols, rows, rowWriter);
}
}
private void floatMatrixHelper(float[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(FLOAT_COLUMN_NAME)
.addArray(TypeProtos.MinorType.FLOAT4)
.resumeSchema()
.buildSchema();
int index = rowWriter.tupleSchema().index(FLOAT_COLUMN_NAME);
if (index == -1) {
index = rowWriter.addColumn(nestedSchema.column(FLOAT_COLUMN_NAME));
}
// The outer array
ArrayWriter listWriter = rowWriter.column(index).array();
// The inner array
ArrayWriter innerWriter = listWriter.array();
// The strings within the inner array
ScalarWriter floatWriter = innerWriter.scalar();
int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
for (int k = 0; k < cols; k++) {
floatWriter.setDouble(colData[i][k]);
}
listWriter.save();
}
}
private void mapDoubleMatrixField(double[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// If the default path is not null, auto flatten the data
// The end result are that a 2D array gets mapped to Drill columns
if (readerConfig.defaultPath != null) {
for (int i = 0; i < rows; i++) {
rowWriter.start();
for (int k = 0; k < cols; k++) {
String tempColumnName = DOUBLE_COLUMN_PREFIX + k;
writeFloat8Column(rowWriter, tempColumnName, colData[i][k]);
}
rowWriter.save();
}
} else {
doubleMatrixHelper(colData, cols, rows, rowWriter);
}
}
private void doubleMatrixHelper(double[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(DOUBLE_COLUMN_NAME)
.addArray(TypeProtos.MinorType.FLOAT8)
.resumeSchema()
.buildSchema();
int index = rowWriter.tupleSchema().index(DOUBLE_COLUMN_NAME);
if (index == -1) {
index = rowWriter.addColumn(nestedSchema.column(DOUBLE_COLUMN_NAME));
}
// The outer array
ArrayWriter listWriter = rowWriter.column(index).array();
// The inner array
ArrayWriter innerWriter = listWriter.array();
// The strings within the inner array
ScalarWriter floatWriter = innerWriter.scalar();
int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
for (int k = 0; k < cols; k++) {
floatWriter.setDouble(colData[i][k]);
}
listWriter.save();
}
}
private void mapBigIntMatrixField(long[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// If the default path is not null, auto flatten the data
// The end result are that a 2D array gets mapped to Drill columns
if (readerConfig.defaultPath != null) {
for (int i = 0; i < rows; i++) {
rowWriter.start();
for (int k = 0; k < cols; k++) {
String tempColumnName = LONG_COLUMN_PREFIX + k;
writeLongColumn(rowWriter, tempColumnName, colData[i][k]);
}
rowWriter.save();
}
} else {
bigIntMatrixHelper(colData, cols, rows, rowWriter);
}
}
private void bigIntMatrixHelper(long[][] colData, int cols, int rows, RowSetLoader rowWriter) {
// This is the case where a dataset is projected in a metadata query. The result should be a list of lists
TupleMetadata nestedSchema = new SchemaBuilder()
.addRepeatedList(LONG_COLUMN_NAME)
.addArray(TypeProtos.MinorType.BIGINT)
.resumeSchema()
.buildSchema();
int index = rowWriter.tupleSchema().index(LONG_COLUMN_NAME);
if (index == -1) {
index = rowWriter.addColumn(nestedSchema.column(LONG_COLUMN_NAME));
}
// The outer array
ArrayWriter listWriter = rowWriter.column(index).array();
// The inner array
ArrayWriter innerWriter = listWriter.array();
// The strings within the inner array
ScalarWriter bigintWriter = innerWriter.scalar();
int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
for (int i = 0; i < maxElements; i++) {
for (int k = 0; k < cols; k++) {
bigintWriter.setLong(colData[i][k]);
}
listWriter.save();
}
}
private void writeTimestampColumn(TupleWriter rowWriter, String name, long timestamp) {
Instant ts = new Instant(timestamp);
ScalarWriter colWriter = getColWriter(rowWriter, name, TypeProtos.MinorType.TIMESTAMP);
colWriter.setTimestamp(ts);
}
private void writeTimestampListColumn(TupleWriter rowWriter, String name, long[] list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.REPEATED);
index = rowWriter.addColumn(colSchema);
}
ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
for (long l : list) {
arrayWriter.setTimestamp(new Instant(l));
}
}
private ScalarWriter getColWriter(TupleWriter tupleWriter, String fieldName, TypeProtos.MinorType type) {
int index = tupleWriter.tupleSchema().index(fieldName);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type, TypeProtos.DataMode.OPTIONAL);
index = tupleWriter.addColumn(colSchema);
}
return tupleWriter.scalar(index);
}
/**
* This helper function gets the attributes for an HDF5 datapath. These attributes are projected as a map in select * queries when the defaultPath is null.
* @param rowWriter the row to which the data will be written
* @param record the record for the attributes
*/
private void writeAttributes(TupleWriter rowWriter, HDF5DrillMetadata record) {
Map<String, HDF5Attribute> attribs = getAttributes(record.getPath());
Iterator<Map.Entry<String, HDF5Attribute>> entries = attribs.entrySet().iterator();
int index = rowWriter.tupleSchema().index("attributes");
if (index == -1) {
index = rowWriter
.addColumn(SchemaBuilder.columnSchema("attributes", TypeProtos.MinorType.MAP, TypeProtos.DataMode.REQUIRED));
}
TupleWriter mapWriter = rowWriter.tuple(index);
while (entries.hasNext()) {
Map.Entry<String, HDF5Attribute> entry = entries.next();
String key = entry.getKey();
HDF5Attribute attrib = entry.getValue();
switch (attrib.getDataType()) {
case BIT:
boolean value = (Boolean) attrib.getValue();
writeBooleanColumn(mapWriter, key, value);
break;
case BIGINT:
writeLongColumn(mapWriter, key, (Long) attrib.getValue());
break;
case INT:
//try {
writeIntColumn(mapWriter, key, (Integer) attrib.getValue());
//} catch (Exception e) {
// logger.warn("{} {}", key, attrib);
//}
break;
case FLOAT8:
writeFloat8Column(mapWriter, key, (Double) attrib.getValue());
break;
case FLOAT4:
writeFloat4Column(mapWriter, key, (Float) attrib.getValue());
break;
case VARCHAR:
writeStringColumn(mapWriter, key, (String) attrib.getValue());
break;
case TIMESTAMP:
writeTimestampColumn(mapWriter, key, (Long) attrib.getValue());
case GENERIC_OBJECT:
//This is the case for HDF5 enums
String enumText = attrib.getValue().toString();
writeStringColumn(mapWriter, key, enumText);
break;
}
}
}
/**
* This function processes the MAP data type which can be found in HDF5 files.
* It automatically flattens anything greater than 2 dimensions.
*
* @param path the HDF5 path tp the compound data
* @param fieldNames The field names to be mapped
* @param reader the HDF5 reader for the data file
* @param rowWriter the rowWriter to write the data
*/
private void getAndMapCompoundData(String path, List<String> fieldNames, IHDF5Reader reader, RowSetLoader rowWriter) {
final String COMPOUND_DATA_FIELD_NAME = "compound_data";
String resolvedPath = HDF5Utils.resolvePath(path, reader);
Class<?> dataClass = HDF5Utils.getDatasetClass(resolvedPath, reader);
if (dataClass == Map.class) {
Object[][] values = reader.compound().readArray(resolvedPath, Object[].class);
String currentFieldName;
if (fieldNames != null) {
HDF5CompoundMemberInformation[] infos = reader.compound().getDataSetInfo(resolvedPath);
for (HDF5CompoundMemberInformation info : infos) {
fieldNames.add(info.getName());
}
}
// Case for auto-flatten
if (readerConfig.defaultPath != null) {
for (int row = 0; row < values.length; row++) {
rowWriter.start();
for (int col = 0; col < values[row].length; col++) {
assert fieldNames != null;
currentFieldName = fieldNames.get(col);
if (values[row][col] instanceof Integer) {
writeIntColumn(rowWriter, currentFieldName, (Integer) values[row][col]);
} else if (values[row][col] instanceof Short) {
writeIntColumn(rowWriter, currentFieldName, ((Short) values[row][col]).intValue());
} else if (values[row][col] instanceof Byte) {
writeIntColumn(rowWriter, currentFieldName, ((Byte) values[row][col]).intValue());
} else if (values[row][col] instanceof Long) {
writeLongColumn(rowWriter, currentFieldName, (Long) values[row][col]);
} else if (values[row][col] instanceof Float) {
writeFloat4Column(rowWriter, currentFieldName, (Float) values[row][col]);
} else if (values[row][col] instanceof Double) {
writeFloat8Column(rowWriter, currentFieldName, (Double) values[row][col]);
} else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) {
assert values[row][col] instanceof Integer;
writeBooleanColumn(rowWriter, currentFieldName, (Integer) values[row][col]);
} else if (values[row][col] instanceof String) {
String stringValue = (String) values[row][col];
writeStringColumn(rowWriter, currentFieldName, stringValue);
}
}
rowWriter.save();
}
} else {
int index = 0;
if (fieldNames != null) {
HDF5CompoundMemberInformation[] infos = reader.compound().getDataSetInfo(resolvedPath);
SchemaBuilder innerSchema = new SchemaBuilder();
MapBuilder mapBuilder = innerSchema.addMap(COMPOUND_DATA_FIELD_NAME);
for (HDF5CompoundMemberInformation info : infos) {
fieldNames.add(info.getName());
String compoundColumnDataType = info.getType().tryGetJavaType().getSimpleName();
switch (compoundColumnDataType) {
case "int":
mapBuilder.add(info.getName(), TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
break;
case "double":
mapBuilder.add(info.getName(), TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REPEATED);
break;
case "float":
mapBuilder.add(info.getName(), TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REPEATED);
break;
case "long":
mapBuilder.add(info.getName(), TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REPEATED);
break;
case "String":
mapBuilder.add(info.getName(), TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REPEATED);
break;
}
}
TupleMetadata finalInnerSchema = mapBuilder.resumeSchema().buildSchema();
index = rowWriter.tupleSchema().index(COMPOUND_DATA_FIELD_NAME);
if (index == -1) {
index = rowWriter.addColumn(finalInnerSchema.column(COMPOUND_DATA_FIELD_NAME));
}
}
TupleWriter listWriter = rowWriter.column(index).tuple();
//Note: The [][] returns an array of [rows][cols]
for (int row = 0; row < values.length; row++) {
// Iterate over rows
for (int col = 0; col < values[row].length; col++) {
assert fieldNames != null;
currentFieldName = fieldNames.get(col);
try {
ArrayWriter innerWriter = listWriter.array(currentFieldName);
if (values[row][col] instanceof Integer) {
innerWriter.scalar().setInt((Integer) values[row][col]);
} else if (values[row][col] instanceof Short) {
innerWriter.scalar().setInt((Short) values[row][col]);
} else if (values[row][col] instanceof Byte) {
innerWriter.scalar().setInt((Byte) values[row][col]);
} else if (values[row][col] instanceof Long) {
innerWriter.scalar().setLong((Long) values[row][col]);
} else if (values[row][col] instanceof Float) {
innerWriter.scalar().setDouble((Float) values[row][col]);
} else if (values[row][col] instanceof Double) {
innerWriter.scalar().setDouble((Double) values[row][col]);
} else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) {
innerWriter.scalar().setBoolean((Boolean) values[row][col]);
} else if (values[row][col] instanceof String) {
innerWriter.scalar().setString((String) values[row][col]);
} else {
logger.warn("Skipping {}/{} due to unsupported data type.", resolvedPath, currentFieldName);
}
if (col == values[row].length) {
innerWriter.save();
}
} catch (TupleWriter.UndefinedColumnException e) {
logger.warn("Drill does not support maps and lists in HDF5 Compound fields. Skipping: {}/{}", resolvedPath, currentFieldName);
}
}
}
}
}
}
@Override
public void close() {
if (hdf5Reader != null) {
hdf5Reader.close();
hdf5Reader = null;
}
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.debug("Failed to close HDF5 Reader.");
}
reader = null;
}
if (inFile != null) {
if (!inFile.delete()) {
logger.warn("{} file not deleted.", inFile.getName());
}
inFile = null;
}
}
}