blob: 2dc6bb1fbf8e6a13a1015644f43a7e1b8e23bf0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.io.hdf5.H5;
import org.apache.sysds.runtime.io.hdf5.H5Constants;
import org.apache.sysds.runtime.io.hdf5.H5ContiguousDataset;
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
public class ReaderHDF5 extends MatrixReader {
protected final FileFormatPropertiesHDF5 _props;
public ReaderHDF5(FileFormatPropertiesHDF5 props) {
_props = props;
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException {
//allocate output matrix block
MatrixBlock ret = null;
if(rlen >= 0 && clen >= 0) //otherwise allocated on read
ret = createOutputMatrixBlock(rlen, clen, (int) rlen, estnnz, true, false);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
//check existence and non-empty file
checkValidInputFile(fs, path);
//core read
ret = readHDF5MatrixFromHDFS(path, job, fs, ret, rlen, clen, blen, _props.getDatasetName());
//finally check if change of sparse/dense block representation required
//(nnz explicitly maintained during read)
ret.examSparsity();
return ret;
}
@Override
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException {
//allocate output matrix block
MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int) rlen, estnnz, true, false);
//core read
String datasetName = _props.getDatasetName();
BufferedInputStream bis = new BufferedInputStream(is, (int) (H5Constants.STATIC_HEADER_SIZE + (clen * rlen * 8)));
long lnnz = readMatrixFromHDF5(bis, datasetName, ret, 0, rlen, clen, blen);
//finally check if change of sparse/dense block representation required
ret.setNonZeros(lnnz);
ret.examSparsity();
return ret;
}
private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
FileSystem fs, MatrixBlock dest, long rlen, long clen, int blen, String datasetName)
throws IOException, DMLRuntimeException {
//prepare file paths in alphanumeric order
ArrayList<Path> files = new ArrayList<>();
if(fs.getFileStatus(path).isDirectory()) {
for(FileStatus stat : fs.listStatus(path, IOUtilFunctions.hiddenFileFilter))
files.add(stat.getPath());
Collections.sort(files);
}
else
files.add(path);
//determine matrix size via additional pass if required
if(dest == null) {
dest = computeHDF5Size(files, fs, datasetName);
clen = dest.getNumColumns();
rlen = dest.getNumRows();
}
//actual read of individual files
long lnnz = 0;
for(int fileNo = 0; fileNo < files.size(); fileNo++) {
BufferedInputStream bis = new BufferedInputStream(fs.open(files.get(fileNo)),
(int) (H5Constants.STATIC_HEADER_SIZE + (clen * rlen * 8)));
lnnz += readMatrixFromHDF5(bis, datasetName, dest, 0, rlen, clen, blen);
}
//post processing
dest.setNonZeros(lnnz);
return dest;
}
public static long readMatrixFromHDF5(BufferedInputStream bis, String datasetName, MatrixBlock dest,
int row, long rlen, long clen, int blen) {
bis.mark(0);
long lnnz = 0;
H5RootObject rootObject = H5.H5Fopen(bis);
H5ContiguousDataset contiguousDataset = H5.H5Dopen(rootObject, datasetName);
int[] dims = rootObject.getDimensions();
int ncol = dims[1];
DenseBlock denseBlock = dest.getDenseBlock();
double[] data = new double[ncol];
for(int i = row; i < rlen; i++) {
H5.H5Dread(contiguousDataset, i, data);
for(int j = 0; j < ncol; j++) {
if(data[j] != 0) {
denseBlock.set(i, j, data[j]);
lnnz++;
}
}
row++;
}
IOUtilFunctions.closeSilently(bis);
return lnnz;
}
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, String datasetName)
throws IOException, DMLRuntimeException {
int nrow = 0;
int ncol = 0;
for(int fileNo = 0; fileNo < files.size(); fileNo++) {
BufferedInputStream bis = new BufferedInputStream(fs.open(files.get(fileNo)));
H5RootObject rootObject = H5.H5Fopen(bis);
H5.H5Dopen(rootObject, datasetName);
int[] dims = rootObject.getDimensions();
nrow += dims[0];
ncol += dims[1];
IOUtilFunctions.closeSilently(bis);
}
// allocate target matrix block based on given size;
return createOutputMatrixBlock(nrow, ncol, nrow, (long) nrow * ncol, true, false);
}
}