| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.hops; |
| |
| import java.util.HashMap; |
| import java.util.Map.Entry; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.sysds.common.Types.DataType; |
| import org.apache.sysds.common.Types.FileFormat; |
| import org.apache.sysds.common.Types.OpOpData; |
| import org.apache.sysds.common.Types.ValueType; |
| import org.apache.sysds.conf.CompilerConfig.ConfigType; |
| import org.apache.sysds.conf.ConfigurationManager; |
| import org.apache.sysds.lops.Data; |
| import org.apache.sysds.lops.Federated; |
| import org.apache.sysds.lops.Lop; |
| import org.apache.sysds.lops.LopProperties.ExecType; |
| import org.apache.sysds.lops.LopsException; |
| import org.apache.sysds.lops.Sql; |
| import org.apache.sysds.parser.DataExpression; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.util.LocalFileUtils; |
| |
| /** |
| * A DataOp can be either a persistent read/write or transient read/write - writes will always have at least one input, |
| * but all types can have parameters (e.g., for csv literals of delimiter, header, etc). |
| */ |
| public class DataOp extends Hop { |
| private static final Log LOG = LogFactory.getLog(DataOp.class.getName()); |
| private OpOpData _op; |
| private String _fileName = null; |
| |
| //read dataop properties |
| private FileFormat _inFormat = FileFormat.TEXT; |
| private long _inBlocksize = -1; |
| |
| private boolean _recompileRead = true; |
| |
| /** |
| * List of "named" input parameters. They are maintained as a hashmap: |
| * parameter names (String) are mapped as indices (Integer) into getInput() |
| * arraylist. |
| * |
| * i.e., getInput().get(_paramIndexMap.get(parameterName)) refers to the Hop |
| * that is associated with parameterName. |
| */ |
| private HashMap<String, Integer> _paramIndexMap = new HashMap<>(); |
| |
| private DataOp() { |
| //default constructor for clone |
| } |
| |
| /** |
| * READ operation for Matrix w/ dim1, dim2. |
| * This constructor does not support any expression in parameters |
| * |
| * @param l ? |
| * @param dt data type |
| * @param vt value type |
| * @param dop data operator type |
| * @param fname file name |
| * @param dim1 dimension 1 |
| * @param dim2 dimension 2 |
| * @param nnz number of non-zeros |
| * @param blen rows/cols per block |
| */ |
| public DataOp(String l, DataType dt, ValueType vt, OpOpData dop, |
| String fname, long dim1, long dim2, long nnz, int blen) { |
| super(l, dt, vt); |
| _op = dop; |
| |
| _fileName = fname; |
| setDim1(dim1); |
| setDim2(dim2); |
| setBlocksize(blen); |
| setNnz(nnz); |
| |
| if( dop == OpOpData.TRANSIENTREAD ) |
| setInputFormatType(FileFormat.BINARY); |
| } |
| |
| public DataOp(String l, DataType dt, ValueType vt, OpOpData dop, |
| String fname, long dim1, long dim2, long nnz, UpdateType update, int blen) { |
| this(l, dt, vt, dop, fname, dim1, dim2, nnz, blen); |
| setUpdateType(update); |
| } |
| |
| /** |
| * READ operation for Matrix / SQL operation for tensor |
| * This constructor supports expressions in parameters |
| * |
| * @param l ? |
| * @param dt data type |
| * @param vt value type |
| * @param dop data operator type |
| * @param params input parameters |
| */ |
| public DataOp(String l, DataType dt, ValueType vt, |
| OpOpData dop, HashMap<String, Hop> params) { |
| super(l, dt, vt); |
| _op = dop; |
| |
| int index = 0; |
| for( Entry<String, Hop> e : params.entrySet() ) { |
| String s = e.getKey(); |
| Hop input = e.getValue(); |
| getInput().add(input); |
| if (LOG.isDebugEnabled()){ |
| LOG.debug(String.format("%15s - %s",s,input)); |
| } |
| input.getParent().add(this); |
| |
| _paramIndexMap.put(s, index); |
| index++; |
| } |
| if (dop == OpOpData.TRANSIENTREAD ){ |
| setInputFormatType(FileFormat.BINARY); |
| } |
| |
| if( params.containsKey(DataExpression.READROWPARAM) ) |
| setDim1(((LiteralOp)params.get(DataExpression.READROWPARAM)).getLongValue()); |
| if( params.containsKey(DataExpression.READCOLPARAM) ) |
| setDim2(((LiteralOp)params.get(DataExpression.READCOLPARAM)).getLongValue()); |
| if( params.containsKey(DataExpression.READNNZPARAM) ) |
| setNnz(((LiteralOp)params.get(DataExpression.READNNZPARAM)).getLongValue()); |
| } |
| |
| // WRITE operation |
| // This constructor does not support any expression in parameters |
| public DataOp(String l, DataType dt, ValueType vt, Hop in, |
| OpOpData dop, String fname) { |
| super(l, dt, vt); |
| _op = dop; |
| getInput().add(0, in); |
| in.getParent().add(this); |
| _fileName = fname; |
| |
| if (dop == OpOpData.TRANSIENTWRITE || dop == OpOpData.FUNCTIONOUTPUT ) |
| setInputFormatType(FileFormat.BINARY); |
| } |
| |
| /** |
| * WRITE operation for Matrix |
| * This constructor supports expression in parameters |
| * |
| * @param l ? |
| * @param dt data type |
| * @param vt value type |
| * @param dop data operator type |
| * @param in high-level operator |
| * @param inputParameters input parameters |
| */ |
| public DataOp(String l, DataType dt, ValueType vt, |
| OpOpData dop, Hop in, HashMap<String, Hop> inputParameters) { |
| super(l, dt, vt); |
| _op = dop; |
| |
| getInput().add(0, in); |
| in.getParent().add(this); |
| |
| if (inputParameters != null){ |
| int index = 1; |
| for( Entry<String, Hop> e : inputParameters.entrySet() ) |
| { |
| String s = e.getKey(); |
| Hop input = e.getValue(); |
| getInput().add(input); |
| input.getParent().add(this); |
| |
| _paramIndexMap.put(s, index); |
| index++; |
| } |
| |
| } |
| |
| if (dop == OpOpData.TRANSIENTWRITE) |
| setInputFormatType(FileFormat.BINARY); |
| } |
| |
| /** Check for N (READ) or N+1 (WRITE) inputs. */ |
| @Override |
| public void checkArity() { |
| int sz = _input.size(); |
| int pz = _paramIndexMap.size(); |
| switch (_op) { |
| case PERSISTENTREAD: |
| case TRANSIENTREAD: |
| case SQLREAD: |
| HopsException.check(sz == pz, this, |
| "in %s operator type has %d inputs and %d parameters", _op.name(), sz, pz); |
| break; |
| case PERSISTENTWRITE: |
| case TRANSIENTWRITE: |
| case FUNCTIONOUTPUT: |
| HopsException.check(sz == pz + 1, this, |
| "in %s operator type has %d inputs and %d parameters (expect 1 more input for write operator type)", |
| _op.name(), sz, pz); |
| break; |
| |
| case FEDERATED: |
| //TODO |
| default: |
| //do nothing |
| } |
| } |
| |
| public OpOpData getOp() { |
| return _op; |
| } |
| |
| public void setDataOpType(OpOpData type) { |
| _op = type; |
| } |
| |
| public void setOutputParams(long dim1, long dim2, long nnz, UpdateType update, int blen) { |
| setDim1(dim1); |
| setDim2(dim2); |
| setNnz(nnz); |
| setUpdateType(update); |
| setBlocksize(blen); |
| } |
| |
| public void setFileName(String fn) { |
| _fileName = fn; |
| } |
| |
| public String getFileName() { |
| return _fileName; |
| } |
| |
| public int getParameterIndex(String name) { |
| return _paramIndexMap.get(name); |
| } |
| |
| @Override |
| public boolean isGPUEnabled() { |
| return false; |
| } |
| |
| @Override |
| public Lop constructLops() |
| { |
| //return already created lops |
| if( getLops() != null ) |
| return getLops(); |
| |
| ExecType et = optFindExecType(); |
| Lop l = null; |
| |
| // construct lops for all input parameters |
| HashMap<String, Lop> inputLops = new HashMap<>(); |
| for (Entry<String, Integer> cur : _paramIndexMap.entrySet()) { |
| inputLops.put(cur.getKey(), getInput().get(cur.getValue()) |
| .constructLops()); |
| } |
| |
| // Create the lop |
| switch(_op) |
| { |
| case TRANSIENTREAD: |
| l = new Data(_op, null, inputLops, getName(), null, |
| getDataType(), getValueType(), getInputFormatType()); |
| setOutputDimensions(l); |
| break; |
| |
| case PERSISTENTREAD: |
| l = new Data(_op, null, inputLops, getName(), null, |
| getDataType(), getValueType(), getInputFormatType()); |
| l.getOutputParameters().setDimensions(getDim1(), getDim2(), _inBlocksize, getNnz(), getUpdateType()); |
| break; |
| |
| case PERSISTENTWRITE: |
| case FUNCTIONOUTPUT: |
| l = new Data(_op, getInput().get(0).constructLops(), inputLops, getName(), null, |
| getDataType(), getValueType(), getInputFormatType()); |
| ((Data)l).setExecType(et); |
| setOutputDimensions(l); |
| break; |
| |
| case TRANSIENTWRITE: |
| l = new Data(_op, getInput().get(0).constructLops(), inputLops, getName(), null, |
| getDataType(), getValueType(), getInputFormatType()); |
| setOutputDimensions(l); |
| break; |
| |
| case SQLREAD: |
| l = new Sql(inputLops, getDataType(), getValueType()); |
| break; |
| |
| case FEDERATED: |
| l = new Federated(inputLops, getDataType(), getValueType()); |
| break; |
| |
| default: |
| throw new LopsException("Invalid operation type for Data LOP: " + _op); |
| } |
| |
| setLineNumbers(l); |
| setPrivacy(l); |
| setLops(l); |
| |
| //add reblock/checkpoint lops if necessary |
| constructAndSetLopsDataFlowProperties(); |
| |
| return getLops(); |
| |
| } |
| |
| public void setInputFormatType(FileFormat ft) { |
| _inFormat = ft; |
| } |
| |
| public FileFormat getInputFormatType() { |
| return _inFormat; |
| } |
| |
| public void setInputBlocksize(long blen){ |
| _inBlocksize = blen; |
| } |
| |
| public long getInputBlocksize(){ |
| return _inBlocksize; |
| } |
| |
| public boolean isRead() { |
| return( _op == OpOpData.PERSISTENTREAD || _op == OpOpData.TRANSIENTREAD ); |
| } |
| |
| public boolean isWrite() { |
| return( _op == OpOpData.PERSISTENTWRITE || _op == OpOpData.TRANSIENTWRITE ); |
| } |
| |
| public boolean isPersistentReadWrite() { |
| return( _op == OpOpData.PERSISTENTREAD || _op == OpOpData.PERSISTENTWRITE ); |
| } |
| |
| @Override |
| public String getOpString() { |
| String s = new String(""); |
| s += _op.toString(); |
| s += " "+getName(); |
| return s; |
| } |
| |
| @Override |
| public boolean allowsAllExecTypes() |
| { |
| return false; |
| } |
| |
| @Override |
| protected double computeOutputMemEstimate( long dim1, long dim2, long nnz ) |
| { |
| double ret = 0; |
| |
| if ( getDataType() == DataType.SCALAR ) |
| { |
| switch( getValueType() ) |
| { |
| case INT64: |
| ret = OptimizerUtils.INT_SIZE; break; |
| case FP64: |
| ret = OptimizerUtils.DOUBLE_SIZE; break; |
| case BOOLEAN: |
| ret = OptimizerUtils.BOOLEAN_SIZE; break; |
| case STRING: |
| // by default, it estimates the size of string[100] |
| ret = 100 * OptimizerUtils.CHAR_SIZE; break; |
| case UNKNOWN: |
| ret = OptimizerUtils.DEFAULT_SIZE; break; |
| default: |
| ret = 0; |
| } |
| } |
| else //MATRIX / FRAME |
| { |
| if( _op == OpOpData.PERSISTENTREAD |
| || _op == OpOpData.TRANSIENTREAD ) |
| { |
| double sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz); |
| ret = OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity); |
| } |
| // output memory estimate is not required for "write" nodes (just input) |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| protected double computeIntermediateMemEstimate( long dim1, long dim2, long nnz ) { |
| return LocalFileUtils.BUFFER_SIZE; |
| } |
| |
| @Override |
| protected DataCharacteristics inferOutputCharacteristics( MemoTable memo ) { |
| DataCharacteristics ret = null; |
| if( _op == OpOpData.PERSISTENTWRITE || _op == OpOpData.TRANSIENTWRITE ) { |
| DataCharacteristics tmp = memo.getAllInputStats(getInput().get(0)); |
| if( tmp.dimsKnown() ) |
| ret = tmp; |
| } |
| else if( _op == OpOpData.TRANSIENTREAD ) { |
| //prepare statistics, passed from cross-dag transient writes |
| DataCharacteristics tmp = memo.getAllInputStats(this); |
| if( tmp.dimsKnown() ) |
| ret = tmp; |
| } |
| return ret; |
| } |
| |
| @Override |
| protected ExecType optFindExecType() |
| { |
| //MB: find exec type has two meanings here: (1) for write it means the actual |
| //exec type, while (2) for read it affects the recompilation decision as needed |
| //for example for sum(X) where the memory consumption is solely determined by the DataOp |
| |
| ExecType letype = (OptimizerUtils.isMemoryBasedOptLevel()) ? findExecTypeByMemEstimate() : null; |
| |
| //NOTE: independent of etype executed in MR (piggybacked) if input to persistent write is MR |
| if( _op == OpOpData.PERSISTENTWRITE || _op == OpOpData.TRANSIENTWRITE ) |
| { |
| checkAndSetForcedPlatform(); |
| |
| //additional check for write only |
| if( getDataType()==DataType.SCALAR ) |
| _etypeForced = ExecType.CP; |
| |
| if( _etypeForced != null ) |
| { |
| _etype = _etypeForced; |
| } |
| else |
| { |
| if ( OptimizerUtils.isMemoryBasedOptLevel() ) |
| { |
| _etype = letype; |
| } |
| else if ( getInput().get(0).areDimsBelowThreshold() ) |
| { |
| _etype = ExecType.CP; |
| } |
| else |
| { |
| _etype = ExecType.SPARK; |
| } |
| |
| //check for valid CP dimensions and matrix size |
| checkAndSetInvalidCPDimsAndSize(); |
| } |
| |
| //mark for recompile (forever) |
| setRequiresRecompileIfNecessary(); |
| } |
| else //READ |
| { |
| //mark for recompile (forever) |
| if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && letype==ExecType.SPARK |
| && (_recompileRead || _requiresCheckpoint) ) |
| { |
| setRequiresRecompile(); |
| } |
| |
| _etype = letype; |
| } |
| |
| return _etype; |
| } |
| |
| @Override |
| public void refreshSizeInformation() |
| { |
| if( _op == OpOpData.PERSISTENTWRITE || _op == OpOpData.TRANSIENTWRITE ) |
| { |
| Hop input1 = getInput().get(0); |
| setDim1(input1.getDim1()); |
| setDim2(input1.getDim2()); |
| setNnz(input1.getNnz()); |
| } |
| else //READ |
| { |
| //do nothing; dimensions updated via set output params |
| } |
| } |
| |
| |
| /** |
| * Explicitly disables recompilation of transient reads, this additional information |
| * is required because requiresRecompile is set in a top-down manner, hence any value |
| * set from a consuming operating would be overwritten by opFindExecType. |
| */ |
| public void disableRecompileRead() |
| { |
| _recompileRead = false; |
| } |
| |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public Object clone() throws CloneNotSupportedException |
| { |
| DataOp ret = new DataOp(); |
| |
| //copy generic attributes |
| ret.clone(this, false); |
| |
| //copy specific attributes |
| ret._op = _op; |
| ret._fileName = _fileName; |
| ret._inFormat = _inFormat; |
| ret._inBlocksize = _inBlocksize; |
| ret._recompileRead = _recompileRead; |
| ret._paramIndexMap = (HashMap<String, Integer>) _paramIndexMap.clone(); |
| //note: no deep cp of params since read-only |
| |
| return ret; |
| } |
| |
| @Override |
| public boolean compare( Hop that ) |
| { |
| if( !(that instanceof DataOp) ) |
| return false; |
| |
| //common subexpression elimination for redundant persistent reads, in order |
| //to avoid unnecessary read and reblocks as well as to prevent specific anomalies, e.g., |
| //with multiple piggybacked csvreblock of the same input w/ unknown input sizes |
| |
| DataOp that2 = (DataOp)that; |
| boolean ret = ( OptimizerUtils.ALLOW_COMMON_SUBEXPRESSION_ELIMINATION |
| && ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_CSE_PERSISTENT_READS) |
| && _op == that2._op |
| && _op == OpOpData.PERSISTENTREAD |
| && _fileName.equals(that2._fileName) |
| && _inFormat == that2._inFormat |
| && _inBlocksize == that2._inBlocksize |
| && _paramIndexMap!=null && that2._paramIndexMap!=null ); |
| |
| //above conditions also ensure consistency with regard to |
| //(1) checkpointing, (2) reblock and (3) recompile. |
| |
| if( ret ) { |
| for( Entry<String,Integer> e : _paramIndexMap.entrySet() ) { |
| String key1 = e.getKey(); |
| int pos1 = e.getValue(); |
| int pos2 = that2._paramIndexMap.get(key1); |
| ret &= ( that2.getInput().get(pos2)!=null |
| && getInput().get(pos1) == that2.getInput().get(pos2) ); |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Remove an input from the list of inputs and from the parameter index map. |
| * Parameter index map values higher than the index of the removed input |
| * will be decremented by one. |
| * |
| * @param inputName The name of the input to remove |
| */ |
| public void removeInput(String inputName) { |
| int inputIndex = getParameterIndex(inputName); |
| Hop tmp = _input.remove(inputIndex); |
| tmp._parent.remove(this); |
| _paramIndexMap.remove(inputName); |
| for (Entry<String, Integer> entry : _paramIndexMap.entrySet()) { |
| if (entry.getValue() > inputIndex) { |
| _paramIndexMap.put(entry.getKey(), (entry.getValue() - 1)); |
| } |
| } |
| } |
| |
| } |