blob: 26a2eaf516217c0d715bed155e4c6da7ac88d010 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions.CountRowsTask;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.UtilFunctions;
/**
* Parallel version of ReaderTextCSV.java. To summarize, we do two passes in
* order to compute row offsets and the actual read. We accordingly create count
* and read tasks and use fixed-size thread pools to execute these tasks. If the
* target matrix is dense, the inserts are done lock-free. In contrast to
* textcell parallel read, we also do lock-free inserts. If the matrix is
* sparse, because splits contain row partitioned lines and hence there is no
* danger of lost updates. Note, there is also no sorting of sparse rows
* required because data comes in sorted order per row.
*
*/
public class ReaderTextCSVParallel extends MatrixReader
{
private FileFormatPropertiesCSV _props = null;
private int _numThreads = 1;
private SplitOffsetInfos _offsets = null;
public ReaderTextCSVParallel(FileFormatPropertiesCSV props) {
_numThreads = OptimizerUtils.getParallelTextReadParallelism();
_props = props;
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, job,
_props.hasHeader(), _props.getDelim(), rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readCSVMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen,
_props.hasHeader(), _props.getDelim(), _props.isFill(),
_props.getFillValue(), _props.getNAStrings());
//post-processing (representation-specific, change of sparse/dense block representation)
// - no sorting required for CSV because it is read in sorted order per row
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
@Override
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException
{
//not implemented yet, fallback to sequential reader
return new ReaderTextCSV(_props)
.readMatrixFromInputStream(is, rlen, clen, blen, estnnz);
}
private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int blen,
boolean hasHeader, String delim, boolean fill, double fillValue, HashSet<String> naStrings)
throws IOException
{
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
ExecutorService pool = CommonThreadPool.get(_numThreads);
try
{
// create read tasks for all splits
ArrayList<CSVReadTask> tasks = new ArrayList<>();
int splitCount = 0;
for (InputSplit split : splits) {
tasks.add( new CSVReadTask(split, _offsets, informat, job, dest,
rlen, clen, hasHeader, delim, fill, fillValue, splitCount++, naStrings) );
}
pool.invokeAll(tasks);
pool.shutdown();
// check return codes and aggregate nnz
long lnnz = 0;
for (CSVReadTask rt : tasks) {
lnnz += rt.getPartialNnz();
if (!rt.getReturnCode()) {
Exception err = rt.getException();
throw new IOException("Read task for csv input failed: "+ err.toString(), err);
}
}
dest.setNonZeros(lnnz);
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
}
private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
JobConf job, boolean hasHeader, String delim, long rlen, long clen, long estnnz)
throws IOException, DMLRuntimeException
{
int nrow = 0;
int ncol = 0;
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
// count no of entities in the first non-header row
LongWritable key = new LongWritable();
Text oneLine = new Text();
RecordReader<LongWritable, Text> reader = informat
.getRecordReader(splits[0], job, Reporter.NULL);
try {
if (reader.next(key, oneLine)) {
String cellStr = oneLine.toString().trim();
ncol = StringUtils.countMatches(cellStr, delim) + 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
// count rows in parallel per split
try
{
ExecutorService pool = CommonThreadPool.get(_numThreads);
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for (InputSplit split : splits) {
tasks.add(new CountRowsTask(split, informat, job, hasHeader));
hasHeader = false;
}
List<Future<Long>> ret = pool.invokeAll(tasks);
pool.shutdown();
// collect row counts for offset computation
// early error notify in case not all tasks successful
_offsets = new SplitOffsetInfos(tasks.size());
for (Future<Long> rc : ret) {
int lnrow = (int)rc.get().longValue(); //incl error handling
_offsets.setOffsetPerSplit(ret.indexOf(rc), nrow);
_offsets.setLenghtPerSplit(ret.indexOf(rc), lnrow);
nrow = nrow + lnrow;
}
}
catch (Exception e) {
throw new IOException("Threadpool Error " + e.getMessage(), e);
}
//robustness for wrong dimensions which are already compiled into the plan
if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
if( rlen < nrow || clen < ncol ) {
//a) specified matrix dimensions too small
throw new DMLRuntimeException(msg);
}
else {
//b) specified matrix dimensions too large -> padding and warning
LOG.warn(msg);
nrow = (int) rlen;
ncol = (int) clen;
}
}
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free insert into target
long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
private static class SplitOffsetInfos {
// offset & length info per split
private int[] offsetPerSplit = null;
private int[] lenghtPerSplit = null;
public SplitOffsetInfos(int numSplits) {
lenghtPerSplit = new int[numSplits];
offsetPerSplit = new int[numSplits];
}
public int getLenghtPerSplit(int split) {
return lenghtPerSplit[split];
}
public void setLenghtPerSplit(int split, int r) {
lenghtPerSplit[split] = r;
}
public int getOffsetPerSplit(int split) {
return offsetPerSplit[split];
}
public void setOffsetPerSplit(int split, int o) {
offsetPerSplit[split] = o;
}
}
private static class CSVReadTask implements Callable<Object>
{
private InputSplit _split = null;
private SplitOffsetInfos _splitoffsets = null;
private boolean _sparse = false;
private TextInputFormat _informat = null;
private JobConf _job = null;
private MatrixBlock _dest = null;
private long _rlen = -1;
private long _clen = -1;
private boolean _isFirstSplit = false;
private boolean _hasHeader = false;
private boolean _fill = false;
private double _fillValue = 0;
private String _delim = null;
private int _splitCount = 0;
private boolean _rc = true;
private Exception _exception = null;
private long _nnz;
private HashSet<String> _naStrings;
public CSVReadTask(InputSplit split, SplitOffsetInfos offsets,
TextInputFormat informat, JobConf job, MatrixBlock dest,
long rlen, long clen, boolean hasHeader, String delim,
boolean fill, double fillValue, int splitCount, HashSet<String> naStrings)
{
_split = split;
_splitoffsets = offsets; // new SplitOffsetInfos(offsets);
_sparse = dest.isInSparseFormat();
_informat = informat;
_job = job;
_dest = dest;
_rlen = rlen;
_clen = clen;
_isFirstSplit = (splitCount == 0);
_hasHeader = hasHeader;
_fill = fill;
_fillValue = fillValue;
_delim = delim;
_rc = true;
_splitCount = splitCount;
_naStrings = naStrings;
}
public boolean getReturnCode() {
return _rc;
}
public Exception getException() {
return _exception;
}
public long getPartialNnz() {
return _nnz;
}
@Override
public Object call()
throws Exception
{
int row = 0;
int col = 0;
double cellValue = 0;
long lnnz = 0;
try
{
RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
// skip the header line
if (_isFirstSplit && _hasHeader) {
reader.next(key, value);
}
boolean noFillEmpty = false;
row = _splitoffsets.getOffsetPerSplit(_splitCount);
try {
if (_sparse) // SPARSE<-value
{
while (reader.next(key, value)) // foreach line
{
String cellStr = value.toString().trim();
String[] parts = IOUtilFunctions.split(cellStr, _delim);
col = 0;
for (String part : parts) // foreach cell
{
part = part.trim();
if (part.isEmpty()) {
noFillEmpty |= !_fill;
cellValue = _fillValue;
}
else {
cellValue = UtilFunctions.parseToDouble(part,_naStrings);
}
if( cellValue != 0 ) {
_dest.appendValue(row, col, cellValue);
lnnz++;
}
col++;
}
// sanity checks (number of columns, fill values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _fill, noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _clen);
row++;
}
}
else // DENSE<-value
{
DenseBlock a = _dest.getDenseBlock();
while (reader.next(key, value)) { // foreach line
String cellStr = value.toString().trim();
String[] parts = IOUtilFunctions.split(cellStr, _delim);
col = 0;
for (String part : parts) { // foreach cell
part = part.trim();
if (part.isEmpty()) {
noFillEmpty |= !_fill;
cellValue = _fillValue;
}
else {
cellValue = UtilFunctions.parseToDouble(part,_naStrings);
}
if( cellValue != 0 ) {
a.set(row, col, cellValue);
lnnz++;
}
col++;
}
// sanity checks (number of columns, fill values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _fill, noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _clen);
row++;
}
}
// sanity checks (number of rows)
if (row != (_splitoffsets.getOffsetPerSplit(_splitCount) + _splitoffsets.getLenghtPerSplit(_splitCount)) )
{
throw new IOException("Incorrect number of rows ("+ row+ ") found in delimited file ("
+ (_splitoffsets.getOffsetPerSplit(_splitCount)
+ _splitoffsets.getLenghtPerSplit(_splitCount))+ "): " + value);
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
catch (Exception ex) {
// central error handling (return code, message)
_rc = false;
_exception = ex;
// post-mortem error handling and bounds checking
if (row < 0 || row + 1 > _rlen || col < 0 || col + 1 > _clen) {
String errMsg = "CSV cell [" + (row + 1) + "," + (col + 1)+ "] " +
"out of overall matrix range [1:" + _rlen+ ",1:" + _clen + "]. " + ex.getMessage();
throw new IOException(errMsg, _exception);
}
else {
String errMsg = "Unable to read matrix in text CSV format. "+ ex.getMessage();
throw new IOException(errMsg, _exception);
}
}
//post processing
_nnz = lnnz;
return null;
}
}
}