blob: 8e96ef9a2762619310848bbff487a53623459151 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.hops;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
import org.apache.sysml.lops.Aggregate;
import org.apache.sysml.lops.Binary;
import org.apache.sysml.lops.DataPartition;
import org.apache.sysml.lops.Group;
import org.apache.sysml.hops.Hop.MultiThreadedHop;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.lops.MMCJ;
import org.apache.sysml.lops.MMRJ;
import org.apache.sysml.lops.MMTSJ;
import org.apache.sysml.lops.MMCJ.MMCJType;
import org.apache.sysml.lops.MMTSJ.MMTSJType;
import org.apache.sysml.lops.MMZip;
import org.apache.sysml.lops.MapMult;
import org.apache.sysml.lops.MapMultChain;
import org.apache.sysml.lops.MapMultChain.ChainType;
import org.apache.sysml.lops.PMMJ;
import org.apache.sysml.lops.PMapMult;
import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
import org.apache.sysml.lops.Transform;
import org.apache.sysml.lops.Transform.OperationTypes;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput;
import org.apache.sysml.runtime.matrix.mapred.MMCJMRReducerWithAggregator;
/* Aggregate binary (cell operations): Sum (aij + bij)
* Properties:
* Inner Symbol: *, -, +, ...
* Outer Symbol: +, min, max, ...
* 2 Operands
*
* Semantic: generate indices, align, cross-operate, generate indices, align, aggregate
*/
public class AggBinaryOp extends Hop implements MultiThreadedHop
{
public static final double MAPMULT_MEM_MULTIPLIER = 1.0;
public static MMultMethod FORCED_MMULT_METHOD = null;
public enum MMultMethod {
CPMM, //cross-product matrix multiplication (mr)
RMM, //replication matrix multiplication (mr)
MAPMM_L, //map-side matrix-matrix multiplication using distributed cache (mr/sp)
MAPMM_R, //map-side matrix-matrix multiplication using distributed cache (mr/sp)
MAPMM_CHAIN, //map-side matrix-matrix-matrix multiplication using distributed cache, for right input (cp/mr/sp)
PMAPMM, //partitioned map-side matrix-matrix multiplication (sp)
PMM, //permutation matrix multiplication using distributed cache, for left input (mr/cp)
TSMM, //transpose-self matrix multiplication (cp/mr/sp)
TSMM2, //transpose-self matrix multiplication, 2-pass w/o shuffle (sp)
ZIPMM, //zip matrix multiplication (sp)
MM //in-memory matrix multiplication (cp)
};
public enum SparkAggType{
NONE,
SINGLE_BLOCK,
MULTI_BLOCK,
}
private OpOp2 innerOp;
private AggOp outerOp;
private MMultMethod _method = null;
//hints set by previous to operator selection
private boolean _hasLeftPMInput = false; //left input is permutation matrix
private int _maxNumThreads = -1; //-1 for unlimited
private AggBinaryOp() {
//default constructor for clone
}
public AggBinaryOp(String l, DataType dt, ValueType vt, OpOp2 innOp,
AggOp outOp, Hop in1, Hop in2) {
super(l, dt, vt);
innerOp = innOp;
outerOp = outOp;
getInput().add(0, in1);
getInput().add(1, in2);
in1.getParent().add(this);
in2.getParent().add(this);
//compute unknown dims and nnz
refreshSizeInformation();
}
public void setHasLeftPMInput(boolean flag) {
_hasLeftPMInput = flag;
}
public boolean hasLeftPMInput(){
return _hasLeftPMInput;
}
@Override
public void setMaxNumThreads( int k ) {
_maxNumThreads = k;
}
@Override
public int getMaxNumThreads() {
return _maxNumThreads;
}
public MMultMethod getMMultMethod(){
return _method;
}
/**
* NOTE: overestimated mem in case of transpose-identity matmult, but 3/2 at worst
* and existing mem estimate advantageous in terms of consistency hops/lops,
* and some special cases internally materialize the transpose for better cache locality
*/
@Override
public Lop constructLops()
throws HopsException, LopsException
{
//return already created lops
if( getLops() != null )
return getLops();
//construct matrix mult lops (currently only supported aggbinary)
if ( isMatrixMultiply() )
{
Hop input1 = getInput().get(0);
Hop input2 = getInput().get(1);
//matrix mult operation selection part 1 (CP vs MR vs Spark)
ExecType et = optFindExecType();
//matrix mult operation selection part 2 (specific pattern)
MMTSJType mmtsj = checkTransposeSelf(); //determine tsmm pattern
ChainType chain = checkMapMultChain(); //determine mmchain pattern
if( et == ExecType.CP )
{
//matrix mult operation selection part 3 (CP type)
_method = optFindMMultMethodCP ( input1.getDim1(), input1.getDim2(),
input2.getDim1(), input2.getDim2(), mmtsj, chain, _hasLeftPMInput );
//dispatch CP lops construction
switch( _method ){
case TSMM:
constructCPLopsTSMM( mmtsj );
break;
case MAPMM_CHAIN:
constructCPLopsMMChain( chain );
break;
case PMM:
constructCPLopsPMM();
break;
case MM:
constructCPLopsMM();
break;
default:
throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing CP lops.");
}
}
else if( et == ExecType.SPARK )
{
//matrix mult operation selection part 3 (SPARK type)
boolean tmmRewrite = input1 instanceof ReorgOp && ((ReorgOp)input1).getOp()==ReOrgOp.TRANSPOSE;
_method = optFindMMultMethodSpark (
input1.getDim1(), input1.getDim2(), input1.getRowsInBlock(), input1.getColsInBlock(), input1.getNnz(),
input2.getDim1(), input2.getDim2(), input2.getRowsInBlock(), input2.getColsInBlock(), input2.getNnz(),
mmtsj, chain, _hasLeftPMInput, tmmRewrite );
//dispatch SPARK lops construction
switch( _method )
{
case TSMM:
case TSMM2:
constructSparkLopsTSMM( mmtsj, _method==MMultMethod.TSMM2 );
break;
case MAPMM_L:
case MAPMM_R:
constructSparkLopsMapMM( _method );
break;
case MAPMM_CHAIN:
constructSparkLopsMapMMChain( chain );
break;
case PMAPMM:
constructSparkLopsPMapMM();
break;
case CPMM:
constructSparkLopsCPMM();
break;
case RMM:
constructSparkLopsRMM();
break;
case PMM:
constructSparkLopsPMM();
break;
case ZIPMM:
constructSparkLopsZIPMM();
break;
default:
throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing SPARK lops.");
}
}
else if( et == ExecType.MR )
{
//matrix mult operation selection part 3 (MR type)
_method = optFindMMultMethodMR (
input1.getDim1(), input1.getDim2(), input1.getRowsInBlock(), input1.getColsInBlock(), input1.getNnz(),
input2.getDim1(), input2.getDim2(), input2.getRowsInBlock(), input2.getColsInBlock(), input2.getNnz(),
mmtsj, chain, _hasLeftPMInput);
//dispatch MR lops construction
switch( _method ) {
case MAPMM_L:
case MAPMM_R:
constructMRLopsMapMM( _method );
break;
case MAPMM_CHAIN:
constructMRLopsMapMMChain( chain );
break;
case CPMM:
constructMRLopsCPMM();
break;
case RMM:
constructMRLopsRMM();
break;
case TSMM:
constructMRLopsTSMM( mmtsj );
break;
case PMM:
constructMRLopsPMM();
break;
default:
throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing MR lops.");
}
}
}
else
throw new HopsException(this.printErrorLocation() + "Invalid operation in AggBinary Hop, aggBin(" + innerOp + "," + outerOp + ") while constructing lops.");
//add reblock/checkpoint lops if necessary
constructAndSetLopsDataFlowProperties();
return getLops();
}
@Override
public String getOpString() {
//ba - binary aggregate, for consistency with runtime
String s = "ba(" +
HopsAgg2String.get(outerOp) +
HopsOpOp2String.get(innerOp)+")";
return s;
}
public void printMe() throws HopsException {
if (LOG.isDebugEnabled()){
if (getVisited() != VisitStatus.DONE) {
super.printMe();
LOG.debug(" InnerOperation: " + innerOp);
LOG.debug(" OuterOperation: " + outerOp);
for (Hop h : getInput()) {
h.printMe();
}
;
}
setVisited(VisitStatus.DONE);
}
}
@Override
public void computeMemEstimate(MemoTable memo)
{
//extension of default compute memory estimate in order to
//account for smaller tsmm memory requirements.
super.computeMemEstimate(memo);
//tsmm left is guaranteed to require only X but not t(X), while
//tsmm right might have additional requirements to transpose X if sparse
//NOTE: as a heuristic this correction is only applied if not a column vector because
//most other vector operations require memory for at least two vectors (we aim for
//consistency in order to prevent anomalies in parfor opt leading to small degree of par)
MMTSJType mmtsj = checkTransposeSelf();
if( mmtsj.isLeft() && getInput().get(1).dimsKnown() && getInput().get(1).getDim2()>1 ) {
_memEstimate = _memEstimate - getInput().get(0)._outputMemEstimate;
}
}
@Override
protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
{
//NOTES:
// * The estimate for transpose-self is the same as for normal matrix multiplications
// because (1) this decouples the decision of TSMM over default MM and (2) some cases
// of TSMM internally materialize the transpose for efficiency.
// * All matrix multiplications internally use dense output representations for efficiency.
// This is reflected in our conservative memory estimate. However, we additionally need
// to account for potential final dense/sparse transformations via processing mem estimates.
double sparsity = 1.0;
/*
if( isMatrixMultiply() ) {
if( nnz < 0 ){
Hops input1 = getInput().get(0);
Hops input2 = getInput().get(1);
if( input1.dimsKnown() && input2.dimsKnown() )
{
double sp1 = (input1.getNnz()>0) ? OptimizerUtils.getSparsity(input1.getDim1(), input1.getDim2(), input1.getNnz()) : 1.0;
double sp2 = (input2.getNnz()>0) ? OptimizerUtils.getSparsity(input2.getDim1(), input2.getDim2(), input2.getNnz()) : 1.0;
sparsity = OptimizerUtils.getMatMultSparsity(sp1, sp2, input1.getDim1(), input1.getDim2(), input2.getDim2(), true);
}
}
else //sparsity known (e.g., inferred from worst case estimates)
sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
}
*/
//currently always estimated as dense in order to account for dense intermediate without unnecessary overestimation
double ret = OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
return ret;
}
@Override
protected double computeIntermediateMemEstimate( long dim1, long dim2, long nnz )
{
double ret = 0;
//account for potential final dense-sparse transformation (worst-case sparse representation)
if( dim2 >= 2 ) //vectors always dense
ret = OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, MatrixBlock.SPARSITY_TURN_POINT);
return ret;
}
@Override
protected long[] inferOutputCharacteristics( MemoTable memo )
{
long[] ret = null;
MatrixCharacteristics[] mc = memo.getAllInputStats(getInput());
if( mc[0].rowsKnown() && mc[1].colsKnown() ) {
ret = new long[3];
ret[0] = mc[0].getRows();
ret[1] = mc[1].getCols();
double sp1 = (mc[0].getNonZeros()>0) ? OptimizerUtils.getSparsity(mc[0].getRows(), mc[0].getCols(), mc[0].getNonZeros()) : 1.0;
double sp2 = (mc[1].getNonZeros()>0) ? OptimizerUtils.getSparsity(mc[1].getRows(), mc[1].getCols(), mc[1].getNonZeros()) : 1.0;
ret[2] = (long) ( ret[0] * ret[1] * OptimizerUtils.getMatMultSparsity(sp1, sp2, ret[0], mc[0].getCols(), ret[1], true));
}
return ret;
}
public boolean isMatrixMultiply() {
return ( this.innerOp == OpOp2.MULT && this.outerOp == AggOp.SUM );
}
private boolean isOuterProduct() {
if ( getInput().get(0).isVector() && getInput().get(1).isVector() ) {
if ( getInput().get(0).getDim1() == 1 && getInput().get(0).getDim1() > 1
&& getInput().get(1).getDim1() > 1 && getInput().get(1).getDim2() == 1 )
return true;
else
return false;
}
return false;
}
@Override
public boolean allowsAllExecTypes()
{
return true;
}
@Override
protected ExecType optFindExecType()
throws HopsException
{
checkAndSetForcedPlatform();
ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
if( _etypeForced != null )
{
_etype = _etypeForced;
}
else
{
if ( OptimizerUtils.isMemoryBasedOptLevel() )
{
_etype = findExecTypeByMemEstimate();
}
// choose CP if the dimensions of both inputs are below Hops.CPThreshold
// OR if it is vector-vector inner product
else if ( (getInput().get(0).areDimsBelowThreshold() && getInput().get(1).areDimsBelowThreshold())
|| (getInput().get(0).isVector() && getInput().get(1).isVector() && !isOuterProduct()) )
{
_etype = ExecType.CP;
}
else
{
_etype = REMOTE;
}
//check for valid CP dimensions and matrix size
checkAndSetInvalidCPDimsAndSize();
}
//spark-specific decision refinement (execute binary aggregate w/ left spark input and
//single parent also in spark because it's likely cheap and reduces data transfer)
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
&& !(getInput().get(0) instanceof ReorgOp && ((ReorgOp)getInput().get(0)).getOp()==ReOrgOp.TRANSPOSE)
&& !(getInput().get(0) instanceof DataOp) //input is not checkpoint
&& getInput().get(0).getParent().size()==1 //bagg is only parent
&& !getInput().get(0).areDimsBelowThreshold()
&& getInput().get(0).optFindExecType() == ExecType.SPARK
&& getInput().get(0).getOutputMemEstimate()>getOutputMemEstimate() )
{
//pull unary aggregate into spark
_etype = ExecType.SPARK;
}
//mark for recompile (forever)
if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && _etype==REMOTE ) {
setRequiresRecompile();
}
return _etype;
}
/**
* TSMM: Determine if XtX pattern applies for this aggbinary and if yes
* which type.
*
* @return
*/
public MMTSJType checkTransposeSelf()
{
MMTSJType ret = MMTSJType.NONE;
Hop in1 = getInput().get(0);
Hop in2 = getInput().get(1);
if( in1 instanceof ReorgOp
&& ((ReorgOp)in1).getOp() == ReOrgOp.TRANSPOSE
&& in1.getInput().get(0) == in2 )
{
ret = MMTSJType.LEFT;
}
if( in2 instanceof ReorgOp
&& ((ReorgOp)in2).getOp() == ReOrgOp.TRANSPOSE
&& in2.getInput().get(0) == in1 )
{
ret = MMTSJType.RIGHT;
}
return ret;
}
/**
* MapMultChain: Determine if XtwXv/XtXv pattern applies for this aggbinary
* and if yes which type.
*
* @return
*/
public ChainType checkMapMultChain()
{
ChainType chainType = ChainType.NONE;
Hop in1 = getInput().get(0);
Hop in2 = getInput().get(1);
//check for transpose left input (both chain types)
if( in1 instanceof ReorgOp && ((ReorgOp)in1).getOp() == ReOrgOp.TRANSPOSE )
{
Hop X = in1.getInput().get(0);
//check mapmultchain patterns
//t(X)%*%(w*(X%*%v))
if( in2 instanceof BinaryOp && ((BinaryOp)in2).getOp()==OpOp2.MULT )
{
Hop in3b = in2.getInput().get(1);
if( in3b instanceof AggBinaryOp )
{
Hop in4 = in3b.getInput().get(0);
if( X == in4 ) //common input
chainType = ChainType.XtwXv;
}
}
//t(X)%*%((X%*%v)-y)
else if( in2 instanceof BinaryOp && ((BinaryOp)in2).getOp()==OpOp2.MINUS )
{
Hop in3a = in2.getInput().get(0);
Hop in3b = in2.getInput().get(1);
if( in3a instanceof AggBinaryOp && in3b.getDataType()==DataType.MATRIX )
{
Hop in4 = in3a.getInput().get(0);
if( X == in4 ) //common input
chainType = ChainType.XtXvy;
}
}
//t(X)%*%(X%*%v)
else if( in2 instanceof AggBinaryOp )
{
Hop in3 = in2.getInput().get(0);
if( X == in3 ) //common input
chainType = ChainType.XtXv;
}
}
return chainType;
}
//////////////////////////
// CP Lops generation
/////////////////////////
/**
*
* @param mmtsj
* @throws HopsException
* @throws LopsException
*/
private void constructCPLopsTSMM( MMTSJType mmtsj )
throws HopsException, LopsException
{
int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
ExecType et = ExecType.CP;
// if(DMLScript.USE_ACCELERATOR && (DMLScript.FORCE_ACCELERATOR || getMemEstimate() < OptimizerUtils.GPU_MEMORY_BUDGET)) {
//TODO: Fix me. Currently forcing the instruction to GPU if gpu flag is set
if(DMLScript.USE_ACCELERATOR) {
et = ExecType.GPU;
}
Lop matmultCP = new MMTSJ(getInput().get(mmtsj.isLeft()?1:0).constructLops(),
getDataType(), getValueType(), et, mmtsj, false, k);
matmultCP.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers( matmultCP );
setLops(matmultCP);
}
/**
*
* @param chain
* @throws LopsException
* @throws HopsException
*/
private void constructCPLopsMMChain( ChainType chain )
throws LopsException, HopsException
{
MapMultChain mapmmchain = null;
if( chain == ChainType.XtXv ) {
Hop hX = getInput().get(0).getInput().get(0);
Hop hv = getInput().get(1).getInput().get(1);
mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), getDataType(), getValueType(), ExecType.CP);
}
else { //ChainType.XtwXv / ChainType.XtwXvy
int wix = (chain == ChainType.XtwXv) ? 0 : 1;
int vix = (chain == ChainType.XtwXv) ? 1 : 0;
Hop hX = getInput().get(0).getInput().get(0);
Hop hw = getInput().get(1).getInput().get(wix);
Hop hv = getInput().get(1).getInput().get(vix).getInput().get(1);
mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), hw.constructLops(), chain, getDataType(), getValueType(), ExecType.CP);
}
//set degree of parallelism
int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
mapmmchain.setNumThreads( k );
//set basic lop properties
setOutputDimensions(mapmmchain);
setLineNumbers(mapmmchain);
setLops(mapmmchain);
}
/**
* NOTE: exists for consistency since removeEmtpy might be scheduled to MR
* but matrix mult on small output might be scheduled to CP. Hence, we
* need to handle directly passed selection vectors in CP as well.
*
* @throws HopsException
* @throws LopsException
*/
private void constructCPLopsPMM()
throws HopsException, LopsException
{
Hop pmInput = getInput().get(0);
Hop rightInput = getInput().get(1);
Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW
HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
nrow.setForcedExecType(ExecType.CP);
HopRewriteUtils.copyLineNumbers(this, nrow);
Lop lnrow = nrow.constructLops();
PMMJ pmm = new PMMJ(pmInput.constructLops(), rightInput.constructLops(), lnrow, getDataType(), getValueType(), false, false, ExecType.CP);
//set degree of parallelism
int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
pmm.setNumThreads(k);
pmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(pmm);
setLops(pmm);
HopRewriteUtils.removeChildReference(pmInput, nrow);
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructCPLopsMM()
throws HopsException, LopsException
{
Lop matmultCP = null;
if(DMLScript.USE_ACCELERATOR && (DMLScript.FORCE_ACCELERATOR || getMemEstimate() < OptimizerUtils.GPU_MEMORY_BUDGET)) {
Hop h1 = getInput().get(0);
Hop h2 = getInput().get(1);
Lop left; Lop right;
boolean isLeftTransposed; boolean isRightTransposed;
if( h1 instanceof ReorgOp && ((ReorgOp)h1).getOp()==ReOrgOp.TRANSPOSE ) {
isLeftTransposed = true;
left = h1.getInput().get(0).constructLops();
}
else {
isLeftTransposed = false;
left = h1.constructLops();
}
if( h2 instanceof ReorgOp && ((ReorgOp)h2).getOp()==ReOrgOp.TRANSPOSE ) {
isRightTransposed = true;
right = h2.getInput().get(0).constructLops();
}
else {
isRightTransposed = false;
right = h2.constructLops();
}
matmultCP = new Binary(left, right,
Binary.OperationTypes.MATMULT, getDataType(), getValueType(), ExecType.GPU, isLeftTransposed, isRightTransposed);
setOutputDimensions(matmultCP);
setNnz(-1);
}
else {
if( isLeftTransposeRewriteApplicable(true, false) ) {
matmultCP = constructCPLopsMMWithLeftTransposeRewrite();
}
else {
int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
Binary.OperationTypes.MATMULT, getDataType(), getValueType(), ExecType.CP, k);
}
setOutputDimensions(matmultCP);
}
setLineNumbers( matmultCP );
setLops(matmultCP);
}
/**
*
* @return
* @throws HopsException
* @throws LopsException
*/
private Lop constructCPLopsMMWithLeftTransposeRewrite()
throws HopsException, LopsException
{
Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
Hop Y = getInput().get(1);
int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
//right vector transpose
Lop lY = Y.constructLops();
Lop tY = (lY instanceof Transform && ((Transform)lY).getOperationType()==OperationTypes.Transpose ) ?
lY.getInputs().get(0) : //if input is already a transpose, avoid redundant transpose ops
new Transform(lY, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP, k);
tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
setLineNumbers(tY);
//matrix mult
Lop mult = new Binary(tY, X.constructLops(), Binary.OperationTypes.MATMULT, getDataType(), getValueType(), ExecType.CP, k);
mult.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mult);
//result transpose (dimensions set outside)
Lop out = new Transform(mult, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP, k);
return out;
}
//////////////////////////
// Spark Lops generation
/////////////////////////
/**
*
* @param mmtsj
* @throws HopsException
* @throws LopsException
*/
private void constructSparkLopsTSMM(MMTSJType mmtsj, boolean multiPass)
throws HopsException, LopsException
{
Hop input = getInput().get(mmtsj.isLeft()?1:0);
MMTSJ tsmm = new MMTSJ(input.constructLops(), getDataType(),
getValueType(), ExecType.SPARK, mmtsj, multiPass);
setOutputDimensions(tsmm);
setLineNumbers(tsmm);
setLops(tsmm);
}
/**
*
* @param method
* @throws LopsException
* @throws HopsException
*/
private void constructSparkLopsMapMM(MMultMethod method)
throws LopsException, HopsException
{
Lop mapmult = null;
if( isLeftTransposeRewriteApplicable(false, false) )
{
mapmult = constructSparkLopsMapMMWithLeftTransposeRewrite();
}
else
{
// If number of columns is smaller than block size then explicit aggregation is not required.
// i.e., entire matrix multiplication can be performed in the mappers.
boolean needAgg = requiresAggregation(method);
SparkAggType aggtype = getSparkMMAggregationType(needAgg);
_outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
//core matrix mult
mapmult = new MapMult( getInput().get(0).constructLops(), getInput().get(1).constructLops(),
getDataType(), getValueType(), (method==MMultMethod.MAPMM_R), false,
_outputEmptyBlocks, aggtype);
}
setOutputDimensions(mapmult);
setLineNumbers(mapmult);
setLops(mapmult);
}
/**
*
* @return
* @throws HopsException
* @throws LopsException
*/
private Lop constructSparkLopsMapMMWithLeftTransposeRewrite()
throws HopsException, LopsException
{
Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
Hop Y = getInput().get(1);
//right vector transpose
Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
setLineNumbers(tY);
//matrix mult spark
boolean needAgg = requiresAggregation(MMultMethod.MAPMM_R);
SparkAggType aggtype = getSparkMMAggregationType(needAgg);
_outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
Lop mult = new MapMult( tY, X.constructLops(), getDataType(), getValueType(),
false, false, _outputEmptyBlocks, aggtype);
mult.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mult);
//result transpose (dimensions set outside)
Lop out = new Transform(mult, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
return out;
}
/**
*
* @param chain
* @throws HopsException
* @throws LopsException
*/
private void constructSparkLopsMapMMChain(ChainType chain)
throws LopsException, HopsException
{
MapMultChain mapmmchain = null;
if( chain == ChainType.XtXv ) {
Hop hX = getInput().get(0).getInput().get(0);
Hop hv = getInput().get(1).getInput().get(1);
mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), getDataType(), getValueType(), ExecType.SPARK);
}
else { //ChainType.XtwXv / ChainType.XtXvy
int wix = (chain == ChainType.XtwXv) ? 0 : 1;
int vix = (chain == ChainType.XtwXv) ? 1 : 0;
Hop hX = getInput().get(0).getInput().get(0);
Hop hw = getInput().get(1).getInput().get(wix);
Hop hv = getInput().get(1).getInput().get(vix).getInput().get(1);
mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), hw.constructLops(), chain, getDataType(), getValueType(), ExecType.SPARK);
}
setOutputDimensions(mapmmchain);
setLineNumbers(mapmmchain);
setLops(mapmmchain);
}
/**
*
* @throws LopsException
* @throws HopsException
*/
private void constructSparkLopsPMapMM()
throws LopsException, HopsException
{
PMapMult pmapmult = new PMapMult(
getInput().get(0).constructLops(),
getInput().get(1).constructLops(),
getDataType(), getValueType() );
setOutputDimensions(pmapmult);
setLineNumbers(pmapmult);
setLops(pmapmult);
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructSparkLopsCPMM()
throws HopsException, LopsException
{
if( isLeftTransposeRewriteApplicable(false, false) )
{
setLops( constructSparkLopsCPMMWithLeftTransposeRewrite() );
}
else
{
SparkAggType aggtype = getSparkMMAggregationType(true);
Lop cpmm = new MMCJ(getInput().get(0).constructLops(), getInput().get(1).constructLops(),
getDataType(), getValueType(), aggtype, ExecType.SPARK);
setOutputDimensions( cpmm );
setLineNumbers( cpmm );
setLops( cpmm );
}
}
/**
*
* @return
* @throws HopsException
* @throws LopsException
*/
private Lop constructSparkLopsCPMMWithLeftTransposeRewrite()
throws HopsException, LopsException
{
SparkAggType aggtype = getSparkMMAggregationType(true);
Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
Hop Y = getInput().get(1);
//right vector transpose CP
Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
setLineNumbers(tY);
//matrix multiply
MMCJ mmcj = new MMCJ(tY, X.constructLops(), getDataType(), getValueType(), aggtype, ExecType.SPARK);
mmcj.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mmcj);
//result transpose CP
Lop out = new Transform(mmcj, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
out.getOutputParameters().setDimensions(X.getDim2(), Y.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
return out;
}
/**
*
* @param chain
* @throws LopsException
* @throws HopsException
*/
private void constructSparkLopsRMM()
throws LopsException, HopsException
{
Lop rmm = new MMRJ(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
getDataType(), getValueType(), ExecType.SPARK);
setOutputDimensions(rmm);
setLineNumbers( rmm );
setLops(rmm);
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructSparkLopsPMM()
throws HopsException, LopsException
{
//PMM has two potential modes (a) w/ full permutation matrix input, and
//(b) w/ already condensed input vector of target row positions.
Hop pmInput = getInput().get(0);
Hop rightInput = getInput().get(1);
long brlen = pmInput.getRowsInBlock();
long bclen = pmInput.getColsInBlock();
Lop lpmInput = pmInput.constructLops();
Hop nrow = null;
double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 1);
ExecType etVect = (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP;
//a) full permutation matrix input (potentially without empty block materialized)
if( pmInput.getDim2() != 1 ) //not a vector
{
//compute condensed permutation matrix vector input
//v = rowMaxIndex(t(pm)) * rowMax(t(pm))
ReorgOp transpose = HopRewriteUtils.createTranspose(pmInput);
transpose.setForcedExecType(ExecType.SPARK);
HopRewriteUtils.copyLineNumbers(this, transpose);
AggUnaryOp agg1 = new AggUnaryOp("tmp2a", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAXINDEX, Direction.Row, transpose);
HopRewriteUtils.setOutputBlocksizes(agg1, brlen, bclen);
agg1.refreshSizeInformation();
agg1.setForcedExecType(ExecType.SPARK);
HopRewriteUtils.copyLineNumbers(this, agg1);
AggUnaryOp agg2 = new AggUnaryOp("tmp2b", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAX, Direction.Row, transpose);
HopRewriteUtils.setOutputBlocksizes(agg2, brlen, bclen);
agg2.refreshSizeInformation();
agg2.setForcedExecType(ExecType.SPARK);
HopRewriteUtils.copyLineNumbers(this, agg2);
BinaryOp mult = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, agg1, agg2);
HopRewriteUtils.setOutputBlocksizes(mult, brlen, bclen);
mult.refreshSizeInformation();
mult.setForcedExecType(ExecType.SPARK);
//mult.computeMemEstimate(memo); //select exec type
HopRewriteUtils.copyLineNumbers(this, mult);
//compute NROW target via nrow(m)
nrow = HopRewriteUtils.createValueHop(pmInput, true);
HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
nrow.setForcedExecType(ExecType.CP);
HopRewriteUtils.copyLineNumbers(this, nrow);
lpmInput = mult.constructLops();
HopRewriteUtils.removeChildReference(pmInput, transpose);
}
else //input vector
{
//compute NROW target via max(v)
nrow = HopRewriteUtils.createAggUnaryOp(pmInput, AggOp.MAX, Direction.RowCol);
HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
nrow.setForcedExecType(etVect);
HopRewriteUtils.copyLineNumbers(this, nrow);
}
//b) condensed permutation matrix vector input (target rows)
_outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), nrow.constructLops(),
getDataType(), getValueType(), false, _outputEmptyBlocks, ExecType.SPARK);
setOutputDimensions(pmm);
setLineNumbers(pmm);
setLops(pmm);
HopRewriteUtils.removeChildReference(pmInput, nrow);
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructSparkLopsZIPMM()
throws HopsException, LopsException
{
//zipmm applies to t(X)%*%y if ncol(X)<=blocksize and it prevents
//unnecessary reshuffling by keeping the original indexes (and partitioning)
//joining the datasets, and internally doing the necessary transpose operations
Hop left = getInput().get(0).getInput().get(0); //x out of t(X)
Hop right = getInput().get(1); //y
//determine left-transpose rewrite beneficial
boolean tRewrite = (left.getDim1()*left.getDim2() >= right.getDim1()*right.getDim2());
Lop zipmm = new MMZip(left.constructLops(), right.constructLops(), getDataType(), getValueType(), tRewrite, ExecType.SPARK);
setOutputDimensions(zipmm);
setLineNumbers( zipmm );
setLops(zipmm);
}
//////////////////////////
// MR Lops generation
/////////////////////////
/**
*
* @param method
* @throws HopsException
* @throws LopsException
*/
private void constructMRLopsMapMM(MMultMethod method)
throws HopsException, LopsException
{
if( method == MMultMethod.MAPMM_R && isLeftTransposeRewriteApplicable(false, true) )
{
setLops( constructMRLopsMapMMWithLeftTransposeRewrite() );
}
else //GENERAL CASE
{
// If number of columns is smaller than block size then explicit aggregation is not required.
// i.e., entire matrix multiplication can be performed in the mappers.
boolean needAgg = requiresAggregation(method);
boolean needPart = requiresPartitioning(method, false);
_outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
//pre partitioning
Lop leftInput = getInput().get(0).constructLops();
Lop rightInput = getInput().get(1).constructLops();
if( needPart ) {
if( (method==MMultMethod.MAPMM_L) ) //left in distributed cache
{
Hop input = getInput().get(0);
ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(input.getDim1(), input.getDim2(), OptimizerUtils.getSparsity(input.getDim1(), input.getDim2(), input.getNnz()))
< OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
leftInput = new DataPartition(input.constructLops(), DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.COLUMN_BLOCK_WISE_N);
leftInput.getOutputParameters().setDimensions(input.getDim1(), input.getDim2(), getRowsInBlock(), getColsInBlock(), input.getNnz());
setLineNumbers(leftInput);
}
else //right side in distributed cache
{
Hop input = getInput().get(1);
ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(input.getDim1(), input.getDim2(), OptimizerUtils.getSparsity(input.getDim1(), input.getDim2(), input.getNnz()))
< OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
rightInput = new DataPartition(input.constructLops(), DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.ROW_BLOCK_WISE_N);
rightInput.getOutputParameters().setDimensions(input.getDim1(), input.getDim2(), getRowsInBlock(), getColsInBlock(), input.getNnz());
setLineNumbers(rightInput);
}
}
//core matrix mult
MapMult mapmult = new MapMult( leftInput, rightInput, getDataType(), getValueType(),
(method==MMultMethod.MAPMM_R), needPart, _outputEmptyBlocks);
mapmult.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mapmult);
//post aggregation
if (needAgg) {
Group grp = new Group(mapmult, Group.OperationTypes.Sort, getDataType(), getValueType());
Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
grp.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
agg1.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(agg1);
// aggregation uses kahanSum but the inputs do not have correction values
agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
setLops(agg1);
}
else {
setLops(mapmult);
}
}
}
/**
*
* @return
* @throws HopsException
* @throws LopsException
*/
private Lop constructMRLopsMapMMWithLeftTransposeRewrite()
throws HopsException, LopsException
{
Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
Hop Y = getInput().get(1);
//right vector transpose CP
Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
setLineNumbers(tY);
//matrix mult
// If number of columns is smaller than block size then explicit aggregation is not required.
// i.e., entire matrix multiplication can be performed in the mappers.
boolean needAgg = ( X.getDim1() <= 0 || X.getDim1() > X.getRowsInBlock() );
boolean needPart = requiresPartitioning(MMultMethod.MAPMM_R, true); //R disregarding transpose rewrite
//pre partitioning
Lop dcinput = null;
if( needPart ) {
ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(Y.getDim2(), Y.getDim1(), OptimizerUtils.getSparsity(Y.getDim2(), Y.getDim1(), Y.getNnz()))
< OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
dcinput = new DataPartition(tY, DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.COLUMN_BLOCK_WISE_N);
dcinput.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
setLineNumbers(dcinput);
}
else
dcinput = tY;
MapMult mapmult = new MapMult(dcinput, X.constructLops(), getDataType(), getValueType(), false, needPart, false);
mapmult.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mapmult);
//post aggregation
Lop mult = null;
if( needAgg ) {
Group grp = new Group(mapmult, Group.OperationTypes.Sort, getDataType(), getValueType());
grp.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(grp);
Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
agg1.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(agg1);
agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
mult = agg1;
}
else
mult = mapmult;
//result transpose CP
Lop out = new Transform(mult, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
out.getOutputParameters().setDimensions(X.getDim2(), Y.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
return out;
}
/**
*
* @param chainType
* @throws HopsException
* @throws LopsException
*/
private void constructMRLopsMapMMChain( ChainType chainType )
throws HopsException, LopsException
{
Lop mapmult = null;
if( chainType == ChainType.XtXv )
{
//v never needs partitioning because always single block
Hop hX = getInput().get(0).getInput().get(0);
Hop hv = getInput().get(1).getInput().get(1);
//core matrix mult
mapmult = new MapMultChain( hX.constructLops(), hv.constructLops(), getDataType(), getValueType(), ExecType.MR);
mapmult.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mapmult);
}
else //ChainType.XtwXv / ChainType.XtXvy
{
//v never needs partitioning because always single block
int wix = (chainType == ChainType.XtwXv) ? 0 : 1;
int vix = (chainType == ChainType.XtwXv) ? 1 : 0;
Hop hX = getInput().get(0).getInput().get(0);
Hop hw = getInput().get(1).getInput().get(wix);
Hop hv = getInput().get(1).getInput().get(vix).getInput().get(1);
double mestW = OptimizerUtils.estimateSize(hw.getDim1(), hw.getDim2());
boolean needPart = !hw.dimsKnown() || hw.getDim1() * hw.getDim2() > DistributedCacheInput.PARTITION_SIZE;
Lop X = hX.constructLops(), v = hv.constructLops(), w = null;
if( needPart ){ //requires partitioning
w = new DataPartition(hw.constructLops(), DataType.MATRIX, ValueType.DOUBLE, (mestW>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
w.getOutputParameters().setDimensions(hw.getDim1(), hw.getDim2(), getRowsInBlock(), getColsInBlock(), hw.getNnz());
setLineNumbers(w);
}
else
w = hw.constructLops();
//core matrix mult
mapmult = new MapMultChain( X, v, w, chainType, getDataType(), getValueType(), ExecType.MR);
mapmult.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(mapmult);
}
//post aggregation
Group grp = new Group(mapmult, Group.OperationTypes.Sort, getDataType(), getValueType());
grp.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
agg1.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum
setLineNumbers(agg1);
setLops(agg1);
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructMRLopsCPMM()
throws HopsException, LopsException
{
if( isLeftTransposeRewriteApplicable(false, false) )
{
setLops( constructMRLopsCPMMWithLeftTransposeRewrite() );
}
else //general case
{
Hop X = getInput().get(0);
Hop Y = getInput().get(1);
MMCJType type = getMMCJAggregationType(X, Y);
MMCJ mmcj = new MMCJ(X.constructLops(), Y.constructLops(), getDataType(), getValueType(), type, ExecType.MR);
setOutputDimensions(mmcj);
setLineNumbers(mmcj);
Group grp = new Group(mmcj, Group.OperationTypes.Sort, getDataType(), getValueType());
setOutputDimensions(grp);
setLineNumbers(grp);
Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
setOutputDimensions(agg1);
setLineNumbers(agg1);
// aggregation uses kahanSum but the inputs do not have correction values
agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
setLops(agg1);
}
}
/**
*
* @return
* @throws HopsException
* @throws LopsException
*/
private Lop constructMRLopsCPMMWithLeftTransposeRewrite()
throws HopsException, LopsException
{
Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
Hop Y = getInput().get(1);
//right vector transpose CP
Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
setLineNumbers(tY);
//matrix multiply
MMCJType type = getMMCJAggregationType(X, Y);
MMCJ mmcj = new MMCJ(tY, X.constructLops(), getDataType(), getValueType(), type, ExecType.MR);
setOutputDimensions(mmcj);
setLineNumbers(mmcj);
Group grp = new Group(mmcj, Group.OperationTypes.Sort, getDataType(), getValueType());
setOutputDimensions(grp);
setLineNumbers(grp);
Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
setOutputDimensions(agg1);
setLineNumbers(agg1);
// aggregation uses kahanSum but the inputs do not have correction values
agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
//result transpose CP
Lop out = new Transform(agg1, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
out.getOutputParameters().setDimensions(X.getDim2(), Y.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
return out;
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructMRLopsRMM()
throws HopsException, LopsException
{
MMRJ rmm = new MMRJ(getInput().get(0).constructLops(), getInput().get(1).constructLops(),
getDataType(), getValueType(), ExecType.MR);
setOutputDimensions(rmm);
setLineNumbers(rmm);
setLops(rmm);
}
/**
*
* @param mmtsj
* @throws HopsException
* @throws LopsException
*/
private void constructMRLopsTSMM(MMTSJType mmtsj)
throws HopsException, LopsException
{
Hop input = getInput().get(mmtsj.isLeft()?1:0);
MMTSJ tsmm = new MMTSJ(input.constructLops(), getDataType(), getValueType(), ExecType.MR, mmtsj);
tsmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(tsmm);
Aggregate agg1 = new Aggregate(tsmm, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
agg1.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum but the inputs do not have correction values
setLineNumbers(agg1);
setLops(agg1);
}
/**
*
* @throws HopsException
* @throws LopsException
*/
private void constructMRLopsPMM()
throws HopsException, LopsException
{
//PMM has two potential modes (a) w/ full permutation matrix input, and
//(b) w/ already condensed input vector of target row positions.
Hop pmInput = getInput().get(0);
Hop rightInput = getInput().get(1);
long brlen = pmInput.getRowsInBlock();
long bclen = pmInput.getColsInBlock();
Lop lpmInput = pmInput.constructLops();
Hop nrow = null;
double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 1);
ExecType etVect = (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP;
//a) full permutation matrix input (potentially without empty block materialized)
if( pmInput.getDim2() != 1 ) //not a vector
{
//compute condensed permutation matrix vector input
//v = rowMaxIndex(t(pm)) * rowMax(t(pm))
ReorgOp transpose = HopRewriteUtils.createTranspose(pmInput);
transpose.setForcedExecType(ExecType.MR);
HopRewriteUtils.copyLineNumbers(this, transpose);
AggUnaryOp agg1 = new AggUnaryOp("tmp2a", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAXINDEX, Direction.Row, transpose);
HopRewriteUtils.setOutputBlocksizes(agg1, brlen, bclen);
agg1.refreshSizeInformation();
agg1.setForcedExecType(ExecType.MR);
HopRewriteUtils.copyLineNumbers(this, agg1);
AggUnaryOp agg2 = new AggUnaryOp("tmp2b", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAX, Direction.Row, transpose);
HopRewriteUtils.setOutputBlocksizes(agg2, brlen, bclen);
agg2.refreshSizeInformation();
agg2.setForcedExecType(ExecType.MR);
HopRewriteUtils.copyLineNumbers(this, agg2);
BinaryOp mult = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, agg1, agg2);
HopRewriteUtils.setOutputBlocksizes(mult, brlen, bclen);
mult.refreshSizeInformation();
mult.setForcedExecType(ExecType.MR);
HopRewriteUtils.copyLineNumbers(this, mult);
//compute NROW target via nrow(m)
nrow = HopRewriteUtils.createValueHop(pmInput, true);
HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
nrow.setForcedExecType(ExecType.CP);
HopRewriteUtils.copyLineNumbers(this, nrow);
lpmInput = mult.constructLops();
HopRewriteUtils.removeChildReference(pmInput, transpose);
}
else //input vector
{
//compute NROW target via max(v)
nrow = HopRewriteUtils.createAggUnaryOp(pmInput, AggOp.MAX, Direction.RowCol);
HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
nrow.setForcedExecType(etVect);
HopRewriteUtils.copyLineNumbers(this, nrow);
}
//b) condensed permutation matrix vector input (target rows)
boolean needPart = !pmInput.dimsKnown() || pmInput.getDim1() > DistributedCacheInput.PARTITION_SIZE;
if( needPart ){ //requires partitioning
lpmInput = new DataPartition(lpmInput, DataType.MATRIX, ValueType.DOUBLE, etVect, PDataPartitionFormat.ROW_BLOCK_WISE_N);
lpmInput.getOutputParameters().setDimensions(pmInput.getDim1(), 1, getRowsInBlock(), getColsInBlock(), pmInput.getDim1());
setLineNumbers(lpmInput);
}
_outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), nrow.constructLops(), getDataType(), getValueType(), needPart, _outputEmptyBlocks, ExecType.MR);
pmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
setLineNumbers(pmm);
Aggregate aggregate = new Aggregate(pmm, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
aggregate.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
aggregate.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum but the inputs do not have correction values
setLineNumbers(aggregate);
setLops(aggregate);
HopRewriteUtils.removeChildReference(pmInput, nrow);
}
/**
* Determines if the rewrite t(X)%*%Y -> t(t(Y)%*%X) is applicable
* and cost effective. Whenever X is a wide matrix and Y is a vector
* this has huge impact, because the transpose of X would dominate
* the entire operation costs.
*
* @return
*/
private boolean isLeftTransposeRewriteApplicable(boolean CP, boolean checkMemMR)
{
//check for forced MR or Spark execution modes, which prevent the introduction of
//additional CP operations and hence the rewrite application
if( DMLScript.rtplatform == RUNTIME_PLATFORM.HADOOP //not hybrid_mr
|| DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK ) //not hybrid_spark
{
return false;
}
boolean ret = false;
Hop h1 = getInput().get(0);
Hop h2 = getInput().get(1);
//check for known dimensions and cost for t(X) vs t(v) + t(tvX)
//(for both CP/MR, we explicitly check that new transposes fit in memory,
//even a ba in CP does not imply that both transposes can be executed in CP)
if( CP ) //in-memory ba
{
if( h1 instanceof ReorgOp && ((ReorgOp)h1).getOp()==ReOrgOp.TRANSPOSE )
{
long m = h1.getDim1();
long cd = h1.getDim2();
long n = h2.getDim2();
//check for known dimensions (necessary condition for subsequent checks)
ret = (m>0 && cd>0 && n>0);
//check operation memory with changed transpose (this is important if we have
//e.g., t(X) %*% v, where X is sparse and tX fits in memory but X does not
double memX = h1.getInput().get(0).getOutputMemEstimate();
double memtv = OptimizerUtils.estimateSizeExactSparsity(n, cd, 1.0);
double memtXv = OptimizerUtils.estimateSizeExactSparsity(n, m, 1.0);
double newMemEstimate = memtv + memX + memtXv;
ret &= ( newMemEstimate < OptimizerUtils.getLocalMemBudget() );
//check for cost benefit of t(X) vs t(v) + t(tvX) and memory of additional transpose ops
ret &= ( m*cd > (cd*n + m*n) &&
2 * OptimizerUtils.estimateSizeExactSparsity(cd, n, 1.0) < OptimizerUtils.getLocalMemBudget() &&
2 * OptimizerUtils.estimateSizeExactSparsity(m, n, 1.0) < OptimizerUtils.getLocalMemBudget() );
//update operation memory estimate (e.g., for parfor optimizer)
if( ret )
_memEstimate = newMemEstimate;
}
}
else //MR
{
if( h1 instanceof ReorgOp && ((ReorgOp)h1).getOp()==ReOrgOp.TRANSPOSE )
{
long m = h1.getDim1();
long cd = h1.getDim2();
long n = h2.getDim2();
//note: output size constraint for mapmult already checked by optfindmmultmethod
if( m>0 && cd>0 && n>0 && (m*cd > (cd*n + m*n)) &&
2 * OptimizerUtils.estimateSizeExactSparsity(cd, n, 1.0) < OptimizerUtils.getLocalMemBudget() &&
2 * OptimizerUtils.estimateSizeExactSparsity(m, n, 1.0) < OptimizerUtils.getLocalMemBudget() &&
(!checkMemMR || OptimizerUtils.estimateSizeExactSparsity(cd, n, 1.0) < OptimizerUtils.getRemoteMemBudgetMap(true)) )
{
ret = true;
}
}
}
return ret;
}
/**
*
* @param X
* @param Y
* @return
*/
private MMCJType getMMCJAggregationType(Hop X, Hop Y)
{
//choose quickpath (no aggregation) if the aggregation buffer likely has to spill and the smaller block fits
//into the minimal cache size and hence is guaranteed not to require spilling
double sizeX = OptimizerUtils.estimateSize(X.getDim1(), Math.min(X.getDim2(), X.getColsInBlock()));
double sizeY = OptimizerUtils.estimateSize(Math.min(Y.getDim1(), Y.getRowsInBlock()), Y.getDim2());
return (dimsKnown() && 2*OptimizerUtils.estimateSize(getDim1(), getDim2())>OptimizerUtils.getRemoteMemBudgetReduce()
&& ( sizeX < MMCJMRReducerWithAggregator.MIN_CACHE_SIZE
|| sizeY < MMCJMRReducerWithAggregator.MIN_CACHE_SIZE) )
? MMCJType.NO_AGG : MMCJType.AGG ;
}
/**
*
* @param agg
* @return
*/
private SparkAggType getSparkMMAggregationType( boolean agg )
{
if( !agg )
return SparkAggType.NONE;
if( dimsKnown() && getDim1()<=getRowsInBlock() && getDim2()<=getColsInBlock() )
return SparkAggType.SINGLE_BLOCK;
else
return SparkAggType.MULTI_BLOCK;
}
/**
*
* @param method
* @return
*/
private boolean requiresAggregation(MMultMethod method)
{
//worst-case assumption (for plan correctness)
boolean ret = true;
//right side cached (no agg if left has just one column block)
if( method == MMultMethod.MAPMM_R && getInput().get(0).getDim2() > 0 //known num columns
&& getInput().get(0).getDim2() <= getInput().get(0).getColsInBlock() )
{
ret = false;
}
//left side cached (no agg if right has just one row block)
if( method == MMultMethod.MAPMM_L && getInput().get(1).getDim1() > 0 //known num rows
&& getInput().get(1).getDim1() <= getInput().get(1).getRowsInBlock() )
{
ret = false;
}
return ret;
}
/**
*
* @param method
* @return
*/
private boolean requiresPartitioning(MMultMethod method, boolean rewrite)
{
boolean ret = true; //worst-case
Hop input1 = getInput().get(0);
Hop input2 = getInput().get(1);
//right side cached
if( method == MMultMethod.MAPMM_R && input2.dimsKnown() ) { //known input size
ret = (input2.getDim1()*input2.getDim2() > DistributedCacheInput.PARTITION_SIZE);
//conservative: do not apply partitioning if this forces t(X) into separate job
//if( !rewrite && input1 instanceof ReorgOp && ((ReorgOp)input1).getOp()==ReOrgOp.TRANSPOSE )
// ret = false;
}
//left side cached (no agg if right has just one row block)
if( method == MMultMethod.MAPMM_L && input1.dimsKnown() ) { //known input size
ret = (input1.getDim1()*input1.getDim2() > DistributedCacheInput.PARTITION_SIZE);
//conservative: do not apply partitioning if this forces t(X) into separate job
//if( !rewrite && input2 instanceof ReorgOp && ((ReorgOp)input2).getOp()==ReOrgOp.TRANSPOSE )
// ret = false;
}
return ret;
}
/**
* Estimates the memory footprint of MapMult operation depending on which input is put into distributed cache.
* This function is called by <code>optFindMMultMethod()</code> to decide the execution strategy, as well as by
* piggybacking to decide the number of Map-side instructions to put into a single GMR job.
*/
public static double getMapmmMemEstimate(long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz, int cachedInputIndex, boolean pmm)
{
// If the size of one input is small, choose a method that uses distributed cache
// NOTE: be aware of output size because one input block might generate many output blocks
double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz); //m1 partitioned
double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz); //m2 partitioned
double m1BlockSize = OptimizerUtils.estimateSize(Math.min(m1_rows, m1_rpb), Math.min(m1_cols, m1_cpb));
double m2BlockSize = OptimizerUtils.estimateSize(Math.min(m2_rows, m2_rpb), Math.min(m2_cols, m2_cpb));
double m3m1OutSize = OptimizerUtils.estimateSize(Math.min(m1_rows, m1_rpb), m2_cols); //output per m1 block if m2 in cache
double m3m2OutSize = OptimizerUtils.estimateSize(m1_rows, Math.min(m2_cols, m2_cpb)); //output per m2 block if m1 in cache
double footprint = 0;
if( pmm )
{
//permutation matrix multiply
//(one input block -> at most two output blocks)
footprint = m1SizeP + 3*m2BlockSize; //in+2*out
}
else
{
//generic matrix multiply
if ( cachedInputIndex == 1 ) {
// left input (m1) is in cache
footprint = m1SizeP+m2BlockSize+m3m2OutSize;
}
else {
// right input (m2) is in cache
footprint = m1BlockSize+m2SizeP+m3m1OutSize;
}
}
return footprint;
}
/**
* Optimization that chooses between two methods to perform matrix multiplication on map-reduce.
*
* More details on the cost-model used: refer ICDE 2011 paper.
*/
private static MMultMethod optFindMMultMethodMR ( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
MMTSJType mmtsj, ChainType chainType, boolean leftPMInput )
{
double memBudget = MAPMULT_MEM_MULTIPLIER * OptimizerUtils.getRemoteMemBudgetMap(true);
// Step 0: check for forced mmultmethod
if( FORCED_MMULT_METHOD !=null )
return FORCED_MMULT_METHOD;
// Step 1: check TSMM
// If transpose self pattern and result is single block:
// use specialized TSMM method (always better than generic jobs)
if( ( mmtsj == MMTSJType.LEFT && m2_cols>=0 && m2_cols <= m2_cpb )
|| ( mmtsj == MMTSJType.RIGHT && m1_rows>=0 && m1_rows <= m1_rpb ) )
{
return MMultMethod.TSMM;
}
// Step 2: check MapMultChain
// If mapmultchain pattern and result is a single block:
// use specialized mapmult method
if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES )
{
//matmultchain if dim2(X)<=blocksize and all vectors fit in mappers
//(X: m1_cols x m1_rows, v: m1_rows x m2_cols, w: m1_cols x m2_cols)
//NOTE: generalization possibe: m2_cols>=0 && m2_cols<=m2_cpb
if( chainType!=ChainType.NONE && m1_rows>=0 && m1_rows<= m1_rpb && m2_cols==1 )
{
if( chainType==ChainType.XtXv && m1_rows>=0 && m2_cols>=0
&& OptimizerUtils.estimateSize(m1_rows, m2_cols ) < memBudget )
{
return MMultMethod.MAPMM_CHAIN;
}
else if( (chainType==ChainType.XtwXv || chainType==ChainType.XtXvy )
&& m1_rows>=0 && m2_cols>=0 && m1_cols>=0
&& OptimizerUtils.estimateSize(m1_rows, m2_cols )
+ OptimizerUtils.estimateSize(m1_cols, m2_cols) < memBudget )
{
return MMultMethod.MAPMM_CHAIN;
}
}
}
// Step 3: check for PMM (permutation matrix needs to fit into mapper memory)
// (needs to be checked before mapmult for consistency with removeEmpty compilation
double footprintPM1 = getMapmmMemEstimate(m1_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
double footprintPM2 = getMapmmMemEstimate(m2_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
if( (footprintPM1 < memBudget && m1_rows>=0 || footprintPM2 < memBudget && m2_rows>=0 )
&& leftPMInput )
{
return MMultMethod.PMM;
}
// Step 4: check MapMult
// If the size of one input is small, choose a method that uses distributed cache
// (with awareness of output size because one input block might generate many output blocks)
//memory estimates for local partitioning (mb -> partitioned mb)
double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz); //m1 partitioned
double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz); //m2 partitioned
//memory estimates for remote execution (broadcast and outputs)
double footprint1 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, false);
double footprint2 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 2, false);
if ( (footprint1 < memBudget && m1_rows>=0 && m1_cols>=0)
|| (footprint2 < memBudget && m2_rows>=0 && m2_cols>=0) )
{
//apply map mult if one side fits in remote task memory
//(if so pick smaller input for distributed cache)
if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0)
return MMultMethod.MAPMM_L;
else
return MMultMethod.MAPMM_R;
}
// Step 5: check for unknowns
// If the dimensions are unknown at compilation time, simply assume
// the worst-case scenario and produce the most robust plan -- which is CPMM
if ( m1_rows == -1 || m1_cols == -1 || m2_rows == -1 || m2_cols == -1 )
return MMultMethod.CPMM;
// Step 6: Decide CPMM vs RMM based on io costs
//estimate shuffle costs weighted by parallelism
double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
//final mmult method decision
if ( cpmm_costs < rmm_costs )
return MMultMethod.CPMM;
else
return MMultMethod.RMM;
}
/**
*
* @param m1_rows
* @param m1_cols
* @param m2_rows
* @param m2_cols
* @param mmtsj
* @param chainType
* @return
*/
private static MMultMethod optFindMMultMethodCP( long m1_rows, long m1_cols, long m2_rows, long m2_cols, MMTSJType mmtsj, ChainType chainType, boolean leftPM )
{
//step 1: check for TSMM pattern
if( mmtsj != MMTSJType.NONE )
return MMultMethod.TSMM;
//step 2: check for MMChain pattern
if( chainType != ChainType.NONE && OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES && m2_cols==1 )
return MMultMethod.MAPMM_CHAIN;
//step 3: check for PMM
if( leftPM && m1_cols==1 && m2_rows!=1 )
return MMultMethod.PMM;
//step 4: general purpose MM
return MMultMethod.MM;
}
/**
*
* @param m1_rows
* @param m1_cols
* @param m1_rpb
* @param m1_cpb
* @param m2_rows
* @param m2_cols
* @param m2_rpb
* @param m2_cpb
* @param mmtsj
* @param chainType
* @return
*/
private MMultMethod optFindMMultMethodSpark( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite )
{
//Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp,
//and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer
//required because the max_int byte buffer constraint has been fixed in Spark 1.4
double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget();
double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
//reset spark broadcast memory information (for concurrent parfor jobs, awareness of additional
//cp memory requirements on spark rdd operations with broadcasts)
_spBroadcastMemEstimate = 0;
// Step 0: check for forced mmultmethod
if( FORCED_MMULT_METHOD !=null )
return FORCED_MMULT_METHOD;
// Step 1: check TSMM
// If transpose self pattern and result is single block:
// use specialized TSMM method (always better than generic jobs)
if( ( mmtsj == MMTSJType.LEFT && m2_cols>=0 && m2_cols <= m2_cpb )
|| ( mmtsj == MMTSJType.RIGHT && m1_rows>=0 && m1_rows <= m1_rpb ) )
{
return MMultMethod.TSMM;
}
// Step 2: check MapMMChain
// If mapmultchain pattern and result is a single block:
// use specialized mapmult method
if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES )
{
//matmultchain if dim2(X)<=blocksize and all vectors fit in mappers
//(X: m1_cols x m1_rows, v: m1_rows x m2_cols, w: m1_cols x m2_cols)
//NOTE: generalization possibe: m2_cols>=0 && m2_cols<=m2_cpb
if( chainType!=ChainType.NONE && m1_rows >=0 && m1_rows <= m1_rpb && m2_cols==1 )
{
if( chainType==ChainType.XtXv && m1_rows>=0 && m2_cols>=0
&& OptimizerUtils.estimateSize(m1_rows, m2_cols ) < memBudgetExec )
{
return MMultMethod.MAPMM_CHAIN;
}
else if( (chainType==ChainType.XtwXv || chainType==ChainType.XtXvy )
&& m1_rows>=0 && m2_cols>=0 && m1_cols>=0
&& OptimizerUtils.estimateSize(m1_rows, m2_cols)
+ OptimizerUtils.estimateSize(m1_cols, m2_cols) < memBudgetExec
&& 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols)
+ OptimizerUtils.estimateSize(m1_cols, m2_cols)) < memBudgetLocal )
{
_spBroadcastMemEstimate = 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols)
+ OptimizerUtils.estimateSize(m1_cols, m2_cols));
return MMultMethod.MAPMM_CHAIN;
}
}
}
// Step 3: check for PMM (permutation matrix needs to fit into mapper memory)
// (needs to be checked before mapmult for consistency with removeEmpty compilation
double footprintPM1 = getMapmmMemEstimate(m1_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
double footprintPM2 = getMapmmMemEstimate(m2_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
if( (footprintPM1 < memBudgetExec && m1_rows>=0 || footprintPM2 < memBudgetExec && m2_rows>=0)
&& 2*OptimizerUtils.estimateSize(m1_rows, 1) < memBudgetLocal
&& leftPMInput )
{
_spBroadcastMemEstimate = 2*OptimizerUtils.estimateSize(m1_rows, 1);
return MMultMethod.PMM;
}
// Step 4: check MapMM
// If the size of one input is small, choose a method that uses broadcast variables to prevent shuffle
//memory estimates for local partitioning (mb -> partitioned mb)
double m1Size = OptimizerUtils.estimateSizeExactSparsity(m1_rows, m1_cols, m1_nnz); //m1 single block
double m2Size = OptimizerUtils.estimateSizeExactSparsity(m2_rows, m2_cols, m2_nnz); //m2 single block
double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz); //m1 partitioned
double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz); //m2 partitioned
//memory estimates for remote execution (broadcast and outputs)
double footprint1 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, false);
double footprint2 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 2, false);
if ( (footprint1 < memBudgetExec && m1Size+m1SizeP < memBudgetLocal && m1_rows>=0 && m1_cols>=0)
|| (footprint2 < memBudgetExec && m2Size+m2SizeP < memBudgetLocal && m2_rows>=0 && m2_cols>=0) )
{
//apply map mult if one side fits in remote task memory
//(if so pick smaller input for distributed cache)
if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0) {
_spBroadcastMemEstimate = m1Size+m1SizeP;
return MMultMethod.MAPMM_L;
}
else {
_spBroadcastMemEstimate = m2Size+m2SizeP;
return MMultMethod.MAPMM_R;
}
}
// Step 5: check for TSMM2 (2 pass w/o suffle, preferred over CPMM/RMM)
if( mmtsj != MMTSJType.NONE && m1_rows >=0 && m1_cols>=0
&& m2_rows >= 0 && m2_cols>=0 )
{
double mSize = (mmtsj == MMTSJType.LEFT) ?
OptimizerUtils.estimateSizeExactSparsity(m2_rows, m2_cols-m2_cpb, 1.0) :
OptimizerUtils.estimateSizeExactSparsity(m1_rows-m1_rpb, m1_cols, 1.0);
double mSizeP = (mmtsj == MMTSJType.LEFT) ?
OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols-m2_cpb, m2_rpb, m2_cpb, 1.0) :
OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows-m1_rpb, m1_cols, m1_rpb, m1_cpb, 1.0);
if( mSizeP < memBudgetExec && mSize+mSizeP < memBudgetLocal
&& ((mmtsj == MMTSJType.LEFT) ? m2_cols<=2*m2_cpb : m1_rows<=2*m1_rpb) //4 output blocks
&& mSizeP < 2L*1024*1024*1024) { //2GB limitation as single broadcast
return MMultMethod.TSMM2;
}
}
// Step 6: check for unknowns
// If the dimensions are unknown at compilation time, simply assume
// the worst-case scenario and produce the most robust plan -- which is CPMM
if ( m1_rows == -1 || m1_cols == -1 || m2_rows == -1 || m2_cols == -1 )
return MMultMethod.CPMM;
// Step 7: check for ZIPMM
// If t(X)%*%y -> t(t(y)%*%X) rewrite and ncol(X)<blocksize
if( tmmRewrite && m1_rows >= 0 && m1_rows <= m1_rpb //blocksize constraint left
&& m2_cols >= 0 && m2_cols <= m2_cpb ) //blocksize constraint right
{
return MMultMethod.ZIPMM;
}
// Step 8: Decide CPMM vs RMM based on io costs
//estimate shuffle costs weighted by parallelism
//TODO currently we reuse the mr estimates, these need to be fine-tune for our spark operators
double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
//final mmult method decision
if ( cpmm_costs < rmm_costs )
return MMultMethod.CPMM;
else
return MMultMethod.RMM;
}
/**
*
* @param m1_rows
* @param m1_cols
* @param m1_rpb
* @param m1_cpb
* @param m2_rows
* @param m2_cols
* @param m2_rpb
* @param m2_cpb
* @return
*/
private static double getRMMCostEstimate( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb,
long m2_rows, long m2_cols, long m2_rpb, long m2_cpb )
{
long m1_nrb = (long) Math.ceil((double)m1_rows/m1_rpb); // number of row blocks in m1
long m2_ncb = (long) Math.ceil((double)m2_cols/m2_cpb); // number of column blocks in m2
// TODO: we must factor in the "sparsity"
double m1_size = m1_rows * m1_cols;
double m2_size = m2_rows * m2_cols;
double result_size = m1_rows * m2_cols;
int numReducersRMM = OptimizerUtils.getNumReducers(true);
// Estimate the cost of RMM
// RMM phase 1
double rmm_shuffle = (m2_ncb*m1_size) + (m1_nrb*m2_size);
double rmm_io = m1_size + m2_size + result_size;
double rmm_nred = Math.min( m1_nrb * m2_ncb, //max used reducers
numReducersRMM); //available reducers
// RMM total costs
double rmm_costs = (rmm_shuffle + rmm_io) / rmm_nred;
// return total costs
return rmm_costs;
}
/**
*
* @param m1_rows
* @param m1_cols
* @param m1_rpb
* @param m1_cpb
* @param m2_rows
* @param m2_cols
* @param m2_rpb
* @param m2_cpb
* @return
*/
private static double getCPMMCostEstimate( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb,
long m2_rows, long m2_cols, long m2_rpb, long m2_cpb )
{
long m1_nrb = (long) Math.ceil((double)m1_rows/m1_rpb); // number of row blocks in m1
long m1_ncb = (long) Math.ceil((double)m1_cols/m1_cpb); // number of column blocks in m1
long m2_ncb = (long) Math.ceil((double)m2_cols/m2_cpb); // number of column blocks in m2
// TODO: we must factor in the "sparsity"
double m1_size = m1_rows * m1_cols;
double m2_size = m2_rows * m2_cols;
double result_size = m1_rows * m2_cols;
int numReducersCPMM = OptimizerUtils.getNumReducers(false);
// Estimate the cost of CPMM
// CPMM phase 1
double cpmm_shuffle1 = m1_size + m2_size;
double cpmm_nred1 = Math.min( m1_ncb, //max used reducers
numReducersCPMM); //available reducers
double cpmm_io1 = m1_size + m2_size + cpmm_nred1 * result_size;
// CPMM phase 2
double cpmm_shuffle2 = cpmm_nred1 * result_size;
double cpmm_io2 = cpmm_nred1 * result_size + result_size;
double cpmm_nred2 = Math.min( m1_nrb * m2_ncb, //max used reducers
numReducersCPMM); //available reducers
// CPMM total costs
double cpmm_costs = (cpmm_shuffle1+cpmm_io1)/cpmm_nred1 //cpmm phase1
+(cpmm_shuffle2+cpmm_io2)/cpmm_nred2; //cpmm phase2
//return total costs
return cpmm_costs;
}
@Override
public void refreshSizeInformation()
{
Hop input1 = getInput().get(0);
Hop input2 = getInput().get(1);
if( isMatrixMultiply() )
{
setDim1(input1.getDim1());
setDim2(input2.getDim2());
}
}
@Override
public Object clone() throws CloneNotSupportedException
{
AggBinaryOp ret = new AggBinaryOp();
//copy generic attributes
ret.clone(this, false);
//copy specific attributes
ret.innerOp = innerOp;
ret.outerOp = outerOp;
ret._hasLeftPMInput = _hasLeftPMInput;
ret._maxNumThreads = _maxNumThreads;
return ret;
}
@Override
public boolean compare( Hop that )
{
if( !(that instanceof AggBinaryOp) )
return false;
AggBinaryOp that2 = (AggBinaryOp)that;
return ( innerOp == that2.innerOp
&& outerOp == that2.outerOp
&& getInput().get(0) == that2.getInput().get(0)
&& getInput().get(1) == that2.getInput().get(1)
&& _hasLeftPMInput == that2._hasLeftPMInput
&& _maxNumThreads == that2._maxNumThreads);
}
}