blob: d96b70089f2eb93ce1c76e9eb27934295b802751 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.caching;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.IndexRange;
/**
* Represents a matrix in control program. This class contains method to read
* matrices from HDFS and convert them to a specific format/representation. It
* is also able to write several formats/representation of matrices to HDFS.
* IMPORTANT: Preserve one-to-one correspondence between {@link MatrixObject}
* and {@link MatrixBlock} objects, for cache purposes. Do not change a
* {@link MatrixBlock} object without informing its {@link MatrixObject} object.
*
*/
public class MatrixObject extends CacheableData<MatrixBlock>
{
private static final long serialVersionUID = 6374712373206495637L;
public enum UpdateType {
COPY,
INPLACE,
INPLACE_PINNED;
public boolean isInPlace() {
return (this != COPY);
}
}
//additional matrix-specific flags
private UpdateType _updateType = UpdateType.COPY;
private boolean _diag = false;
private boolean _markForLinCache = false;
//information relevant to partitioned matrices.
private boolean _partitioned = false; //indicates if obj partitioned
private PDataPartitionFormat _partitionFormat = null; //indicates how obj partitioned
private int _partitionSize = -1; //indicates n for BLOCKWISE_N
private String _partitionCacheName = null; //name of cache block
private MatrixBlock _partitionInMemory = null;
/**
* Constructor that takes the value type and the HDFS filename.
*
* @param vt value type
* @param file file name
*/
public MatrixObject (ValueType vt, String file) {
this (vt, file, null); //HDFS file path
}
/**
* Constructor that takes the value type, HDFS filename and associated metadata.
*
* @param vt value type
* @param file file name
* @param mtd metadata
*/
public MatrixObject( ValueType vt, String file, MetaData mtd ) {
super (DataType.MATRIX, vt);
_metaData = mtd;
_hdfsFileName = file;
_cache = null;
_data = null;
}
/**
* Copy constructor that copies meta data but NO data.
*
* @param mo matrix object
*/
public MatrixObject( MatrixObject mo )
{
//base copy constructor
super(mo);
MetaDataFormat metaOld = (MetaDataFormat)mo.getMetaData();
_metaData = new MetaDataFormat(
new MatrixCharacteristics(metaOld.getDataCharacteristics()), metaOld.getFileFormat());
_updateType = mo._updateType;
_diag = mo._diag;
_partitioned = mo._partitioned;
_partitionFormat = mo._partitionFormat;
_partitionSize = mo._partitionSize;
_partitionCacheName = mo._partitionCacheName;
_markForLinCache = mo._markForLinCache;
}
public void setUpdateType(UpdateType flag) {
_updateType = flag;
}
public UpdateType getUpdateType() {
return _updateType;
}
public boolean isDiag() {
return _diag;
}
public void setDiag(boolean diag) {
_diag = diag;
}
public void setMarkForLinCache (boolean mark) {
_markForLinCache = mark;
}
public boolean isMarked() {
return _markForLinCache;
}
@Override
public void updateDataCharacteristics (DataCharacteristics dc) {
_metaData.getDataCharacteristics().set(dc);
}
/**
* Make the matrix metadata consistent with the in-memory matrix data
*/
@Override
public void refreshMetaData() {
if ( _data == null || _metaData ==null ) //refresh only for existing data
throw new DMLRuntimeException("Cannot refresh meta data because there is no data or meta data. ");
//we need to throw an exception, otherwise input/output format cannot be inferred
DataCharacteristics mc = _metaData.getDataCharacteristics();
mc.setDimension( _data.getNumRows(), _data.getNumColumns() );
mc.setNonZeros( _data.getNonZeros() );
}
public long getBlocksize() {
return getDataCharacteristics().getBlocksize();
}
public long getNnz() {
return getDataCharacteristics().getNonZeros();
}
public double getSparsity() {
return OptimizerUtils.getSparsity(getDataCharacteristics());
}
// *********************************************
// *** ***
// *** HIGH-LEVEL PUBLIC METHODS ***
// *** FOR PARTITIONED MATRIX ACCESS ***
// *** (all other methods still usable) ***
// *** ***
// *********************************************
public void setPartitioned( PDataPartitionFormat format, int n )
{
_partitioned = true;
_partitionFormat = format;
_partitionSize = n;
}
public void unsetPartitioned()
{
_partitioned = false;
_partitionFormat = null;
_partitionSize = -1;
}
public boolean isPartitioned()
{
return _partitioned;
}
public PDataPartitionFormat getPartitionFormat()
{
return _partitionFormat;
}
public int getPartitionSize()
{
return _partitionSize;
}
public synchronized void setInMemoryPartition(MatrixBlock block)
{
_partitionInMemory = block;
}
/**
* NOTE: for reading matrix partitions, we could cache (in its real sense) the read block
* with soft references (no need for eviction, as partitioning only applied for read-only matrices).
* However, since we currently only support row- and column-wise partitioning caching is not applied yet.
* This could be changed once we also support column-block-wise and row-block-wise. Furthermore,
* as we reject to partition vectors and support only full row or column indexing, no metadata (apart from
* the partition flag) is required.
*
* @param pred index range
* @return matrix block
*/
public synchronized MatrixBlock readMatrixPartition( IndexRange pred ) {
if( LOG.isTraceEnabled() )
LOG.trace("Acquire partition "+hashCode()+" "+pred);
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
if ( !_partitioned )
throw new DMLRuntimeException("MatrixObject not available to indexed read.");
//return static partition of set from outside of the program
if( _partitionInMemory != null )
return _partitionInMemory;
MatrixBlock mb = null;
try
{
boolean blockwise = (_partitionFormat==PDataPartitionFormat.ROW_BLOCK_WISE || _partitionFormat==PDataPartitionFormat.COLUMN_BLOCK_WISE);
//preparations for block wise access
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics mc = iimd.getDataCharacteristics();
int blen = mc.getBlocksize();
//get filename depending on format
String fname = getPartitionFileName( pred, blen );
//probe cache
if( blockwise && _partitionCacheName != null && _partitionCacheName.equals(fname) )
{
mb = _cache.get(); //try getting block from cache
}
if( mb == null ) //block not in cache
{
//get rows and cols
long rows = -1;
long cols = -1;
switch( _partitionFormat )
{
case ROW_WISE:
rows = 1;
cols = mc.getCols();
break;
case ROW_BLOCK_WISE:
rows = blen;
cols = mc.getCols();
break;
case ROW_BLOCK_WISE_N:
rows = _partitionSize;
cols = mc.getCols();
break;
case COLUMN_WISE:
rows = mc.getRows();
cols = 1;
break;
case COLUMN_BLOCK_WISE:
rows = mc.getRows();
cols = blen;
break;
case COLUMN_BLOCK_WISE_N:
rows = mc.getRows();
cols = _partitionSize;
break;
default:
throw new DMLRuntimeException("Unsupported partition format: "+_partitionFormat);
}
//read the
if( HDFSTool.existsFileOnHDFS(fname) )
mb = readBlobFromHDFS( fname, new long[]{rows, cols} );
else
{
mb = new MatrixBlock((int)rows, (int)cols, true);
LOG.warn("Reading empty matrix partition "+fname);
}
}
//post processing
if( blockwise )
{
//put block into cache
_partitionCacheName = fname;
_cache = new SoftReference<>(mb);
if( _partitionFormat == PDataPartitionFormat.ROW_BLOCK_WISE )
{
int rix = (int)((pred.rowStart-1)%blen);
mb = mb.slice(rix, rix, (int)(pred.colStart-1), (int)(pred.colEnd-1), new MatrixBlock());
}
if( _partitionFormat == PDataPartitionFormat.COLUMN_BLOCK_WISE )
{
int cix = (int)((pred.colStart-1)%blen);
mb = mb.slice((int)(pred.rowStart-1), (int)(pred.rowEnd-1), cix, cix, new MatrixBlock());
}
}
//NOTE: currently no special treatment of non-existing partitions necessary
// because empty blocks are written anyway
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
if( DMLScript.STATISTICS ){
long t1 = System.nanoTime();
CacheStatistics.incrementAcquireRTime(t1-t0);
}
return mb;
}
public String getPartitionFileName( IndexRange pred, int blen )
{
if ( !_partitioned )
throw new DMLRuntimeException("MatrixObject not available to indexed read.");
StringBuilder sb = new StringBuilder();
sb.append(_hdfsFileName);
switch( _partitionFormat )
{
case ROW_WISE:
sb.append(Lop.FILE_SEPARATOR);
sb.append(pred.rowStart);
break;
case ROW_BLOCK_WISE:
sb.append(Lop.FILE_SEPARATOR);
sb.append((pred.rowStart-1)/blen+1);
break;
case ROW_BLOCK_WISE_N:
sb.append(Lop.FILE_SEPARATOR);
sb.append((pred.rowStart-1)/_partitionSize+1);
break;
case COLUMN_WISE:
sb.append(Lop.FILE_SEPARATOR);
sb.append(pred.colStart);
break;
case COLUMN_BLOCK_WISE:
sb.append(Lop.FILE_SEPARATOR);
sb.append((pred.colStart-1)/blen+1);
break;
case COLUMN_BLOCK_WISE_N:
sb.append(Lop.FILE_SEPARATOR);
sb.append((pred.colStart-1)/_partitionSize+1);
break;
default:
throw new DMLRuntimeException("MatrixObject not available to indexed read.");
}
return sb.toString();
}
// *********************************************
// *** ***
// *** LOW-LEVEL PROTECTED METHODS ***
// *** EXTEND CACHEABLE DATA ***
// *** ONLY CALLED BY THE SUPERCLASS ***
// *** ***
// *********************************************
@Override
protected boolean isBelowCachingThreshold() {
return LazyWriteBuffer.getCacheBlockSize(_data) <= CACHING_THRESHOLD
|| getUpdateType() == UpdateType.INPLACE_PINNED;
}
@Override
protected MatrixBlock readBlobFromCache(String fname) throws IOException {
return (MatrixBlock)LazyWriteBuffer.readBlock(fname, true);
}
@Override
protected MatrixBlock readBlobFromHDFS(String fname, long[] dims)
throws IOException
{
long rlen = dims[0];
long clen = dims[1];
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics mc = iimd.getDataCharacteristics();
long begin = 0;
if( LOG.isTraceEnabled() ) {
LOG.trace("Reading matrix from HDFS... " + hashCode() + " Path: " + fname
+ ", dimensions: [" + mc.getRows() + ", " + mc.getCols() + ", " + mc.getNonZeros() + "]");
begin = System.currentTimeMillis();
}
//read matrix and maintain meta data
MatrixBlock newData = isFederated() ? acquireReadAndRelease() :
DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(), rlen,
clen, mc.getBlocksize(), mc.getNonZeros(), getFileFormatProperties());
setHDFSFileExists(true);
//sanity check correct output
if( newData == null )
throw new IOException("Unable to load matrix from file: "+fname);
if( LOG.isTraceEnabled() )
LOG.trace("Reading Completed: " + (System.currentTimeMillis()-begin) + " msec.");
return newData;
}
@Override
protected MatrixBlock readBlobFromRDD(RDDObject rdd, MutableBoolean writeStatus)
throws IOException
{
//note: the read of a matrix block from an RDD might trigger
//lazy evaluation of pending transformations.
RDDObject lrdd = rdd;
//prepare return status (by default only collect)
writeStatus.setValue(false);
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics mc = iimd.getDataCharacteristics();
FileFormat fmt = iimd.getFileFormat();
MatrixBlock mb = null;
try
{
//prevent unnecessary collect through rdd checkpoint
if( rdd.allowsShortCircuitCollect() ) {
lrdd = (RDDObject)rdd.getLineageChilds().get(0);
}
//obtain matrix block from RDD
int rlen = (int)mc.getRows();
int clen = (int)mc.getCols();
int blen = mc.getBlocksize();
long nnz = mc.getNonZerosBound();
//guarded rdd collect
if( fmt == FileFormat.BINARY && //guarded collect not for binary cell
!OptimizerUtils.checkSparkCollectMemoryBudget(mc, getPinnedSize()+getBroadcastSize(), true) ) {
//write RDD to hdfs and read to prevent invalid collect mem consumption
//note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower
if( !HDFSTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
long newnnz = SparkExecutionContext.writeMatrixRDDtoHDFS(lrdd, _hdfsFileName, iimd.getFileFormat());
_metaData.getDataCharacteristics().setNonZeros(newnnz);
rdd.setPending(false); //mark rdd as non-pending (for export)
rdd.setHDFSFile(true); //mark rdd as hdfs file (for restore)
writeStatus.setValue(true); //mark for no cache-write on read
//note: the flag hdfsFile is actually not entirely correct because we still hold an rdd
//reference to the input not to an rdd of the hdfs file but the resulting behavior is correct
}
mb = readBlobFromHDFS(_hdfsFileName);
}
else {
//collect matrix block from binary cell RDD
mb = SparkExecutionContext.toMatrixBlock(lrdd, rlen, clen, blen, nnz);
}
}
catch(DMLRuntimeException ex) {
throw new IOException(ex);
}
//sanity check correct output
if( mb == null )
throw new IOException("Unable to load matrix from rdd.");
return mb;
}
@Override
protected MatrixBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
throws IOException
{
// TODO sparse optimization
MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1], false);
List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = fedMap.requestFederatedData();
try {
for (Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
FederatedRange range = readResponse.getLeft();
FederatedResponse response = readResponse.getRight().get();
// add result
int[] beginDimsInt = range.getBeginDimsInt();
int[] endDimsInt = range.getEndDimsInt();
MatrixBlock multRes = (MatrixBlock) response.getData()[0];
ret.copy(beginDimsInt[0], endDimsInt[0] - 1,
beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
ret.setNonZeros(ret.getNonZeros() + multRes.getNonZeros());
}
}
catch (Exception e) {
throw new DMLRuntimeException("Federated matrix read failed.", e);
}
return ret;
}
/**
* Writes in-memory matrix to HDFS in a specified format.
*/
@Override
protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
throws IOException, DMLRuntimeException
{
long begin = 0;
if( LOG.isTraceEnabled() ){
LOG.trace (" Writing matrix to HDFS... " + hashCode() + " Path: " + fname + ", Format: " +
(ofmt != null ? ofmt : "inferred from metadata"));
begin = System.currentTimeMillis();
}
MetaDataFormat iimd = (MetaDataFormat) _metaData;
if (_data != null)
{
// Get the dimension information from the metadata stored within MatrixObject
DataCharacteristics mc = iimd.getDataCharacteristics();
// Write the matrix to HDFS in requested format
FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());
// when outputFormat is binaryblock, make sure that matrixCharacteristics has correct blocking dimensions
// note: this is only required if singlenode (due to binarycell default)
if ( fmt == FileFormat.BINARY && DMLScript.getGlobalExecMode() == ExecMode.SINGLE_NODE
&& mc.getBlocksize() != ConfigurationManager.getBlocksize() )
{
DataConverter.writeMatrixToHDFS(_data, fname, fmt, new MatrixCharacteristics(mc.getRows(), mc.getCols(),
ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop, _diag);
}
else {
DataConverter.writeMatrixToHDFS(_data, fname, fmt, mc, rep, fprop, _diag);
}
if( LOG.isTraceEnabled() )
LOG.trace("Writing matrix to HDFS ("+fname+") - COMPLETED... " + (System.currentTimeMillis()-begin) + " msec.");
}
else if( LOG.isTraceEnabled() ) {
LOG.trace ("Writing matrix to HDFS ("+fname+") - NOTHING TO WRITE (_data == null).");
}
if( DMLScript.STATISTICS )
CacheStatistics.incrementHDFSWrites();
}
@Override
protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String outputFormat)
throws IOException, DMLRuntimeException
{
//prepare output info
MetaDataFormat iimd = (MetaDataFormat) _metaData;
FileFormat fmt = (outputFormat != null ? FileFormat.safeValueOf(outputFormat) : iimd.getFileFormat());
//note: the write of an RDD to HDFS might trigger
//lazy evaluation of pending transformations.
long newnnz = SparkExecutionContext.writeMatrixRDDtoHDFS(rdd, fname, fmt);
_metaData.getDataCharacteristics().setNonZeros(newnnz);
}
}