| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.controlprogram.parfor.opt; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.sysds.api.DMLScript; |
| import org.apache.sysds.common.Types.FileFormat; |
| import org.apache.sysds.common.Types.OpOpN; |
| import org.apache.sysds.conf.ConfigurationManager; |
| import org.apache.sysds.hops.AggBinaryOp; |
| import org.apache.sysds.hops.AggBinaryOp.MMultMethod; |
| import org.apache.sysds.hops.DataOp; |
| import org.apache.sysds.hops.FunctionOp; |
| import org.apache.sysds.hops.Hop; |
| import org.apache.sysds.hops.IndexingOp; |
| import org.apache.sysds.hops.LeftIndexingOp; |
| import org.apache.sysds.hops.LiteralOp; |
| import org.apache.sysds.hops.MemoTable; |
| import org.apache.sysds.hops.MultiThreadedHop; |
| import org.apache.sysds.hops.OptimizerUtils; |
| import org.apache.sysds.hops.recompile.Recompiler; |
| import org.apache.sysds.hops.rewrite.HopRewriteUtils; |
| import org.apache.sysds.hops.rewrite.ProgramRewriteStatus; |
| import org.apache.sysds.hops.rewrite.ProgramRewriter; |
| import org.apache.sysds.hops.rewrite.RewriteInjectSparkLoopCheckpointing; |
| import org.apache.sysds.lops.LopProperties; |
| import org.apache.sysds.parser.DMLProgram; |
| import org.apache.sysds.parser.FunctionStatementBlock; |
| import org.apache.sysds.parser.ParForStatement; |
| import org.apache.sysds.parser.ParForStatementBlock; |
| import org.apache.sysds.parser.ParForStatementBlock.ResultVar; |
| import org.apache.sysds.parser.StatementBlock; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.ForProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.LocalVariableMap; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitioner; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PExecMode; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.POptMode; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PResultMerge; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat; |
| import org.apache.sysds.runtime.controlprogram.Program; |
| import org.apache.sysds.runtime.controlprogram.ProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; |
| import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; |
| import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; |
| import org.apache.sysds.runtime.controlprogram.paramserv.ParamservUtils; |
| import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile; |
| import org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType; |
| import org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure; |
| import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.ExecType; |
| import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.NodeType; |
| import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.ParamType; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; |
| import org.apache.sysds.runtime.data.SparseRowVector; |
| import org.apache.sysds.runtime.instructions.Instruction; |
| import org.apache.sysds.runtime.instructions.cp.Data; |
| import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction; |
| import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool; |
| import org.apache.sysds.runtime.instructions.spark.data.RDDObject; |
| import org.apache.sysds.runtime.io.IOUtilFunctions; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.meta.MetaDataFormat; |
| import org.apache.sysds.runtime.util.ProgramConverter; |
| import org.apache.sysds.utils.NativeHelper; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Rule-Based ParFor Optimizer (time: O(n)): |
| * |
| * Applied rule-based rewrites |
| * - 1) rewrite set data partitioner (incl. recompile RIX) |
| * - 2) rewrite remove unnecessary compare matrix |
| * - 3) rewrite result partitioning (incl. recompile LIX) |
| * - 4) rewrite set execution strategy |
| * - 5) rewrite set operations exec type (incl. recompile) |
| * - 6) rewrite use data colocation |
| * - 7) rewrite set partition replication factor |
| * - 8) rewrite set export replication factor |
| * - 9) rewrite use nested parallelism |
| * - 10) rewrite set degree of parallelism |
| * - 11) rewrite set task partitioner |
| * - 12) rewrite set fused data partitioning and execution |
| * - 13) rewrite transpose vector operations (for sparse) |
| * - 14) rewrite set in-place result indexing |
| * - 15) rewrite disable caching (prevent sparse serialization) |
| * - 16) rewrite enable runtime piggybacking |
| * - 17) rewrite inject spark loop checkpointing |
| * - 18) rewrite inject spark repartition (for zipmm) |
| * - 19) rewrite set spark eager rdd caching |
| * - 20) rewrite set result merge |
| * - 21) rewrite set recompile memory budget |
| * - 22) rewrite remove recursive parfor |
| * - 23) rewrite remove unnecessary parfor |
| * |
| * TODO fuse also result merge into fused data partitioning and execute |
| * (for writing the result directly from execute we need to partition |
| * columns/rows according to blocksize -> rewrite (only applicable if |
| * numCols/blocksize>numreducers)+custom MR partitioner) |
| * |
| * |
| * TODO take remote memory into account in data/result partitioning rewrites (smaller/larger) |
| * TODO memory estimates with shared reads |
| * TODO memory estimates of result merge into plan tree |
| * TODO blockwise partitioning |
| * |
| */ |
| public class OptimizerRuleBased extends Optimizer { |
| private static final Log LOG = LogFactory.getLog(OptimizerRuleBased.class.getName()); |
| |
| public static final double PROB_SIZE_THRESHOLD_REMOTE = 100; //wrt # top-level iterations (min) |
| public static final double PROB_SIZE_THRESHOLD_PARTITIONING = 2; //wrt # top-level iterations (min) |
| public static final double PROB_SIZE_THRESHOLD_MB = 256*1024*1024; //wrt overall memory consumption (min) |
| public static final int MAX_REPLICATION_FACTOR_PARTITIONING = 5; |
| public static final int MAX_REPLICATION_FACTOR_EXPORT = 7; |
| public static final boolean ALLOW_REMOTE_NESTED_PARALLELISM = false; |
| public static final String FUNCTION_UNFOLD_NAMEPREFIX = "__unfold_"; |
| |
| public static final double PAR_K_FACTOR = OptimizationWrapper.PAR_FACTOR_INFRASTRUCTURE; |
| public static final double PAR_K_MR_FACTOR = 1.0 * OptimizationWrapper.PAR_FACTOR_INFRASTRUCTURE; |
| |
| //problem and infrastructure properties |
| protected long _N = -1; //problemsize |
| protected long _Nmax = -1; //max problemsize (including subproblems) |
| protected int _lk = -1; //local par |
| protected int _lkmaxCP = -1; //local max par (if only CP inst) |
| protected int _lkmaxMR = -1; //local max par (if also MR inst) |
| protected int _rnk = -1; //remote num nodes |
| protected int _rk = -1; //remote par (mappers) |
| protected int _rk2 = -1; //remote par (reducers) |
| protected int _rkmax = -1; //remote max par (mappers) |
| protected int _rkmax2 = -1; //remote max par (reducers) |
| protected double _lm = -1; //local memory constraint |
| protected double _rm = -1; //remote memory constraint (mappers) |
| protected double _rm2 = -1; //remote memory constraint (reducers) |
| |
| protected CostEstimator _cost = null; |
| |
| @Override |
| public CostModelType getCostModelType() { |
| return CostModelType.STATIC_MEM_METRIC; |
| } |
| |
| |
| @Override |
| public PlanInputType getPlanInputType() { |
| return PlanInputType.ABSTRACT_PLAN; |
| } |
| |
| @Override |
| public POptMode getOptMode() { |
| return POptMode.RULEBASED; |
| } |
| |
| /** |
| * Main optimization procedure. |
| * |
| * Transformation-based heuristic (rule-based) optimization |
| * (no use of sb, direct change of pb). |
| */ |
| @Override |
| public boolean optimize(ParForStatementBlock sb, ParForProgramBlock pb, OptTree plan, CostEstimator est, ExecutionContext ec) |
| { |
| LOG.debug("--- "+getOptMode()+" OPTIMIZER -------"); |
| |
| OptNode pn = plan.getRoot(); |
| |
| //early abort for empty parfor body |
| if( pn.isLeaf() ) |
| return true; |
| |
| //ANALYZE infrastructure properties |
| analyzeProblemAndInfrastructure( pn ); |
| |
| _cost = est; |
| |
| //debug and warnings output |
| if( LOG.isDebugEnabled() ) { |
| LOG.debug(getOptMode()+" OPT: Optimize w/ max_mem="+toMB(_lm)+"/"+toMB(_rm)+"/"+toMB(_rm2)+", max_k="+_lk+"/"+_rk+"/"+_rk2+")." ); |
| if( OptimizerUtils.isSparkExecutionMode() ) |
| LOG.debug(getOptMode()+" OPT: Optimize w/ "+SparkExecutionContext.getSparkClusterConfig().toString()); |
| if( _rnk <= 0 || _rk <= 0 ) |
| LOG.warn(getOptMode()+" OPT: Optimize for inactive cluster (num_nodes="+_rnk+", num_map_slots="+_rk+")." ); |
| } |
| |
| //ESTIMATE memory consumption |
| pn.setSerialParFor(); //for basic mem consumption |
| double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); |
| LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) ); |
| |
| //OPTIMIZE PARFOR PLAN |
| |
| // rewrite 1: data partitioning (incl. log. recompile RIX and flag opt nodes) |
| HashMap<String, PartitionFormat> partitionedMatrices = new HashMap<>(); |
| rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget(), false ); |
| double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate |
| |
| // rewrite 2: remove unnecessary compare matrix (before result partitioning) |
| rewriteRemoveUnnecessaryCompareMatrix(pn, ec); |
| |
| // rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX) |
| boolean flagLIX = rewriteSetResultPartitioning( pn, M0b, ec.getVariables() ); |
| double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate |
| LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) ); |
| |
| //determine memory consumption for what-if: all-cp or partitioned |
| double M2 = pn.isCPOnly() ? M1 : |
| _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP); |
| LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) ); |
| double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true); |
| LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) ); |
| |
| // rewrite 4: execution strategy |
| boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX ); |
| |
| //exec-type-specific rewrites |
| if( pn.getExecType() == getRemoteExecType() ) |
| { |
| if( M1 > _rm && M3 <= _rm ) { |
| // rewrite 1: data partitioning (apply conditional partitioning) |
| rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3, false ); |
| M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate |
| } |
| |
| if( flagRecompMR ){ |
| //rewrite 5: set operations exec type |
| rewriteSetOperationsExecType( pn, flagRecompMR ); |
| M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate |
| } |
| |
| // rewrite 6: data colocation |
| rewriteDataColocation( pn, ec.getVariables() ); |
| |
| // rewrite 7: rewrite set partition replication factor |
| rewriteSetPartitionReplicationFactor( pn, partitionedMatrices, ec.getVariables() ); |
| |
| // rewrite 8: rewrite set partition replication factor |
| rewriteSetExportReplicationFactor( pn, ec.getVariables() ); |
| |
| // rewrite 10: determine parallelism |
| rewriteSetDegreeOfParallelism( pn, _cost, ec.getVariables(), M1, false ); |
| |
| // rewrite 11: task partitioning |
| rewriteSetTaskPartitioner( pn, false, flagLIX ); |
| |
| // rewrite 12: fused data partitioning and execution |
| rewriteSetFusedDataPartitioningExecution(pn, M1, flagLIX, partitionedMatrices, ec.getVariables()); |
| |
| // rewrite 14: set in-place result indexing |
| HashSet<ResultVar> inplaceResultVars = new HashSet<>(); |
| rewriteSetInPlaceResultIndexing(pn, _cost, ec.getVariables(), inplaceResultVars, ec); |
| } |
| else //if( pn.getExecType() == ExecType.CP ) |
| { |
| // rewrite 10: determine parallelism |
| rewriteSetDegreeOfParallelism( pn, _cost, ec.getVariables(), M1, false ); |
| |
| // rewrite 11: task partitioning |
| rewriteSetTaskPartitioner( pn, false, false ); //flagLIX always false |
| |
| // rewrite 14: set in-place result indexing |
| HashSet<ResultVar> inplaceResultVars = new HashSet<>(); |
| rewriteSetInPlaceResultIndexing(pn, _cost, ec.getVariables(), inplaceResultVars, ec); |
| |
| //rewrite 17: checkpoint injection for parfor loop body |
| rewriteInjectSparkLoopCheckpointing( pn ); |
| |
| //rewrite 18: repartition read-only inputs for zipmm |
| rewriteInjectSparkRepartition( pn, ec.getVariables() ); |
| |
| //rewrite 19: eager caching for checkpoint rdds |
| rewriteSetSparkEagerRDDCaching( pn, ec.getVariables() ); |
| } |
| |
| // rewrite 20: set result merge |
| rewriteSetResultMerge( pn, ec.getVariables(), true ); |
| |
| // rewrite 21: set local recompile memory budget |
| rewriteSetRecompileMemoryBudget( pn ); |
| |
| /////// |
| //Final rewrites for cleanup / minor improvements |
| |
| // rewrite 22: parfor (in recursive functions) to for |
| rewriteRemoveRecursiveParFor( pn, ec.getVariables() ); |
| |
| // rewrite 23: parfor (par=1) to for |
| rewriteRemoveUnnecessaryParFor( pn ); |
| |
| //info optimization result |
| _numTotalPlans = -1; //_numEvaluatedPlans maintained in rewrites; |
| return true; |
| } |
| |
| protected void analyzeProblemAndInfrastructure( OptNode pn ) |
| { |
| _N = Long.parseLong(pn.getParam(ParamType.NUM_ITERATIONS)); |
| _Nmax = pn.getMaxProblemSize(); |
| _lk = InfrastructureAnalyzer.getLocalParallelism(); |
| _lkmaxCP = (int) Math.ceil( PAR_K_FACTOR * _lk ); |
| _lkmaxMR = (int) Math.ceil( PAR_K_MR_FACTOR * _lk ); |
| _lm = OptimizerUtils.getLocalMemBudget(); |
| |
| //spark-specific cluster characteristics |
| if( OptimizerUtils.isSparkExecutionMode() ) { |
| //we get all required cluster characteristics from spark's configuration |
| //to avoid invoking yarns cluster status |
| _rnk = SparkExecutionContext.getNumExecutors(); |
| _rk = SparkExecutionContext.getDefaultParallelism(true); |
| _rk2 = _rk; //equal map/reduce unless we find counter-examples |
| int cores = SparkExecutionContext.getDefaultParallelism(true) |
| / SparkExecutionContext.getNumExecutors(); |
| int ccores = Math.max((int) Math.min(cores, _N), 1); |
| _rm = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; |
| _rm2 = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; |
| } |
| //single node |
| else { |
| _rnk = 1; |
| _rk = InfrastructureAnalyzer.getLocalParallelism(); |
| _rk2 = InfrastructureAnalyzer.getLocalParallelism(); |
| _rm = InfrastructureAnalyzer.getLocalMaxMemory()/_rk; |
| _rm2 = InfrastructureAnalyzer.getLocalMaxMemory()/_rk2; |
| } |
| |
| _rkmax = (int) Math.ceil( PAR_K_FACTOR * _rk ); |
| _rkmax2 = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); |
| } |
| |
| protected ExecType getRemoteExecType() { |
| return ExecType.SPARK; |
| } |
| |
| /////// |
| //REWRITE set data partitioner |
| /// |
| |
| protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PartitionFormat> partitionedMatrices, double thetaM, boolean constrained ) |
| { |
| if( n.getNodeType() != NodeType.PARFOR ) |
| LOG.warn(getOptMode()+" OPT: Data partitioner can only be set for a ParFor node."); |
| |
| boolean blockwise = false; |
| |
| //preparations |
| long id = n.getID(); |
| Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(id); |
| ParForStatementBlock pfsb = (ParForStatementBlock) o[0]; |
| ParForProgramBlock pfpb = (ParForProgramBlock) o[1]; |
| |
| //search for candidates |
| boolean apply = false; |
| if( OptimizerUtils.isHybridExecutionMode() //only if we are allowed to recompile |
| && (_N >= PROB_SIZE_THRESHOLD_PARTITIONING || _Nmax >= PROB_SIZE_THRESHOLD_PARTITIONING) ) //only if beneficial wrt problem size |
| { |
| HashMap<String, PartitionFormat> cand2 = new HashMap<>(); |
| for( String c : pfsb.getReadOnlyParentMatrixVars() ) { |
| PartitionFormat dpf = pfsb.determineDataPartitionFormat( c ); |
| double mem = getMemoryEstimate(c, vars); |
| if( dpf != PartitionFormat.NONE |
| && dpf._dpf != PDataPartitionFormat.BLOCK_WISE_M_N |
| && (constrained || (mem > _lm/2 && mem > _rm/2)) |
| && vars.get(c) != null //robustness non-existing vars |
| && !vars.get(c).getDataType().isList() ) { |
| cand2.put( c, dpf ); |
| } |
| } |
| apply = rFindDataPartitioningCandidates(n, cand2, vars, thetaM); |
| if( apply ) |
| partitionedMatrices.putAll(cand2); |
| } |
| |
| PDataPartitioner REMOTE = PDataPartitioner.REMOTE_SPARK; |
| PDataPartitioner pdp = (apply)? REMOTE : PDataPartitioner.NONE; |
| //NOTE: since partitioning is only applied in case of MR index access, we assume a large |
| // matrix and hence always apply REMOTE_MR (the benefit for large matrices outweigths |
| // potentially unnecessary MR jobs for smaller matrices) |
| |
| // modify rtprog |
| pfpb.setDataPartitioner( pdp ); |
| // modify plan |
| n.addParam(ParamType.DATA_PARTITIONER, pdp.toString()); |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set data partitioner' - result="+pdp.toString()+ |
| " ("+Arrays.toString(partitionedMatrices.keySet().toArray())+")" ); |
| |
| return blockwise; |
| } |
| |
| protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PartitionFormat> cand, LocalVariableMap vars, double thetaM ) |
| { |
| boolean ret = false; |
| |
| if( !n.isLeaf() ) { |
| for( OptNode cn : n.getChilds() ) |
| if( cn.getNodeType() != NodeType.FUNCCALL ) //prevent conflicts with aliases |
| ret |= rFindDataPartitioningCandidates( cn, cand, vars, thetaM ); |
| } |
| else if( n.getNodeType()== NodeType.HOP |
| && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) ) |
| { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| String inMatrix = h.getInput().get(0).getName(); |
| if( cand.containsKey(inMatrix) && h.getDataType().isMatrix() ) //Required: partitionable |
| { |
| PartitionFormat dpf = cand.get(inMatrix); |
| double mnew = getNewRIXMemoryEstimate( n, inMatrix, dpf, vars ); |
| //NOTE: for the moment, we do not partition according to the remote mem, because we can execute |
| //it even without partitioning in CP. However, advanced optimizers should reason about this |
| //double mold = h.getMemEstimate(); |
| if( n.getExecType() == getRemoteExecType() //Opt Condition: MR/Spark |
| || h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning |
| { |
| //NOTE: subsequent rewrites will still use the MR mem estimate |
| //(guarded by subsequent operations that have at least the memory req of one partition) |
| n.setExecType(ExecType.CP); //partition ref only (see below) |
| n.addParam(ParamType.DATA_PARTITION_FORMAT, dpf.toString()); |
| h.setMemEstimate( mnew ); //CP vs CP_FILE in ProgramRecompiler bases on mem_estimate |
| ret = true; |
| } |
| //keep track of nodes that allow conditional data partitioning and their mem |
| else |
| { |
| n.addParam(ParamType.DATA_PARTITION_COND, String.valueOf(true)); |
| n.addParam(ParamType.DATA_PARTITION_COND_MEM, String.valueOf(mnew)); |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * TODO consolidate mem estimation with Indexing Hop |
| * |
| * NOTE: Using the dimensions without sparsity is a conservative worst-case consideration. |
| * |
| * @param n internal representation of a plan alternative for program blocks and instructions |
| * @param varName variable name |
| * @param dpf data partition format |
| * @param vars local variable map |
| * @return memory estimate |
| */ |
| protected double getNewRIXMemoryEstimate( OptNode n, String varName, PartitionFormat dpf, LocalVariableMap vars ) |
| { |
| double mem = -1; |
| |
| //not all intermediates need to be known or existing on optimize |
| Data dat = vars.get( varName ); |
| if( dat != null && dat instanceof MatrixObject ) |
| { |
| MatrixObject mo = (MatrixObject) dat; |
| |
| //those are worst-case (dense) estimates |
| switch( dpf._dpf ) |
| { |
| case COLUMN_WISE: |
| mem = OptimizerUtils.estimateSize(mo.getNumRows(), 1); |
| break; |
| case ROW_WISE: |
| mem = OptimizerUtils.estimateSize(1, mo.getNumColumns()); |
| break; |
| case COLUMN_BLOCK_WISE_N: |
| mem = OptimizerUtils.estimateSize(mo.getNumRows(), dpf._N); |
| break; |
| case ROW_BLOCK_WISE_N: |
| mem = OptimizerUtils.estimateSize(dpf._N, mo.getNumColumns()); |
| break; |
| default: |
| //do nothing |
| } |
| } |
| |
| return mem; |
| } |
| |
| protected double getMemoryEstimate(String varName, LocalVariableMap vars) { |
| Data dat = vars.get(varName); |
| return (dat instanceof MatrixObject) ? |
| OptimizerUtils.estimateSize(((MatrixObject)dat).getDataCharacteristics()) : |
| OptimizerUtils.DEFAULT_SIZE; |
| } |
| |
| protected static LopProperties.ExecType getRIXExecType( MatrixObject mo, PDataPartitionFormat dpf, boolean withSparsity ) |
| { |
| double mem = -1; |
| |
| long rlen = mo.getNumRows(); |
| long clen = mo.getNumColumns(); |
| long blen = mo.getBlocksize(); |
| long nnz = mo.getNnz(); |
| double lsparsity = ((double)nnz)/rlen/clen; |
| double sparsity = withSparsity ? lsparsity : 1.0; |
| |
| switch( dpf ) |
| { |
| case COLUMN_WISE: |
| mem = OptimizerUtils.estimateSizeExactSparsity(mo.getNumRows(), 1, sparsity); |
| break; |
| case COLUMN_BLOCK_WISE: |
| mem = OptimizerUtils.estimateSizeExactSparsity(mo.getNumRows(), blen, sparsity); |
| break; |
| case ROW_WISE: |
| mem = OptimizerUtils.estimateSizeExactSparsity(1, mo.getNumColumns(), sparsity); |
| break; |
| case ROW_BLOCK_WISE: |
| mem = OptimizerUtils.estimateSizeExactSparsity(blen, mo.getNumColumns(), sparsity); |
| break; |
| |
| default: |
| //do nothing |
| } |
| |
| if( mem < OptimizerUtils.getLocalMemBudget() ) |
| return LopProperties.ExecType.CP; |
| else |
| return LopProperties.ExecType.CP_FILE; |
| } |
| |
| public static boolean allowsBinaryCellPartitions( MatrixObject mo, PartitionFormat dpf ) { |
| return (getRIXExecType(mo, PDataPartitionFormat.COLUMN_BLOCK_WISE, false)==LopProperties.ExecType.CP ); |
| } |
| |
| /////// |
| //REWRITE set result partitioning |
| /// |
| |
| protected boolean rewriteSetResultPartitioning(OptNode n, double M, LocalVariableMap vars) { |
| //preparations |
| long id = n.getID(); |
| Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(id); |
| ParForProgramBlock pfpb = (ParForProgramBlock) o[1]; |
| |
| //search for candidates |
| Collection<OptNode> cand = n.getNodeList(getRemoteExecType()); |
| |
| //determine if applicable |
| boolean apply = M < _rm //ops fit in remote memory budget |
| && !cand.isEmpty() //at least one MR |
| && isResultPartitionableAll(cand,pfpb.getResultVariables(), |
| vars, pfpb.getIterVar()); // check candidates |
| |
| //recompile LIX |
| if( apply ) |
| { |
| try { |
| for(OptNode lix : cand) |
| recompileLIX( lix, vars ); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException("Unable to recompile LIX.", ex); |
| } |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set result partitioning' - result="+apply ); |
| |
| return apply; |
| } |
| |
| protected boolean isResultPartitionableAll( Collection<OptNode> nlist, ArrayList<ResultVar> resultVars, LocalVariableMap vars, String iterVarname ) { |
| boolean ret = true; |
| for( OptNode n : nlist ) { |
| ret &= isResultPartitionable(n, resultVars, vars, iterVarname); |
| if(!ret) //early abort |
| break; |
| } |
| return ret; |
| } |
| |
| protected boolean isResultPartitionable( OptNode n, ArrayList<ResultVar> resultVars, LocalVariableMap vars, String iterVarname ) |
| { |
| boolean ret = true; |
| //check left indexing operator |
| String opStr = n.getParam(ParamType.OPSTRING); |
| if( opStr==null || !opStr.equals(LeftIndexingOp.OPSTRING) ) |
| ret = false; |
| |
| Hop h = null; |
| Hop base = null; |
| |
| if( ret ) { |
| h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| base = h.getInput().get(0); |
| |
| //check result variable |
| if( !ResultVar.contains(resultVars, base.getName()) ) |
| ret = false; |
| } |
| |
| //check access pattern, memory budget |
| if( ret ) { |
| int dpf = 0; |
| Hop inpRowL = h.getInput().get(2); |
| Hop inpRowU = h.getInput().get(3); |
| Hop inpColL = h.getInput().get(4); |
| Hop inpColU = h.getInput().get(5); |
| if( (inpRowL.getName().equals(iterVarname) && inpRowU.getName().equals(iterVarname)) ) |
| dpf = 1; //rowwise |
| if( (inpColL.getName().equals(iterVarname) && inpColU.getName().equals(iterVarname)) ) |
| dpf = (dpf==0) ? 2 : 3; //colwise or cellwise |
| |
| if( dpf == 0 ) |
| ret = false; |
| else |
| { |
| //check memory budget |
| MatrixObject mo = (MatrixObject)vars.get(base.getName()); |
| if( mo.getNnz() != 0 ) //-1 valid because result var known during opt |
| ret = false; |
| |
| //Note: for memory estimation the common case is sparse since remote_mr and individual tasks; |
| //and in the dense case, we would not benefit from result partitioning |
| boolean sparse = MatrixBlock.evalSparseFormatInMemory(base.getDim1(), base.getDim2(),base.getDim1()); |
| |
| if( sparse ) |
| { |
| //custom memory estimatation in order to account for structural properties |
| //e.g., for rowwise we know that we only pay one sparserow overhead per task |
| double memSparseBlock = estimateSizeSparseRowBlock(base.getDim1()); |
| double memSparseRow1 = estimateSizeSparseRow(base.getDim2(), base.getDim2()); |
| double memSparseRowMin = estimateSizeSparseRowMin(base.getDim2()); |
| |
| double memTask1 = -1; |
| int taskN = -1; |
| switch(dpf) { |
| case 1: //rowwise |
| //sparse block and one sparse row per task |
| memTask1 = memSparseBlock + memSparseRow1; |
| taskN = (int) ((_rm-memSparseBlock) / memSparseRow1); |
| break; |
| case 2: //colwise |
| //sparse block, sparse row per row but shared over tasks |
| memTask1 = memSparseBlock + memSparseRowMin * base.getDim1(); |
| taskN = estimateNumTasksSparseCol(_rm-memSparseBlock, base.getDim1()); |
| break; |
| case 3: //cellwise |
| //sparse block and one minimal sparse row per task |
| memTask1 = memSparseBlock + memSparseRowMin; |
| taskN = (int) ((_rm-memSparseBlock) / memSparseRowMin); |
| break; |
| } |
| |
| if( memTask1>_rm || memTask1<0 ) |
| ret = false; |
| else |
| n.addParam(ParamType.TASK_SIZE, String.valueOf(taskN)); |
| } |
| else |
| { |
| //dense (no result partitioning possible) |
| ret = false; |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| private static double estimateSizeSparseRowBlock( long rows ) { |
| //see MatrixBlock.estimateSizeSparseInMemory |
| return 44 + rows * 8; |
| } |
| |
| private static double estimateSizeSparseRow( long cols, long nnz ) { |
| //see MatrixBlock.estimateSizeSparseInMemory |
| long cnnz = Math.max(SparseRowVector.initialCapacity, Math.max(cols, nnz)); |
| return ( 116 + 12 * cnnz ); //sparse row |
| } |
| |
| private static double estimateSizeSparseRowMin( long cols ) { |
| //see MatrixBlock.estimateSizeSparseInMemory |
| long cnnz = Math.min(SparseRowVector.initialCapacity, cols); |
| return ( 116 + 12 * cnnz ); //sparse row |
| } |
| |
| private static int estimateNumTasksSparseCol( double budget, long rows ) { |
| //see MatrixBlock.estimateSizeSparseInMemory |
| double lbudget = budget - rows * 116; |
| return (int) Math.floor( lbudget / 12 ); |
| } |
| |
| protected void recompileLIX( OptNode n, LocalVariableMap vars ) { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| |
| //set forced exec type |
| h.setForcedExecType(LopProperties.ExecType.CP); |
| n.setExecType(ExecType.CP); |
| |
| //recompile parent pb |
| long pid = OptTreeConverter.getAbstractPlanMapping().getMappedParentID(n.getID()); |
| OptNode nParent = OptTreeConverter.getAbstractPlanMapping().getOptNode(pid); |
| Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(pid); |
| StatementBlock sb = (StatementBlock) o[0]; |
| BasicProgramBlock pb = (BasicProgramBlock) o[1]; |
| |
| //keep modified estimated of partitioned rix (in same dag as lix) |
| HashMap<Hop, Double> estRix = getPartitionedRIXEstimates(nParent); |
| |
| //construct new instructions |
| ArrayList<Instruction> newInst = Recompiler.recompileHopsDag( |
| sb, sb.getHops(), vars, null, false, false, 0); |
| pb.setInstructions( newInst ); |
| |
| //reset all rix estimated (modified by recompile) |
| resetPartitionRIXEstimates( estRix ); |
| |
| //set new mem estimate (last, otherwise overwritten from recompile) |
| h.setMemEstimate(_rm-1); |
| } |
| |
| protected HashMap<Hop, Double> getPartitionedRIXEstimates(OptNode parent) |
| { |
| HashMap<Hop, Double> estimates = new HashMap<>(); |
| for( OptNode n : parent.getChilds() ) |
| if( n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) |
| { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| estimates.put( h, h.getMemEstimate() ); |
| } |
| return estimates; |
| } |
| |
| protected void resetPartitionRIXEstimates( HashMap<Hop, Double> estimates ) |
| { |
| for( Entry<Hop, Double> e : estimates.entrySet() ) |
| { |
| Hop h = e.getKey(); |
| double val = e.getValue(); |
| h.setMemEstimate(val); |
| } |
| } |
| |
| |
| /////// |
| //REWRITE set execution strategy |
| /// |
| |
| protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX) { |
| boolean isCPOnly = n.isCPOnly(); |
| boolean isCPOnlyPossible = isCPOnly || isCPOnlyPossible(n, _rm); |
| |
| String datapartitioner = n.getParam(ParamType.DATA_PARTITIONER); |
| ExecType REMOTE = getRemoteExecType(); |
| PDataPartitioner REMOTE_DP = PDataPartitioner.REMOTE_SPARK; |
| |
| //deciding on the execution strategy |
| if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution |
| && ( (isCPOnly && M <= _rm ) //Required: all inst already in cp and fit in remote mem |
| ||(isCPOnly && M3 <= _rm ) //Required: all inst already in cp and fit partitioned in remote mem |
| ||(isCPOnlyPossible && M2 <= _rm)) ) //Required: all inst forced to cp fit in remote mem |
| { |
| //at this point all required conditions for REMOTE_MR given, now its an opt decision |
| int cpk = (int) Math.min( _lk, Math.floor( _lm / M ) ); //estimated local exploited par |
| |
| //MR if local par cannot be exploited due to mem constraints (this implies that we work on large data) |
| //(the factor of 2 is to account for hyper-threading and in order prevent too eager remote parfor) |
| if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk ) //incl conditional partitioning |
| { |
| n.setExecType( REMOTE ); //remote parfor |
| } |
| //MR if problem is large enough and remote parallelism is larger than local |
| else if( _lk < _N && _lk < _rk && M <= _rm && isLargeProblem(n, M0) ) |
| { |
| n.setExecType( REMOTE ); //remote parfor |
| } |
| //MR if MR operations in local, but CP only in remote (less overall MR jobs) |
| else if( !isCPOnly && isCPOnlyPossible ) |
| { |
| n.setExecType( REMOTE ); //remote parfor |
| } |
| //MR if necessary for LIX rewrite (LIX true iff cp only and rm valid) |
| else if( flagLIX ) |
| { |
| n.setExecType( REMOTE ); //remote parfor |
| } |
| //MR if remote data partitioning, because data will be distributed on all nodes |
| else if( datapartitioner!=null && datapartitioner.equals(REMOTE_DP.toString()) |
| && !InfrastructureAnalyzer.isLocalMode()) |
| { |
| n.setExecType( REMOTE ); //remote parfor |
| } |
| //otherwise CP |
| else |
| { |
| n.setExecType( ExecType.CP ); //local parfor |
| } |
| } |
| else //mr instructions in body, or rm too small |
| { |
| n.setExecType( ExecType.CP ); //local parfor |
| } |
| |
| //actual programblock modification |
| long id = n.getID(); |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(id)[1]; |
| |
| PExecMode mode = n.getExecType().toParForExecMode(); |
| pfpb.setExecMode( mode ); |
| |
| //decide if recompilation according to remote mem budget necessary |
| boolean requiresRecompile = (mode == PExecMode.REMOTE_SPARK && !isCPOnly); |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set execution strategy' - result="+mode+" (recompile="+requiresRecompile+")" ); |
| |
| return requiresRecompile; |
| } |
| |
| protected boolean isLargeProblem(OptNode pn, double M) |
| { |
| //TODO get a proper time estimate based to capture compute-intensive scenarios |
| |
| //rule-based decision based on number of outer iterations or maximum number of |
| //inner iterations (w/ appropriately scaled minimum data size threshold); |
| boolean isCtxCreated = OptimizerUtils.isSparkExecutionMode() |
| && SparkExecutionContext.isSparkContextCreated(); |
| return (_N >= PROB_SIZE_THRESHOLD_REMOTE && M > PROB_SIZE_THRESHOLD_MB) |
| || (_Nmax >= 10 * PROB_SIZE_THRESHOLD_REMOTE |
| && M > PROB_SIZE_THRESHOLD_MB/(isCtxCreated?10:1)); |
| } |
| |
| protected boolean isCPOnlyPossible( OptNode n, double memBudget ) { |
| ExecType et = n.getExecType(); |
| boolean ret = ( et == ExecType.CP); |
| |
| if( n.isLeaf() && et == getRemoteExecType() ) |
| { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() ); |
| if( h.getForcedExecType()!=LopProperties.ExecType.SPARK |
| && h.hasValidCPDimsAndSize() ) //integer dims |
| { |
| double mem = _cost.getLeafNodeEstimate(TestMeasure.MEMORY_USAGE, n, LopProperties.ExecType.CP); |
| if( mem <= memBudget ) |
| ret = true; |
| } |
| } |
| |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| { |
| if( !ret ) break; //early abort if already false |
| ret &= isCPOnlyPossible(c, memBudget); |
| } |
| return ret; |
| } |
| |
| |
| /////// |
| //REWRITE set operations exec type |
| /// |
| |
| protected void rewriteSetOperationsExecType(OptNode pn, boolean recompile) { |
| //set exec type in internal opt tree |
| int count = setOperationExecType(pn, ExecType.CP); |
| |
| //recompile program (actual programblock modification) |
| if( recompile && count<=0 ) |
| LOG.warn("OPT: Forced set operations exec type 'CP', but no operation requires recompile."); |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; |
| HashSet<String> fnStack = new HashSet<>(); |
| Recompiler.recompileProgramBlockHierarchy2Forced(pfpb.getChildBlocks(), 0, fnStack, LopProperties.ExecType.CP); |
| |
| //debug output |
| LOG.debug(getOptMode()+" OPT: rewrite 'set operation exec type CP' - result="+count); |
| } |
| |
| protected int setOperationExecType( OptNode n, ExecType et ) |
| { |
| int count = 0; |
| |
| //set operation exec type to CP, count num recompiles |
| if( n.getExecType()!=ExecType.CP && n.getNodeType()==NodeType.HOP ) { |
| n.setExecType( ExecType.CP ); |
| count = 1; |
| } |
| |
| //recursively set exec type of childs |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| count += setOperationExecType(c, et); |
| |
| return count; |
| } |
| |
| /////// |
| //REWRITE enable data colocation |
| /// |
| |
| /** |
| * NOTE: if MAX_REPLICATION_FACTOR_PARTITIONING is set larger than 10, co-location may |
| * throw warnings per split since this exceeds "max block locations" |
| * |
| * @param n internal representation of a plan alternative for program blocks and instructions |
| * @param vars local variable map |
| */ |
| protected void rewriteDataColocation( OptNode n, LocalVariableMap vars ) { |
| // data colocation is beneficial if we have dp=REMOTE_MR, etype=REMOTE_MR |
| // and there is at least one direct col-/row-wise access with the index variable |
| // on the partitioned matrix |
| boolean apply = false; |
| String varname = null; |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| |
| //modify the runtime plan (apply true if at least one candidate) |
| if( apply ) |
| pfpb.enableColocatedPartitionedMatrix( varname ); |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'enable data colocation' - result="+apply+((apply)?" ("+varname+")":"") ); |
| } |
| |
| protected void rFindDataColocationCandidates( OptNode n, HashSet<String> cand, String iterVarname ) { |
| if( !n.isLeaf() ) |
| { |
| for( OptNode cn : n.getChilds() ) |
| rFindDataColocationCandidates( cn, cand, iterVarname ); |
| } |
| else if( n.getNodeType()== NodeType.HOP |
| && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) |
| && n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) |
| { |
| PartitionFormat dpf = PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT)); |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| String inMatrix = h.getInput().get(0).getName(); |
| String indexAccess = null; |
| switch( dpf._dpf ) |
| { |
| case ROW_WISE: //input 1 and 2 eq |
| if( h.getInput().get(1) instanceof DataOp ) |
| indexAccess = h.getInput().get(1).getName(); |
| break; |
| case COLUMN_WISE: //input 3 and 4 eq |
| if( h.getInput().get(3) instanceof DataOp ) |
| indexAccess = h.getInput().get(3).getName(); |
| break; |
| default: |
| //do nothing |
| } |
| |
| if( indexAccess != null && indexAccess.equals(iterVarname) ) |
| cand.add( inMatrix ); |
| } |
| } |
| |
| |
| /////// |
| //REWRITE set partition replication factor |
| /// |
| |
| /** |
| * Increasing the partition replication factor is beneficial if partitions are |
| * read multiple times (e.g., in nested loops) because partitioning (done once) |
| * gets slightly slower but there is a higher probability for local access |
| * |
| * NOTE: this rewrite requires 'set data partitioner' to be executed in order to |
| * leverage the partitioning information in the plan tree. |
| * |
| * @param n internal representation of a plan alternative for program blocks and instructions |
| * @param partitionedMatrices map of data partition formats |
| * @param vars local variable map |
| */ |
| protected void rewriteSetPartitionReplicationFactor( OptNode n, HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars ) |
| { |
| boolean apply = false; |
| double sizeReplicated = 0; |
| int replication = ParForProgramBlock.WRITE_REPLICATION_FACTOR; |
| |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| |
| if(((n.getExecType()==ExecType.SPARK && n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_SPARK.name()))) |
| && n.hasNestedParallelism(false) |
| && n.hasNestedPartitionReads(false) ) |
| { |
| apply = true; |
| |
| //account for problem and cluster constraints |
| replication = (int)Math.min( _N, _rnk ); |
| |
| //account for internal max constraint (note hadoop will warn if max > 10) |
| replication = Math.min( replication, MAX_REPLICATION_FACTOR_PARTITIONING ); |
| |
| //account for remaining hdfs capacity |
| try( FileSystem fs = IOUtilFunctions.getFileSystem(ConfigurationManager.getCachedJobConf()) ) { |
| long hdfsCapacityRemain = fs.getStatus().getRemaining(); |
| long sizeInputs = 0; //sum of all input sizes (w/o replication) |
| for( String var : partitionedMatrices.keySet() ) { |
| MatrixObject mo = (MatrixObject)vars.get(var); |
| Path fname = new Path(mo.getFileName()); |
| if( fs.exists( fname ) ) //non-existing (e.g., CP) -> small file |
| sizeInputs += fs.getContentSummary(fname).getLength(); |
| } |
| replication = (int) Math.min(replication, Math.floor(0.9*hdfsCapacityRemain/sizeInputs)); |
| |
| //ensure at least replication 1 |
| replication = Math.max( replication, ParForProgramBlock.WRITE_REPLICATION_FACTOR); |
| sizeReplicated = replication * sizeInputs; |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException("Failed to analyze remaining hdfs capacity.", ex); |
| } |
| } |
| |
| //modify the runtime plan |
| if( apply ) |
| pfpb.setPartitionReplicationFactor( replication ); |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set partition replication factor' - result="+apply+ |
| ((apply)?" ("+replication+", "+toMB(sizeReplicated)+")":"") ); |
| } |
| |
| /////// |
| //REWRITE set export replication factor |
| /// |
| |
| /** |
| * Increasing the export replication factor is beneficial for remote execution |
| * because each task will read the full input data set. This only applies to |
| * matrices that are created as in-memory objects before parfor execution. |
| * |
| * NOTE: this rewrite requires 'set execution strategy' to be executed. |
| * |
| * @param n internal representation of a plan alternative for program blocks and instructions |
| * @param vars local variable map |
| */ |
| protected void rewriteSetExportReplicationFactor( OptNode n, LocalVariableMap vars ) |
| { |
| boolean apply = false; |
| int replication = -1; |
| |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| |
| //decide on the replication factor |
| if( n.getExecType()==getRemoteExecType() ) |
| { |
| apply = true; |
| |
| //account for problem and cluster constraints |
| replication = (int)Math.min( _N, _rnk ); |
| |
| //account for internal max constraint (note hadoop will warn if max > 10) |
| replication = Math.min( replication, MAX_REPLICATION_FACTOR_EXPORT ); |
| } |
| |
| //modify the runtime plan |
| if( apply ) |
| pfpb.setExportReplicationFactor( replication ); |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set export replication factor' - result="+apply+((apply)?" ("+replication+")":"") ); |
| } |
| |
| /** |
| * Calculates the maximum memory needed in a CP only Parfor |
| * based on the {@link Hop#computeMemEstimate(MemoTable)} } function |
| * called recursively for the "children" of the parfor {@link OptNode}. |
| * |
| * @param n the parfor {@link OptNode} |
| * @return the maximum memory needed for any operation inside a parfor in CP execution mode |
| */ |
| protected double getMaxCPOnlyBudget(OptNode n) { |
| ExecType et = n.getExecType(); |
| double ret = 0; |
| |
| if (n.isLeaf() && et != getRemoteExecType()) { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| if ( h.getForcedExecType() != LopProperties.ExecType.SPARK) { |
| double mem = _cost.getLeafNodeEstimate(TestMeasure.MEMORY_USAGE, n, LopProperties.ExecType.CP); |
| if (mem >= OptimizerUtils.DEFAULT_SIZE) { |
| // memory estimate for worst case scenario. |
| // optimistically ignoring this |
| } else { |
| ret = Math.max(ret, mem); |
| } |
| } |
| } |
| |
| if (!n.isLeaf()) { |
| for (OptNode c : n.getChilds()) { |
| ret = Math.max(ret, getMaxCPOnlyBudget(c)); |
| } |
| } |
| return ret; |
| } |
| |
| /////// |
| //REWRITE set degree of parallelism |
| /// |
| |
| protected void rewriteSetDegreeOfParallelism(OptNode n, CostEstimator cost, LocalVariableMap vars, double M, boolean flagNested) |
| { |
| ExecType type = n.getExecType(); |
| long id = n.getID(); |
| |
| //special handling for different exec models (CP, MR, MR nested) |
| Object[] map = OptTreeConverter.getAbstractPlanMapping().getMappedProg(id); |
| ParForStatementBlock pfsb = (ParForStatementBlock)map[0]; |
| ParForProgramBlock pfpb = (ParForProgramBlock)map[1]; |
| |
| if( type == ExecType.CP ) |
| { |
| //determine local max parallelism constraint |
| int kMax = ConfigurationManager.isParallelParFor() ? |
| (n.isCPOnly() ? _lkmaxCP : _lkmaxMR) : 1; |
| |
| //compute memory budgets and partial estimates for handling shared reads |
| double mem = (OptimizerUtils.isSparkExecutionMode() && !n.isCPOnly()) ? _lm/2 : _lm; |
| double sharedM = 0, nonSharedM = M; |
| if( computeMaxK(M, M, 0, mem) < kMax ) { //account for shared read if necessary |
| sharedM = pfsb.getReadOnlyParentMatrixVars().stream().map(s -> vars.get(s)) |
| .filter(d -> d instanceof MatrixObject).mapToDouble(mo -> OptimizerUtils |
| .estimateSize(((MatrixObject)mo).getDataCharacteristics())).sum(); |
| nonSharedM = cost.getEstimate(TestMeasure.MEMORY_USAGE, n, true, |
| pfsb.getReadOnlyParentMatrixVars(), ExcludeType.SHARED_READ); |
| } |
| |
| //ensure local memory constraint (for spark more conservative in order to |
| //prevent unnecessary guarded collect) |
| kMax = Math.min( kMax, computeMaxK(M, nonSharedM, sharedM, mem) ); |
| kMax = Math.max( kMax, 1); |
| |
| //constrain max parfor parallelism by problem size |
| int parforK = (int)((_N<kMax)? _N : kMax); |
| |
| // if gpu mode is enabled, the amount of parallelism is set to |
| // the smaller of the number of iterations and the number of GPUs |
| // otherwise it default to the number of CPU cores and the |
| // operations are run in CP mode |
| //FIXME rework for nested parfor parallelism and body w/o gpu ops |
| if (DMLScript.USE_ACCELERATOR) { |
| long perGPUBudget = GPUContextPool.initialGPUMemBudget(); |
| double maxMemUsage = getMaxCPOnlyBudget(n); |
| if (maxMemUsage < perGPUBudget){ |
| parforK = GPUContextPool.getDeviceCount(); |
| parforK = Math.min(parforK, (int)_N); |
| LOG.debug("Setting degree of parallelism + [" + parforK + "] for GPU; per GPU budget :[" + |
| perGPUBudget + "], parfor budget :[" + maxMemUsage + "], max parallelism per GPU : [" + |
| parforK + "]"); |
| } |
| } |
| |
| //set parfor degree of parallelism |
| pfpb.setDegreeOfParallelism(parforK); |
| n.setK(parforK); |
| |
| //distribute remaining parallelism |
| int remainParforK = getRemainingParallelismParFor(kMax, parforK); |
| int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK); |
| rAssignRemainingParallelism( n, remainParforK, remainOpsK ); |
| } |
| else // ExecType.MR/ExecType.SPARK |
| { |
| int kMax = -1; |
| if( flagNested ) { |
| //determine remote max parallelism constraint |
| pfpb.setDegreeOfParallelism( _rnk ); //guaranteed <= _N (see nested) |
| n.setK( _rnk ); |
| kMax = _rkmax / _rnk; //per node (CP only inside) |
| } |
| else { //not nested (default) |
| //determine remote max parallelism constraint |
| int tmpK = (int)((_N<_rk)? _N : _rk); |
| pfpb.setDegreeOfParallelism(tmpK); |
| n.setK(tmpK); |
| kMax = _rkmax / tmpK; //per node (CP only inside) |
| } |
| |
| //ensure remote memory constraint |
| kMax = Math.min( kMax, (int)Math.floor( _rm / M ) ); //guaranteed >= 1 (see exec strategy) |
| if( kMax < 1 ) |
| kMax = 1; |
| |
| //disable nested parallelism, if required |
| if( !ALLOW_REMOTE_NESTED_PARALLELISM ) |
| kMax = 1; |
| |
| //distribute remaining parallelism and recompile parallel instructions |
| rAssignRemainingParallelism( n, kMax, 1 ); |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set degree of parallelism' - result=(see EXPLAIN)" ); |
| } |
| |
| private static int computeMaxK(double M, double memNonShared, double memShared, double memBudget) { |
| //note: we compute max K for both w/o and w/ shared reads and take the max, because |
| //the latter might reduce the degree of parallelism if shared reads don't dominate |
| int k1 = (int)Math.floor(memBudget / M); |
| int k2 = (int)Math.floor(memBudget-memShared / memNonShared); |
| return Math.max(k1, k2); |
| } |
| |
| protected void rAssignRemainingParallelism(OptNode n, int parforK, int opsK) |
| { |
| ArrayList<OptNode> childs = n.getChilds(); |
| if( childs != null ) |
| { |
| boolean recompileSB = false; |
| for( OptNode c : childs ) |
| { |
| //NOTE: we cannot shortcut with c.setSerialParFor() on par=1 because |
| //this would miss to recompile multi-threaded hop operations |
| |
| if( c.getNodeType() == NodeType.PARFOR ) |
| { |
| //constrain max parfor parallelism by problem size |
| int tmpN = Integer.parseInt(c.getParam(ParamType.NUM_ITERATIONS)); |
| int tmpK = (tmpN<parforK)? tmpN : parforK; |
| |
| //set parfor degree of parallelism |
| long id = c.getID(); |
| c.setK(tmpK); |
| ParForProgramBlock pfpb = (ParForProgramBlock) |
| OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(id); |
| pfpb.setDegreeOfParallelism(tmpK); |
| |
| //distribute remaining parallelism |
| int remainParforK = getRemainingParallelismParFor(parforK, tmpK); |
| int remainOpsK = getRemainingParallelismOps(opsK, tmpK); |
| rAssignRemainingParallelism(c, remainParforK, remainOpsK); |
| } |
| else if( c.getNodeType() == NodeType.HOP ) |
| { |
| //set degree of parallelism for multi-threaded leaf nodes |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(c.getID()); |
| if( ConfigurationManager.isParallelMatrixOperations() |
| && h instanceof MultiThreadedHop |
| && ((MultiThreadedHop)h).isMultiThreadedOpType() ) |
| { |
| MultiThreadedHop mhop = (MultiThreadedHop) h; |
| mhop.setMaxNumThreads(opsK); //set max constraint in hop |
| c.setK(opsK); //set optnode k (for explain) |
| //need to recompile SB, if changed constraint |
| recompileSB = true; |
| } |
| //for all other multi-threaded hops set k=1 to simply debugging |
| else if( h instanceof MultiThreadedHop ) { |
| MultiThreadedHop mhop = (MultiThreadedHop) h; |
| mhop.setMaxNumThreads(1); //set max constraint in hop |
| c.setK(1); //set optnode k (for explain) |
| } |
| |
| //if parfor contains eval call, make unoptimized functions single-threaded |
| if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) { |
| ProgramBlock pb = OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID()); |
| pb.getProgram().getFunctionProgramBlocks(false) |
| .forEach((fname, fvalue) -> ParamservUtils.recompileProgramBlocks(1, fvalue.getChildBlocks())); |
| } |
| } |
| else |
| rAssignRemainingParallelism(c, parforK, opsK); |
| } |
| |
| //recompile statement block if required |
| if( recompileSB ) { |
| try { |
| //guaranteed to be a last-level block (see hop change) |
| ProgramBlock pb = OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID()); |
| Recompiler.recompileProgramBlockInstructions(pb); |
| } |
| catch(Exception ex){ |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| } |
| } |
| |
| protected static int getRemainingParallelismParFor(int parforK, int tmpK) { |
| //compute max remaining parfor parallelism k such that k * tmpK <= parforK |
| return (int)Math.ceil((double)(parforK-tmpK+1) / tmpK); |
| } |
| |
| protected static int getRemainingParallelismOps(int opsK, int tmpK) { |
| //compute max remaining operations parallelism k with slight over-provisioning |
| //such that k * tmpK <= 1.5 * opsK; note that if parfor already exploits the |
| //maximum parallelism, this will not introduce any over-provisioning. |
| //(when running with native BLAS/DNN libraries, we disable over-provisioning |
| //to avoid internal SIGFPE and allocation buffer issues w/ MKL and OpenBlas) |
| return NativeHelper.isNativeLibraryLoaded() ? |
| (int) Math.max(opsK / tmpK, 1) : |
| (int) Math.max(Math.round((double)opsK / tmpK), 1); |
| } |
| |
| /////// |
| //REWRITE set task partitioner |
| /// |
| |
| protected void rewriteSetTaskPartitioner(OptNode pn, boolean flagNested, boolean flagLIX) |
| { |
| //assertions (warnings of corrupt optimizer decisions) |
| if( pn.getNodeType() != NodeType.PARFOR ) |
| LOG.warn(getOptMode()+" OPT: Task partitioner can only be set for a ParFor node."); |
| if( flagNested && flagLIX ) |
| LOG.warn(getOptMode()+" OPT: Task partitioner decision has conflicting input from rewrites 'nested parallelism' and 'result partitioning'."); |
| |
| //set task partitioner |
| if( flagNested ) |
| { |
| setTaskPartitioner( pn, PTaskPartitioner.STATIC ); |
| setTaskPartitioner( pn.getChilds().get(0), PTaskPartitioner.FACTORING ); |
| } |
| else if( flagLIX ) |
| { |
| setTaskPartitioner( pn, PTaskPartitioner.FACTORING_CMAX ); |
| } |
| else if( pn.getExecType()==ExecType.SPARK && pn.hasOnlySimpleChilds() ) |
| { |
| //for simple body programs without loops, branches, or function calls, we don't |
| //expect much load imbalance and hence use static partitioning in order to |
| //(1) reduce task latency, (2) prevent repeated read (w/o jvm reuse), and (3) |
| //preaggregate results (less write / less read by result merge) |
| setTaskPartitioner( pn, PTaskPartitioner.STATIC ); |
| } |
| else if( _N/4 >= pn.getK() ) //to prevent imbalance due to ceiling |
| { |
| setTaskPartitioner( pn, PTaskPartitioner.FACTORING ); |
| } |
| else |
| { |
| setTaskPartitioner( pn, PTaskPartitioner.NAIVE ); |
| } |
| } |
| |
| protected void setTaskPartitioner( OptNode n, PTaskPartitioner partitioner ) |
| { |
| long id = n.getID(); |
| |
| // modify rtprog |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProgramBlock(id); |
| pfpb.setTaskPartitioner(partitioner); |
| |
| // modify plan |
| n.addParam(ParamType.TASK_PARTITIONER, partitioner.toString()); |
| |
| //handle specific case of LIX recompile |
| boolean flagLIX = (partitioner == PTaskPartitioner.FACTORING_CMAX); |
| if( flagLIX ) |
| { |
| long maxc = n.getMaxC( _N ); |
| pfpb.setTaskSize( maxc ); //used as constraint |
| pfpb.disableJVMReuse(); |
| n.addParam(ParamType.TASK_SIZE, String.valueOf(maxc)); |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set task partitioner' - result="+partitioner+((flagLIX) ? ","+n.getParam(ParamType.TASK_SIZE) : "") ); |
| } |
| |
| /////// |
| //REWRITE set fused data partitioning / execution |
| /// |
| |
| /** |
| * This dedicated execution mode can only be applied if all of the |
| * following conditions are true: |
| * - Only cp instructions in the parfor body |
| * - Only one partitioned input |
| * - number of iterations is equal to number of partitions (nrow/ncol) |
| * - partitioned matrix access via plain iteration variables (no composed expressions) |
| * (this ensures that each partition is exactly read once) |
| * - no left indexing (since by default static task partitioning) |
| * |
| * Furthermore, it should be only chosen if we already decided for remote partitioning |
| * and otherwise would create a large number of partition files. |
| * |
| * NOTE: We already respect the reducer memory budget for plan correctness. However, |
| * we miss optimization potential if the reducer budget is larger than the mapper budget |
| * (if we were not able to select REMOTE_MR as execution strategy wrt mapper budget) |
| * TODO modify 'set exec strategy' and related rewrites for conditional data partitioning. |
| * |
| * @param pn internal representation of a plan alternative for program blocks and instructions |
| * @param M ? |
| * @param flagLIX ? |
| * @param partitionedMatrices map of data partition formats |
| * @param vars local variable map |
| */ |
| protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, double M, boolean flagLIX, HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars) |
| { |
| //assertions (warnings of corrupt optimizer decisions) |
| if( pn.getNodeType() != NodeType.PARFOR ) |
| LOG.warn(getOptMode()+" OPT: Fused data partitioning and execution is only applicable for a ParFor node."); |
| |
| boolean apply = false; |
| String partitioner = pn.getParam(ParamType.DATA_PARTITIONER); |
| PDataPartitioner REMOTE_DP = PDataPartitioner.REMOTE_SPARK; |
| PExecMode REMOTE_DPE = PExecMode.REMOTE_SPARK_DP; |
| |
| //precondition: rewrite only invoked if exec type MR |
| // (this also implies that the body is CP only) |
| |
| // try to merge MR data partitioning and MR exec |
| if( pn.getExecType()==ExecType.SPARK //MR/SP EXEC and CP body |
| && partitioner!=null && partitioner.equals(REMOTE_DP.toString()) //MR/SP partitioning |
| && partitionedMatrices.size()==1 ) //only one partitioned matrix |
| { |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; |
| |
| //partitioned matrix |
| String moVarname = partitionedMatrices.keySet().iterator().next(); |
| PartitionFormat moDpf = partitionedMatrices.get(moVarname); |
| MatrixObject mo = (MatrixObject)vars.get(moVarname); |
| |
| if( rIsAccessByIterationVariable(pn, moVarname, pfpb.getIterVar()) && |
| ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || |
| (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N) || |
| (moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && mo.getNumRows()<=_N*moDpf._N)|| |
| (moDpf._dpf==PDataPartitionFormat.COLUMN_BLOCK_WISE_N && mo.getNumColumns()<=_N*moDpf._N)) ) |
| { |
| int k = (int)Math.min(_N,_rk2); |
| |
| pn.addParam(ParamType.DATA_PARTITIONER, REMOTE_DPE.toString()+"(fused)"); |
| pn.setK( k ); |
| |
| pfpb.setExecMode(REMOTE_DPE); //set fused exec type |
| pfpb.setDataPartitioner(PDataPartitioner.NONE); |
| pfpb.enableColocatedPartitionedMatrix( moVarname ); |
| pfpb.setDegreeOfParallelism(k); |
| |
| apply = true; |
| } |
| } |
| |
| LOG.debug(getOptMode()+" OPT: rewrite 'set fused data partitioning and execution' - result="+apply ); |
| } |
| |
| protected boolean rIsAccessByIterationVariable( OptNode n, String varName, String iterVarname ) |
| { |
| boolean ret = true; |
| |
| if( !n.isLeaf() ) |
| { |
| for( OptNode cn : n.getChilds() ) |
| rIsAccessByIterationVariable( cn, varName, iterVarname ); |
| } |
| else if( n.getNodeType()== NodeType.HOP |
| && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) |
| && n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) |
| { |
| PartitionFormat dpf = PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT)); |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| String inMatrix = h.getInput().get(0).getName(); |
| String indexAccess = null; |
| switch( dpf._dpf ) |
| { |
| case ROW_WISE: //input 1 and 2 eq |
| if( h.getInput().get(1) instanceof DataOp ) |
| indexAccess = h.getInput().get(1).getName(); |
| break; |
| case ROW_BLOCK_WISE_N: //input 1 and 2 have same slope and var |
| indexAccess = rGetVarFromExpression(h.getInput().get(1)); |
| break; |
| case COLUMN_WISE: //input 3 and 4 eq |
| if( h.getInput().get(3) instanceof DataOp ) |
| indexAccess = h.getInput().get(3).getName(); |
| break; |
| case COLUMN_BLOCK_WISE_N: //input 3 and 4 have same slope and var |
| indexAccess = rGetVarFromExpression(h.getInput().get(3)); |
| break; |
| |
| default: |
| //do nothing |
| } |
| |
| ret &= ( (inMatrix!=null && inMatrix.equals(varName)) |
| && (indexAccess!=null && indexAccess.equals(iterVarname))); |
| } |
| |
| return ret; |
| } |
| |
| private static String rGetVarFromExpression(Hop current) { |
| String var = null; |
| for( Hop c : current.getInput() ) { |
| var = rGetVarFromExpression(c); |
| if( var != null ) |
| return var; |
| } |
| return (current instanceof DataOp) ? |
| current.getName() : null; |
| } |
| |
| /////// |
| //REWRITE transpose sparse vector operations |
| /// |
| |
| protected boolean rIsTransposeSafePartition( OptNode n, String varName ) |
| { |
| boolean ret = true; |
| |
| if( !n.isLeaf() ) |
| { |
| for( OptNode cn : n.getChilds() ) |
| rIsTransposeSafePartition( cn, varName ); |
| } |
| else if( n.getNodeType()== NodeType.HOP |
| && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) |
| && n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) |
| { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| |
| String inMatrix = h.getInput().get(0).getName(); |
| if( inMatrix.equals(varName) ) |
| { |
| //check that all parents are transpose-safe operations |
| //(even a transient write would not be safe due to indirection into other DAGs) |
| ArrayList<Hop> parent = h.getParent(); |
| for( Hop p : parent ) |
| ret &= p.isTransposeSafe(); |
| } |
| } |
| |
| return ret; |
| } |
| |
| |
| /////// |
| //REWRITE set in-place result indexing |
| /// |
| |
| protected void rewriteSetInPlaceResultIndexing(OptNode pn, CostEstimator cost, LocalVariableMap vars, HashSet<ResultVar> inPlaceResultVars, ExecutionContext ec) |
| { |
| //assertions (warnings of corrupt optimizer decisions) |
| if( pn.getNodeType() != NodeType.PARFOR ) |
| LOG.warn(getOptMode()+" OPT: Set in-place result update is only applicable for a ParFor node."); |
| |
| boolean apply = false; |
| |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; |
| |
| //note currently we decide for all result vars jointly, i.e., |
| //only if all fit pinned in remaining budget, we apply this rewrite. |
| ArrayList<ResultVar> retVars = pfpb.getResultVariables(); |
| |
| //basic correctness constraint |
| double totalMem = -1; |
| if( rHasOnlyInPlaceSafeLeftIndexing(pn, retVars) ) |
| { |
| //compute total sum of pinned result variable memory |
| double sum = computeTotalSizeResultVariables(retVars, vars, pfpb.getDegreeOfParallelism()); |
| |
| //compute memory estimate without result indexing, and total sum per worker |
| double M = cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true, retVars.stream() |
| .map(var -> var._name).collect(Collectors.toList()), ExcludeType.RESULT_LIX); |
| totalMem = M + sum; |
| |
| //result update in-place for MR/Spark (w/ remote memory constraint) |
| if( (pfpb.getExecMode() == PExecMode.REMOTE_SPARK_DP || pfpb.getExecMode() == PExecMode.REMOTE_SPARK) |
| && totalMem < _rm ) |
| { |
| apply = true; |
| } |
| //result update in-place for CP (w/ local memory constraint) |
| else if( pfpb.getExecMode() == PExecMode.LOCAL |
| && totalMem * pfpb.getDegreeOfParallelism() < _lm |
| && pn.isCPOnly() ) //no forced mr/spark execution |
| { |
| apply = true; |
| } |
| } |
| |
| //modify result variable meta data, if rewrite applied |
| if( apply ) |
| { |
| //add result vars to result and set state |
| //will be serialized and transfered via symbol table |
| for( ResultVar var : retVars ){ |
| Data dat = vars.get(var._name); |
| if( dat instanceof MatrixObject ) |
| ((MatrixObject)dat).setUpdateType(UpdateType.INPLACE_PINNED); |
| } |
| inPlaceResultVars.addAll(retVars); |
| } |
| |
| LOG.debug(getOptMode()+" OPT: rewrite 'set in-place result indexing' - result="+ |
| apply+" ("+Arrays.toString(inPlaceResultVars.toArray(new ResultVar[0]))+", M="+toMB(totalMem)+")" ); |
| } |
| |
| protected boolean rHasOnlyInPlaceSafeLeftIndexing( OptNode n, ArrayList<ResultVar> retVars ) |
| { |
| boolean ret = true; |
| if( !n.isLeaf() ) { |
| for( OptNode cn : n.getChilds() ) |
| ret &= rHasOnlyInPlaceSafeLeftIndexing( cn, retVars ); |
| } |
| else if( n.getNodeType()== NodeType.HOP) { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| if( h instanceof LeftIndexingOp && ResultVar.contains(retVars, h.getInput().get(0).getName() ) |
| && !retVars.stream().anyMatch(rvar -> rvar._isAccum) ) |
| ret &= (h.getParent().size()==1 |
| && h.getParent().get(0).getName().equals(h.getInput().get(0).getName())); |
| } |
| return ret; |
| } |
| |
| private static double computeTotalSizeResultVariables(ArrayList<ResultVar> retVars, LocalVariableMap vars, int k) { |
| double sum = 1; |
| for( ResultVar var : retVars ) { |
| Data dat = vars.get(var._name); |
| if( !(dat instanceof MatrixObject) ) |
| continue; |
| MatrixObject mo = (MatrixObject)dat; |
| // every worker will consume memory for at most (max_nnz/k + in_nnz) |
| sum += (OptimizerUtils.estimateSizeExactSparsity(mo.getNumRows(), |
| mo.getNumColumns(), Math.min((1.0/k)+mo.getSparsity(), 1.0))); |
| } |
| return sum; |
| } |
| |
| /////// |
| //REWRITE disable CP caching |
| /// |
| |
| protected double rComputeSumMemoryIntermediates( OptNode n, HashSet<ResultVar> inplaceResultVars ) |
| { |
| double sum = 0; |
| |
| if( !n.isLeaf() ) { |
| for( OptNode cn : n.getChilds() ) |
| sum += rComputeSumMemoryIntermediates( cn, inplaceResultVars ); |
| } |
| else if( n.getNodeType()== NodeType.HOP ) |
| { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| if( n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) |
| && n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) { |
| //set during partitioning rewrite |
| sum += h.getMemEstimate(); |
| } |
| else { |
| //base intermediate (worst-case w/ materialized intermediates) |
| sum += h.getOutputMemEstimate() |
| + h.getIntermediateMemEstimate(); |
| //inputs not represented in the planopttree (worst-case no CSE) |
| if( h.getInput() != null ) |
| for( Hop cn : h.getInput() ) |
| if( cn instanceof DataOp && ((DataOp)cn).isRead() //read data |
| && !ResultVar.contains(inplaceResultVars, cn.getName())) //except in-place result vars |
| sum += cn.getMemEstimate(); |
| } |
| } |
| |
| return sum; |
| } |
| |
| /////// |
| //REWRITE enable runtime piggybacking |
| /// |
| |
| // protected void rewriteEnableRuntimePiggybacking( OptNode n, LocalVariableMap vars, HashMap<String, PartitionFormat> partitionedMatrices ) |
| // { |
| // ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| // .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| // HashSet<String> sharedVars = new HashSet<>(); |
| // boolean apply = false; |
| // |
| // //enable runtime piggybacking if MR jobs on shared read-only data set |
| // if( OptimizerUtils.ALLOW_RUNTIME_PIGGYBACKING ) |
| // { |
| // //apply runtime piggybacking if hop in mr and shared input variable |
| // //(any input variabled which is not partitioned and is read only and applies) |
| // apply = rHasSharedMRInput(n, vars.keySet(), partitionedMatrices.keySet(), sharedVars) |
| // && n.getTotalK() > 1; //apply only if degree of parallelism > 1 |
| // } |
| // |
| // if( apply ) |
| // pfpb.setRuntimePiggybacking(apply); |
| // |
| // _numEvaluatedPlans++; |
| // LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result=" |
| // +apply+" ("+Arrays.toString(sharedVars.toArray())+")" ); |
| // } |
| // |
| // protected boolean rHasSharedMRInput( OptNode n, Set<String> inputVars, Set<String> partitionedVars, HashSet<String> sharedVars ) |
| // { |
| // boolean ret = false; |
| // |
| // if( !n.isLeaf() ) |
| // { |
| // for( OptNode cn : n.getChilds() ) |
| // ret |= rHasSharedMRInput( cn, inputVars, partitionedVars, sharedVars ); |
| // } |
| // else if( n.getNodeType()== NodeType.HOP && n.getExecType()==ExecType.MR ) |
| // { |
| // Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| // for( Hop ch : h.getInput() ) |
| // { |
| // //note: we replaxed the contraint of non-partitioned inputs for additional |
| // //latecy hiding and scan sharing of partitions which are read multiple times |
| // |
| // if( ch instanceof DataOp && ch.getDataType() == DataType.MATRIX |
| // && inputVars.contains(ch.getName()) ) |
| // { |
| // ret = true; |
| // sharedVars.add(ch.getName()); |
| // } |
| // else if( HopRewriteUtils.isTransposeOperation(ch) |
| // && ch.getInput().get(0) instanceof DataOp && ch.getInput().get(0).getDataType() == DataType.MATRIX |
| // && inputVars.contains(ch.getInput().get(0).getName()) ) |
| // { |
| // ret = true; |
| // sharedVars.add(ch.getInput().get(0).getName()); |
| // } |
| // } |
| // } |
| // |
| // return ret; |
| // } |
| |
| |
| /////// |
| //REWRITE inject spark loop checkpointing |
| /// |
| |
| protected void rewriteInjectSparkLoopCheckpointing(OptNode n) |
| { |
| //get program blocks of root parfor |
| Object[] progobj = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID()); |
| ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0]; |
| ParForStatement fs = (ParForStatement) pfsb.getStatement(0); |
| ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1]; |
| |
| boolean applied = false; |
| |
| try |
| { |
| //apply hop rewrite inject spark checkpoints (but without context awareness) |
| RewriteInjectSparkLoopCheckpointing rewrite = new RewriteInjectSparkLoopCheckpointing(false); |
| ProgramRewriter rewriter = new ProgramRewriter(rewrite); |
| ProgramRewriteStatus state = new ProgramRewriteStatus(); |
| rewriter.rRewriteStatementBlockHopDAGs( pfsb, state ); |
| fs.setBody(rewriter.rRewriteStatementBlocks(fs.getBody(), state, true)); |
| |
| //recompile if additional checkpoints introduced |
| if( state.getInjectedCheckpoints() ) { |
| pfpb.setChildBlocks(ProgramRecompiler.generatePartitialRuntimeProgram(pfpb.getProgram(), fs.getBody())); |
| applied = true; |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| LOG.debug(getOptMode()+" OPT: rewrite 'inject spark loop checkpointing' - result="+applied ); |
| } |
| |
| /////// |
| //REWRITE inject spark repartition for zipmm |
| /// |
| |
| protected void rewriteInjectSparkRepartition(OptNode n, LocalVariableMap vars) |
| { |
| //get program blocks of root parfor |
| Object[] progobj = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID()); |
| ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0]; |
| ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1]; |
| ArrayList<String> ret = new ArrayList<>(); |
| |
| if( OptimizerUtils.isSparkExecutionMode() //spark exec mode |
| && n.getExecType() == ExecType.CP //local parfor |
| && _N > 1 ) //at least 2 iterations |
| { |
| //collect candidates from zipmm spark instructions |
| HashSet<String> cand = new HashSet<>(); |
| rCollectZipmmPartitioningCandidates(n, cand); |
| |
| //prune updated candidates |
| HashSet<String> probe = new HashSet<>(pfsb.getReadOnlyParentMatrixVars()); |
| for( String var : cand ) |
| if( probe.contains( var ) ) |
| ret.add( var ); |
| |
| //prune small candidates |
| ArrayList<String> tmp = new ArrayList<>(ret); |
| ret.clear(); |
| for( String var : tmp ) |
| if( vars.get(var) instanceof MatrixObject ) { |
| MatrixObject mo = (MatrixObject) vars.get(var); |
| double sp = OptimizerUtils.getSparsity(mo.getNumRows(), mo.getNumColumns(), mo.getNnz()); |
| double size = OptimizerUtils.estimateSizeExactSparsity(mo.getNumRows(), mo.getNumColumns(), sp); |
| if( size > OptimizerUtils.getLocalMemBudget() ) |
| ret.add(var); |
| } |
| |
| //apply rewrite to parfor pb |
| if( !ret.isEmpty() ) { |
| pfpb.setSparkRepartitionVariables(ret); |
| } |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result=" |
| +ret.size()+" ("+Arrays.toString(ret.toArray())+")" ); |
| } |
| |
| private void rCollectZipmmPartitioningCandidates( OptNode n, HashSet<String> cand ) |
| { |
| //collect zipmm inputs |
| if( n.getNodeType()==NodeType.HOP ) |
| { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| if( h instanceof AggBinaryOp && (((AggBinaryOp)h).getMMultMethod()==MMultMethod.ZIPMM |
| ||((AggBinaryOp)h).getMMultMethod()==MMultMethod.CPMM) ) |
| { |
| //found zipmm or cpmm (unknowns) which might turn into zipmm |
| //check for dataop or dataops under transpose on both sides |
| for( Hop in : h.getInput() ) { |
| if( in instanceof DataOp ) |
| cand.add( in.getName() ); |
| else if( HopRewriteUtils.isTransposeOperation(in) |
| && in.getInput().get(0) instanceof DataOp ) |
| cand.add( in.getInput().get(0).getName() ); |
| } |
| } |
| } |
| |
| //recursively process childs |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| rCollectZipmmPartitioningCandidates(c, cand); |
| } |
| |
| /////// |
| //REWRITE set spark eager rdd caching |
| /// |
| |
| protected void rewriteSetSparkEagerRDDCaching(OptNode n, LocalVariableMap vars) |
| { |
| //get program blocks of root parfor |
| Object[] progobj = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID()); |
| ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0]; |
| ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1]; |
| |
| ArrayList<String> ret = new ArrayList<>(); |
| |
| if( OptimizerUtils.isSparkExecutionMode() //spark exec mode |
| && n.getExecType() == ExecType.CP //local parfor |
| && _N > 1 ) //at least 2 iterations |
| { |
| Set<String> cand = pfsb.variablesRead().getVariableNames(); |
| Collection<String> rpVars = pfpb.getSparkRepartitionVariables(); |
| for( String var : cand) |
| { |
| Data dat = vars.get(var); |
| |
| if( dat!=null && dat instanceof MatrixObject |
| && ((MatrixObject)dat).getRDDHandle()!=null ) |
| { |
| MatrixObject mo = (MatrixObject)dat; |
| DataCharacteristics mc = mo.getDataCharacteristics(); |
| RDDObject rdd = mo.getRDDHandle(); |
| if( (rpVars==null || !rpVars.contains(var)) //not a repartition var |
| && rdd.rHasCheckpointRDDChilds() //is cached rdd |
| && _lm / n.getK() < //is out-of-core dataset |
| OptimizerUtils.estimateSizeExactSparsity(mc)) |
| { |
| ret.add(var); |
| } |
| } |
| } |
| |
| //apply rewrite to parfor pb |
| if( !ret.isEmpty() ) { |
| pfpb.setSparkEagerCacheVariables(ret); |
| } |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result=" |
| +ret.size()+" ("+Arrays.toString(ret.toArray())+")" ); |
| } |
| |
| /////// |
| //REWRITE remove compare matrix (for result merge, needs to be invoked before setting result merge) |
| /// |
| |
| protected void rewriteRemoveUnnecessaryCompareMatrix( OptNode n, ExecutionContext ec ) |
| { |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| |
| ArrayList<ResultVar> cleanedVars = new ArrayList<>(); |
| ArrayList<ResultVar> resultVars = pfpb.getResultVariables(); |
| String itervar = pfpb.getIterVar(); |
| |
| for( ResultVar rvar : resultVars ) { |
| Data dat = ec.getVariable(rvar._name); |
| if( dat instanceof MatrixObject && ((MatrixObject)dat).getNnz()!=0 //subject to result merge with compare |
| && n.hasOnlySimpleChilds() //guaranteed no conditional indexing |
| && rContainsResultFullReplace(n, rvar._name, itervar, (MatrixObject)dat) //guaranteed full matrix replace |
| && !rIsReadInRightIndexing(n, rvar._name) //never read variable in loop body |
| && ((MatrixObject)dat).getNumRows()<=Integer.MAX_VALUE |
| && ((MatrixObject)dat).getNumColumns()<=Integer.MAX_VALUE ) |
| { |
| //replace existing matrix object with empty matrix |
| MatrixObject mo = (MatrixObject)dat; |
| ec.cleanupCacheableData(mo); |
| ec.setMatrixOutput(rvar._name, new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(),false)); |
| |
| //keep track of cleaned result variables |
| cleanedVars.add(rvar); |
| } |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'remove unnecessary compare matrix' - result="+(!cleanedVars.isEmpty()) |
| +" ("+ProgramConverter.serializeResultVariables(cleanedVars)+")" ); |
| } |
| |
| protected boolean rContainsResultFullReplace( OptNode n, String resultVar, String iterVarname, MatrixObject mo ) { |
| boolean ret = false; |
| //process hop node |
| if( n.getNodeType()==NodeType.HOP ) |
| ret |= isResultFullReplace(n, resultVar, iterVarname, mo); |
| //process childs recursively |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| ret |= rContainsResultFullReplace(c, resultVar, iterVarname, mo); |
| return ret; |
| } |
| |
| protected boolean isResultFullReplace( OptNode n, String resultVar, String iterVarname, MatrixObject mo ) |
| { |
| //check left indexing operator |
| String opStr = n.getParam(ParamType.OPSTRING); |
| if( opStr==null || !opStr.equals(LeftIndexingOp.OPSTRING) ) |
| return false; |
| |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| Hop base = h.getInput().get(0); |
| |
| //check result variable |
| if( !resultVar.equals(base.getName()) ) |
| return false; |
| |
| //check access pattern, memory budget |
| Hop inpRowL = h.getInput().get(2); |
| Hop inpRowU = h.getInput().get(3); |
| Hop inpColL = h.getInput().get(4); |
| Hop inpColU = h.getInput().get(5); |
| //check for rowwise overwrite |
| if( (inpRowL.getName().equals(iterVarname) && inpRowU.getName().equals(iterVarname)) |
| && inpColL instanceof LiteralOp && HopRewriteUtils.getDoubleValueSafe((LiteralOp)inpColL)==1 |
| && inpColU instanceof LiteralOp && HopRewriteUtils.getDoubleValueSafe((LiteralOp)inpColU)==mo.getNumColumns() ) |
| { |
| return true; |
| } |
| |
| //check for colwise overwrite |
| if( (inpColL.getName().equals(iterVarname) && inpColU.getName().equals(iterVarname)) |
| && inpRowL instanceof LiteralOp && HopRewriteUtils.getDoubleValueSafe((LiteralOp)inpRowL)==1 |
| && inpRowU instanceof LiteralOp && HopRewriteUtils.getDoubleValueSafe((LiteralOp)inpRowU)==mo.getNumRows() ) |
| { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| protected boolean rIsReadInRightIndexing(OptNode n, String var) |
| { |
| //NOTE: This method checks if a given variables is used in right indexing |
| //expressions. This is sufficient for "remove unnecessary compare matrix" because |
| //we already checked for full replace, which is only valid if we dont access |
| //the entire matrix in any other operation. |
| boolean ret = false; |
| |
| if( n.getNodeType()==NodeType.HOP ) { |
| Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| if( h instanceof IndexingOp && h.getInput().get(0) instanceof DataOp |
| && h.getInput().get(0).getName().equals(var) ) |
| { |
| ret |= true; |
| } |
| } |
| |
| //process childs recursively |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| ret |= rIsReadInRightIndexing(c, var); |
| |
| return ret; |
| } |
| |
| /////// |
| //REWRITE set result merge |
| /// |
| |
| protected void rewriteSetResultMerge( OptNode n, LocalVariableMap vars, boolean inLocal ) { |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| |
| PResultMerge REMOTE = PResultMerge.REMOTE_SPARK; |
| PResultMerge ret = null; |
| |
| //investigate details of current parfor node |
| boolean flagRemoteParFOR = (n.getExecType() == getRemoteExecType()); |
| boolean flagLargeResult = hasLargeTotalResults( n, pfpb.getResultVariables(), vars, true ); |
| boolean flagRemoteLeftIndexing = hasResultMRLeftIndexing( n, pfpb.getResultVariables(), vars, true ); |
| boolean flagCellFormatWoCompare = determineFlagCellFormatWoCompare(pfpb.getResultVariables(), vars); |
| boolean flagOnlyInMemResults = hasOnlyInMemoryResults(n, pfpb.getResultVariables(), vars ); |
| |
| //optimimality decision on result merge |
| //MR, if remote exec, and w/compare (prevent huge transfer/merge costs) |
| if( flagRemoteParFOR && flagLargeResult ) |
| { |
| ret = REMOTE; |
| } |
| //CP, if all results in mem |
| else if( flagOnlyInMemResults ) |
| { |
| ret = PResultMerge.LOCAL_MEM; |
| } |
| //MR, if result partitioning and copy not possible |
| //NOTE: 'at least one' instead of 'all' condition of flagMRLeftIndexing because the |
| // benefit for large matrices outweigths potentially unnecessary MR jobs for smaller matrices) |
| else if( ( flagRemoteParFOR || flagRemoteLeftIndexing) |
| && !(flagCellFormatWoCompare && ResultMergeLocalFile.ALLOW_COPY_CELLFILES ) ) |
| { |
| ret = REMOTE; |
| } |
| //CP, otherwise (decide later if in mem or file-based) |
| else |
| { |
| ret = PResultMerge.LOCAL_AUTOMATIC; |
| } |
| |
| // modify rtprog |
| pfpb.setResultMerge(ret); |
| |
| // modify plan |
| n.addParam(ParamType.RESULT_MERGE, ret.toString()); |
| |
| //recursively apply rewrite for parfor nodes |
| if( n.getChilds() != null ) |
| rInvokeSetResultMerge(n.getChilds(), vars, inLocal && !flagRemoteParFOR); |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set result merge' - result="+ret ); |
| } |
| |
| protected boolean determineFlagCellFormatWoCompare( ArrayList<ResultVar> resultVars, LocalVariableMap vars ) |
| { |
| boolean ret = true; |
| |
| for( ResultVar rVar : resultVars ) |
| { |
| Data dat = vars.get(rVar._name); |
| if( dat == null || !(dat instanceof MatrixObject) ) { |
| ret = false; |
| break; |
| } |
| else { |
| MatrixObject mo = (MatrixObject)dat; |
| MetaDataFormat meta = (MetaDataFormat) mo.getMetaData(); |
| long nnz = meta.getDataCharacteristics().getNonZeros(); |
| if( meta.getFileFormat() == FileFormat.BINARY || nnz != 0 ) { |
| ret = false; |
| break; |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| protected boolean hasResultMRLeftIndexing( OptNode n, ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean checkSize ) |
| { |
| boolean ret = false; |
| |
| if( n.isLeaf() ) |
| { |
| String opName = n.getParam(ParamType.OPSTRING); |
| //check opstring and exec type |
| if( opName != null && opName.equals(LeftIndexingOp.OPSTRING) |
| && n.getExecType() == getRemoteExecType() ) |
| { |
| LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| //check agains set of varname |
| String varName = hop.getInput().get(0).getName(); |
| if( ResultVar.contains(resultVars, varName) ) |
| { |
| ret = true; |
| if( checkSize && vars.keySet().contains(varName) ) |
| { |
| //dims of result vars must be known at this point in time |
| MatrixObject mo = (MatrixObject) vars.get( hop.getInput().get(0).getName() ); |
| long rows = mo.getNumRows(); |
| long cols = mo.getNumColumns(); |
| //TODO rework memory handling for spark |
| ret = !isInMemoryResultMerge(rows, cols, SparkExecutionContext.getBroadcastMemoryBudget()); |
| } |
| } |
| } |
| } |
| else |
| { |
| for( OptNode c : n.getChilds() ) |
| ret |= hasResultMRLeftIndexing(c, resultVars, vars, checkSize); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Heuristically compute total result sizes, if larger than local mem budget assumed to be large. |
| * |
| * @param pn internal representation of a plan alternative for program blocks and instructions |
| * @param resultVars list of result variables |
| * @param vars local variable map |
| * @param checkSize ? |
| * @return true if result sizes larger than local memory budget |
| */ |
| protected boolean hasLargeTotalResults( OptNode pn, ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean checkSize ) |
| { |
| double totalSize = 0; |
| |
| //get num tasks according to task partitioning |
| PTaskPartitioner tp = PTaskPartitioner.valueOf(pn.getParam(ParamType.TASK_PARTITIONER)); |
| int k = pn.getK(); |
| long W = estimateNumTasks(tp, _N, k); |
| |
| for( ResultVar var : resultVars ) |
| { |
| //Potential unknowns: for local result var of child parfor (but we're only interested in top level) |
| //Potential scalars: for disabled dependency analysis and unbounded scoping |
| Data dat = vars.get( var._name ); |
| if( dat != null && dat instanceof MatrixObject ) |
| { |
| MatrixObject mo = (MatrixObject) dat; |
| |
| long rows = mo.getNumRows(); |
| long cols = mo.getNumColumns(); |
| long nnz = mo.getNnz(); |
| |
| if( nnz > 0 ) //w/ compare |
| { |
| totalSize += W * OptimizerUtils.estimateSizeExactSparsity(rows, cols, 1.0); |
| } |
| else //in total at most as dimensions (due to disjoint results) |
| { |
| totalSize += OptimizerUtils.estimateSizeExactSparsity(rows, cols, 1.0); |
| } |
| } |
| } |
| |
| return ( totalSize >= _lm ); //heuristic: large if >= local mem budget |
| } |
| |
| protected long estimateNumTasks( PTaskPartitioner tp, long N, int k ) |
| { |
| long W = -1; |
| |
| switch( tp ) |
| { |
| case NAIVE: |
| case FIXED: W = N; break; |
| case STATIC: W = N / k; break; |
| case FACTORING: |
| case FACTORING_CMIN: |
| case FACTORING_CMAX: W = k * (long)(Math.log(((double)N)/k)/Math.log(2.0)); break; |
| default: W = N; break; //N as worst case estimate |
| } |
| |
| return W; |
| } |
| |
| protected boolean hasOnlyInMemoryResults( OptNode n, ArrayList<ResultVar> resultVars, LocalVariableMap vars ) |
| { |
| boolean ret = true; |
| |
| if( n.isLeaf() ) |
| { |
| String opName = n.getParam(ParamType.OPSTRING); |
| //check opstring and exec type |
| if( opName.equals(LeftIndexingOp.OPSTRING) ) |
| { |
| LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| //check agains set of varname |
| String varName = hop.getInput().get(0).getName(); |
| if( ResultVar.contains(resultVars, varName) && vars.keySet().contains(varName) ) { |
| Data dat = vars.get(hop.getInput().get(0).getName()); |
| //dims of result vars must be known at this point in time |
| if( dat instanceof MatrixObject ) { |
| MatrixObject mo = (MatrixObject) dat; |
| long rows = mo.getNumRows(); |
| long cols = mo.getNumColumns(); |
| double memBudget = OptimizerUtils.getLocalMemBudget(); |
| ret &= isInMemoryResultMerge(rows, cols, memBudget); |
| } |
| } |
| } |
| } |
| else { |
| for( OptNode c : n.getChilds() ) |
| ret &= hasOnlyInMemoryResults(c, resultVars, vars); |
| } |
| |
| return ret; |
| } |
| |
| protected void rInvokeSetResultMerge( Collection<OptNode> nodes, LocalVariableMap vars, boolean inLocal) { |
| for( OptNode n : nodes ) |
| if( n.getNodeType() == NodeType.PARFOR ) |
| { |
| rewriteSetResultMerge(n, vars, inLocal); |
| if( n.getExecType()==getRemoteExecType() ) |
| inLocal = false; |
| } |
| else if( n.getChilds()!=null ) |
| rInvokeSetResultMerge(n.getChilds(), vars, inLocal); |
| } |
| |
| public static boolean isInMemoryResultMerge( long rows, long cols, double memBudget ) |
| { |
| if( !ParForProgramBlock.USE_PARALLEL_RESULT_MERGE ) |
| { |
| //1/4 mem budget because: 2xout (incl sparse-dense change), 1xin, 1xcompare |
| return ( rows>=0 && cols>=0 && MatrixBlock.estimateSizeInMemory(rows, cols, 1.0) < memBudget/4 ); |
| } |
| else |
| return ( rows>=0 && cols>=0 && rows*cols < Math.pow(Hop.CPThreshold, 2) ); |
| } |
| |
| |
| /////// |
| //REWRITE set recompile memory budget |
| /// |
| |
| protected void rewriteSetRecompileMemoryBudget( OptNode n ) |
| { |
| double newLocalMem = _lm; |
| |
| //check et because recompilation only happens at the master node |
| if( n.getExecType() == ExecType.CP ) |
| { |
| //compute local recompile memory budget |
| int par = n.getTotalK(); |
| newLocalMem = _lm / par; |
| |
| //modify runtime plan |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| pfpb.setRecompileMemoryBudget( newLocalMem ); |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'set recompile memory budget' - result="+toMB(newLocalMem) ); |
| } |
| |
| |
| /////// |
| //REWRITE remove recursive parfor |
| /// |
| |
| protected void rewriteRemoveRecursiveParFor(OptNode n, LocalVariableMap vars) |
| { |
| int count = 0; //num removed parfor |
| |
| //find recursive parfor |
| HashSet<ParForProgramBlock> recPBs = new HashSet<>(); |
| rFindRecursiveParFor( n, recPBs, false ); |
| |
| if( !recPBs.isEmpty() ) |
| { |
| //unfold if necessary |
| try |
| { |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| if( recPBs.contains(pfpb) ) |
| rFindAndUnfoldRecursiveFunction(n, pfpb, recPBs, vars); |
| } |
| catch(Exception ex) |
| { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //remove recursive parfor (parfor to for) |
| count = removeRecursiveParFor(n, recPBs); |
| } |
| |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'remove recursive parfor' - result="+recPBs.size()+"/"+count ); |
| } |
| |
| protected void rFindRecursiveParFor( OptNode n, HashSet<ParForProgramBlock> cand, boolean recContext ) |
| { |
| //recursive invocation |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| { |
| if( c.getNodeType() == NodeType.FUNCCALL && c.isRecursive() ) |
| rFindRecursiveParFor(c, cand, true); |
| else |
| rFindRecursiveParFor(c, cand, recContext); |
| } |
| |
| //add candidate program blocks |
| if( recContext && n.getNodeType()==NodeType.PARFOR ) |
| { |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(n.getID())[1]; |
| cand.add(pfpb); |
| } |
| } |
| |
| protected void rFindAndUnfoldRecursiveFunction( OptNode n, ParForProgramBlock parfor, HashSet<ParForProgramBlock> recPBs, LocalVariableMap vars ) |
| { |
| //unfold if found |
| if( n.getNodeType() == NodeType.FUNCCALL && n.isRecursive()) |
| { |
| boolean exists = rContainsNode(n, parfor); |
| if( exists ) |
| { |
| String fnameKey = n.getParam(ParamType.OPSTRING); |
| String[] names = fnameKey.split(Program.KEY_DELIM); |
| String fnamespace = names[0]; |
| String fname = names[1]; |
| String fnameNew = FUNCTION_UNFOLD_NAMEPREFIX + fname; |
| |
| //unfold function |
| FunctionOp fop = (FunctionOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| Program prog = parfor.getProgram(); |
| DMLProgram dmlprog = parfor.getStatementBlock().getDMLProg(); |
| FunctionProgramBlock fpb = prog.getFunctionProgramBlock(fnamespace, fname); |
| FunctionProgramBlock copyfpb = ProgramConverter.createDeepCopyFunctionProgramBlock(fpb, new HashSet<String>(), new HashSet<String>()); |
| prog.addFunctionProgramBlock(fnamespace, fnameNew, copyfpb); |
| dmlprog.addFunctionStatementBlock(fnamespace, fnameNew, (FunctionStatementBlock)copyfpb.getStatementBlock()); |
| |
| //replace function names in old subtree (link to new function) |
| rReplaceFunctionNames(n, fname, fnameNew); |
| |
| //recreate sub opttree |
| String fnameNewKey = fnamespace + Program.KEY_DELIM + fnameNew; |
| OptNode nNew = new OptNode(NodeType.FUNCCALL); |
| OptTreeConverter.getAbstractPlanMapping().putHopMapping(fop, nNew); |
| nNew.setExecType(ExecType.CP); |
| nNew.addParam(ParamType.OPSTRING, fnameNewKey); |
| long parentID = OptTreeConverter.getAbstractPlanMapping().getMappedParentID(n.getID()); |
| OptTreeConverter.getAbstractPlanMapping().getOptNode(parentID).exchangeChild(n, nNew); |
| HashSet<String> memo = new HashSet<>(); |
| memo.add(fnameKey); //required if functionop not shared (because not replaced yet) |
| memo.add(fnameNewKey); //requied if functionop shared (indirectly replaced) |
| for( int i=0; i<copyfpb.getChildBlocks().size() /*&& i<len*/; i++ ) |
| { |
| ProgramBlock lpb = copyfpb.getChildBlocks().get(i); |
| StatementBlock lsb = lpb.getStatementBlock(); |
| nNew.addChild( OptTreeConverter.rCreateAbstractOptNode(lsb,lpb,vars,false, memo) ); |
| } |
| |
| //compute delta for recPB set (use for removing parfor) |
| recPBs.removeAll( rGetAllParForPBs(n, new HashSet<ParForProgramBlock>()) ); |
| recPBs.addAll( rGetAllParForPBs(nNew, new HashSet<ParForProgramBlock>()) ); |
| |
| //replace function names in new subtree (recursive link to new function) |
| rReplaceFunctionNames(nNew, fname, fnameNew); |
| |
| } |
| //else, we can return anyway because we will not find that parfor |
| |
| return; |
| } |
| |
| //recursive invocation (only for non-recursive functions) |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| rFindAndUnfoldRecursiveFunction(c, parfor, recPBs, vars); |
| } |
| |
| protected boolean rContainsNode( OptNode n, ParForProgramBlock parfor ) |
| { |
| boolean ret = false; |
| |
| if( n.getNodeType() == NodeType.PARFOR ) { |
| ProgramBlock pfpb = OptTreeConverter |
| .getAbstractPlanMapping().getMappedProgramBlock(n.getID()); |
| ret = (parfor == pfpb); |
| } |
| |
| if( !ret && !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) { |
| ret |= rContainsNode(c, parfor); |
| if( ret ) break; //early abort |
| } |
| |
| return ret; |
| } |
| |
| protected HashSet<ParForProgramBlock> rGetAllParForPBs( OptNode n, HashSet<ParForProgramBlock> pbs ) |
| { |
| //collect parfor |
| if( n.getNodeType()==NodeType.PARFOR ) |
| { |
| ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProgramBlock(n.getID()); |
| pbs.add(pfpb); |
| } |
| |
| //recursive invocation |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| rGetAllParForPBs(c, pbs); |
| |
| return pbs; |
| } |
| |
| protected void rReplaceFunctionNames( OptNode n, String oldName, String newName ) |
| { |
| if( n.getNodeType() == NodeType.FUNCCALL) |
| { |
| FunctionOp fop = (FunctionOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); |
| |
| String[] names = n.getParam(ParamType.OPSTRING).split(Program.KEY_DELIM); |
| String fnamespace = names[0]; |
| String fname = names[1]; |
| |
| if( fname.equals(oldName) || fname.equals(newName) ) //newName if shared hop |
| { |
| //set opttree function name |
| n.addParam(ParamType.OPSTRING, DMLProgram.constructFunctionKey(fnamespace,newName)); |
| |
| //set instruction function name |
| long parentID = OptTreeConverter.getAbstractPlanMapping().getMappedParentID(n.getID()); |
| BasicProgramBlock pb = (BasicProgramBlock) OptTreeConverter |
| .getAbstractPlanMapping().getMappedProg(parentID)[1]; |
| |
| ArrayList<Instruction> instArr = pb.getInstructions(); |
| for( int i=0; i<instArr.size(); i++ ) { |
| Instruction inst = instArr.get(i); |
| if( inst instanceof FunctionCallCPInstruction ) { |
| FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst; |
| if( oldName.equals(fci.getFunctionName()) ) |
| instArr.set(i, FunctionCallCPInstruction.parseInstruction(fci.toString().replaceAll(oldName, newName))); |
| } |
| } |
| |
| //set hop name (for recompile) |
| if( fop.getFunctionName().equals(oldName) ) |
| fop.setFunctionName(newName); |
| } |
| } |
| |
| //recursive invocation |
| if( !n.isLeaf() ) |
| for( OptNode c : n.getChilds() ) |
| rReplaceFunctionNames(c, oldName, newName); |
| } |
| |
| protected int removeRecursiveParFor( OptNode n, HashSet<ParForProgramBlock> recPBs ) |
| { |
| int count = 0; |
| |
| if( !n.isLeaf() ) |
| { |
| for( OptNode sub : n.getChilds() ) |
| { |
| if( sub.getNodeType() == NodeType.PARFOR ) |
| { |
| long id = sub.getID(); |
| Object[] progobj = OptTreeConverter.getAbstractPlanMapping().getMappedProg(id); |
| ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0]; |
| ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1]; |
| |
| if( recPBs.contains(pfpb) ) |
| { |
| //create for pb as replacement |
| Program prog = pfpb.getProgram(); |
| ForProgramBlock fpb = ProgramConverter.createShallowCopyForProgramBlock(pfpb, prog); |
| |
| //replace parfor with for, and update objectmapping |
| OptTreeConverter.replaceProgramBlock(n, sub, pfpb, fpb, false); |
| //update link to statement block |
| fpb.setStatementBlock(pfsb); |
| |
| //update node |
| sub.setNodeType(NodeType.FOR); |
| sub.setK(1); |
| |
| count++; |
| } |
| } |
| |
| count += removeRecursiveParFor(sub, recPBs); |
| } |
| } |
| |
| return count; |
| } |
| |
| |
| /////// |
| //REWRITE remove unnecessary parfor |
| /// |
| |
| protected void rewriteRemoveUnnecessaryParFor(OptNode n) { |
| int count = removeUnnecessaryParFor( n ); |
| _numEvaluatedPlans++; |
| LOG.debug(getOptMode()+" OPT: rewrite 'remove unnecessary parfor' - result="+count ); |
| } |
| |
| protected int removeUnnecessaryParFor( OptNode n ) { |
| int count = 0; |
| |
| if( !n.isLeaf() ) |
| { |
| for( OptNode sub : n.getChilds() ) |
| { |
| if( sub.getNodeType() == NodeType.PARFOR && sub.getK() == 1 ) |
| { |
| long id = sub.getID(); |
| Object[] progobj = OptTreeConverter.getAbstractPlanMapping().getMappedProg(id); |
| ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0]; |
| ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1]; |
| |
| //create for pb as replacement |
| Program prog = pfpb.getProgram(); |
| ForProgramBlock fpb = ProgramConverter.createShallowCopyForProgramBlock(pfpb, prog); |
| |
| //replace parfor with for, and update objectmapping |
| OptTreeConverter.replaceProgramBlock(n, sub, pfpb, fpb, false); |
| //update link to statement block |
| fpb.setStatementBlock(pfsb); |
| |
| //update node |
| sub.setNodeType(NodeType.FOR); |
| sub.setK(1); |
| |
| count++; |
| } |
| |
| count += removeUnnecessaryParFor(sub); |
| } |
| } |
| |
| return count; |
| } |
| |
| //////////////////////// |
| // Helper methods // |
| //////////////////////// |
| |
| public static String toMB( double inB ) { |
| return OptimizerUtils.toMB(inB) + "MB"; |
| } |
| } |