blob: cbf067d682fad643c6035e4c1649d424fe2373f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.parfor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.HDFSTool;
/**
* This is the base class for all data partitioner.
*
*/
public abstract class DataPartitioner
{
protected static final Log LOG = LogFactory.getLog(DataPartitioner.class.getName());
//note: the following value has been empirically determined but might change in the future,
//MatrixBlockDSM.SPARCITY_TURN_POINT (with 0.4) was too high because we create 3-4 values per nnz and
//have some computation overhead for binary cell.
protected static final double SPARSITY_CELL_THRESHOLD = 0.1d;
protected static final String NAME_SUFFIX = "_dp";
//instance variables
protected PDataPartitionFormat _format = null;
protected int _n = -1; //blocksize if applicable
protected boolean _allowBinarycell = true;
protected DataPartitioner( PDataPartitionFormat dpf, int n ) {
_format = dpf;
_n = n;
}
public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew ) {
return createPartitionedMatrixObject(in, fnameNew, false);
}
public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew, boolean force ) {
MatrixObject out = new MatrixObject(in.getValueType(), fnameNew);
return createPartitionedMatrixObject(in, out, force);
}
/**
* Creates a partitioned matrix object based on the given input matrix object,
* according to the specified split format. The input matrix can be in-memory
* or still on HDFS and the partitioned output matrix is written to HDFS. The
* created matrix object can be used transparently for obtaining the full matrix
* or reading 1 or multiple partitions based on given index ranges.
*
* @param in input matrix object
* @param out output matrix object
* @param force if false, try to optimize
* @return partitioned matrix object
*/
public MatrixObject createPartitionedMatrixObject( MatrixObject in, MatrixObject out, boolean force ) {
//check for naive partitioning
if( _format == PDataPartitionFormat.NONE )
return in;
//analyze input matrix object
MetaDataFormat meta = (MetaDataFormat)in.getMetaData();
DataCharacteristics dc = meta.getDataCharacteristics();
FileFormat fmt = meta.getFileFormat();
long rows = dc.getRows();
long cols = dc.getCols();
int blen = dc.getBlocksize();
long nonZeros = dc.getNonZeros();
if( !force ) //try to optimize, if format not forced
{
//check lower bound of useful data partitioning
if( rows < Hop.CPThreshold && cols < Hop.CPThreshold ) //or matrix already fits in mem
{
return in;
}
//check for changing to blockwise representations
if( _format == PDataPartitionFormat.ROW_WISE && cols < Hop.CPThreshold )
{
LOG.debug("Changing format from "+PDataPartitionFormat.ROW_WISE+" to "+PDataPartitionFormat.ROW_BLOCK_WISE+".");
_format = PDataPartitionFormat.ROW_BLOCK_WISE;
}
if( _format == PDataPartitionFormat.COLUMN_WISE && rows < Hop.CPThreshold )
{
LOG.debug("Changing format from "+PDataPartitionFormat.COLUMN_WISE+" to "+PDataPartitionFormat.ROW_BLOCK_WISE+".");
_format = PDataPartitionFormat.COLUMN_BLOCK_WISE;
}
//_format = PDataPartitionFormat.ROW_BLOCK_WISE_N;
}
//prepare filenames and cleanup if required
String fnameNew = out.getFileName();
try{
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
}
catch(Exception ex){
throw new DMLRuntimeException( ex );
}
//core partitioning (depending on subclass)
partitionMatrix( in, fnameNew, fmt, rows, cols, blen );
//create output matrix object
out.setPartitioned( _format, _n );
MatrixCharacteristics mcNew = new MatrixCharacteristics(rows, cols, blen);
mcNew.setNonZeros( nonZeros );
MetaDataFormat metaNew = new MetaDataFormat(mcNew, fmt);
out.setMetaData(metaNew);
return out;
}
public void disableBinaryCell()
{
_allowBinarycell = false;
}
protected abstract void partitionMatrix( MatrixObject in, String fnameNew, FileFormat fmt, long rlen, long clen, int blen );
public static MatrixBlock createReuseMatrixBlock( PDataPartitionFormat dpf, int rows, int cols ) {
MatrixBlock tmp = null;
switch( dpf ) {
case ROW_WISE:
//default assumption sparse, but reset per input block anyway
tmp = new MatrixBlock(1, cols, true, (int)(cols*0.1));
break;
case COLUMN_WISE:
//default dense because single column alwyas below SKINNY_MATRIX_TURN_POINT
tmp = new MatrixBlock(rows, 1, false);
break;
default:
//do nothing
}
return tmp;
}
}