blob: b4dbae42fb03ed6833c3ab035993c5c43d23ac78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.hops;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.sysml.conf.CompilerConfig.ConfigType;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.lops.Data;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.util.LocalFileUtils;
public class DataOp extends Hop
{
private DataOpTypes _dataop;
private String _fileName = null;
//read dataop properties
private FileFormatTypes _inFormat = FileFormatTypes.TEXT;
private long _inRowsInBlock = -1;
private long _inColsInBlock = -1;
private boolean _recompileRead = true;
/**
* List of "named" input parameters. They are maintained as a hashmap:
* parameter names (String) are mapped as indices (Integer) into getInput()
* arraylist.
*
* i.e., getInput().get(_paramIndexMap.get(parameterName)) refers to the Hop
* that is associated with parameterName.
*/
private HashMap<String, Integer> _paramIndexMap = new HashMap<String, Integer>();
private DataOp() {
//default constructor for clone
}
/**
* READ operation for Matrix w/ dim1, dim2.
* This constructor does not support any expression in parameters
*/
public DataOp(String l, DataType dt, ValueType vt, DataOpTypes dop,
String fname, long dim1, long dim2, long nnz, long rowsPerBlock, long colsPerBlock) {
super(l, dt, vt);
_dataop = dop;
_fileName = fname;
setDim1(dim1);
setDim2(dim2);
setRowsInBlock(rowsPerBlock);
setColsInBlock(colsPerBlock);
setNnz(nnz);
if( dop == DataOpTypes.TRANSIENTREAD )
setInputFormatType(FileFormatTypes.BINARY);
}
public DataOp(String l, DataType dt, ValueType vt, DataOpTypes dop,
String fname, long dim1, long dim2, long nnz, UpdateType update, long rowsPerBlock, long colsPerBlock) {
this(l, dt, vt, dop, fname, dim1, dim2, nnz, rowsPerBlock, colsPerBlock);
setUpdateType(update);
}
/**
* READ operation for Matrix
* This constructor supports expressions in parameters
*/
public DataOp(String l, DataType dt, ValueType vt,
DataOpTypes dop, HashMap<String, Hop> inputParameters) {
super(l, dt, vt);
_dataop = dop;
int index = 0;
for( Entry<String, Hop> e : inputParameters.entrySet() )
{
String s = e.getKey();
Hop input = e.getValue();
getInput().add(input);
input.getParent().add(this);
_paramIndexMap.put(s, index);
index++;
}
if (dop == DataOpTypes.TRANSIENTREAD ){
setInputFormatType(FileFormatTypes.BINARY);
}
}
// WRITE operation
// This constructor does not support any expression in parameters
public DataOp(String l, DataType dt, ValueType vt, Hop in,
DataOpTypes dop, String fname) {
super(l, dt, vt);
_dataop = dop;
getInput().add(0, in);
in.getParent().add(this);
_fileName = fname;
if (dop == DataOpTypes.TRANSIENTWRITE || dop == DataOpTypes.FUNCTIONOUTPUT )
setInputFormatType(FileFormatTypes.BINARY);
}
// CHECKPOINT operation
// This constructor does not support any expression in parameters
public DataOp(String l, DataType dt, ValueType vt, Hop in,
LiteralOp level, DataOpTypes dop, String fname) {
super(l, dt, vt);
_dataop = dop;
getInput().add(0, in);
getInput().add(1, level);
in.getParent().add(this);
level.getParent().add(this);
_fileName = fname;
if (dop == DataOpTypes.TRANSIENTWRITE || dop == DataOpTypes.FUNCTIONOUTPUT )
setInputFormatType(FileFormatTypes.BINARY);
}
/**
* WRITE operation for Matrix
* This constructor supports expression in parameters
*/
public DataOp(String l, DataType dt, ValueType vt,
DataOpTypes dop, Hop in, HashMap<String, Hop> inputParameters) {
super(l, dt, vt);
_dataop = dop;
getInput().add(0, in);
in.getParent().add(this);
if (inputParameters != null){
int index = 1;
for( Entry<String, Hop> e : inputParameters.entrySet() )
{
String s = e.getKey();
Hop input = e.getValue();
getInput().add(input);
input.getParent().add(this);
_paramIndexMap.put(s, index);
index++;
}
}
if (dop == DataOpTypes.TRANSIENTWRITE)
setInputFormatType(FileFormatTypes.BINARY);
}
public DataOpTypes getDataOpType()
{
return _dataop;
}
public void setDataOpType( DataOpTypes type )
{
_dataop = type;
}
public void setOutputParams(long dim1, long dim2, long nnz, UpdateType update, long rowsPerBlock, long colsPerBlock) {
setDim1(dim1);
setDim2(dim2);
setNnz(nnz);
setUpdateType(update);
setRowsInBlock(rowsPerBlock);
setColsInBlock(colsPerBlock);
}
public void setFileName(String fn) {
_fileName = fn;
}
public String getFileName() {
return _fileName;
}
public int getParameterIndex(String name)
{
return _paramIndexMap.get(name);
}
@Override
public Lop constructLops()
throws HopsException, LopsException
{
//return already created lops
if( getLops() != null )
return getLops();
ExecType et = optFindExecType();
Lop l = null;
// construct lops for all input parameters
HashMap<String, Lop> inputLops = new HashMap<String, Lop>();
for (Entry<String, Integer> cur : _paramIndexMap.entrySet()) {
inputLops.put(cur.getKey(), getInput().get(cur.getValue())
.constructLops());
}
// Create the lop
switch(_dataop)
{
case TRANSIENTREAD:
l = new Data(HopsData2Lops.get(_dataop), null, inputLops, getName(), null,
getDataType(), getValueType(), true, getInputFormatType());
setOutputDimensions(l);
break;
case PERSISTENTREAD:
l = new Data(HopsData2Lops.get(_dataop), null, inputLops, getName(), null,
getDataType(), getValueType(), false, getInputFormatType());
l.getOutputParameters().setDimensions(getDim1(), getDim2(), _inRowsInBlock, _inColsInBlock, getNnz(), getUpdateType());
break;
case PERSISTENTWRITE:
l = new Data(HopsData2Lops.get(_dataop), getInput().get(0).constructLops(), inputLops, getName(), null,
getDataType(), getValueType(), false, getInputFormatType());
((Data)l).setExecType(et);
setOutputDimensions(l);
break;
case TRANSIENTWRITE:
l = new Data(HopsData2Lops.get(_dataop), getInput().get(0).constructLops(), inputLops, getName(), null,
getDataType(), getValueType(), true, getInputFormatType());
setOutputDimensions(l);
break;
case FUNCTIONOUTPUT:
l = new Data(HopsData2Lops.get(_dataop), getInput().get(0).constructLops(), inputLops, getName(), null,
getDataType(), getValueType(), true, getInputFormatType());
((Data)l).setExecType(et);
setOutputDimensions(l);
break;
default:
throw new LopsException("Invalid operation type for Data LOP: " + _dataop);
}
setLineNumbers(l);
setLops(l);
//add reblock/checkpoint lops if necessary
constructAndSetLopsDataFlowProperties();
return getLops();
}
public void setInputFormatType(FileFormatTypes ft) {
_inFormat = ft;
}
public FileFormatTypes getInputFormatType() {
return _inFormat;
}
public void setInputBlockSizes( long brlen, long bclen ){
setInputRowsInBlock(brlen);
setInputColsInBlock(bclen);
}
public void setInputRowsInBlock( long brlen ){
_inRowsInBlock = brlen;
}
public long getInputRowsInBlock(){
return _inRowsInBlock;
}
public void setInputColsInBlock( long bclen ){
_inColsInBlock = bclen;
}
public long getInputColsInBlock(){
return _inColsInBlock;
}
public boolean isRead()
{
return( _dataop == DataOpTypes.PERSISTENTREAD || _dataop == DataOpTypes.TRANSIENTREAD );
}
public boolean isWrite()
{
return( _dataop == DataOpTypes.PERSISTENTWRITE || _dataop == DataOpTypes.TRANSIENTWRITE );
}
public boolean isPersistentReadWrite()
{
return( _dataop == DataOpTypes.PERSISTENTREAD || _dataop == DataOpTypes.PERSISTENTWRITE );
}
@Override
public String getOpString() {
String s = new String("");
s += HopsData2String.get(_dataop);
s += " "+getName();
return s;
}
public void printMe() throws HopsException {
if (LOG.isDebugEnabled()){
if (getVisited() != VisitStatus.DONE) {
super.printMe();
LOG.debug(" DataOp: " + _dataop);
if (_fileName != null) {
LOG.debug(" file: " + _fileName);
}
LOG.debug(" format: " + getInputFormatType());
for (Hop h : getInput()) {
h.printMe();
}
}
setVisited(VisitStatus.DONE);
}
}
@Override
public boolean allowsAllExecTypes()
{
return false;
}
@Override
protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
{
double ret = 0;
if ( getDataType() == DataType.SCALAR )
{
switch( getValueType() )
{
case INT:
ret = OptimizerUtils.INT_SIZE; break;
case DOUBLE:
ret = OptimizerUtils.DOUBLE_SIZE; break;
case BOOLEAN:
ret = OptimizerUtils.BOOLEAN_SIZE; break;
case STRING:
// by default, it estimates the size of string[100]
ret = 100 * OptimizerUtils.CHAR_SIZE; break;
case OBJECT:
ret = OptimizerUtils.DEFAULT_SIZE; break;
default:
ret = 0;
}
}
else //MATRIX / FRAME
{
if( _dataop == DataOpTypes.PERSISTENTREAD
|| _dataop == DataOpTypes.TRANSIENTREAD )
{
double sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
ret = OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
}
// output memory estimate is not required for "write" nodes (just input)
}
return ret;
}
@Override
protected double computeIntermediateMemEstimate( long dim1, long dim2, long nnz )
{
return LocalFileUtils.BUFFER_SIZE;
}
@Override
protected long[] inferOutputCharacteristics( MemoTable memo )
{
long[] ret = null;
if( _dataop == DataOpTypes.PERSISTENTWRITE
|| _dataop == DataOpTypes.TRANSIENTWRITE )
{
MatrixCharacteristics mc = memo.getAllInputStats(getInput().get(0));
if( mc.dimsKnown() )
ret = new long[]{ mc.getRows(), mc.getCols(), mc.getNonZeros() };
}
else if( _dataop == DataOpTypes.TRANSIENTREAD )
{
//prepare statistics, passed from cross-dag transient writes
MatrixCharacteristics mc = memo.getAllInputStats(this);
if( mc.dimsKnown() )
ret = new long[]{ mc.getRows(), mc.getCols(), mc.getNonZeros() };
}
return ret;
}
@Override
protected ExecType optFindExecType()
throws HopsException
{
//MB: find exec type has two meanings here: (1) for write it means the actual
//exec type, while (2) for read it affects the recompilation decision as needed
//for example for sum(X) where the memory consumption is solely determined by the DataOp
ExecType letype = (OptimizerUtils.isMemoryBasedOptLevel()) ? findExecTypeByMemEstimate() : null;
ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
//NOTE: independent of etype executed in MR (piggybacked) if input to persistent write is MR
if( _dataop == DataOpTypes.PERSISTENTWRITE || _dataop == DataOpTypes.TRANSIENTWRITE )
{
checkAndSetForcedPlatform();
//additional check for write only
if( getDataType()==DataType.SCALAR || (getDataType()==DataType.FRAME && REMOTE==ExecType.MR) )
_etypeForced = ExecType.CP;
if( _etypeForced != null )
{
_etype = _etypeForced;
}
else
{
if ( OptimizerUtils.isMemoryBasedOptLevel() )
{
_etype = letype;
}
else if ( getInput().get(0).areDimsBelowThreshold() )
{
_etype = ExecType.CP;
}
else
{
_etype = REMOTE;
}
//check for valid CP dimensions and matrix size
checkAndSetInvalidCPDimsAndSize();
}
//mark for recompile (forever)
if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && _etype==REMOTE ) {
setRequiresRecompile();
}
}
else //READ
{
//mark for recompile (forever)
if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && letype==REMOTE
&& (_recompileRead || _requiresCheckpoint) )
{
setRequiresRecompile();
}
_etype = letype;
}
return _etype;
}
@Override
public void refreshSizeInformation()
{
if( _dataop == DataOpTypes.PERSISTENTWRITE || _dataop == DataOpTypes.TRANSIENTWRITE )
{
Hop input1 = getInput().get(0);
setDim1(input1.getDim1());
setDim2(input1.getDim2());
setNnz(input1.getNnz());
}
else //READ
{
//do nothing; dimensions updated via set output params
}
}
/**
* Explicitly disables recompilation of transient reads, this additional information
* is required because requiresRecompile is set in a top-down manner, hence any value
* set from a consuming operating would be overwritten by opFindExecType.
*/
public void disableRecompileRead()
{
_recompileRead = false;
}
@Override
@SuppressWarnings("unchecked")
public Object clone() throws CloneNotSupportedException
{
DataOp ret = new DataOp();
//copy generic attributes
ret.clone(this, false);
//copy specific attributes
ret._dataop = _dataop;
ret._fileName = _fileName;
ret._inFormat = _inFormat;
ret._inRowsInBlock = _inRowsInBlock;
ret._inColsInBlock = _inColsInBlock;
ret._recompileRead = _recompileRead;
ret._paramIndexMap = (HashMap<String, Integer>) _paramIndexMap.clone();
//note: no deep cp of params since read-only
return ret;
}
@Override
public boolean compare( Hop that )
{
if( !(that instanceof DataOp) )
return false;
//common subexpression elimination for redundant persistent reads, in order
//to avoid unnecessary read and reblocks as well as to prevent specific anomalies, e.g.,
//with multiple piggybacked csvreblock of the same input w/ unknown input sizes
DataOp that2 = (DataOp)that;
boolean ret = ( OptimizerUtils.ALLOW_COMMON_SUBEXPRESSION_ELIMINATION
&& ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_CSE_PERSISTENT_READS)
&&_dataop == that2._dataop
&& _dataop == DataOpTypes.PERSISTENTREAD
&& _fileName.equals(that2._fileName)
&& _inFormat == that2._inFormat
&& _inRowsInBlock == that2._inRowsInBlock
&& _inColsInBlock == that2._inColsInBlock
&& _paramIndexMap!=null && that2._paramIndexMap!=null );
//above conditions also ensure consistency with regard to
//(1) checkpointing, (2) reblock and (3) recompile.
if( ret ) {
for( Entry<String,Integer> e : _paramIndexMap.entrySet() ) {
String key1 = e.getKey();
int pos1 = e.getValue();
int pos2 = that2._paramIndexMap.get(key1);
ret &= ( that2.getInput().get(pos2)!=null
&& getInput().get(pos1) == that2.getInput().get(pos2) );
}
}
return ret;
}
/**
* Remove an input from the list of inputs and from the parameter index map.
* Parameter index map values higher than the index of the removed input
* will be decremented by one.
*
* @param inputName The name of the input to remove
*/
public void removeInput(String inputName) {
int inputIndex = getParameterIndex(inputName);
_input.remove(inputIndex);
_paramIndexMap.remove(inputName);
for (Entry<String, Integer> entry : _paramIndexMap.entrySet()) {
if (entry.getValue() > inputIndex) {
_paramIndexMap.put(entry.getKey(), (entry.getValue() - 1));
}
}
}
}