blob: 96e010cdc41f4f3ddb11ab01da2247a33049371e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.io;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlockMCSR;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.HDFSTool;
public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
{
private static int _numThreads = 1;
public ReaderBinaryBlockParallel( boolean localFS )
{
super(localFS);
_numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException
{
//early abort for known empty matrices (e.g., remote parfor result vars)
if( RETURN_EMPTY_NNZ0 && estnnz == 0 )
return new MatrixBlock((int)rlen, (int)clen, true);
//allocate output matrix block (incl block allocation for parallel)
MatrixBlock ret = createOutputMatrixBlock(rlen, clen, blen, estnnz, true, true);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( (_localFS ? "file:///" : "") + fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
//check existence and non-empty file
checkValidInputFile(fs, path);
//core read
int numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
long numBlocks = (long)Math.ceil((double)rlen / blen);
readBinaryBlockMatrixFromHDFS(path, job, fs, ret,
rlen, clen, blen, numThreads<=numBlocks);
//finally check if change of sparse/dense block representation required
if( !AGGREGATE_BLOCK_NNZ )
ret.recomputeNonZeros();
ret.examSparsity();
return ret;
}
private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest,
long rlen, long clen, int blen, boolean syncBlock )
throws IOException, DMLRuntimeException
{
//set up preferred custom serialization framework for binary block format
if( HDFSTool.USE_BINARYBLOCK_SERIALIZATION )
HDFSTool.addBinaryBlockSerializationFramework( job );
try
{
//create read tasks for all files
ExecutorService pool = CommonThreadPool.get(_numThreads);
ArrayList<ReadFileTask> tasks = new ArrayList<>();
for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){
ReadFileTask t = new ReadFileTask(lpath, job, dest, rlen, clen, blen, syncBlock);
tasks.add(t);
}
//wait until all tasks have been executed
List<Future<Object>> rt = pool.invokeAll(tasks);
//check for exceptions and aggregate nnz
long lnnz = 0;
for( Future<Object> task : rt )
lnnz += (Long)task.get();
//post-processing
dest.setNonZeros( lnnz );
if( dest.isInSparseFormat() && clen>blen )
sortSparseRowsParallel(dest, rlen, _numThreads, pool);
pool.shutdown();
}
catch (Exception e) {
throw new IOException("Failed parallel read of binary block input.", e);
}
}
private static class ReadFileTask implements Callable<Object>
{
private final Path _path;
private final JobConf _job;
private final MatrixBlock _dest;
private final long _rlen, _clen;
private final int _blen;
private final boolean _syncBlocks;
public ReadFileTask(Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen, boolean syncBlocks) {
_path = path;
_job = job;
_dest = dest;
_rlen = rlen;
_clen = clen;
_blen = blen;
_syncBlocks = syncBlocks;
}
@Override
public Object call() throws Exception
{
boolean sparse = _dest.isInSparseFormat();
MatrixIndexes key = new MatrixIndexes();
MatrixBlock value = getReuseBlock(_blen, sparse);
long lnnz = 0; //aggregate block nnz
//directly read from sequence files (individual partfiles)
SequenceFile.Reader reader = new SequenceFile
.Reader(_job, SequenceFile.Reader.file(_path));
try
{
//note: next(key, value) does not yet exploit the given serialization classes, record reader does but is generally slower.
while( reader.next(key, value) )
{
//empty block filter (skip entire block)
if( value.isEmptyBlock(false) )
continue;
int row_offset = (int)(key.getRowIndex()-1)*_blen;
int col_offset = (int)(key.getColumnIndex()-1)*_blen;
int rows = value.getNumRows();
int cols = value.getNumColumns();
//bound check per block
if( row_offset + rows < 0 || row_offset + rows > _rlen
|| col_offset + cols<0 || col_offset + cols > _clen ) {
throw new IOException("Matrix block ["+(row_offset+1)+":"
+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
"out of overall matrix range [1:"+_rlen+",1:"+_clen+"].");
}
//copy block to result
if( sparse )
{
//note: append requires final sort
if (cols < _clen ) {
//sparse requires lock, when matrix is wider than one block
//(fine-grained locking of block rows instead of the entire matrix)
//NOTE: fine-grained locking depends on MCSR SparseRow objects
SparseBlock sblock = _dest.getSparseBlock();
if( sblock instanceof SparseBlockMCSR && sblock.get(row_offset) != null ) {
if( _syncBlocks ) {
synchronized( sblock.get(row_offset) ){
_dest.appendToSparse(value, row_offset, col_offset);
}
}
else {
for( int i=0; i<rows; i++ )
synchronized( sblock.get(row_offset+i) ) {
_dest.appendRowToSparse(sblock, value, i, row_offset, col_offset, true);
}
}
}
else {
synchronized( _dest ){
_dest.appendToSparse(value, row_offset, col_offset);
}
}
}
else { //quickpath (no synchronization)
_dest.appendToSparse(value, row_offset, col_offset);
}
}
else {
_dest.copy( row_offset, row_offset+rows-1,
col_offset, col_offset+cols-1, value, false );
}
//aggregate nnz
lnnz += value.getNonZeros();
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
return lnnz;
}
}
}