| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysml.hops; |
| |
| import org.apache.sysml.conf.ConfigurationManager; |
| import org.apache.sysml.hops.Hop.MultiThreadedHop; |
| import org.apache.sysml.lops.Aggregate; |
| import org.apache.sysml.lops.DataPartition; |
| import org.apache.sysml.lops.Group; |
| import org.apache.sysml.lops.Lop; |
| import org.apache.sysml.lops.LopsException; |
| import org.apache.sysml.lops.RepMat; |
| import org.apache.sysml.lops.Transform; |
| import org.apache.sysml.lops.Unary; |
| import org.apache.sysml.lops.UnaryCP; |
| import org.apache.sysml.lops.LopProperties.ExecType; |
| import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; |
| import org.apache.sysml.lops.WeightedCrossEntropy; |
| import org.apache.sysml.lops.WeightedCrossEntropyR; |
| import org.apache.sysml.lops.WeightedDivMM; |
| import org.apache.sysml.lops.WeightedCrossEntropy.WCeMMType; |
| import org.apache.sysml.lops.WeightedDivMM.WDivMMType; |
| import org.apache.sysml.lops.WeightedDivMMR; |
| import org.apache.sysml.lops.WeightedSigmoid; |
| import org.apache.sysml.lops.WeightedSigmoid.WSigmoidType; |
| import org.apache.sysml.lops.WeightedSigmoidR; |
| import org.apache.sysml.lops.WeightedSquaredLoss; |
| import org.apache.sysml.lops.WeightedSquaredLoss.WeightsType; |
| import org.apache.sysml.lops.WeightedSquaredLossR; |
| import org.apache.sysml.lops.WeightedUnaryMM; |
| import org.apache.sysml.lops.WeightedUnaryMM.WUMMType; |
| import org.apache.sysml.lops.WeightedUnaryMMR; |
| import org.apache.sysml.parser.Expression.DataType; |
| import org.apache.sysml.parser.Expression.ValueType; |
| import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; |
| import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; |
| import org.apache.sysml.runtime.matrix.MatrixCharacteristics; |
| import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput; |
| |
| /** |
| * Note: this hop should be called AggQuaternaryOp in consistency with AggUnaryOp and AggBinaryOp; |
| * however, since there does not exist a real QuaternaryOp yet - we can leave it as is for now. |
| */ |
| public class QuaternaryOp extends Hop implements MultiThreadedHop |
| { |
| |
| //config influencing mr operator selection (for testing purposes only) |
| public static boolean FORCE_REPLICATION = false; |
| |
| private OpOp4 _op = null; |
| private int _maxNumThreads = -1; //-1 for unlimited |
| |
| //wsloss-specific attributes |
| private boolean _postWeights = false; |
| |
| //wsigmoid-specific attributes |
| private boolean _logout = false; |
| private boolean _minusin = false; |
| |
| //wdivmm-specific attributes |
| private int _baseType = -1; |
| private boolean _mult = false; |
| private boolean _minus = false; |
| |
| //wumm-specific attributes |
| private boolean _umult = false; |
| private OpOp1 _uop = null; |
| private OpOp2 _sop = null; |
| |
| private QuaternaryOp() { |
| //default constructor for clone |
| } |
| |
| /** |
| * Constructor for wsloss. |
| * |
| * @param l |
| * @param dt |
| * @param vt |
| * @param o |
| * @param inX |
| * @param inU |
| * @param inV |
| * @param inW |
| * @param post |
| */ |
| public QuaternaryOp(String l, DataType dt, ValueType vt, Hop.OpOp4 o, |
| Hop inX, Hop inU, Hop inV, Hop inW, boolean post) |
| { |
| this(l, dt, vt, o, inX, inU, inV); |
| getInput().add(3, inW); |
| inW.getParent().add(this); |
| |
| _postWeights = post; |
| } |
| |
| /** |
| * Constructor for wsigmoid. |
| * |
| * @param l |
| * @param dt |
| * @param vt |
| * @param o |
| * @param inX |
| * @param inU |
| * @param inV |
| * @param logout |
| * @param minusin |
| */ |
| public QuaternaryOp(String l, DataType dt, ValueType vt, Hop.OpOp4 o, |
| Hop inX, Hop inU, Hop inV, boolean flag1, boolean flag2) |
| { |
| this(l, dt, vt, o, inX, inU, inV); |
| |
| _logout = flag1; |
| _minusin = flag2; |
| } |
| |
| /** |
| * |
| * @param l |
| * @param dt |
| * @param vt |
| * @param o |
| * @param inW |
| * @param inU |
| * @param inV |
| * @param baseType |
| * @param flag1 |
| * @param flag2 |
| */ |
| public QuaternaryOp(String l, DataType dt, ValueType vt, Hop.OpOp4 o, |
| Hop inX, Hop inU, Hop inV, Hop inW, int baseType, boolean flag1, boolean flag2) |
| { |
| this(l, dt, vt, o, inX, inU, inV); |
| if( inW != null ) { //four inputs |
| getInput().add(3, inW); |
| inW.getParent().add(this); |
| } |
| |
| _baseType = baseType; |
| _mult = flag1; |
| _minus = flag2; |
| } |
| |
| public QuaternaryOp(String l, DataType dt, ValueType vt, Hop.OpOp4 o, |
| Hop inW, Hop inU, Hop inV, boolean umult, OpOp1 uop, OpOp2 sop) |
| { |
| this(l, dt, vt, o, inW, inU, inV); |
| |
| _umult = umult; |
| _uop = uop; |
| _sop = sop; |
| } |
| |
| /** |
| * |
| * @param l |
| * @param dt |
| * @param vt |
| * @param o |
| * @param inX |
| * @param inU |
| * @param inV |
| */ |
| public QuaternaryOp(String l, DataType dt, ValueType vt, Hop.OpOp4 o, Hop inX, Hop inU, Hop inV) |
| { |
| super(l, dt, vt); |
| _op = o; |
| getInput().add(0, inX); |
| getInput().add(1, inU); |
| getInput().add(2, inV); |
| inX.getParent().add(this); |
| inU.getParent().add(this); |
| inV.getParent().add(this); |
| } |
| |
| public OpOp4 getOp(){ |
| return _op; |
| } |
| |
| @Override |
| public void setMaxNumThreads( int k ) { |
| _maxNumThreads = k; |
| } |
| |
| @Override |
| public int getMaxNumThreads() { |
| return _maxNumThreads; |
| } |
| |
| @Override |
| public Lop constructLops() |
| throws HopsException, LopsException |
| { |
| //return already created lops |
| if( getLops() != null ) |
| return getLops(); |
| |
| try |
| { |
| ExecType et = optFindExecType(); |
| |
| switch( _op ) { |
| case WSLOSS: { |
| WeightsType wtype = checkWeightsType(); |
| |
| if( et == ExecType.CP ) |
| constructCPLopsWeightedSquaredLoss(wtype); |
| else if( et == ExecType.MR ) |
| constructMRLopsWeightedSquaredLoss(wtype); |
| else if( et == ExecType.SPARK ) |
| constructSparkLopsWeightedSquaredLoss(wtype); |
| else |
| throw new HopsException("Unsupported quaternaryop-wsloss exec type: "+et); |
| break; |
| } |
| |
| case WSIGMOID:{ |
| WSigmoidType wtype = checkWSigmoidType(); |
| |
| if( et == ExecType.CP ) |
| constructCPLopsWeightedSigmoid(wtype); |
| else if( et == ExecType.MR ) |
| constructMRLopsWeightedSigmoid(wtype); |
| else if( et == ExecType.SPARK ) |
| constructSparkLopsWeightedSigmoid(wtype); |
| else |
| throw new HopsException("Unsupported quaternaryop-wsigmoid exec type: "+et); |
| break; |
| } |
| |
| case WDIVMM:{ |
| WDivMMType wtype = checkWDivMMType(); |
| if( et == ExecType.CP ) |
| constructCPLopsWeightedDivMM(wtype); |
| else if( et == ExecType.MR ) |
| constructMRLopsWeightedDivMM(wtype); |
| else if( et == ExecType.SPARK ) |
| constructSparkLopsWeightedDivMM(wtype); |
| else |
| throw new HopsException("Unsupported quaternaryop-wdivmm exec type: "+et); |
| break; |
| } |
| |
| case WCEMM:{ |
| WCeMMType wtype = checkWCeMMType(); |
| |
| if( et == ExecType.CP ) |
| constructCPLopsWeightedCeMM(wtype); |
| else if( et == ExecType.MR ) |
| constructMRLopsWeightedCeMM(wtype); |
| else if( et == ExecType.SPARK ) |
| constructSparkLopsWeightedCeMM(wtype); |
| else |
| throw new HopsException("Unsupported quaternaryop-wcemm exec type: "+et); |
| break; |
| } |
| |
| case WUMM:{ |
| WUMMType wtype = _umult ? WUMMType.MULT : WUMMType.DIV; |
| |
| if( et == ExecType.CP ) |
| constructCPLopsWeightedUMM(wtype); |
| else if( et == ExecType.MR ) |
| constructMRLopsWeightedUMM(wtype); |
| else if( et == ExecType.SPARK ) |
| constructSparkLopsWeightedUMM(wtype); |
| else |
| throw new HopsException("Unsupported quaternaryop-wumm exec type: "+et); |
| break; |
| } |
| |
| default: |
| throw new HopsException(this.printErrorLocation() + "Unknown QuaternaryOp (" + _op + ") while constructing Lops"); |
| } |
| } |
| catch(LopsException e) { |
| throw new HopsException(this.printErrorLocation() + "error constructing lops for QuaternaryOp." , e); |
| } |
| |
| //add reblock/checkpoint lops if necessary |
| constructAndSetLopsDataFlowProperties(); |
| |
| return getLops(); |
| } |
| |
| @Override |
| public String getOpString() { |
| String s = new String(""); |
| s += "q(" + HopsOpOp4String.get(_op) + ")"; |
| return s; |
| } |
| |
| public void printMe() throws HopsException { |
| if (LOG.isDebugEnabled()){ |
| if (getVisited() != VisitStatus.DONE) { |
| super.printMe(); |
| LOG.debug(" Operation: " + _op); |
| for (Hop h : getInput()) { |
| h.printMe(); |
| } |
| } |
| setVisited(VisitStatus.DONE); |
| } |
| } |
| |
| @Override |
| public boolean allowsAllExecTypes() |
| { |
| return true; |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructCPLopsWeightedSquaredLoss(WeightsType wtype) |
| throws HopsException, LopsException |
| { |
| WeightedSquaredLoss wsloss = new WeightedSquaredLoss( |
| getInput().get(0).constructLops(), |
| getInput().get(1).constructLops(), |
| getInput().get(2).constructLops(), |
| getInput().get(3).constructLops(), |
| getDataType(), getValueType(), wtype, ExecType.CP); |
| |
| //set degree of parallelism |
| int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); |
| wsloss.setNumThreads(k); |
| |
| setOutputDimensions( wsloss ); |
| setLineNumbers( wsloss ); |
| setLops( wsloss ); |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructMRLopsWeightedSquaredLoss(WeightsType wtype) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wsloss are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted Squared Loss only if this constraint holds. |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| Hop W = getInput().get(3); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWsloss = (!wtype.hasFourInputs() && m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetMap(true)); |
| |
| if( !FORCE_REPLICATION && isMapWsloss ) //broadcast |
| { |
| //partitioning of U |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| |
| //partitioning of V |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| |
| //map-side wsloss always with broadcast |
| Lop wsloss = new WeightedSquaredLoss( X.constructLops(), lU, lV, W.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, ExecType.MR); |
| wsloss.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(wsloss); |
| |
| Group grp = new Group(wsloss, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grp.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(grp); |
| |
| Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(AggOp.SUM), DataType.MATRIX, ValueType.DOUBLE, ExecType.MR); |
| agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum |
| agg1.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(agg1); |
| |
| UnaryCP unary1 = new UnaryCP(agg1, HopsOpOp1LopsUS.get(OpOp1.CAST_AS_SCALAR), getDataType(), getValueType()); |
| unary1.getOutputParameters().setDimensions(0, 0, 0, 0, -1); |
| setLineNumbers(unary1); |
| setLops(unary1); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < OptimizerUtils.getRemoteMemBudgetReduce()); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < OptimizerUtils.getRemoteMemBudgetReduce()) |
| || (cacheU && m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetReduce())); |
| |
| Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(grpX); |
| |
| Lop grpW = W.constructLops(); |
| if( grpW.getDataType()==DataType.MATRIX ) { |
| grpW = new Group(W.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpW.getOutputParameters().setDimensions(W.getDim1(), W.getDim2(), W.getRowsInBlock(), W.getColsInBlock(), -1); |
| setLineNumbers(grpW); |
| } |
| |
| Lop lU = null; |
| if( cacheU ) { |
| //partitioning of U for read through distributed cache |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| } |
| else { |
| //replication of U for shuffle to target block |
| Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates |
| lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), |
| U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| |
| Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); |
| setLineNumbers(grpU); |
| lU = grpU; |
| } |
| |
| Lop lV = null; |
| if( cacheV ) { |
| //partitioning of V for read through distributed cache |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| } |
| else { |
| //replication of t(V) for shuffle to target block |
| Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); |
| ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(ltV); |
| |
| Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates |
| lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); |
| lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| |
| Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); |
| setLineNumbers(grpV); |
| lV = grpV; |
| } |
| |
| //reduce-side wsloss w/ or without broadcast |
| Lop wsloss = new WeightedSquaredLossR( |
| grpX, lU, lV, grpW, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); |
| wsloss.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(wsloss); |
| |
| Group grp = new Group(wsloss, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grp.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(grp); |
| |
| Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(AggOp.SUM), DataType.MATRIX, ValueType.DOUBLE, ExecType.MR); |
| agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum |
| agg1.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(agg1); |
| |
| UnaryCP unary1 = new UnaryCP(agg1, HopsOpOp1LopsUS.get(OpOp1.CAST_AS_SCALAR), getDataType(), getValueType()); |
| unary1.getOutputParameters().setDimensions(0, 0, 0, 0, -1); |
| setLineNumbers(unary1); |
| setLops(unary1); |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructSparkLopsWeightedSquaredLoss(WeightsType wtype) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wsloss are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted Squared Loss only if this constraint holds. |
| |
| //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp, |
| //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer |
| //required because the max_int byte buffer constraint has been fixed in Spark 1.4 |
| double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget(); |
| double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| Hop W = getInput().get(3); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWsloss = (!wtype.hasFourInputs() && m1Size+m2Size < memBudgetExec |
| && 2*m1Size < memBudgetLocal && 2*m2Size < memBudgetLocal); |
| |
| if( !FORCE_REPLICATION && isMapWsloss ) //broadcast |
| { |
| //map-side wsloss always with broadcast |
| Lop wsloss = new WeightedSquaredLoss( X.constructLops(), U.constructLops(), V.constructLops(), W.constructLops(), |
| DataType.SCALAR, ValueType.DOUBLE, wtype, ExecType.SPARK); |
| setOutputDimensions(wsloss); |
| setLineNumbers(wsloss); |
| setLops(wsloss); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < memBudgetExec && 2*m1Size < memBudgetLocal); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < memBudgetExec ) |
| || (cacheU && m1Size+m2Size < memBudgetExec)) && 2*m2Size < memBudgetLocal; |
| |
| //reduce-side wsloss w/ or without broadcast |
| Lop wsloss = new WeightedSquaredLossR( |
| X.constructLops(), U.constructLops(), V.constructLops(), W.constructLops(), |
| DataType.SCALAR, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.SPARK); |
| setOutputDimensions(wsloss); |
| setLineNumbers(wsloss); |
| setLops(wsloss); |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructCPLopsWeightedSigmoid(WSigmoidType wtype) |
| throws HopsException, LopsException |
| { |
| WeightedSigmoid wsig = new WeightedSigmoid( |
| getInput().get(0).constructLops(), |
| getInput().get(1).constructLops(), |
| getInput().get(2).constructLops(), |
| getDataType(), getValueType(), wtype, ExecType.CP); |
| |
| //set degree of parallelism |
| int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); |
| wsig.setNumThreads(k); |
| |
| setOutputDimensions( wsig ); |
| setLineNumbers( wsig ); |
| setLops( wsig ); |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructMRLopsWeightedSigmoid( WSigmoidType wtype ) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wsigmoid are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted Sigmoid only if this constraint holds. |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWsig = (m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetMap(true)); |
| |
| if( !FORCE_REPLICATION && isMapWsig ) //broadcast |
| { |
| //partitioning of U |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| |
| //partitioning of V |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| |
| //map-side wsig always with broadcast |
| Lop wsigmoid = new WeightedSigmoid( X.constructLops(), lU, lV, |
| DataType.MATRIX, ValueType.DOUBLE, wtype, ExecType.MR); |
| setOutputDimensions(wsigmoid); |
| setLineNumbers(wsigmoid); |
| setLops( wsigmoid ); |
| |
| //in contrast to wsloss no aggregation required |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < OptimizerUtils.getRemoteMemBudgetReduce()); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < OptimizerUtils.getRemoteMemBudgetReduce()) |
| || (cacheU && m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetReduce())); |
| |
| Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz()); |
| setLineNumbers(grpX); |
| |
| Lop lU = null; |
| if( cacheU ) { |
| //partitioning of U for read through distributed cache |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| } |
| else { |
| //replication of U for shuffle to target block |
| Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates |
| lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), |
| U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| |
| Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); |
| setLineNumbers(grpU); |
| lU = grpU; |
| } |
| |
| Lop lV = null; |
| if( cacheV ) { |
| //partitioning of V for read through distributed cache |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| } |
| else { |
| //replication of t(V) for shuffle to target block |
| Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); |
| ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(ltV); |
| |
| Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates |
| lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); |
| lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| |
| Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); |
| setLineNumbers(grpV); |
| lV = grpV; |
| } |
| |
| //reduce-side wsig w/ or without broadcast |
| Lop wsigmoid = new WeightedSigmoidR( |
| grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); |
| setOutputDimensions(wsigmoid); |
| setLineNumbers(wsigmoid); |
| setLops(wsigmoid); |
| |
| //in contrast to wsloss no aggregation required |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructSparkLopsWeightedSigmoid( WSigmoidType wtype ) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wsigmoid are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted Sigmoid only if this constraint holds. |
| |
| //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp, |
| //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer |
| //required because the max_int byte buffer constraint has been fixed in Spark 1.4 |
| double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget(); |
| double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWsig = (m1Size+m2Size < memBudgetExec |
| && 2*m1Size<memBudgetLocal && 2*m2Size<memBudgetLocal); |
| |
| if( !FORCE_REPLICATION && isMapWsig ) //broadcast |
| { |
| //map-side wsig always with broadcast |
| Lop wsigmoid = new WeightedSigmoid( X.constructLops(), U.constructLops(), V.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, ExecType.SPARK); |
| setOutputDimensions(wsigmoid); |
| setLineNumbers(wsigmoid); |
| setLops( wsigmoid ); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < memBudgetExec && 2*m1Size < memBudgetLocal); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < memBudgetExec ) |
| || (cacheU && m1Size+m2Size < memBudgetExec)) && 2*m2Size < memBudgetLocal; |
| |
| //reduce-side wsig w/ or without broadcast |
| Lop wsigmoid = new WeightedSigmoidR( |
| X.constructLops(), U.constructLops(), V.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.SPARK); |
| setOutputDimensions(wsigmoid); |
| setLineNumbers(wsigmoid); |
| setLops(wsigmoid); |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructCPLopsWeightedDivMM(WDivMMType wtype) |
| throws HopsException, LopsException |
| { |
| WeightedDivMM wdiv = new WeightedDivMM( |
| getInput().get(0).constructLops(), |
| getInput().get(1).constructLops(), |
| getInput().get(2).constructLops(), |
| getInput().get(3).constructLops(), |
| getDataType(), getValueType(), wtype, ExecType.CP); |
| |
| //set degree of parallelism |
| int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); |
| wdiv.setNumThreads(k); |
| |
| setOutputDimensions( wdiv ); |
| setLineNumbers( wdiv ); |
| setLops( wdiv ); |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructMRLopsWeightedDivMM( WDivMMType wtype ) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wdivmm are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted DivMM only if this constraint holds. |
| |
| Hop W = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| Hop X = getInput().get(3); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWdivmm = ((!wtype.hasFourInputs() || wtype.hasScalar()) && |
| m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetMap(true)); |
| |
| if( !FORCE_REPLICATION && isMapWdivmm ) //broadcast |
| { |
| //partitioning of U |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| |
| //partitioning of V |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| |
| //map-side wdivmm always with broadcast |
| Lop wdivmm = new WeightedDivMM( W.constructLops(), lU, lV, X.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, ExecType.MR); |
| setOutputDimensions(wdivmm); |
| setLineNumbers(wdivmm); |
| setLops(wdivmm); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 (both cannot happen for wdivmm, otherwise mapwdivmm) |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < OptimizerUtils.getRemoteMemBudgetReduce()); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < OptimizerUtils.getRemoteMemBudgetReduce()) |
| || (cacheU && m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetReduce())); |
| |
| Group grpW = new Group(W.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpW.getOutputParameters().setDimensions(W.getDim1(), W.getDim2(), W.getRowsInBlock(), W.getColsInBlock(), W.getNnz()); |
| setLineNumbers(grpW); |
| |
| Lop grpX = X.constructLops(); |
| if( wtype.hasFourInputs() && (X.getDataType() != DataType.SCALAR) ) |
| grpX = new Group(grpX, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz()); |
| setLineNumbers(grpX); |
| |
| Lop lU = null; |
| if( cacheU ) { |
| //partitioning of U for read through distributed cache |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| } |
| else { |
| //replication of U for shuffle to target block |
| Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates |
| lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), |
| U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| |
| Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); |
| setLineNumbers(grpU); |
| lU = grpU; |
| } |
| |
| Lop lV = null; |
| if( cacheV ) { |
| //partitioning of V for read through distributed cache |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| } |
| else { |
| //replication of t(V) for shuffle to target block |
| Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); |
| ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(ltV); |
| |
| Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates |
| lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); |
| lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| |
| Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); |
| setLineNumbers(grpV); |
| lV = grpV; |
| } |
| |
| //reduce-side wdivmm w/ or without broadcast |
| Lop wdivmm = new WeightedDivMMR( grpW, lU, lV, grpX, |
| DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); |
| setOutputDimensions(wdivmm); |
| setLineNumbers(wdivmm); |
| setLops(wdivmm); |
| } |
| |
| //in contrast to to wsloss/wsigmoid, wdivmm requires partial aggregation (for the final mm) |
| Group grp = new Group(getLops(), Group.OperationTypes.Sort, getDataType(), getValueType()); |
| setOutputDimensions(grp); |
| setLineNumbers(grp); |
| |
| Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(AggOp.SUM), getDataType(), getValueType(), ExecType.MR); |
| // aggregation uses kahanSum but the inputs do not have correction values |
| agg1.setupCorrectionLocation(CorrectionLocationType.NONE); |
| setOutputDimensions(agg1); |
| setLineNumbers(agg1); |
| |
| setLops(agg1); |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructSparkLopsWeightedDivMM( WDivMMType wtype ) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wdivmm are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted DivMM only if this constraint holds. |
| |
| //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp, |
| //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer |
| //required because the max_int byte buffer constraint has been fixed in Spark 1.4 |
| double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget(); |
| double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); |
| |
| Hop W = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| Hop X = getInput().get(3); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWdivmm = ((!wtype.hasFourInputs() || wtype.hasScalar()) && m1Size+m2Size < memBudgetExec |
| && 2*m1Size<memBudgetLocal && 2*m2Size<memBudgetLocal); |
| |
| if( !FORCE_REPLICATION && isMapWdivmm ) //broadcast |
| { |
| //map-side wdivmm always with broadcast |
| Lop wdivmm = new WeightedDivMM( W.constructLops(), U.constructLops(), V.constructLops(), |
| X.constructLops(), DataType.MATRIX, ValueType.DOUBLE, wtype, ExecType.SPARK); |
| setOutputDimensions(wdivmm); |
| setLineNumbers(wdivmm); |
| setLops( wdivmm ); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < memBudgetExec && 2*m1Size < memBudgetLocal); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < memBudgetExec ) |
| || (cacheU && m1Size+m2Size < memBudgetExec)) && 2*m2Size < memBudgetLocal; |
| |
| //reduce-side wdivmm w/ or without broadcast |
| Lop wdivmm = new WeightedDivMMR( |
| W.constructLops(), U.constructLops(), V.constructLops(), X.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.SPARK); |
| setOutputDimensions(wdivmm); |
| setLineNumbers(wdivmm); |
| setLops(wdivmm); |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructCPLopsWeightedCeMM(WCeMMType wtype) |
| throws HopsException, LopsException |
| { |
| WeightedCrossEntropy wcemm = new WeightedCrossEntropy( |
| getInput().get(0).constructLops(), |
| getInput().get(1).constructLops(), |
| getInput().get(2).constructLops(), |
| getInput().get(3).constructLops(), |
| getDataType(), getValueType(), wtype, ExecType.CP); |
| |
| //set degree of parallelism |
| int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); |
| wcemm.setNumThreads(k); |
| |
| setOutputDimensions( wcemm ); |
| setLineNumbers( wcemm ); |
| setLops( wcemm ); |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructMRLopsWeightedCeMM(WCeMMType wtype) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wcemm are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted Cross Entropy only if this constraint holds. |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| Hop eps = getInput().get(3); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWcemm = (m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetMap(true)); |
| |
| if( !FORCE_REPLICATION && isMapWcemm ) //broadcast |
| { |
| //partitioning of U |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| |
| //partitioning of V |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| |
| //map-side wcemm always with broadcast |
| Lop wcemm = new WeightedCrossEntropy( X.constructLops(), lU, lV, eps.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, ExecType.MR); |
| wcemm.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(wcemm); |
| |
| Group grp = new Group(wcemm, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grp.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(grp); |
| |
| Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(AggOp.SUM), DataType.MATRIX, ValueType.DOUBLE, ExecType.MR); |
| agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum |
| agg1.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(agg1); |
| |
| UnaryCP unary1 = new UnaryCP(agg1, HopsOpOp1LopsUS.get(OpOp1.CAST_AS_SCALAR), getDataType(), getValueType()); |
| unary1.getOutputParameters().setDimensions(0, 0, 0, 0, -1); |
| setLineNumbers(unary1); |
| setLops(unary1); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < OptimizerUtils.getRemoteMemBudgetReduce()); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < OptimizerUtils.getRemoteMemBudgetReduce()) |
| || (cacheU && m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetReduce())); |
| |
| Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(grpX); |
| |
| Lop lU = null; |
| if( cacheU ) { |
| //partitioning of U for read through distributed cache |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| } |
| else { |
| //replication of U for shuffle to target block |
| Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates |
| lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), |
| U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| |
| Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); |
| setLineNumbers(grpU); |
| lU = grpU; |
| } |
| |
| Lop lV = null; |
| if( cacheV ) { |
| //partitioning of V for read through distributed cache |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| } |
| else { |
| //replication of t(V) for shuffle to target block |
| Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); |
| ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(ltV); |
| |
| Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates |
| lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); |
| lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| |
| Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); |
| setLineNumbers(grpV); |
| lV = grpV; |
| } |
| |
| //reduce-side wcemm w/ or without broadcast |
| Lop wcemm = new WeightedCrossEntropyR( grpX, lU, lV, eps.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); |
| wcemm.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(wcemm); |
| |
| Group grp = new Group(wcemm, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grp.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(grp); |
| |
| Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(AggOp.SUM), DataType.MATRIX, ValueType.DOUBLE, ExecType.MR); |
| agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum |
| agg1.getOutputParameters().setDimensions(1, 1, X.getRowsInBlock(), X.getColsInBlock(), -1); |
| setLineNumbers(agg1); |
| |
| UnaryCP unary1 = new UnaryCP(agg1, HopsOpOp1LopsUS.get(OpOp1.CAST_AS_SCALAR), getDataType(), getValueType()); |
| unary1.getOutputParameters().setDimensions(0, 0, 0, 0, -1); |
| setLineNumbers(unary1); |
| setLops(unary1); |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructSparkLopsWeightedCeMM(WCeMMType wtype) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wcemm are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted Cross Entropy only if this constraint holds. |
| |
| //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp, |
| //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer |
| //required because the max_int byte buffer constraint has been fixed in Spark 1.4 |
| double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget(); |
| double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| Hop eps = getInput().get(3); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWcemm = (m1Size+m2Size < memBudgetExec |
| && 2*m1Size < memBudgetLocal && 2*m2Size < memBudgetLocal); |
| |
| if( !FORCE_REPLICATION && isMapWcemm ) //broadcast |
| { |
| //map-side wcemm always with broadcast |
| Lop wcemm = new WeightedCrossEntropy( X.constructLops(), U.constructLops(), V.constructLops(), eps.constructLops(), |
| DataType.SCALAR, ValueType.DOUBLE, wtype, ExecType.SPARK); |
| setOutputDimensions(wcemm); |
| setLineNumbers(wcemm); |
| setLops(wcemm); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < memBudgetExec && 2*m1Size < memBudgetLocal); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < memBudgetExec ) |
| || (cacheU && m1Size+m2Size < memBudgetExec)) && 2*m2Size < memBudgetLocal; |
| |
| //reduce-side wcemm w/ or without broadcast |
| Lop wcemm = new WeightedCrossEntropyR( |
| X.constructLops(), U.constructLops(), V.constructLops(), eps.constructLops(), |
| DataType.SCALAR, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.SPARK); |
| setOutputDimensions(wcemm); |
| setLineNumbers(wcemm); |
| setLops(wcemm); |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructCPLopsWeightedUMM(WUMMType wtype) |
| throws HopsException, LopsException |
| { |
| Unary.OperationTypes uop = _uop!=null ? |
| HopsOpOp1LopsU.get(_uop) : _sop==OpOp2.POW ? |
| Unary.OperationTypes.POW2 : Unary.OperationTypes.MULTIPLY2; |
| |
| WeightedUnaryMM wumm = new WeightedUnaryMM( |
| getInput().get(0).constructLops(), |
| getInput().get(1).constructLops(), |
| getInput().get(2).constructLops(), |
| getDataType(), getValueType(), wtype, uop, ExecType.CP); |
| |
| //set degree of parallelism |
| int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); |
| wumm.setNumThreads(k); |
| |
| setOutputDimensions( wumm ); |
| setLineNumbers( wumm ); |
| setLops( wumm ); |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructMRLopsWeightedUMM( WUMMType wtype ) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wumm are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted UnaryMM only if this constraint holds. |
| |
| Unary.OperationTypes uop = _uop!=null ? |
| HopsOpOp1LopsU.get(_uop) : _sop==OpOp2.POW ? |
| Unary.OperationTypes.POW2 : Unary.OperationTypes.MULTIPLY2; |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWumm = (m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetMap(true)); |
| |
| if( !FORCE_REPLICATION && isMapWumm ) //broadcast |
| { |
| //partitioning of U |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| |
| //partitioning of V |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| Lop lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| |
| //map-side wumm always with broadcast |
| Lop wumm = new WeightedUnaryMM( X.constructLops(), lU, lV, |
| DataType.MATRIX, ValueType.DOUBLE, wtype, uop, ExecType.MR); |
| setOutputDimensions(wumm); |
| setLineNumbers(wumm); |
| setLops( wumm ); |
| |
| //in contrast to wsloss no aggregation required |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < OptimizerUtils.getRemoteMemBudgetReduce()); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < OptimizerUtils.getRemoteMemBudgetReduce()) |
| || (cacheU && m1Size+m2Size < OptimizerUtils.getRemoteMemBudgetReduce())); |
| |
| Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz()); |
| setLineNumbers(grpX); |
| |
| Lop lU = null; |
| if( cacheU ) { |
| //partitioning of U for read through distributed cache |
| boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lU = U.constructLops(); |
| if( needPartU ){ //requires partitioning |
| lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| } |
| } |
| else { |
| //replication of U for shuffle to target block |
| Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates |
| lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); |
| lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), |
| U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); |
| setLineNumbers(lU); |
| |
| Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); |
| setLineNumbers(grpU); |
| lU = grpU; |
| } |
| |
| Lop lV = null; |
| if( cacheV ) { |
| //partitioning of V for read through distributed cache |
| boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; |
| lV = V.constructLops(); |
| if( needPartV ){ //requires partitioning |
| lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); |
| lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| } |
| } |
| else { |
| //replication of t(V) for shuffle to target block |
| Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); |
| ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(ltV); |
| |
| Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates |
| lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); |
| lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), |
| V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); |
| setLineNumbers(lV); |
| |
| Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); |
| grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); |
| setLineNumbers(grpV); |
| lV = grpV; |
| } |
| |
| //reduce-side wumm w/ or without broadcast |
| Lop wumm = new WeightedUnaryMMR( |
| grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, uop, cacheU, cacheV, ExecType.MR); |
| setOutputDimensions(wumm); |
| setLineNumbers(wumm); |
| setLops(wumm); |
| |
| //in contrast to wsloss no aggregation required |
| } |
| } |
| |
| /** |
| * |
| * @param wtype |
| * @throws HopsException |
| * @throws LopsException |
| */ |
| private void constructSparkLopsWeightedUMM( WUMMType wtype ) |
| throws HopsException, LopsException |
| { |
| //NOTE: the common case for wumm are factors U/V with a rank of 10s to 100s; the current runtime only |
| //supports single block outer products (U/V rank <= blocksize, i.e., 1000 by default); we enforce this |
| //by applying the hop rewrite for Weighted UnaryMM only if this constraint holds. |
| |
| Unary.OperationTypes uop = _uop!=null ? |
| HopsOpOp1LopsU.get(_uop) : _sop==OpOp2.POW ? |
| Unary.OperationTypes.POW2 : Unary.OperationTypes.MULTIPLY2; |
| |
| //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp, |
| //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer |
| //required because the max_int byte buffer constraint has been fixed in Spark 1.4 |
| double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget(); |
| double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); |
| |
| Hop X = getInput().get(0); |
| Hop U = getInput().get(1); |
| Hop V = getInput().get(2); |
| |
| //MR operator selection, part1 |
| double m1Size = OptimizerUtils.estimateSize(U.getDim1(), U.getDim2()); //size U |
| double m2Size = OptimizerUtils.estimateSize(V.getDim1(), V.getDim2()); //size V |
| boolean isMapWsloss = (m1Size+m2Size < memBudgetExec |
| && 2*m1Size<memBudgetLocal && 2*m2Size<memBudgetLocal); |
| |
| if( !FORCE_REPLICATION && isMapWsloss ) //broadcast |
| { |
| //map-side wumm always with broadcast |
| Lop wumm = new WeightedUnaryMM( X.constructLops(), U.constructLops(), V.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, uop, ExecType.SPARK); |
| setOutputDimensions(wumm); |
| setLineNumbers(wumm); |
| setLops( wumm ); |
| } |
| else //general case |
| { |
| //MR operator selection part 2 |
| boolean cacheU = !FORCE_REPLICATION && (m1Size < memBudgetExec && 2*m1Size < memBudgetLocal); |
| boolean cacheV = !FORCE_REPLICATION && ((!cacheU && m2Size < memBudgetExec ) |
| || (cacheU && m1Size+m2Size < memBudgetExec)) && 2*m2Size < memBudgetLocal; |
| |
| //reduce-side wumm w/ or without broadcast |
| Lop wumm = new WeightedUnaryMMR( |
| X.constructLops(), U.constructLops(), V.constructLops(), |
| DataType.MATRIX, ValueType.DOUBLE, wtype, uop, cacheU, cacheV, ExecType.SPARK); |
| setOutputDimensions(wumm); |
| setLineNumbers(wumm); |
| setLops(wumm); |
| } |
| } |
| |
| /** |
| * |
| * @return |
| */ |
| private WeightsType checkWeightsType() |
| { |
| WeightsType ret = WeightsType.NONE; |
| if( !(getInput().get(3) instanceof LiteralOp) ){ |
| if( _postWeights ) |
| ret = WeightsType.POST; |
| else |
| ret = WeightsType.PRE; |
| } |
| else if( _postWeights ){ |
| ret = WeightsType.POST_NZ; |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * |
| * @return |
| */ |
| private WSigmoidType checkWSigmoidType() |
| { |
| |
| if( _logout && _minusin ) |
| return WSigmoidType.LOG_MINUS; |
| else if( _logout ) |
| return WSigmoidType.LOG; |
| else if( _minusin ) |
| return WSigmoidType.MINUS; |
| else |
| return WSigmoidType.BASIC; |
| } |
| |
| /** |
| * |
| * @return |
| */ |
| private WDivMMType checkWDivMMType() |
| { |
| switch( _baseType ) |
| { |
| case 0: //BASIC |
| return WDivMMType.MULT_BASIC; |
| case 1: //LEFT |
| if( getInput().get(3).getDataType()==DataType.MATRIX ) |
| return WDivMMType.MULT_MINUS_4_LEFT; |
| else if( _minus ) |
| return WDivMMType.MULT_MINUS_LEFT; |
| else |
| return _mult ? WDivMMType.MULT_LEFT : WDivMMType.DIV_LEFT; |
| case 2: //RIGHT |
| if( getInput().get(3).getDataType()==DataType.MATRIX ) |
| return WDivMMType.MULT_MINUS_4_RIGHT; |
| else if( _minus ) |
| return WDivMMType.MULT_MINUS_RIGHT; |
| else |
| return _mult ? WDivMMType.MULT_RIGHT : WDivMMType.DIV_RIGHT; |
| case 3: //LEFT w/EPS |
| return WDivMMType.DIV_LEFT_EPS; |
| case 4: //RIGHT w/EPS |
| return WDivMMType.DIV_RIGHT_EPS; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * |
| * @return |
| */ |
| private WCeMMType checkWCeMMType() |
| { |
| return _baseType == 1 ? WCeMMType.BASIC_EPS : WCeMMType.BASIC; |
| } |
| |
| @Override |
| protected double computeOutputMemEstimate( long dim1, long dim2, long nnz ) |
| { |
| switch( _op ) { |
| case WSLOSS: //always scalar output |
| case WCEMM: |
| return OptimizerUtils.DOUBLE_SIZE; |
| |
| case WSIGMOID: |
| case WDIVMM: |
| case WUMM: |
| double sp = OptimizerUtils.getSparsity(dim1, dim2, nnz); |
| return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sp); |
| |
| default: |
| return 0; |
| } |
| } |
| |
| @Override |
| protected double computeIntermediateMemEstimate( long dim1, long dim2, long nnz ) |
| { |
| //no intermediates |
| return 0; |
| } |
| |
| @Override |
| protected long[] inferOutputCharacteristics( MemoTable memo ) |
| { |
| long[] ret = null; |
| |
| switch( _op ) { |
| case WSLOSS: //always scalar output |
| ret = null; |
| break; |
| |
| case WSIGMOID: |
| case WUMM: { |
| MatrixCharacteristics mcW = memo.getAllInputStats(getInput().get(0)); |
| ret = new long[]{mcW.getRows(), mcW.getCols(), mcW.getNonZeros()}; |
| break; |
| } |
| |
| case WDIVMM: { |
| if( _baseType == 0 ){ //basic |
| MatrixCharacteristics mcW = memo.getAllInputStats(getInput().get(0)); |
| ret = new long[]{mcW.getRows(), mcW.getCols(), mcW.getNonZeros()}; |
| } |
| if( _baseType == 1 || _baseType == 3 ) { //left (w/ transpose or w/ epsilon) |
| MatrixCharacteristics mcV = memo.getAllInputStats(getInput().get(2)); |
| ret = new long[]{mcV.getRows(), mcV.getCols(), -1}; |
| } |
| else { //right |
| MatrixCharacteristics mcU = memo.getAllInputStats(getInput().get(1)); |
| ret = new long[]{mcU.getRows(), mcU.getCols(), -1}; |
| } |
| break; |
| } |
| |
| default: |
| throw new RuntimeException("Memory for operation (" + _op + ") can not be estimated."); |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| protected ExecType optFindExecType() |
| throws HopsException |
| { |
| checkAndSetForcedPlatform(); |
| |
| ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR; |
| |
| if( _etypeForced != null ) |
| { |
| _etype = _etypeForced; |
| } |
| else |
| { |
| if ( OptimizerUtils.isMemoryBasedOptLevel() ) { |
| _etype = findExecTypeByMemEstimate(); |
| } |
| else if ( (getInput().get(0).areDimsBelowThreshold() |
| && getInput().get(1).areDimsBelowThreshold() |
| && getInput().get(2).areDimsBelowThreshold() |
| && getInput().get(3).areDimsBelowThreshold()) ) |
| _etype = ExecType.CP; |
| else |
| _etype = REMOTE; |
| |
| //check for valid CP dimensions and matrix size |
| checkAndSetInvalidCPDimsAndSize(); |
| } |
| |
| //mark for recompile (forever) |
| if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && _etype==REMOTE ) |
| setRequiresRecompile(); |
| |
| return _etype; |
| } |
| |
| @Override |
| public void refreshSizeInformation() |
| { |
| switch( _op ) { |
| case WSLOSS: |
| //do nothing: always scalar |
| break; |
| |
| case WSIGMOID: |
| case WUMM: { |
| Hop inW = getInput().get(0); |
| setDim1( inW.getDim1() ); |
| setDim2( inW.getDim2() ); |
| setNnz( inW.getNnz() ); |
| break; |
| } |
| |
| case WDIVMM: { |
| if( _baseType == 0 ) { //basic |
| Hop inW = getInput().get(0); |
| setDim1( inW.getDim1() ); |
| setDim2( inW.getDim2() ); |
| setNnz( inW.getNnz() ); |
| } |
| else if( _baseType == 1 || _baseType == 3 ){ //left (w/ transpose or w/ epsilon) |
| Hop inV = getInput().get(2); |
| setDim1( inV.getDim1() ); |
| setDim2( inV.getDim2() ); |
| } |
| else { //right |
| Hop inU = getInput().get(1); |
| setDim1( inU.getDim1() ); |
| setDim2( inU.getDim2() ); |
| } |
| break; |
| } |
| |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| public Object clone() throws CloneNotSupportedException |
| { |
| QuaternaryOp ret = new QuaternaryOp(); |
| |
| //copy generic attributes |
| ret.clone(this, false); |
| |
| //copy specific attributes |
| ret._op = _op; |
| ret._postWeights = _postWeights; |
| ret._logout = _logout; |
| ret._minusin = _minusin; |
| ret._baseType = _baseType; |
| ret._mult = _mult; |
| ret._minus = _minus; |
| ret._umult = _umult; |
| ret._uop = _uop; |
| ret._sop = _sop; |
| ret._maxNumThreads = _maxNumThreads; |
| |
| return ret; |
| } |
| |
| @Override |
| public boolean compare( Hop that ) |
| { |
| if( !(that instanceof QuaternaryOp) ) |
| return false; |
| |
| QuaternaryOp that2 = (QuaternaryOp)that; |
| |
| //compare basic inputs and weights (always existing) |
| boolean ret = (_op == that2._op |
| && getInput().size() == getInput().size() |
| && getInput().get(0) == that2.getInput().get(0) |
| && getInput().get(1) == that2.getInput().get(1) |
| && getInput().get(2) == that2.getInput().get(2) ); |
| |
| //check for 4th argument if same size (see above) |
| if( ret && getInput().size()==4 ) |
| ret &= (getInput().get(3) == that2.getInput().get(3)); |
| |
| //compare specific parameters |
| ret &= _postWeights == that2._postWeights; |
| ret &= _logout == that2._logout; |
| ret &= _minusin == that2._minusin; |
| ret &= _baseType == that2._baseType; |
| ret &= _mult == that2._mult; |
| ret &= _minus == that2._minus; |
| ret &= _umult == that2._umult; |
| ret &= _uop == that2._uop; |
| ret &= _sop == that2._sop; |
| ret &= _maxNumThreads == that2._maxNumThreads; |
| |
| return ret; |
| } |
| } |