blob: ea183fc16f7c759b14c6e2d61bc481ec03259a77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.hops;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.hops.Hop.MultiThreadedHop;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
import org.apache.sysml.lops.Aggregate;
import org.apache.sysml.lops.AppendR;
import org.apache.sysml.lops.Data;
import org.apache.sysml.lops.DataPartition;
import org.apache.sysml.lops.Group;
import org.apache.sysml.lops.GroupedAggregate;
import org.apache.sysml.lops.GroupedAggregateM;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.lops.OutputParameters.Format;
import org.apache.sysml.lops.PMMJ;
import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
import org.apache.sysml.lops.ParameterizedBuiltin;
import org.apache.sysml.lops.RepMat;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.parser.Statement;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput;
import org.apache.sysml.runtime.util.UtilFunctions;
/**
* Defines the HOP for calling an internal function (with custom parameters) from a DML script.
*
*/
public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
{
private static boolean COMPILE_PARALLEL_REMOVEEMPTY = true;
public static boolean FORCE_DIST_RM_EMPTY = false;
//operator type
private ParamBuiltinOp _op;
private int _maxNumThreads = -1; //-1 for unlimited
//removeEmpty hints
private boolean _outputEmptyBlocks = true;
private boolean _outputPermutationMatrix = false;
private boolean _bRmEmptyBC = false;
/**
* List of "named" input parameters. They are maintained as a hashmap:
* parameter names (String) are mapped as indices (Integer) into getInput()
* arraylist.
*
* i.e., getInput().get(_paramIndexMap.get(parameterName)) refers to the Hop
* that is associated with parameterName.
*/
private HashMap<String, Integer> _paramIndexMap = new HashMap<String, Integer>();
private ParameterizedBuiltinOp() {
//default constructor for clone
}
/**
* Creates a new HOP for a function call
*/
public ParameterizedBuiltinOp(String l, DataType dt, ValueType vt,
ParamBuiltinOp op, HashMap<String, Hop> inputParameters) {
super(l, dt, vt);
_op = op;
int index = 0;
for( Entry<String,Hop> e : inputParameters.entrySet() )
{
String s = e.getKey();
Hop input = e.getValue();
getInput().add(input);
input.getParent().add(this);
_paramIndexMap.put(s, index);
index++;
}
//compute unknown dims and nnz
refreshSizeInformation();
}
public HashMap<String, Integer> getParamIndexMap(){
return _paramIndexMap;
}
/**
* Returns a parameters by its name. Returns null if not present
* @param val
* @return
*/
public Hop getInputParameter(String val){
Integer index = getParamIndexMap().get(val);
if (index == null)
return null;
else
return getInput().get(index);
}
@Override
public String getOpString() {
return "" + _op;
}
public ParamBuiltinOp getOp() {
return _op;
}
public void setOutputEmptyBlocks(boolean flag)
{
_outputEmptyBlocks = flag;
}
public void setOutputPermutationMatrix(boolean flag)
{
_outputPermutationMatrix = flag;
}
public Hop getTargetHop() {
return _paramIndexMap.containsKey("target") ?
getInput().get(_paramIndexMap.get("target")) : null;
}
@Override
public void setMaxNumThreads( int k ) {
_maxNumThreads = k;
}
@Override
public int getMaxNumThreads() {
return _maxNumThreads;
}
@Override
public Lop constructLops()
throws HopsException, LopsException
{
//return already created lops
if( getLops() != null )
return getLops();
// construct lops for all input parameters
HashMap<String, Lop> inputlops = new HashMap<String, Lop>();
for (Entry<String, Integer> cur : _paramIndexMap.entrySet()) {
inputlops.put(cur.getKey(), getInput().get(cur.getValue())
.constructLops());
}
switch( _op ) {
case GROUPEDAGG: {
ExecType et = optFindExecType();
constructLopsGroupedAggregate(inputlops, et);
break;
}
case RMEMPTY: {
ExecType et = optFindExecType();
et = (et == ExecType.MR && !COMPILE_PARALLEL_REMOVEEMPTY ) ? ExecType.CP_FILE : et;
constructLopsRemoveEmpty(inputlops, et);
break;
}
case REXPAND: {
ExecType et = optFindExecType();
constructLopsRExpand(inputlops, et);
break;
}
case TRANSFORM: {
ExecType et = optFindExecType();
ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops,
HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
setOutputDimensions(pbilop);
setLineNumbers(pbilop);
// output of transform is always in CSV format
// to produce a blocked output, this lop must be
// fed into CSV Reblock lop.
pbilop.getOutputParameters().setFormat(Format.CSV);
setLops(pbilop);
break;
}
case CDF:
case INVCDF:
case REPLACE:
case TRANSFORMAPPLY:
case TRANSFORMDECODE:
case TRANSFORMMETA:
case TOSTRING: {
ExecType et = optFindExecType();
ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops,
HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
setOutputDimensions(pbilop);
setLineNumbers(pbilop);
setLops(pbilop);
break;
}
default:
throw new HopsException("Unknown ParamBuiltinOp: "+_op);
}
//add reblock/checkpoint lops if necessary
constructAndSetLopsDataFlowProperties();
return getLops();
}
private void constructLopsGroupedAggregate(HashMap<String, Lop> inputlops, ExecType et)
throws HopsException, LopsException
{
//reset reblock requirement (see MR aggregate / construct lops)
setRequiresReblock( false );
//determine output dimensions
long outputDim1=-1, outputDim2=-1;
Lop numGroups = inputlops.get(Statement.GAGG_NUM_GROUPS);
if ( !dimsKnown() && numGroups != null && numGroups instanceof Data && ((Data)numGroups).isLiteral() ) {
long ngroups = ((Data)numGroups).getLongValue();
Lop input = inputlops.get(GroupedAggregate.COMBINEDINPUT);
long inDim1 = input.getOutputParameters().getNumRows();
long inDim2 = input.getOutputParameters().getNumCols();
boolean rowwise = (inDim1==1 && inDim2 > 1 );
if( rowwise ) { //vector
outputDim1 = ngroups;
outputDim2 = 1;
}
else { //vector or matrix
outputDim1 = inDim2;
outputDim2 = ngroups;
}
}
//construct lops
if ( et == ExecType.MR )
{
Lop grp_agg = null;
// construct necessary lops: combineBinary/combineTertiary and groupedAgg
boolean isWeighted = (_paramIndexMap.get(Statement.GAGG_WEIGHTS) != null);
if (isWeighted)
{
Lop append = BinaryOp.constructAppendLopChain(
getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET)),
getInput().get(_paramIndexMap.get(Statement.GAGG_GROUPS)),
getInput().get(_paramIndexMap.get(Statement.GAGG_WEIGHTS)),
DataType.MATRIX, getValueType(), true,
getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET)));
// add the combine lop to parameter list, with a new name "combinedinput"
inputlops.put(GroupedAggregate.COMBINEDINPUT, append);
inputlops.remove(Statement.GAGG_TARGET);
inputlops.remove(Statement.GAGG_GROUPS);
inputlops.remove(Statement.GAGG_WEIGHTS);
grp_agg = new GroupedAggregate(inputlops, isWeighted, getDataType(), getValueType());
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
setRequiresReblock( true );
}
else
{
Hop target = getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET));
Hop groups = getInput().get(_paramIndexMap.get(Statement.GAGG_GROUPS));
Lop append = null;
//physical operator selection
double groupsSizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(groups.getDim1(), groups.getDim2(), groups.getRowsInBlock(), groups.getColsInBlock(), groups.getNnz());
if( groupsSizeP < OptimizerUtils.getRemoteMemBudgetMap(true) //mapgroupedagg
&& getInput().get(_paramIndexMap.get(Statement.GAGG_FN)) instanceof LiteralOp
&& ((LiteralOp)getInput().get(_paramIndexMap.get(Statement.GAGG_FN))).getStringValue().equals("sum")
&& inputlops.get(Statement.GAGG_NUM_GROUPS) != null )
{
//pre partitioning
boolean needPart = (groups.dimsKnown() && groups.getDim1()*groups.getDim2() > DistributedCacheInput.PARTITION_SIZE);
if( needPart ) {
ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(groups.getDim1(), groups.getDim2(), 1.0)
< OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
Lop dcinput = new DataPartition(groups.constructLops(), DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.ROW_BLOCK_WISE_N);
dcinput.getOutputParameters().setDimensions(groups.getDim1(), groups.getDim2(), target.getRowsInBlock(), target.getColsInBlock(), groups.getNnz());
setLineNumbers(dcinput);
inputlops.put(Statement.GAGG_GROUPS, dcinput);
}
Lop grp_agg_m = new GroupedAggregateM(inputlops, getDataType(), getValueType(), needPart, ExecType.MR);
grp_agg_m.getOutputParameters().setDimensions(outputDim1, outputDim2, target.getRowsInBlock(), target.getColsInBlock(), -1);
setLineNumbers(grp_agg_m);
//post aggregation
Group grp = new Group(grp_agg_m, Group.OperationTypes.Sort, getDataType(), getValueType());
grp.getOutputParameters().setDimensions(outputDim1, outputDim2, target.getRowsInBlock(), target.getColsInBlock(), -1);
setLineNumbers(grp);
Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(AggOp.SUM), getDataType(), getValueType(), ExecType.MR);
agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
agg1.getOutputParameters().setDimensions(outputDim1, outputDim2, target.getRowsInBlock(), target.getColsInBlock(), -1);
grp_agg = agg1;
//note: no reblock required
}
else //general case: groupedagg
{
if( target.getDim2()>=target.getColsInBlock() // multi-column-block result matrix
|| target.getDim2()<=0 ) // unkown
{
long m1_dim1 = target.getDim1();
long m1_dim2 = target.getDim2();
long m2_dim1 = groups.getDim1();
long m2_dim2 = groups.getDim2();
long m3_dim1 = m1_dim1;
long m3_dim2 = ((m1_dim2>0 && m2_dim2>0) ? (m1_dim2 + m2_dim2) : -1);
long m3_nnz = (target.getNnz()>0 && groups.getNnz()>0) ? (target.getNnz() + groups.getNnz()) : -1;
long brlen = target.getRowsInBlock();
long bclen = target.getColsInBlock();
Lop offset = createOffsetLop(target, true);
Lop rep = new RepMat(groups.constructLops(), offset, true, groups.getDataType(), groups.getValueType());
setOutputDimensions(rep);
setLineNumbers(rep);
Group group1 = new Group(target.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, target.getValueType());
group1.getOutputParameters().setDimensions(m1_dim1, m1_dim2, brlen, bclen, target.getNnz());
setLineNumbers(group1);
Group group2 = new Group(rep, Group.OperationTypes.Sort, DataType.MATRIX, groups.getValueType());
group1.getOutputParameters().setDimensions(m2_dim1, m2_dim2, brlen, bclen, groups.getNnz());
setLineNumbers(group2);
append = new AppendR(group1, group2, DataType.MATRIX, ValueType.DOUBLE, true, ExecType.MR);
append.getOutputParameters().setDimensions(m3_dim1, m3_dim2, brlen, bclen, m3_nnz);
setLineNumbers(append);
}
else //single-column-block vector or matrix
{
append = BinaryOp.constructMRAppendLop(target, groups,
DataType.MATRIX, getValueType(), true, target);
}
// add the combine lop to parameter list, with a new name "combinedinput"
inputlops.put(GroupedAggregate.COMBINEDINPUT, append);
inputlops.remove(Statement.GAGG_TARGET);
inputlops.remove(Statement.GAGG_GROUPS);
grp_agg = new GroupedAggregate(inputlops, isWeighted, getDataType(), getValueType());
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
setRequiresReblock( true );
}
}
setLineNumbers(grp_agg);
setLops(grp_agg);
}
else //CP/Spark
{
Lop grp_agg = null;
if( et == ExecType.CP)
{
int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, k);
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
}
else if(et == ExecType.SPARK)
{
//physical operator selection
Hop groups = getInput().get(_paramIndexMap.get(Statement.GAGG_GROUPS));
boolean broadcastGroups = (_paramIndexMap.get(Statement.GAGG_WEIGHTS) == null &&
OptimizerUtils.checkSparkBroadcastMemoryBudget( groups.getDim1(), groups.getDim2(),
groups.getRowsInBlock(), groups.getColsInBlock(), groups.getNnz()) );
if( broadcastGroups //mapgroupedagg
&& getInput().get(_paramIndexMap.get(Statement.GAGG_FN)) instanceof LiteralOp
&& ((LiteralOp)getInput().get(_paramIndexMap.get(Statement.GAGG_FN))).getStringValue().equals("sum")
&& inputlops.get(Statement.GAGG_NUM_GROUPS) != null )
{
Hop target = getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET));
grp_agg = new GroupedAggregateM(inputlops, getDataType(), getValueType(), true, ExecType.SPARK);
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, target.getRowsInBlock(), target.getColsInBlock(), -1);
//no reblock required (directly output binary block)
}
else //groupedagg (w/ or w/o broadcast)
{
grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, broadcastGroups);
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
setRequiresReblock( true );
}
}
setLineNumbers(grp_agg);
setLops(grp_agg);
}
}
/**
*
* @param inputlops
* @param et
* @throws HopsException
* @throws LopsException
*/
private void constructLopsRemoveEmpty(HashMap<String, Lop> inputlops, ExecType et)
throws HopsException, LopsException
{
Hop targetHop = getInput().get(_paramIndexMap.get("target"));
Hop marginHop = getInput().get(_paramIndexMap.get("margin"));
Hop selectHop = (_paramIndexMap.get("select") != null) ? getInput().get(_paramIndexMap.get("select")):null;
if( et == ExecType.CP || et == ExecType.CP_FILE )
{
ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops,HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
setOutputDimensions(pbilop);
setLineNumbers(pbilop);
setLops(pbilop);
/*DISABLED CP PMM (see for example, MDA Bivar test, requires size propagation on recompile)
if( et == ExecType.CP && isTargetDiagInput() && marginHop instanceof LiteralOp
&& ((LiteralOp)marginHop).getStringValue().equals("rows")
&& _outputPermutationMatrix ) //SPECIAL CASE SELECTION VECTOR
{
//TODO this special case could be taken into account for memory estimates in order
// to reduce the estimates for the input diag and subsequent matrix multiply
//get input vector (without materializing diag())
Hop input = targetHop.getInput().get(0);
long brlen = input.getRowsInBlock();
long bclen = input.getColsInBlock();
MemoTable memo = new MemoTable();
boolean isPPredInput = (input instanceof BinaryOp && ((BinaryOp)input).isPPredOperation());
//step1: compute index vectors
Hop ppred0 = input;
if( !isPPredInput ) { //ppred only if required
ppred0 = new BinaryOp("tmp1", DataType.MATRIX, ValueType.DOUBLE, OpOp2.NOTEQUAL, input, new LiteralOp("0",0));
HopRewriteUtils.setOutputBlocksizes(ppred0, brlen, bclen);
ppred0.refreshSizeInformation();
ppred0.computeMemEstimate(memo); //select exec type
HopRewriteUtils.copyLineNumbers(this, ppred0);
}
UnaryOp cumsum = new UnaryOp("tmp2", DataType.MATRIX, ValueType.DOUBLE, OpOp1.CUMSUM, ppred0);
HopRewriteUtils.setOutputBlocksizes(cumsum, brlen, bclen);
cumsum.refreshSizeInformation();
cumsum.computeMemEstimate(memo); //select exec type
HopRewriteUtils.copyLineNumbers(this, cumsum);
BinaryOp sel = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, ppred0, cumsum);
HopRewriteUtils.setOutputBlocksizes(sel, brlen, bclen);
sel.refreshSizeInformation();
sel.computeMemEstimate(memo); //select exec type
HopRewriteUtils.copyLineNumbers(this, sel);
Lop loutput = sel.constructLops();
//Step 4: cleanup hops (allow for garbage collection)
HopRewriteUtils.removeChildReference(ppred0, input);
setLops( loutput );
}
else //GENERAL CASE
{
ParameterizedBuiltin pbilop = new ParameterizedBuiltin( et, inputlops,
HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType());
pbilop.getOutputParameters().setDimensions(getDim1(),getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(pbilop);
setLops(pbilop);
}
*/
}
else if( et == ExecType.MR )
{
//special compile for mr removeEmpty-diag
if( isTargetDiagInput() && marginHop instanceof LiteralOp
&& ((LiteralOp)marginHop).getStringValue().equals("rows") )
{
//get input vector (without materializing diag())
Hop input = targetHop.getInput().get(0);
long brlen = input.getRowsInBlock();
long bclen = input.getColsInBlock();
MemoTable memo = new MemoTable();
boolean isPPredInput = (input instanceof BinaryOp && ((BinaryOp)input).isPPredOperation());
//step1: compute index vectors
Hop ppred0 = input;
if( !isPPredInput ) { //ppred only if required
ppred0 = new BinaryOp("tmp1", DataType.MATRIX, ValueType.DOUBLE, OpOp2.NOTEQUAL, input, new LiteralOp(0));
HopRewriteUtils.updateHopCharacteristics(ppred0, brlen, bclen, memo, this);
}
UnaryOp cumsum = new UnaryOp("tmp2", DataType.MATRIX, ValueType.DOUBLE, OpOp1.CUMSUM, ppred0);
HopRewriteUtils.updateHopCharacteristics(cumsum, brlen, bclen, memo, this);
Lop loutput = null;
double mest = AggBinaryOp.getMapmmMemEstimate(input.getDim1(), 1, brlen, bclen, -1, brlen, bclen, brlen, bclen, -1, 1, true);
double mbudget = OptimizerUtils.getRemoteMemBudgetMap(true);
if( _outputPermutationMatrix && mest < mbudget ) //SPECIAL CASE: SELECTION VECTOR
{
BinaryOp sel = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, ppred0, cumsum);
HopRewriteUtils.updateHopCharacteristics(sel, brlen, bclen, memo, this);
loutput = sel.constructLops();
}
else //GENERAL CASE: GENERAL PERMUTATION MATRIX
{
//max ensures non-zero entries and at least one output row
BinaryOp max = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MAX, cumsum, new LiteralOp(1));
HopRewriteUtils.updateHopCharacteristics(max, brlen, bclen, memo, this);
DataGenOp seq = HopRewriteUtils.createSeqDataGenOp(input);
seq.setName("tmp4");
HopRewriteUtils.updateHopCharacteristics(seq, brlen, bclen, memo, this);
//step 2: compute removeEmpty(rows) output via table, seq guarantees right column dimension
//note: weights always the input (even if isPPredInput) because input also includes 0s
TernaryOp table = new TernaryOp("tmp5", DataType.MATRIX, ValueType.DOUBLE, OpOp3.CTABLE, max, seq, input);
HopRewriteUtils.setOutputBlocksizes(table, brlen, bclen);
table.refreshSizeInformation();
table.setForcedExecType(ExecType.MR); //force MR
HopRewriteUtils.copyLineNumbers(this, table);
table.setDisjointInputs(true);
table.setOutputEmptyBlocks(_outputEmptyBlocks);
loutput = table.constructLops();
HopRewriteUtils.removeChildReference(table, input);
}
//Step 4: cleanup hops (allow for garbage collection)
HopRewriteUtils.removeChildReference(ppred0, input);
setLops( loutput );
}
//default mr remove empty
else if( et == ExecType.MR )
{
//TODO additional physical operator if offsets fit in memory
if( !(marginHop instanceof LiteralOp) )
throw new HopsException("Parameter 'margin' must be a literal argument.");
Hop input = targetHop;
long rlen = input.getDim1();
long clen = input.getDim2();
long brlen = input.getRowsInBlock();
long bclen = input.getColsInBlock();
long nnz = input.getNnz();
boolean rmRows = ((LiteralOp)marginHop).getStringValue().equals("rows");
//construct lops via new partial hop dag and subsequent lops construction
//in order to reuse of operator selection decisions
BinaryOp ppred0 = null;
Hop emptyInd = null;
if(selectHop == null) {
//Step1: compute row/col non-empty indicators
ppred0 = new BinaryOp("tmp1", DataType.MATRIX, ValueType.DOUBLE, OpOp2.NOTEQUAL, input, new LiteralOp(0));
HopRewriteUtils.setOutputBlocksizes(ppred0, brlen, bclen);
ppred0.refreshSizeInformation();
ppred0.setForcedExecType(ExecType.MR); //always MR
HopRewriteUtils.copyLineNumbers(this, ppred0);
emptyInd = ppred0;
if( !((rmRows && clen == 1) || (!rmRows && rlen==1)) ){
emptyInd = new AggUnaryOp("tmp2", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAX, rmRows?Direction.Row:Direction.Col, ppred0);
HopRewriteUtils.setOutputBlocksizes(emptyInd, brlen, bclen);
emptyInd.refreshSizeInformation();
emptyInd.setForcedExecType(ExecType.MR); //always MR
HopRewriteUtils.copyLineNumbers(this, emptyInd);
}
} else {
emptyInd = selectHop;
HopRewriteUtils.setOutputBlocksizes(emptyInd, brlen, bclen);
emptyInd.refreshSizeInformation();
emptyInd.setForcedExecType(ExecType.MR); //always MR
HopRewriteUtils.copyLineNumbers(this, emptyInd);
}
//Step 2: compute row offsets for non-empty rows
Hop cumsumInput = emptyInd;
if( !rmRows ){
cumsumInput = HopRewriteUtils.createTranspose(emptyInd);
HopRewriteUtils.updateHopCharacteristics(cumsumInput, brlen, bclen, this);
}
UnaryOp cumsum = new UnaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp1.CUMSUM, cumsumInput);
HopRewriteUtils.updateHopCharacteristics(cumsum, brlen, bclen, this);
Hop cumsumOutput = cumsum;
if( !rmRows ){
cumsumOutput = HopRewriteUtils.createTranspose(cumsum);
HopRewriteUtils.updateHopCharacteristics(cumsumOutput, brlen, bclen, this);
}
Hop maxDim = new AggUnaryOp("tmp4", DataType.SCALAR, ValueType.DOUBLE, AggOp.MAX, Direction.RowCol, cumsumOutput); //alternative: right indexing
HopRewriteUtils.updateHopCharacteristics(maxDim, brlen, bclen, this);
BinaryOp offsets = new BinaryOp("tmp5", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, cumsumOutput, emptyInd);
HopRewriteUtils.updateHopCharacteristics(offsets, brlen, bclen, this);
//Step 3: gather non-empty rows/cols into final results
Lop linput = input.constructLops();
Lop loffset = offsets.constructLops();
Lop lmaxdim = maxDim.constructLops();
double mestPM = OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, 1, brlen, bclen, 1.0);
Lop rmEmpty = null;
//a) broadcast-based PMM (permutation matrix mult)
if( rmRows && rlen > 0 && mestPM < OptimizerUtils.getRemoteMemBudgetMap() )
{
boolean needPart = !offsets.dimsKnown() || offsets.getDim1() > DistributedCacheInput.PARTITION_SIZE;
if( needPart ){ //requires partitioning
loffset = new DataPartition(loffset, DataType.MATRIX, ValueType.DOUBLE, (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
loffset.getOutputParameters().setDimensions(rlen, 1, brlen, bclen, rlen);
setLineNumbers(loffset);
}
rmEmpty = new PMMJ(loffset, linput, lmaxdim, getDataType(), getValueType(), needPart, true, ExecType.MR);
setOutputDimensions(rmEmpty);
setLineNumbers(rmEmpty);
}
//b) general case: repartition-based rmempty
else
{
boolean requiresRep = ((clen>bclen || clen<=0) && rmRows)
|| ((rlen>brlen || rlen<=0) && !rmRows);
if( requiresRep ) {
Lop pos = createOffsetLop(input, rmRows); //ncol of left input (determines num replicates)
loffset = new RepMat(loffset, pos, rmRows, DataType.MATRIX, ValueType.DOUBLE);
loffset.getOutputParameters().setDimensions(rlen, clen, brlen, bclen, nnz);
setLineNumbers(loffset);
}
Group group1 = new Group(linput, Group.OperationTypes.Sort, getDataType(), getValueType());
setLineNumbers(group1);
group1.getOutputParameters().setDimensions(rlen, clen, brlen, bclen, nnz);
Group group2 = new Group( loffset, Group.OperationTypes.Sort, getDataType(), getValueType());
setLineNumbers(group2);
group2.getOutputParameters().setDimensions(rlen, clen, brlen, bclen, nnz);
HashMap<String, Lop> inMap = new HashMap<String, Lop>();
inMap.put("target", group1);
inMap.put("offset", group2);
inMap.put("maxdim", lmaxdim);
inMap.put("margin", inputlops.get("margin"));
rmEmpty = new ParameterizedBuiltin(inMap, HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
setOutputDimensions(rmEmpty);
setLineNumbers(rmEmpty);
}
Group group3 = new Group( rmEmpty, Group.OperationTypes.Sort, getDataType(), getValueType());
setLineNumbers(group3);
group3.getOutputParameters().setDimensions(-1, -1, brlen, bclen, -1);
Aggregate finalagg = new Aggregate(group3, Aggregate.OperationTypes.Sum, DataType.MATRIX, getValueType(), ExecType.MR);
setOutputDimensions(finalagg);
setLineNumbers(finalagg);
//Step 4: cleanup hops (allow for garbage collection)
if(selectHop == null)
HopRewriteUtils.removeChildReference(ppred0, input);
setLops(finalagg);
}
}
else if( et == ExecType.SPARK )
{
if( !(marginHop instanceof LiteralOp) )
throw new HopsException("Parameter 'margin' must be a literal argument.");
Hop input = targetHop;
long rlen = input.getDim1();
long clen = input.getDim2();
long brlen = input.getRowsInBlock();
long bclen = input.getColsInBlock();
boolean rmRows = ((LiteralOp)marginHop).getStringValue().equals("rows");
//construct lops via new partial hop dag and subsequent lops construction
//in order to reuse of operator selection decisions
BinaryOp ppred0 = null;
Hop emptyInd = null;
if(selectHop == null) {
//Step1: compute row/col non-empty indicators
ppred0 = new BinaryOp("tmp1", DataType.MATRIX, ValueType.DOUBLE, OpOp2.NOTEQUAL, input, new LiteralOp(0));
HopRewriteUtils.setOutputBlocksizes(ppred0, brlen, bclen);
ppred0.refreshSizeInformation();
ppred0.setForcedExecType(ExecType.SPARK); //always Spark
HopRewriteUtils.copyLineNumbers(this, ppred0);
emptyInd = ppred0;
if( !((rmRows && clen == 1) || (!rmRows && rlen==1)) ){
emptyInd = new AggUnaryOp("tmp2", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAX, rmRows?Direction.Row:Direction.Col, ppred0);
HopRewriteUtils.setOutputBlocksizes(emptyInd, brlen, bclen);
emptyInd.refreshSizeInformation();
emptyInd.setForcedExecType(ExecType.SPARK); //always Spark
HopRewriteUtils.copyLineNumbers(this, emptyInd);
}
} else {
emptyInd = selectHop;
HopRewriteUtils.setOutputBlocksizes(emptyInd, brlen, bclen);
emptyInd.refreshSizeInformation();
emptyInd.setForcedExecType(ExecType.SPARK); //always Spark
HopRewriteUtils.copyLineNumbers(this, emptyInd);
}
//Step 2: compute row offsets for non-empty rows
Hop cumsumInput = emptyInd;
if( !rmRows ){
cumsumInput = HopRewriteUtils.createTranspose(emptyInd);
HopRewriteUtils.updateHopCharacteristics(cumsumInput, brlen, bclen, this);
}
UnaryOp cumsum = new UnaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp1.CUMSUM, cumsumInput);
HopRewriteUtils.updateHopCharacteristics(cumsum, brlen, bclen, this);
Hop cumsumOutput = cumsum;
if( !rmRows ){
cumsumOutput = HopRewriteUtils.createTranspose(cumsum);
HopRewriteUtils.updateHopCharacteristics(cumsumOutput, brlen, bclen, this);
}
Hop maxDim = new AggUnaryOp("tmp4", DataType.SCALAR, ValueType.DOUBLE, AggOp.MAX, Direction.RowCol, cumsumOutput); //alternative: right indexing
HopRewriteUtils.updateHopCharacteristics(maxDim, brlen, bclen, this);
BinaryOp offsets = new BinaryOp("tmp5", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, cumsumOutput, emptyInd);
HopRewriteUtils.updateHopCharacteristics(offsets, brlen, bclen, this);
//Step 3: gather non-empty rows/cols into final results
Lop linput = input.constructLops();
Lop loffset = offsets.constructLops();
Lop lmaxdim = maxDim.constructLops();
HashMap<String, Lop> inMap = new HashMap<String, Lop>();
inMap.put("target", linput);
inMap.put("offset", loffset);
inMap.put("maxdim", lmaxdim);
inMap.put("margin", inputlops.get("margin"));
if ( !FORCE_DIST_RM_EMPTY && isRemoveEmptyBcSP())
_bRmEmptyBC = true;
ParameterizedBuiltin pbilop = new ParameterizedBuiltin( inMap, HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et, _bRmEmptyBC);
setOutputDimensions(pbilop);
setLineNumbers(pbilop);
//Step 4: cleanup hops (allow for garbage collection)
if(selectHop == null)
HopRewriteUtils.removeChildReference(ppred0, input);
setLops(pbilop);
//NOTE: in contrast to mr, replication and aggregation handled instruction-local
}
}
/**
*
* @param inputlops
* @param et
* @throws HopsException
* @throws LopsException
*/
private void constructLopsRExpand(HashMap<String, Lop> inputlops, ExecType et)
throws HopsException, LopsException
{
if( et == ExecType.CP || et == ExecType.SPARK )
{
ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops,
HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
setOutputDimensions(pbilop);
setLineNumbers(pbilop);
setLops(pbilop);
}
else if( et == ExecType.MR )
{
ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops,
HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
setOutputDimensions(pbilop);
setLineNumbers(pbilop);
Group group1 = new Group( pbilop, Group.OperationTypes.Sort, getDataType(), getValueType());
setOutputDimensions(group1);
setLineNumbers(group1);
Aggregate finalagg = new Aggregate(group1, Aggregate.OperationTypes.Sum, DataType.MATRIX, getValueType(), ExecType.MR);
setOutputDimensions(finalagg);
setLineNumbers(finalagg);
setLops(finalagg);
}
}
@Override
public void printMe() throws HopsException {
if (LOG.isDebugEnabled()){
if (getVisited() != VisitStatus.DONE) {
super.printMe();
LOG.debug(" " + _op);
}
setVisited(VisitStatus.DONE);
}
}
@Override
protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
{
if (getOp() == ParamBuiltinOp.TOSTRING){
// Conservative Assumptions about characteristics of digits
final long AVERAGE_CHARS_PER_VALUE = 7;
final long AVERAGE_CHARS_PER_INDEX = 4;
// Default Values for toString
long specifiedRows = 100;
long specifiedCols = 100;
boolean sparsePrint = false;
String sep = " ";
String linesep = "\n";
Hop rowsHop = getInputParameter("rows");
Hop colsHop = getInputParameter("cols");
Hop sparsePrintHOP = getInputParameter("sparse");
Hop sepHop = getInputParameter("sep");
Hop linesepHop = getInputParameter("linesep");
long numNonZeroes = getInput().get(0).getNnz();
if (numNonZeroes < 0)
numNonZeroes = specifiedRows * specifiedCols;
long numRows = getInput().get(0).getDim1();
if (numRows < 0) // If number of rows is not known, set to default
numRows = specifiedRows;
long numCols = getInput().get(0).getDim2();
if (numCols < 0) // If number of columns is not known, set to default
numCols = specifiedCols;
// Assume Defaults : 100 * 100, sep = " ", linesep = "\n", sparse = false
// String size in bytes is 36 + number_of_chars * 2
final long DEFAULT_SIZE = 36 + 2 *
(100 * 100 * AVERAGE_CHARS_PER_VALUE // Length for digits
+ 1 * 100 * 99 // Length for separator chars
+ 1* 100) ; // Length for line separator chars
try {
if (rowsHop != null && rowsHop instanceof LiteralOp) {
specifiedRows = ((LiteralOp)rowsHop).getLongValue();
}
numRows = numRows < specifiedRows ? numRows : specifiedRows;
if (colsHop != null && colsHop instanceof LiteralOp){
specifiedCols = ((LiteralOp)colsHop).getLongValue();
}
numCols = numCols < specifiedCols ? numCols : specifiedCols;
if (sparsePrintHOP != null && sparsePrintHOP instanceof LiteralOp){
sparsePrint = ((LiteralOp)sparsePrintHOP).getBooleanValue();
}
if (sepHop != null && sepHop instanceof LiteralOp){
sep = ((LiteralOp)sepHop).getStringValue();
}
if (linesepHop != null && linesepHop instanceof LiteralOp){
linesep = ((LiteralOp)linesepHop).getStringValue();
}
long numberOfChars = -1;
if (sparsePrint){
numberOfChars = AVERAGE_CHARS_PER_VALUE * numNonZeroes // Length for value digits
+ AVERAGE_CHARS_PER_INDEX * 2L * numNonZeroes // Length for row & column index
+ sep.length() * 2L * numNonZeroes // Length for separator chars
+ linesep.length() * numNonZeroes; // Length for line separator chars
} else {
numberOfChars = AVERAGE_CHARS_PER_VALUE * numRows * numCols // Length for digits
+ sep.length() * numRows * (numCols - 1) // Length for separator chars
+ linesep.length() * numRows; // Length for line separator chars
}
/**
* For JVM
* 8 + // object header used by the VM
* 8 + // 64-bit reference to char array (value)
* 8 + string.length() * 2 + // character array itself (object header + 16-bit chars)
* 4 + // offset integer
* 4 + // count integer
* 4 + // cached hash code
*/
return (36 + numberOfChars * 2);
} catch (HopsException e){
LOG.warn("Invalid values when trying to compute dims1, dims2 & nnz", e);
return DEFAULT_SIZE;
}
} else {
double sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
}
}
@Override
protected double computeIntermediateMemEstimate( long dim1, long dim2, long nnz )
{
double ret = 0;
if( _op == ParamBuiltinOp.RMEMPTY )
{
Hop marginHop = getInput().get(_paramIndexMap.get("margin"));
boolean cols = marginHop instanceof LiteralOp
&& "cols".equals(((LiteralOp)marginHop).getStringValue());
//remove empty has additional internal memory requirements for
//computing selection vectors
if( cols )
{
//selection vector: boolean array in the number of columns
ret += OptimizerUtils.BOOLEAN_SIZE * dim2;
//removeEmpty-cols has additional memory requirements for intermediate
//data structures in order to make this a cache-friendly operation.
ret += OptimizerUtils.INT_SIZE * dim2;
}
else //rows
{
//selection vector: boolean array in the number of rows
ret += OptimizerUtils.BOOLEAN_SIZE * dim1;
}
}
else if( _op == ParamBuiltinOp.REXPAND )
{
Hop dir = getInput().get(_paramIndexMap.get("dir"));
String dirVal = ((LiteralOp)dir).getStringValue();
if( "rows".equals(dirVal) )
{
//rexpand w/ rows direction has additional memory requirements for
//intermediate data structures in order to prevent performance issues
//due to random output row access (to make this cache-friendly)
//NOTE: bounded by blocksize configuration: at most 12MB
ret = (OptimizerUtils.DOUBLE_SIZE + OptimizerUtils.INT_SIZE)
* Math.min(dim1, 1024*1024);
}
}
return ret;
}
@Override
protected long[] inferOutputCharacteristics( MemoTable memo )
{
//Notes: CDF, TOSTRING always known because scalar outputs
long[] ret = null;
Hop input = getTargetHop();
MatrixCharacteristics mc = memo.getAllInputStats(input);
if( _op == ParamBuiltinOp.GROUPEDAGG )
{
// Get the number of groups provided as part of aggregate() invocation, whenever available.
if ( _paramIndexMap.get(Statement.GAGG_NUM_GROUPS) != null ) {
Hop ngroups = getInput().get(_paramIndexMap.get(Statement.GAGG_NUM_GROUPS));
if(ngroups != null && ngroups instanceof LiteralOp) {
long m = HopRewriteUtils.getIntValueSafe((LiteralOp)ngroups);
long n = (mc.getRows()==1)?1:mc.getCols();
return new long[]{m, n, m};
}
}
// Output dimensions are completely data dependent. In the worst case,
// #groups = #rows in the grouping attribute (e.g., categorical attribute is an ID column, say EmployeeID).
// In such a case, #rows in the output = #rows in the input. Also, output sparsity is
// likely to be 1.0 (e.g., groupedAgg(groups=<a ID column>, fn="count"))
long m = mc.getRows();
long n = (mc.getRows()==1)?1:mc.getCols();
if ( m >= 1 ) {
ret = new long[]{m, n, m};
}
}
else if( _op == ParamBuiltinOp.RMEMPTY )
{
// similar to groupedagg because in the worst-case ouputsize eq inputsize
// #nnz is exactly the same as in the input but sparsity can be higher if dimensions.
// change (denser output).
if ( mc.dimsKnown() ) {
String margin = "rows";
Hop marginHop = getInput().get(_paramIndexMap.get("margin"));
if( marginHop instanceof LiteralOp
&& "cols".equals(((LiteralOp)marginHop).getStringValue()) )
margin = new String("cols");
MatrixCharacteristics mcSelect = null;
if (_paramIndexMap.get("select") != null) {
Hop select = getInput().get(_paramIndexMap.get("select"));
mcSelect = memo.getAllInputStats(select);
}
long lDim1 = 0, lDim2 = 0;
if( margin.equals("rows") ) {
lDim1 = (mcSelect == null || !mcSelect.nnzKnown() ) ? mc.getRows(): mcSelect.getNonZeros();
lDim2 = mc.getCols();
} else {
lDim1 = mc.getRows();
lDim2 = (mcSelect == null || !mcSelect.nnzKnown() ) ? mc.getCols(): mcSelect.getNonZeros();
}
ret = new long[]{lDim1, lDim2, mc.getNonZeros()};
}
}
else if( _op == ParamBuiltinOp.REPLACE )
{
// the worst-case estimate from the input directly propagates to the output
// #nnz depends on the replacement pattern and value, same as input if non-zero
if ( mc.dimsKnown() )
{
if( isNonZeroReplaceArguments() )
ret = new long[]{mc.getRows(), mc.getCols(), mc.getNonZeros()};
else
ret = new long[]{mc.getRows(), mc.getCols(), -1};
}
}
else if( _op == ParamBuiltinOp.REXPAND )
{
//dimensions are exactly known from input, sparsity unknown but upper bounded by nrow(v)
//note: cannot infer exact sparsity due to missing cast for outer and potential cutoff for table
//but very good sparsity estimate possible (number of non-zeros in input)
Hop max = getInput().get(_paramIndexMap.get("max"));
Hop dir = getInput().get(_paramIndexMap.get("dir"));
double maxVal = HopRewriteUtils.getDoubleValueSafe((LiteralOp)max);
String dirVal = ((LiteralOp)dir).getStringValue();
if( mc.dimsKnown() ) {
long lnnz = mc.nnzKnown() ? mc.getNonZeros() : mc.getRows();
if( "cols".equals(dirVal) ) { //expand horizontally
ret = new long[]{mc.getRows(), UtilFunctions.toLong(maxVal), lnnz};
}
else if( "rows".equals(dirVal) ){ //expand vertically
ret = new long[]{UtilFunctions.toLong(maxVal), mc.getRows(), lnnz};
}
}
}
else if( _op == ParamBuiltinOp.TRANSFORMDECODE ) {
if( mc.dimsKnown() ) {
//rows: remain unchanged
//cols: dummy coding might decrease never increase cols
return new long[]{mc.getRows(), mc.getCols(), mc.getRows()*mc.getCols()};
}
}
else if( _op == ParamBuiltinOp.TRANSFORMAPPLY ) {
if( mc.dimsKnown() ) {
//rows: omitting might decrease but never increase rows
//cols: dummy coding and binning might increase cols but nnz stays constant
return new long[]{mc.getRows(), mc.getCols(), mc.getRows()*mc.getCols()};
}
}
return ret;
}
@Override
public boolean allowsAllExecTypes()
{
return false;
}
@Override
protected ExecType optFindExecType()
throws HopsException
{
checkAndSetForcedPlatform();
ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
if( _etypeForced != null )
{
_etype = _etypeForced;
}
else
{
if( _op == ParamBuiltinOp.TRANSFORM ) {
// force remote, at runtime cp transform triggered for small files.
return (_etype = REMOTE);
}
if ( OptimizerUtils.isMemoryBasedOptLevel() ) {
_etype = findExecTypeByMemEstimate();
}
else if ( _op == ParamBuiltinOp.GROUPEDAGG
&& this.getInput().get(0).areDimsBelowThreshold() )
{
_etype = ExecType.CP;
}
else
{
_etype = REMOTE;
}
//check for valid CP dimensions and matrix size
checkAndSetInvalidCPDimsAndSize();
}
//force CP for in-memory only transform builtins
if( (_op == ParamBuiltinOp.TRANSFORMAPPLY && REMOTE==ExecType.MR)
|| _op == ParamBuiltinOp.TRANSFORMDECODE && REMOTE==ExecType.MR
|| _op == ParamBuiltinOp.TRANSFORMMETA
|| _op == ParamBuiltinOp.TOSTRING
|| _op == ParamBuiltinOp.CDF || _op == ParamBuiltinOp.INVCDF) {
_etype = ExecType.CP;
}
//mark for recompile (forever)
if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && _etype==REMOTE )
setRequiresRecompile();
return _etype;
}
@Override
public void refreshSizeInformation()
{
switch( _op )
{
case CDF:
case INVCDF:
//do nothing; CDF is a scalar
break;
case GROUPEDAGG: {
// output dimension dim1 is completely data dependent
long ldim1 = -1;
if ( _paramIndexMap.get(Statement.GAGG_NUM_GROUPS) != null ) {
Hop ngroups = getInput().get(_paramIndexMap.get(Statement.GAGG_NUM_GROUPS));
if(ngroups != null && ngroups instanceof LiteralOp) {
ldim1 = HopRewriteUtils.getIntValueSafe((LiteralOp)ngroups);
}
}
Hop target = getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET));
long ldim2 = (target.getDim1()==1)?1:target.getDim2();
setDim1( ldim1 );
setDim2( ldim2 );
break;
}
case RMEMPTY: {
//one output dimension dim1 or dim2 is completely data dependent
Hop target = getInput().get(_paramIndexMap.get("target"));
Hop margin = getInput().get(_paramIndexMap.get("margin"));
if( margin instanceof LiteralOp ) {
LiteralOp lmargin = (LiteralOp)margin;
if( "rows".equals(lmargin.getStringValue()) )
setDim2( target.getDim2() );
else if( "cols".equals(lmargin.getStringValue()) )
setDim1( target.getDim1() );
}
setNnz( target.getNnz() );
break;
}
case REPLACE: {
//dimensions are exactly known from input, sparsity might increase/decrease if pattern/replacement 0
Hop target = getInput().get(_paramIndexMap.get("target"));
setDim1( target.getDim1() );
setDim2( target.getDim2() );
if( isNonZeroReplaceArguments() )
setNnz( target.getNnz() );
break;
}
case REXPAND: {
//dimensions are exactly known from input, sparsity unknown but upper bounded by nrow(v)
//note: cannot infer exact sparsity due to missing cast for outer and potential cutoff for table
Hop target = getInput().get(_paramIndexMap.get("target"));
Hop max = getInput().get(_paramIndexMap.get("max"));
Hop dir = getInput().get(_paramIndexMap.get("dir"));
double maxVal = HopRewriteUtils.getDoubleValueSafe((LiteralOp)max);
String dirVal = ((LiteralOp)dir).getStringValue();
if( "cols".equals(dirVal) ) { //expand horizontally
setDim1(target.getDim1());
setDim2(UtilFunctions.toLong(maxVal));
}
else if( "rows".equals(dirVal) ){ //expand vertically
setDim1(UtilFunctions.toLong(maxVal));
setDim2(target.getDim1());
}
break;
}
case TRANSFORMDECODE: {
Hop target = getInput().get(_paramIndexMap.get("target"));
//rows remain unchanged for recoding and dummy coding
setDim1( target.getDim1() );
//cols remain unchanged only if no dummy coding
//TODO parse json spec
break;
}
case TRANSFORMAPPLY: {
//rows remain unchanged only if no omitting
//cols remain unchanged of no dummy coding
//TODO parse json spec
break;
}
default:
//do nothing
break;
}
}
@Override
@SuppressWarnings("unchecked")
public Object clone() throws CloneNotSupportedException
{
ParameterizedBuiltinOp ret = new ParameterizedBuiltinOp();
//copy generic attributes
ret.clone(this, false);
//copy specific attributes
ret._op = _op;
ret._outputEmptyBlocks = _outputEmptyBlocks;
ret._outputPermutationMatrix = _outputPermutationMatrix;
ret._paramIndexMap = (HashMap<String, Integer>) _paramIndexMap.clone();
//note: no deep cp of params since read-only
return ret;
}
@Override
public boolean compare( Hop that )
{
if( !(that instanceof ParameterizedBuiltinOp) )
return false;
ParameterizedBuiltinOp that2 = (ParameterizedBuiltinOp)that;
boolean ret = (_op == that2._op
&& _paramIndexMap!=null && that2._paramIndexMap!=null
&& _paramIndexMap.size() == that2._paramIndexMap.size()
&& _outputEmptyBlocks == that2._outputEmptyBlocks
&& _outputPermutationMatrix == that2._outputPermutationMatrix );
if( ret )
{
for( Entry<String,Integer> e : _paramIndexMap.entrySet() )
{
String key1 = e.getKey();
int pos1 = e.getValue();
int pos2 = that2._paramIndexMap.get(key1);
ret &= ( that2.getInput().get(pos2)!=null
&& getInput().get(pos1) == that2.getInput().get(pos2) );
}
}
return ret;
}
/**
*
* @return
*/
@Override
public boolean isTransposeSafe()
{
boolean ret = false;
try
{
if( _op == ParamBuiltinOp.GROUPEDAGG )
{
int ix = _paramIndexMap.get(Statement.GAGG_FN);
Hop fnHop = getInput().get(ix);
ret = (fnHop instanceof LiteralOp && Statement.GAGG_FN_SUM.equals(((LiteralOp)fnHop).getStringValue()) );
}
}
catch(Exception ex) {
//silent false
LOG.warn("Check for transpose-safeness failed, continue assuming false.", ex);
}
return ret;
}
/**
*
* @return
*/
public boolean isCountFunction()
{
boolean ret = false;
try
{
if( _op == ParamBuiltinOp.GROUPEDAGG )
{
int ix = _paramIndexMap.get(Statement.GAGG_FN);
Hop fnHop = getInput().get(ix);
ret = (fnHop instanceof LiteralOp && Statement.GAGG_FN_COUNT.equals(((LiteralOp)fnHop).getStringValue()) );
}
}
catch(Exception ex){
//silent false
LOG.warn("Check for count function failed, continue assuming false.", ex);
}
return ret;
}
/**
* Only applies to REPLACE.
* @return
*/
private boolean isNonZeroReplaceArguments()
{
boolean ret = false;
try
{
Hop pattern = getInput().get(_paramIndexMap.get("pattern"));
Hop replace = getInput().get(_paramIndexMap.get("replacement"));
if( pattern instanceof LiteralOp && ((LiteralOp)pattern).getDoubleValue()!=0d &&
replace instanceof LiteralOp && ((LiteralOp)replace).getDoubleValue()!=0d )
{
ret = true;
}
}
catch(Exception ex)
{
LOG.warn(ex.getMessage());
}
return ret;
}
/**
*
* @return
*/
public boolean isTargetDiagInput()
{
Hop targetHop = getTargetHop();
//input vector (guarantees diagV2M), implies remove rows
return ( targetHop instanceof ReorgOp
&& ((ReorgOp)targetHop).getOp()==ReOrgOp.DIAG
&& targetHop.getInput().get(0).getDim2() == 1 );
}
/**
* This will check if there is sufficient memory locally (twice the size of second matrix, for original and sort data), and remotely (size of second matrix (sorted data)).
* @return
*/
private boolean isRemoveEmptyBcSP() // TODO find if 2 x size needed.
{
boolean ret = false;
Hop input = getInput().get(0);
//note: both cases (partitioned matrix, and sorted double array), require to
//fit the broadcast twice into the local memory budget. Also, the memory
//constraint only needs to take the rhs into account because the output is
//guaranteed to be an aggregate of <=16KB
double size = input.dimsKnown() ?
OptimizerUtils.estimateSize(input.getDim1(), 1) : //dims known and estimate fits
input.getOutputMemEstimate(); //dims unknown but worst-case estimate fits
if( OptimizerUtils.checkSparkBroadcastMemoryBudget(size) ) {
ret = true;
}
return ret;
}
}