| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysml.hops.recompile; |
| |
| import java.io.BufferedReader; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.wink.json4j.JSONObject; |
| import org.apache.sysml.api.DMLScript; |
| import org.apache.sysml.conf.ConfigurationManager; |
| import org.apache.sysml.conf.CompilerConfig.ConfigType; |
| import org.apache.sysml.hops.DataGenOp; |
| import org.apache.sysml.hops.DataOp; |
| import org.apache.sysml.hops.FunctionOp; |
| import org.apache.sysml.hops.FunctionOp.FunctionType; |
| import org.apache.sysml.hops.Hop; |
| import org.apache.sysml.hops.Hop.DataGenMethod; |
| import org.apache.sysml.hops.Hop.DataOpTypes; |
| import org.apache.sysml.hops.Hop.FileFormatTypes; |
| import org.apache.sysml.hops.Hop.OpOp1; |
| import org.apache.sysml.hops.Hop.VisitStatus; |
| import org.apache.sysml.hops.HopsException; |
| import org.apache.sysml.hops.IndexingOp; |
| import org.apache.sysml.hops.LiteralOp; |
| import org.apache.sysml.hops.MemoTable; |
| import org.apache.sysml.hops.OptimizerUtils; |
| import org.apache.sysml.hops.ReorgOp; |
| import org.apache.sysml.hops.UnaryOp; |
| import org.apache.sysml.hops.rewrite.HopRewriteUtils; |
| import org.apache.sysml.hops.rewrite.ProgramRewriter; |
| import org.apache.sysml.lops.CSVReBlock; |
| import org.apache.sysml.lops.DataGen; |
| import org.apache.sysml.lops.Lop; |
| import org.apache.sysml.lops.LopProperties.ExecType; |
| import org.apache.sysml.lops.LopsException; |
| import org.apache.sysml.lops.ReBlock; |
| import org.apache.sysml.lops.compile.Dag; |
| import org.apache.sysml.parser.DMLProgram; |
| import org.apache.sysml.parser.DataExpression; |
| import org.apache.sysml.parser.Expression.DataType; |
| import org.apache.sysml.parser.Expression.ValueType; |
| import org.apache.sysml.parser.ForStatementBlock; |
| import org.apache.sysml.parser.IfStatementBlock; |
| import org.apache.sysml.parser.Statement; |
| import org.apache.sysml.parser.StatementBlock; |
| import org.apache.sysml.parser.WhileStatementBlock; |
| import org.apache.sysml.runtime.DMLRuntimeException; |
| import org.apache.sysml.runtime.controlprogram.ForProgramBlock; |
| import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock; |
| import org.apache.sysml.runtime.controlprogram.IfProgramBlock; |
| import org.apache.sysml.runtime.controlprogram.LocalVariableMap; |
| import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; |
| import org.apache.sysml.runtime.controlprogram.ProgramBlock; |
| import org.apache.sysml.runtime.controlprogram.WhileProgramBlock; |
| import org.apache.sysml.runtime.controlprogram.caching.CacheableData; |
| import org.apache.sysml.runtime.controlprogram.caching.FrameObject; |
| import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; |
| import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; |
| import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; |
| import org.apache.sysml.runtime.controlprogram.parfor.opt.OptTreeConverter; |
| import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; |
| import org.apache.sysml.runtime.instructions.Instruction; |
| import org.apache.sysml.runtime.instructions.InstructionUtils; |
| import org.apache.sysml.runtime.instructions.MRJobInstruction; |
| import org.apache.sysml.runtime.instructions.cp.Data; |
| import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction; |
| import org.apache.sysml.runtime.instructions.cp.IntObject; |
| import org.apache.sysml.runtime.instructions.cp.ScalarObject; |
| import org.apache.sysml.runtime.instructions.mr.RandInstruction; |
| import org.apache.sysml.runtime.instructions.mr.SeqInstruction; |
| import org.apache.sysml.runtime.matrix.MatrixCharacteristics; |
| import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; |
| import org.apache.sysml.runtime.matrix.data.FrameBlock; |
| import org.apache.sysml.runtime.matrix.data.InputInfo; |
| import org.apache.sysml.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysml.runtime.util.MapReduceTool; |
| import org.apache.sysml.utils.Explain; |
| import org.apache.sysml.utils.Explain.ExplainType; |
| import org.apache.sysml.utils.JSONHelper; |
| |
| /** |
| * Dynamic recompilation of hop dags to runtime instructions, which includes the |
| * following substeps: |
| * |
| * (1) deep copy hop dag, (2) refresh matrix characteristics, (3) apply |
| * dynamic rewrites, (4) refresh memory estimates, (5) construct lops (incl |
| * operator selection), and (6) generate runtime program (incl piggybacking). |
| * |
| * |
| */ |
| public class Recompiler |
| { |
| |
| private static final Log LOG = LogFactory.getLog(Recompiler.class.getName()); |
| |
| //Max threshold for in-memory reblock of text input [in bytes] |
| //reason: single-threaded text read at 20MB/s, 1GB input -> 50s (should exploit parallelism) |
| //note that we scale this threshold up by the degree of available parallelism |
| private static final long CP_REBLOCK_THRESHOLD_SIZE = (long)1024*1024*1024; |
| private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = (long)256*1024*1024; |
| private static final long CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE = (long)1024*1024*1024; |
| |
| /** Local reused rewriter for dynamic rewrites during recompile */ |
| |
| /** Local DML configuration for thread-local config updates */ |
| private static ThreadLocal<ProgramRewriter> _rewriter = new ThreadLocal<ProgramRewriter>() { |
| @Override protected ProgramRewriter initialValue() { return new ProgramRewriter(false, true); } |
| }; |
| |
| /** |
| * Re-initializes the recompiler according to the current optimizer flags. |
| */ |
| public static void reinitRecompiler() { |
| _rewriter.set(new ProgramRewriter(false, true)); |
| } |
| |
| /** |
| * A) Recompile basic program block hop DAG. |
| * |
| * We support to basic types inplace or via deep copy. Deep copy is the default and is required |
| * in order to apply non-reversible rewrites. In-place is required in order to modify the existing |
| * hops (e.g., for parfor pre-recompilation). |
| * |
| * @param hops |
| * @param vars |
| * @return |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| public static ArrayList<Instruction> recompileHopsDag( StatementBlock sb, ArrayList<Hop> hops, LocalVariableMap vars, RecompileStatus status, boolean inplace, long tid ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| ArrayList<Instruction> newInst = null; |
| |
| //need for synchronization as we do temp changes in shared hops/lops |
| //however, we create deep copies for most dags to allow for concurrent recompile |
| synchronized( hops ) |
| { |
| LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " + |
| OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB"); |
| |
| // prepare hops dag for recompile |
| if( !inplace ){ |
| // deep copy hop dag (for non-reversable rewrites) |
| hops = deepCopyHopsDag(hops); |
| } |
| else { |
| // clear existing lops |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rClearLops( hopRoot ); |
| } |
| |
| // replace scalar reads with literals |
| if( !inplace ) { |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rReplaceLiterals( hopRoot, vars ); |
| } |
| |
| // refresh matrix characteristics (update stats) |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rUpdateStatistics( hopRoot, vars ); |
| |
| // dynamic hop rewrites |
| if( !inplace ) |
| _rewriter.get().rewriteHopDAGs( hops, null ); |
| |
| // refresh memory estimates (based on updated stats, |
| // before: init memo table with propagated worst-case estimates, |
| // after: extract worst-case estimates from memo table |
| Hop.resetVisitStatus(hops); |
| MemoTable memo = new MemoTable(); |
| memo.init(hops, status); |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| hopRoot.refreshMemEstimates(memo); |
| memo.extract(hops, status); |
| |
| // construct lops |
| Dag<Lop> dag = new Dag<Lop>(); |
| for( Hop hopRoot : hops ){ |
| Lop lops = hopRoot.constructLops(); |
| lops.addToDag(dag); |
| } |
| |
| // generate runtime instructions (incl piggybacking) |
| newInst = dag.getJobs(sb, ConfigurationManager.getDMLConfig()); |
| } |
| |
| // replace thread ids in new instructions |
| if( tid != 0 ) //only in parfor context |
| newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false); |
| |
| // explain recompiled hops / instructions |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS ){ |
| LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" + |
| Explain.explainHops(hops, 1)); |
| } |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME ){ |
| LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" + |
| Explain.explain(newInst, 1)); |
| } |
| |
| return newInst; |
| } |
| |
| /** |
| * B) Recompile predicate hop DAG (single root): |
| * |
| * Note: This overloaded method is required for predicate instructions because |
| * they have only a single hops DAG and we need to synchronize on the original |
| * (shared) hops object. Hence, we cannot create any wrapper arraylist for each |
| * recompilation - this would result in race conditions for concurrent recompilation |
| * in a parfor body. |
| * |
| * Note: no statementblock passed because for predicate dags we dont have separate live variable analysis information. |
| * |
| * @param hops |
| * @param vars |
| * @return |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| public static ArrayList<Instruction> recompileHopsDag( Hop hops, LocalVariableMap vars, RecompileStatus status, boolean inplace, long tid ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| ArrayList<Instruction> newInst = null; |
| |
| //need for synchronization as we do temp changes in shared hops/lops |
| synchronized( hops ) |
| { |
| LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " + |
| OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB"); |
| |
| // prepare hops dag for recompile |
| if( !inplace ) { |
| // deep copy hop dag (for non-reversable rewrites) |
| //(this also clears existing lops in the created dag) |
| hops = deepCopyHopsDag(hops); |
| } |
| else { |
| // clear existing lops |
| hops.resetVisitStatus(); |
| rClearLops( hops ); |
| } |
| |
| // replace scalar reads with literals |
| if( !inplace ) { |
| hops.resetVisitStatus(); |
| rReplaceLiterals( hops, vars ); |
| } |
| |
| // refresh matrix characteristics (update stats) |
| hops.resetVisitStatus(); |
| rUpdateStatistics( hops, vars ); |
| |
| // dynamic hop rewrites |
| if( !inplace ) |
| _rewriter.get().rewriteHopDAG( hops, null ); |
| |
| // refresh memory estimates (based on updated stats) |
| MemoTable memo = new MemoTable(); |
| hops.resetVisitStatus(); |
| memo.init(hops, status); |
| hops.resetVisitStatus(); |
| hops.refreshMemEstimates(memo); |
| |
| // construct lops |
| Dag<Lop> dag = new Dag<Lop>(); |
| Lop lops = hops.constructLops(); |
| lops.addToDag(dag); |
| |
| // generate runtime instructions (incl piggybacking) |
| newInst = dag.getJobs(null, ConfigurationManager.getDMLConfig()); |
| } |
| |
| // replace thread ids in new instructions |
| if( tid != 0 ) //only in parfor context |
| newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false); |
| |
| // explain recompiled instructions |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS ) |
| LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(hops,1)); |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME ) |
| LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(newInst,1)); |
| |
| return newInst; |
| } |
| |
| /** |
| * C) Recompile basic program block hop DAG, but forced to CP. |
| * |
| * This happens always 'inplace', without statistics updates, and |
| * without dynamic rewrites. |
| * |
| * @param hops |
| * @param tid |
| * @return |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| public static ArrayList<Instruction> recompileHopsDag2Forced( StatementBlock sb, ArrayList<Hop> hops, long tid, ExecType et ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| ArrayList<Instruction> newInst = null; |
| |
| //need for synchronization as we do temp changes in shared hops/lops |
| //however, we create deep copies for most dags to allow for concurrent recompile |
| synchronized( hops ) |
| { |
| LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " + |
| OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB"); |
| |
| // clear existing lops |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rClearLops( hopRoot ); |
| |
| // update exec type |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rSetExecType( hopRoot, et ); |
| Hop.resetVisitStatus(hops); |
| |
| // construct lops |
| Dag<Lop> dag = new Dag<Lop>(); |
| for( Hop hopRoot : hops ){ |
| Lop lops = hopRoot.constructLops(); |
| lops.addToDag(dag); |
| } |
| |
| // generate runtime instructions (incl piggybacking) |
| newInst = dag.getJobs(sb, ConfigurationManager.getDMLConfig()); |
| } |
| |
| // replace thread ids in new instructions |
| if( tid != 0 ) //only in parfor context |
| newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false); |
| |
| return newInst; |
| } |
| |
| /** |
| * D) Recompile predicate hop DAG (single root), but forced to CP. |
| * |
| * This happens always 'inplace', without statistics updates, and |
| * without dynamic rewrites. |
| * |
| * @param hops |
| * @param tid |
| * @param et |
| * @return |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| public static ArrayList<Instruction> recompileHopsDag2Forced( Hop hops, long tid, ExecType et ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| ArrayList<Instruction> newInst = null; |
| |
| //need for synchronization as we do temp changes in shared hops/lops |
| synchronized( hops ) |
| { |
| LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " + |
| OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB"); |
| |
| // clear existing lops |
| hops.resetVisitStatus(); |
| rClearLops( hops ); |
| |
| // update exec type |
| hops.resetVisitStatus(); |
| rSetExecType( hops, et ); |
| hops.resetVisitStatus(); |
| |
| // construct lops |
| Dag<Lop> dag = new Dag<Lop>(); |
| Lop lops = hops.constructLops(); |
| lops.addToDag(dag); |
| |
| // generate runtime instructions (incl piggybacking) |
| newInst = dag.getJobs(null, ConfigurationManager.getDMLConfig()); |
| } |
| |
| // replace thread ids in new instructions |
| if( tid != 0 ) //only in parfor context |
| newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false); |
| |
| return newInst; |
| } |
| |
| /** |
| * |
| * @param sb |
| * @param hops |
| * @return |
| * @throws HopsException |
| * @throws LopsException |
| * @throws DMLRuntimeException |
| * @throws IOException |
| */ |
| public static ArrayList<Instruction> recompileHopsDagInstructions( StatementBlock sb, ArrayList<Hop> hops ) |
| throws HopsException, LopsException, DMLRuntimeException, IOException |
| { |
| ArrayList<Instruction> newInst = null; |
| |
| //need for synchronization as we do temp changes in shared hops/lops |
| //however, we create deep copies for most dags to allow for concurrent recompile |
| synchronized( hops ) |
| { |
| LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " + |
| OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB"); |
| |
| // clear existing lops |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rClearLops( hopRoot ); |
| |
| // construct lops |
| Dag<Lop> dag = new Dag<Lop>(); |
| for( Hop hopRoot : hops ){ |
| Lop lops = hopRoot.constructLops(); |
| lops.addToDag(dag); |
| } |
| |
| // generate runtime instructions (incl piggybacking) |
| newInst = dag.getJobs(sb, ConfigurationManager.getDMLConfig()); |
| } |
| |
| // explain recompiled hops / instructions |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS ){ |
| LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" + |
| Explain.explainHops(hops, 1)); |
| } |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME ){ |
| LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" + |
| Explain.explain(newInst, 1)); |
| } |
| |
| return newInst; |
| } |
| |
| /** |
| * |
| * @param hops |
| * @return |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| public static ArrayList<Instruction> recompileHopsDagInstructions( Hop hops ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| ArrayList<Instruction> newInst = null; |
| |
| //need for synchronization as we do temp changes in shared hops/lops |
| synchronized( hops ) |
| { |
| LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " + |
| OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB"); |
| |
| // clear existing lops |
| hops.resetVisitStatus(); |
| rClearLops( hops ); |
| |
| // construct lops |
| Dag<Lop> dag = new Dag<Lop>(); |
| Lop lops = hops.constructLops(); |
| lops.addToDag(dag); |
| |
| // generate runtime instructions (incl piggybacking) |
| newInst = dag.getJobs(null, ConfigurationManager.getDMLConfig()); |
| } |
| |
| // explain recompiled instructions |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS ) |
| LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(hops,1)); |
| if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME ) |
| LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(newInst,1)); |
| |
| return newInst; |
| } |
| |
| |
| /** |
| * |
| * @param pbs |
| * @param vars |
| * @param tid |
| * @throws DMLRuntimeException |
| */ |
| public static void recompileProgramBlockHierarchy( ArrayList<ProgramBlock> pbs, LocalVariableMap vars, long tid, boolean resetRecompile ) |
| throws DMLRuntimeException |
| { |
| try |
| { |
| RecompileStatus status = new RecompileStatus(); |
| |
| synchronized( pbs ) |
| { |
| for( ProgramBlock pb : pbs ) |
| rRecompileProgramBlock(pb, vars, status, tid, resetRecompile); |
| } |
| } |
| catch(Exception ex) |
| { |
| throw new DMLRuntimeException("Unable to recompile program block hierarchy.", ex); |
| } |
| } |
| |
| /** |
| * Method to recompile program block hierarchy to forced execution time. This affects also |
| * referenced functions and chains of functions. Use et==null in order to release the forced |
| * exec type. |
| * |
| * @param pbs |
| * @param tid |
| * @throws DMLRuntimeException |
| */ |
| public static void recompileProgramBlockHierarchy2Forced( ArrayList<ProgramBlock> pbs, long tid, HashSet<String> fnStack, ExecType et ) |
| throws DMLRuntimeException |
| { |
| try |
| { |
| synchronized( pbs ) |
| { |
| for( ProgramBlock pb : pbs ) |
| rRecompileProgramBlock2Forced(pb, tid, fnStack, et); |
| } |
| } |
| catch(Exception ex) |
| { |
| throw new DMLRuntimeException("Unable to recompile program block hierarchy to CP.", ex); |
| } |
| } |
| |
| /** |
| * This method does NO full program block recompile (no stats update, no rewrites, no recursion) but |
| * only regenerates lops and instructions. The primary use case is recompilation after are hop configuration |
| * changes which allows to preserve statistics (e.g., propagated worst case stats from other program blocks) |
| * and better performance for recompiling individual program blocks. |
| * |
| * @param pb |
| * @throws IOException |
| * @throws DMLRuntimeException |
| * @throws LopsException |
| * @throws HopsException |
| */ |
| public static void recompileProgramBlockInstructions(ProgramBlock pb) |
| throws HopsException, LopsException, DMLRuntimeException, IOException |
| { |
| if( pb instanceof WhileProgramBlock ) |
| { |
| //recompile while predicate instructions |
| WhileProgramBlock wpb = (WhileProgramBlock)pb; |
| WhileStatementBlock wsb = (WhileStatementBlock) pb.getStatementBlock(); |
| if( wsb!=null && wsb.getPredicateHops()!=null ) |
| wpb.setPredicate(recompileHopsDagInstructions(wsb.getPredicateHops())); |
| } |
| else if( pb instanceof IfProgramBlock ) |
| { |
| //recompile if predicate instructions |
| IfProgramBlock ipb = (IfProgramBlock)pb; |
| IfStatementBlock isb = (IfStatementBlock) pb.getStatementBlock(); |
| if( isb!=null && isb.getPredicateHops()!=null ) |
| ipb.setPredicate(recompileHopsDagInstructions(isb.getPredicateHops())); |
| } |
| else if( pb instanceof ForProgramBlock ) |
| { |
| //recompile for/parfor predicate instructions |
| ForProgramBlock fpb = (ForProgramBlock)pb; |
| ForStatementBlock fsb = (ForStatementBlock) pb.getStatementBlock(); |
| if( fsb!=null && fsb.getFromHops()!=null ) |
| fpb.setFromInstructions(recompileHopsDagInstructions(fsb.getFromHops())); |
| if( fsb!=null && fsb.getToHops()!=null ) |
| fpb.setToInstructions(recompileHopsDagInstructions(fsb.getToHops())); |
| if( fsb!=null && fsb.getIncrementHops()!=null ) |
| fpb.setIncrementInstructions(recompileHopsDagInstructions(fsb.getIncrementHops())); |
| } |
| else |
| { |
| //recompile last-level program block instructions |
| StatementBlock sb = pb.getStatementBlock(); |
| if( sb!=null && sb.get_hops()!=null ) { |
| pb.setInstructions(recompileHopsDagInstructions(sb, sb.get_hops())); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param hops |
| * @return |
| */ |
| public static boolean requiresRecompilation( ArrayList<Hop> hops ) |
| { |
| boolean ret = false; |
| |
| if( hops != null ) |
| { |
| synchronized( hops ) |
| { |
| Hop.resetVisitStatus(hops); |
| for( Hop hop : hops ) |
| { |
| ret |= rRequiresRecompile(hop); |
| if( ret ) break; // early abort |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * |
| * @param hops |
| * @return |
| */ |
| public static boolean requiresRecompilation( Hop hop ) |
| { |
| boolean ret = false; |
| |
| if( hop != null ) |
| { |
| synchronized( hop ) |
| { |
| hop.resetVisitStatus(); |
| ret = rRequiresRecompile(hop); |
| } |
| } |
| |
| return ret; |
| } |
| |
| |
| /** |
| * Deep copy of hops dags for parallel recompilation. |
| * |
| * @param hops |
| * @return |
| * @throws CloneNotSupportedException |
| */ |
| public static ArrayList<Hop> deepCopyHopsDag( ArrayList<Hop> hops ) |
| throws HopsException |
| { |
| ArrayList<Hop> ret = new ArrayList<Hop>(); |
| |
| try { |
| //note: need memo table over all independent DAGs in order to |
| //account for shared transient reads (otherwise more instructions generated) |
| HashMap<Long, Hop> memo = new HashMap<Long, Hop>(); //orig ID, new clone |
| for( Hop hopRoot : hops ) |
| ret.add(rDeepCopyHopsDag(hopRoot, memo)); |
| } |
| catch(Exception ex) |
| { |
| throw new HopsException(ex); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Deep copy of hops dags for parallel recompilation. |
| * |
| * @param hops |
| * @return |
| * @throws CloneNotSupportedException |
| */ |
| public static Hop deepCopyHopsDag( Hop hops ) |
| throws HopsException |
| { |
| Hop ret = null; |
| |
| try { |
| HashMap<Long, Hop> memo = new HashMap<Long, Hop>(); //orig ID, new clone |
| ret = rDeepCopyHopsDag(hops, memo); |
| } |
| catch(Exception ex) |
| { |
| throw new HopsException(ex); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * |
| * @param hops |
| * @param memo |
| * @return |
| * @throws CloneNotSupportedException |
| */ |
| private static Hop rDeepCopyHopsDag( Hop hops, HashMap<Long,Hop> memo ) |
| throws CloneNotSupportedException |
| { |
| Hop ret = memo.get(hops.getHopID()); |
| |
| //create clone if required |
| if( ret == null ) |
| { |
| ret = (Hop) hops.clone(); |
| ArrayList<Hop> tmp = new ArrayList<Hop>(); |
| |
| //create new childs |
| for( Hop in : hops.getInput() ) |
| { |
| Hop newIn = rDeepCopyHopsDag(in, memo); |
| tmp.add(newIn); |
| } |
| //modify references of childs |
| for( Hop in : tmp ) |
| { |
| ret.getInput().add(in); |
| in.getParent().add(ret); |
| } |
| |
| memo.put(hops.getHopID(), ret); |
| } |
| |
| return ret; |
| } |
| |
| |
| public static void updateFunctionNames(ArrayList<Hop> hops, long pid) |
| { |
| Hop.resetVisitStatus(hops); |
| for( Hop hopRoot : hops ) |
| rUpdateFunctionNames( hopRoot, pid ); |
| } |
| |
| public static void rUpdateFunctionNames( Hop hop, long pid ) |
| { |
| if( hop.getVisited() == VisitStatus.DONE ) |
| return; |
| |
| //update function names |
| if( hop instanceof FunctionOp && ((FunctionOp)hop).getFunctionType() != FunctionType.MULTIRETURN_BUILTIN) { |
| FunctionOp fop = (FunctionOp) hop; |
| fop.setFunctionName( fop.getFunctionName() + |
| ProgramConverter.CP_CHILD_THREAD + pid); |
| } |
| |
| if( hop.getInput() != null ) |
| for( Hop c : hop.getInput() ) |
| rUpdateFunctionNames(c, pid); |
| |
| hop.setVisited(VisitStatus.DONE); |
| } |
| |
| |
| ////////////////////////////// |
| // private helper functions // |
| ////////////////////////////// |
| |
| |
| /** |
| * |
| * @param pb |
| * @param vars |
| * @param tid |
| * @throws IOException |
| * @throws LopsException |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| */ |
| private static void rRecompileProgramBlock( ProgramBlock pb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile ) |
| throws HopsException, DMLRuntimeException, LopsException, IOException |
| { |
| if (pb instanceof WhileProgramBlock) |
| { |
| WhileProgramBlock wpb = (WhileProgramBlock)pb; |
| WhileStatementBlock wsb = (WhileStatementBlock) wpb.getStatementBlock(); |
| //recompile predicate |
| recompileWhilePredicate(wpb, wsb, vars, status, tid, resetRecompile); |
| //remove updated scalars because in loop |
| removeUpdatedScalars(vars, wsb); |
| //copy vars for later compare |
| LocalVariableMap oldVars = (LocalVariableMap) vars.clone(); |
| RecompileStatus oldStatus = (RecompileStatus) status.clone(); |
| for (ProgramBlock pb2 : wpb.getChildBlocks()) |
| rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile); |
| if( reconcileUpdatedCallVarsLoops(oldVars, vars, wsb) |
| | reconcileUpdatedCallVarsLoops(oldStatus, status, wsb) ) { |
| //second pass with unknowns if required |
| recompileWhilePredicate(wpb, wsb, vars, status, tid, resetRecompile); |
| for (ProgramBlock pb2 : wpb.getChildBlocks()) |
| rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile); |
| } |
| removeUpdatedScalars(vars, wsb); |
| } |
| else if (pb instanceof IfProgramBlock) |
| { |
| IfProgramBlock ipb = (IfProgramBlock)pb; |
| IfStatementBlock isb = (IfStatementBlock)ipb.getStatementBlock(); |
| //recompile predicate |
| recompileIfPredicate(ipb, isb, vars, status, tid, resetRecompile); |
| //copy vars for later compare |
| LocalVariableMap oldVars = (LocalVariableMap) vars.clone(); |
| LocalVariableMap varsElse = (LocalVariableMap) vars.clone(); |
| RecompileStatus oldStatus = (RecompileStatus)status.clone(); |
| RecompileStatus statusElse = (RecompileStatus)status.clone(); |
| for( ProgramBlock pb2 : ipb.getChildBlocksIfBody() ) |
| rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile); |
| for( ProgramBlock pb2 : ipb.getChildBlocksElseBody() ) |
| rRecompileProgramBlock(pb2, varsElse, statusElse, tid, resetRecompile); |
| reconcileUpdatedCallVarsIf(oldVars, vars, varsElse, isb); |
| reconcileUpdatedCallVarsIf(oldStatus, status, statusElse, isb); |
| removeUpdatedScalars(vars, ipb.getStatementBlock()); |
| } |
| else if (pb instanceof ForProgramBlock) //includes ParFORProgramBlock |
| { |
| ForProgramBlock fpb = (ForProgramBlock)pb; |
| ForStatementBlock fsb = (ForStatementBlock) fpb.getStatementBlock(); |
| //recompile predicates |
| recompileForPredicates(fpb, fsb, vars, status, tid, resetRecompile); |
| //remove updated scalars because in loop |
| removeUpdatedScalars(vars, fpb.getStatementBlock()); |
| //copy vars for later compare |
| LocalVariableMap oldVars = (LocalVariableMap) vars.clone(); |
| RecompileStatus oldStatus = (RecompileStatus) status.clone(); |
| for( ProgramBlock pb2 : fpb.getChildBlocks() ) |
| rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile); |
| if( reconcileUpdatedCallVarsLoops(oldVars, vars, fsb) |
| | reconcileUpdatedCallVarsLoops(oldStatus, status, fsb)) { |
| //second pass with unknowns if required |
| recompileForPredicates(fpb, fsb, vars, status, tid, resetRecompile); |
| for( ProgramBlock pb2 : fpb.getChildBlocks() ) |
| rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile); |
| } |
| removeUpdatedScalars(vars, fpb.getStatementBlock()); |
| } |
| else if ( pb instanceof FunctionProgramBlock ) //includes ExternalFunctionProgramBlock and ExternalFunctionProgramBlockCP |
| { |
| //do nothing |
| } |
| else |
| { |
| StatementBlock sb = pb.getStatementBlock(); |
| ArrayList<Instruction> tmp = pb.getInstructions(); |
| |
| if( sb != null //recompile all for stats propagation and recompile flags |
| //&& Recompiler.requiresRecompilation( sb.get_hops() ) |
| /*&& !Recompiler.containsNonRecompileInstructions(tmp)*/ ) |
| { |
| tmp = Recompiler.recompileHopsDag(sb, sb.get_hops(), vars, status, true, tid); |
| pb.setInstructions( tmp ); |
| |
| //propagate stats across hops (should be executed on clone of vars) |
| Recompiler.extractDAGOutputStatistics(sb.get_hops(), vars); |
| |
| //reset recompilation flags (w/ special handling functions) |
| if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs |
| && !containsRootFunctionOp(sb.get_hops()) |
| && resetRecompile ) |
| { |
| Hop.resetRecompilationFlag(sb.get_hops(), ExecType.CP); |
| sb.updateRecompilationFlag(); |
| } |
| } |
| |
| } |
| |
| } |
| |
| |
| /** |
| * |
| * @param oldCallVars |
| * @param callVars |
| * @param sb |
| * @return |
| */ |
| public static boolean reconcileUpdatedCallVarsLoops( LocalVariableMap oldCallVars, LocalVariableMap callVars, StatementBlock sb ) |
| { |
| boolean requiresRecompile = false; |
| |
| //handle matrices |
| for( String varname : sb.variablesUpdated().getVariableNames() ) |
| { |
| Data dat1 = oldCallVars.get(varname); |
| Data dat2 = callVars.get(varname); |
| if( dat1!=null && dat1 instanceof MatrixObject && dat2!=null && dat2 instanceof MatrixObject ) |
| { |
| MatrixObject moOld = (MatrixObject) dat1; |
| MatrixObject mo = (MatrixObject) dat2; |
| MatrixCharacteristics mcOld = moOld.getMatrixCharacteristics(); |
| MatrixCharacteristics mc = mo.getMatrixCharacteristics(); |
| |
| if( mcOld.getRows() != mc.getRows() |
| || mcOld.getCols() != mc.getCols() |
| || mcOld.getNonZeros() != mc.getNonZeros() ) |
| { |
| long ldim1 = mc.getRows(), ldim2 = mc.getCols(), lnnz = mc.getNonZeros(); |
| //handle row dimension change in body |
| if( mcOld.getRows() != mc.getRows() ) { |
| ldim1=-1; //unknown |
| requiresRecompile = true; |
| } |
| //handle column dimension change in body |
| if( mcOld.getCols() != mc.getCols() ) { |
| ldim2=-1; //unknown |
| requiresRecompile = true; |
| } |
| //handle sparsity change |
| if( mcOld.getNonZeros() != mc.getNonZeros() ) { |
| lnnz=-1; //unknown |
| requiresRecompile = true; |
| } |
| |
| MatrixObject moNew = createOutputMatrix(ldim1, ldim2, lnnz); |
| callVars.put(varname, moNew); |
| } |
| } |
| } |
| |
| return requiresRecompile; |
| } |
| |
| /** |
| * |
| * @param oldCallVars |
| * @param callVars |
| * @param sb |
| * @return |
| */ |
| public static boolean reconcileUpdatedCallVarsLoops( RecompileStatus oldCallStatus, RecompileStatus callStatus, StatementBlock sb ) |
| { |
| boolean requiresRecompile = false; |
| |
| //handle matrices |
| for( String varname : sb.variablesUpdated().getVariableNames() ) |
| { |
| MatrixCharacteristics dat1 = oldCallStatus.getTWriteStats().get(varname); |
| MatrixCharacteristics dat2 = callStatus.getTWriteStats().get(varname); |
| if( dat1!=null && dat2!=null ) |
| { |
| MatrixCharacteristics mcOld = dat1; |
| MatrixCharacteristics mc = dat2; |
| |
| if( mcOld.getRows() != mc.getRows() |
| || mcOld.getCols() != mc.getCols() |
| || mcOld.getNonZeros() != mc.getNonZeros() ) |
| { |
| long ldim1 = mc.getRows(), ldim2 = mc.getCols(), lnnz = mc.getNonZeros(); |
| //handle row dimension change in body |
| if( mcOld.getRows() != mc.getRows() ) { |
| ldim1 = -1; |
| requiresRecompile = true; |
| } |
| //handle column dimension change in body |
| if( mcOld.getCols() != mc.getCols() ) { |
| ldim2 = -1; |
| requiresRecompile = true; |
| } |
| //handle sparsity change |
| if( mcOld.getNonZeros() != mc.getNonZeros() ) { |
| lnnz = -1; |
| requiresRecompile = true; |
| } |
| |
| MatrixCharacteristics moNew = new MatrixCharacteristics(ldim1, ldim2, -1, -1, lnnz); |
| callStatus.getTWriteStats().put(varname, moNew); |
| } |
| } |
| } |
| |
| return requiresRecompile; |
| } |
| |
| /** |
| * |
| * @param oldCallVars |
| * @param callVarsIf |
| * @param callVarsElse |
| * @param sb |
| * @return |
| */ |
| public static LocalVariableMap reconcileUpdatedCallVarsIf( LocalVariableMap oldCallVars, LocalVariableMap callVarsIf, LocalVariableMap callVarsElse, StatementBlock sb ) |
| { |
| for( String varname : sb.variablesUpdated().getVariableNames() ) |
| { |
| Data origVar = oldCallVars.get(varname); |
| Data ifVar = callVarsIf.get(varname); |
| Data elseVar = callVarsElse.get(varname); |
| Data dat1 = null, dat2 = null; |
| |
| if( ifVar!=null && elseVar!=null ){ // both branches exists |
| dat1 = ifVar; |
| dat2 = elseVar; |
| } |
| else if( ifVar!=null && elseVar==null ){ //only if |
| dat1 = origVar; |
| dat2 = ifVar; |
| } |
| else { //only else |
| dat1 = origVar; |
| dat2 = elseVar; |
| } |
| |
| //compare size and value information (note: by definition both dat1 and dat2 are of same type |
| //because we do not allow data type changes) |
| if( dat1 != null && dat1 instanceof MatrixObject && dat2!=null ) |
| { |
| //handle matrices |
| if( dat1 instanceof MatrixObject && dat2 instanceof MatrixObject ) |
| { |
| MatrixObject moOld = (MatrixObject) dat1; |
| MatrixObject mo = (MatrixObject) dat2; |
| MatrixCharacteristics mcOld = moOld.getMatrixCharacteristics(); |
| MatrixCharacteristics mc = mo.getMatrixCharacteristics(); |
| |
| if( mcOld.getRows() != mc.getRows() |
| || mcOld.getCols() != mc.getCols() |
| || mcOld.getNonZeros() != mc.getNonZeros() ) |
| { |
| long ldim1 =mc.getRows(), ldim2=mc.getCols(), lnnz=mc.getNonZeros(); |
| |
| //handle row dimension change |
| if( mcOld.getRows() != mc.getRows() ) { |
| ldim1 = -1; //unknown |
| } |
| if( mcOld.getCols() != mc.getCols() ) { |
| ldim2 = -1; //unknown |
| } |
| //handle sparsity change |
| if( mcOld.getNonZeros() != mc.getNonZeros() ) { |
| lnnz = -1; //unknown |
| } |
| |
| MatrixObject moNew = createOutputMatrix(ldim1, ldim2, lnnz); |
| callVarsIf.put(varname, moNew); |
| } |
| } |
| } |
| } |
| |
| return callVarsIf; |
| } |
| |
| /** |
| * |
| * @param oldStatus |
| * @param callStatusIf |
| * @param callStatusElse |
| * @param sb |
| * @return |
| */ |
| public static RecompileStatus reconcileUpdatedCallVarsIf( RecompileStatus oldStatus, RecompileStatus callStatusIf, RecompileStatus callStatusElse, StatementBlock sb ) |
| { |
| for( String varname : sb.variablesUpdated().getVariableNames() ) |
| { |
| MatrixCharacteristics origVar = oldStatus.getTWriteStats().get(varname); |
| MatrixCharacteristics ifVar = callStatusIf.getTWriteStats().get(varname); |
| MatrixCharacteristics elseVar = callStatusElse.getTWriteStats().get(varname); |
| MatrixCharacteristics dat1 = null, dat2 = null; |
| |
| if( ifVar!=null && elseVar!=null ){ // both branches exists |
| dat1 = ifVar; |
| dat2 = elseVar; |
| } |
| else if( ifVar!=null && elseVar==null ){ //only if |
| dat1 = origVar; |
| dat2 = ifVar; |
| } |
| else { //only else |
| dat1 = origVar; |
| dat2 = elseVar; |
| } |
| |
| //compare size and value information (note: by definition both dat1 and dat2 are of same type |
| //because we do not allow data type changes) |
| if( dat1 != null && dat2!=null ) |
| { |
| MatrixCharacteristics mcOld = dat1; |
| MatrixCharacteristics mc = dat2; |
| |
| if( mcOld.getRows() != mc.getRows() |
| || mcOld.getCols() != mc.getCols() |
| || mcOld.getNonZeros() != mc.getNonZeros() ) |
| { |
| long ldim1 = (mcOld.getRows()>=0 && mc.getRows()>=0) ? |
| Math.max( mcOld.getRows(), mc.getRows() ) : -1; |
| long ldim2 = (mcOld.getCols()>=0 && mc.getCols()>=0) ? |
| Math.max( mcOld.getCols(), mc.getCols() ) : -1; |
| long lnnz = (mcOld.getNonZeros()>=0 && mc.getNonZeros()>=0) ? |
| Math.max( mcOld.getNonZeros(), mc.getNonZeros() ) : -1; |
| |
| MatrixCharacteristics mcNew = new MatrixCharacteristics(ldim1, ldim2, -1, -1, lnnz); |
| callStatusIf.getTWriteStats().put(varname, mcNew); |
| } |
| } |
| } |
| |
| return callStatusIf; |
| } |
| |
| /** |
| * |
| * @param hops |
| * @return |
| */ |
| private static boolean containsRootFunctionOp( ArrayList<Hop> hops ) |
| { |
| boolean ret = false; |
| for( Hop h : hops ) |
| if( h instanceof FunctionOp ) |
| ret |= true; |
| |
| return ret; |
| } |
| |
| /** |
| * |
| * @param dim1 |
| * @param dim2 |
| * @param nnz |
| * @return |
| */ |
| private static MatrixObject createOutputMatrix( long dim1, long dim2, long nnz ) |
| { |
| MatrixObject moOut = new MatrixObject(ValueType.DOUBLE, null); |
| MatrixCharacteristics mc = new MatrixCharacteristics( |
| dim1, dim2, |
| ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), |
| nnz); |
| MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,null,null); |
| moOut.setMetaData(meta); |
| |
| return moOut; |
| } |
| |
| |
| //helper functions for predicate recompile |
| |
| /** |
| * |
| * @param ipb |
| * @param isb |
| * @param vars |
| * @param tid |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| private static void recompileIfPredicate( IfProgramBlock ipb, IfStatementBlock isb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| if( isb != null ) |
| { |
| Hop hops = isb.getPredicateHops(); |
| if( hops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(hops, vars, status, true, tid); |
| ipb.setPredicate( tmp ); |
| if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs |
| && resetRecompile ) |
| { |
| Hop.resetRecompilationFlag(hops, ExecType.CP); |
| isb.updatePredicateRecompilationFlag(); |
| } |
| |
| //update predicate vars (potentially after constant folding, e.g., in parfor) |
| if( hops instanceof LiteralOp ) |
| ipb.setPredicateResultVar(((LiteralOp)hops).getName().toLowerCase()); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param wpb |
| * @param wsb |
| * @param vars |
| * @param tid |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| private static void recompileWhilePredicate( WhileProgramBlock wpb, WhileStatementBlock wsb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| if( wsb != null ) |
| { |
| Hop hops = wsb.getPredicateHops(); |
| if( hops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(hops, vars, status, true, tid); |
| wpb.setPredicate( tmp ); |
| if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs |
| && resetRecompile ) |
| { |
| Hop.resetRecompilationFlag(hops, ExecType.CP); |
| wsb.updatePredicateRecompilationFlag(); |
| } |
| |
| //update predicate vars (potentially after constant folding, e.g., in parfor) |
| if( hops instanceof LiteralOp ) |
| wpb.setPredicateResultVar(((LiteralOp)hops).getName().toLowerCase()); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param fpb |
| * @param fsb |
| * @param vars |
| * @param tid |
| * @throws DMLRuntimeException |
| * @throws HopsException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| private static void recompileForPredicates( ForProgramBlock fpb, ForStatementBlock fsb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile ) |
| throws DMLRuntimeException, HopsException, LopsException, IOException |
| { |
| if( fsb != null ) |
| { |
| Hop fromHops = fsb.getFromHops(); |
| Hop toHops = fsb.getToHops(); |
| Hop incrHops = fsb.getIncrementHops(); |
| |
| //handle recompilation flags |
| if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs |
| && resetRecompile ) |
| { |
| if( fromHops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(fromHops, vars, status, true, tid); |
| fpb.setFromInstructions(tmp); |
| Hop.resetRecompilationFlag(fromHops,ExecType.CP); |
| } |
| if( toHops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(toHops, vars, status, true, tid); |
| fpb.setToInstructions(tmp); |
| Hop.resetRecompilationFlag(toHops,ExecType.CP); |
| } |
| if( incrHops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(incrHops, vars, status, true, tid); |
| fpb.setIncrementInstructions(tmp); |
| Hop.resetRecompilationFlag(incrHops,ExecType.CP); |
| } |
| fsb.updatePredicateRecompilationFlags(); |
| } |
| else //no reset of recompilation flags |
| { |
| if( fromHops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(fromHops, vars, status, true, tid); |
| fpb.setFromInstructions(tmp); |
| } |
| if( toHops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(toHops, vars, status, true, tid); |
| fpb.setToInstructions(tmp); |
| } |
| if( incrHops != null ) { |
| ArrayList<Instruction> tmp = recompileHopsDag(incrHops, vars, status, true, tid); |
| fpb.setIncrementInstructions(tmp); |
| } |
| } |
| |
| //update predicate vars (potentially after constant folding, e.g., in parfor) |
| String[] itervars = fpb.getIterablePredicateVars(); |
| if( fromHops != null && fromHops instanceof LiteralOp ) |
| itervars[1] = ((LiteralOp)fromHops).getName(); |
| if( toHops != null && toHops instanceof LiteralOp ) |
| itervars[2] = ((LiteralOp)toHops).getName(); |
| if( incrHops != null && incrHops instanceof LiteralOp ) |
| itervars[3] = ((LiteralOp)incrHops).getName(); |
| } |
| } |
| |
| /** |
| * |
| * @param pb |
| * @param tid |
| * @throws HopsException |
| * @throws DMLRuntimeException |
| * @throws LopsException |
| * @throws IOException |
| */ |
| private static void rRecompileProgramBlock2Forced( ProgramBlock pb, long tid, HashSet<String> fnStack, ExecType et ) |
| throws HopsException, DMLRuntimeException, LopsException, IOException |
| { |
| if (pb instanceof WhileProgramBlock) |
| { |
| WhileProgramBlock pbTmp = (WhileProgramBlock)pb; |
| WhileStatementBlock sbTmp = (WhileStatementBlock)pbTmp.getStatementBlock(); |
| //recompile predicate |
| if( sbTmp!=null && !(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getPredicate(), true, true)) ) |
| pbTmp.setPredicate( Recompiler.recompileHopsDag2Forced(sbTmp.getPredicateHops(), tid, et) ); |
| |
| //recompile body |
| for (ProgramBlock pb2 : pbTmp.getChildBlocks()) |
| rRecompileProgramBlock2Forced(pb2, tid, fnStack, et); |
| } |
| else if (pb instanceof IfProgramBlock) |
| { |
| IfProgramBlock pbTmp = (IfProgramBlock)pb; |
| IfStatementBlock sbTmp = (IfStatementBlock)pbTmp.getStatementBlock(); |
| //recompile predicate |
| if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getPredicate(), true, true)) ) |
| pbTmp.setPredicate( Recompiler.recompileHopsDag2Forced(sbTmp.getPredicateHops(), tid, et) ); |
| //recompile body |
| for( ProgramBlock pb2 : pbTmp.getChildBlocksIfBody() ) |
| rRecompileProgramBlock2Forced(pb2, tid, fnStack, et); |
| for( ProgramBlock pb2 : pbTmp.getChildBlocksElseBody() ) |
| rRecompileProgramBlock2Forced(pb2, tid, fnStack, et); |
| } |
| else if (pb instanceof ForProgramBlock) //includes ParFORProgramBlock |
| { |
| ForProgramBlock pbTmp = (ForProgramBlock)pb; |
| ForStatementBlock sbTmp = (ForStatementBlock) pbTmp.getStatementBlock(); |
| //recompile predicate |
| if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getFromInstructions(), true, true)) ) |
| pbTmp.setFromInstructions( Recompiler.recompileHopsDag2Forced(sbTmp.getFromHops(), tid, et) ); |
| if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getToInstructions(), true, true)) ) |
| pbTmp.setToInstructions( Recompiler.recompileHopsDag2Forced(sbTmp.getToHops(), tid, et) ); |
| if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getIncrementInstructions(), true, true)) ) |
| pbTmp.setIncrementInstructions( Recompiler.recompileHopsDag2Forced(sbTmp.getIncrementHops(), tid, et) ); |
| //recompile body |
| for( ProgramBlock pb2 : pbTmp.getChildBlocks() ) |
| rRecompileProgramBlock2Forced(pb2, tid, fnStack, et); |
| } |
| else if ( pb instanceof FunctionProgramBlock )//includes ExternalFunctionProgramBlock and ExternalFunctionProgramBlockCP |
| { |
| FunctionProgramBlock tmp = (FunctionProgramBlock)pb; |
| for( ProgramBlock pb2 : tmp.getChildBlocks() ) |
| rRecompileProgramBlock2Forced(pb2, tid, fnStack, et); |
| } |
| else |
| { |
| StatementBlock sb = pb.getStatementBlock(); |
| |
| //recompile hops dag to CP (note selective recompile 'if CP and no MR inst' |
| //would be invalid with permutation matrix mult across multiple dags) |
| if( sb != null ) { |
| ArrayList<Instruction> tmp = pb.getInstructions(); |
| tmp = Recompiler.recompileHopsDag2Forced(sb, sb.get_hops(), tid, et); |
| pb.setInstructions( tmp ); |
| } |
| |
| //recompile functions |
| if( OptTreeConverter.containsFunctionCallInstruction(pb) ) |
| { |
| ArrayList<Instruction> tmp = pb.getInstructions(); |
| for( Instruction inst : tmp ) |
| if( inst instanceof FunctionCallCPInstruction ) |
| { |
| FunctionCallCPInstruction func = (FunctionCallCPInstruction)inst; |
| String fname = func.getFunctionName(); |
| String fnamespace = func.getNamespace(); |
| String fKey = DMLProgram.constructFunctionKey(fnamespace, fname); |
| |
| if( !fnStack.contains(fKey) ) //memoization for multiple calls, recursion |
| { |
| fnStack.add(fKey); |
| |
| FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fnamespace, fname); |
| rRecompileProgramBlock2Forced(fpb, tid, fnStack, et); //recompile chains of functions |
| } |
| } |
| } |
| } |
| |
| } |
| |
| /** |
| * |
| * @param callVars |
| * @param sb |
| */ |
| public static void removeUpdatedScalars( LocalVariableMap callVars, StatementBlock sb ) |
| { |
| if( sb != null ) |
| { |
| //remove update scalar variables from constants |
| for( String varname : sb.variablesUpdated().getVariables().keySet() ) |
| { |
| Data dat = callVars.get(varname); |
| if( dat != null && dat.getDataType() == DataType.SCALAR ) |
| { |
| callVars.remove(varname); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param hops |
| * @param vars |
| */ |
| public static void extractDAGOutputStatistics(ArrayList<Hop> hops, LocalVariableMap vars) |
| { |
| extractDAGOutputStatistics(hops, vars, true); |
| } |
| |
| /** |
| * |
| * @param hops |
| * @param vars |
| */ |
| public static void extractDAGOutputStatistics(ArrayList<Hop> hops, LocalVariableMap vars, boolean overwrite) |
| { |
| for( Hop hop : hops ) //for all hop roots |
| extractDAGOutputStatistics(hop, vars, overwrite); |
| } |
| |
| /** |
| * |
| * @param hop |
| * @param vars |
| */ |
| public static void extractDAGOutputStatistics(Hop hop, LocalVariableMap vars) |
| { |
| extractDAGOutputStatistics(hop, vars, true); |
| } |
| |
| /** |
| * |
| * @param hop |
| * @param vars |
| * @param overwrite |
| */ |
| public static void extractDAGOutputStatistics(Hop hop, LocalVariableMap vars, boolean overwrite) |
| { |
| if( hop instanceof DataOp && ((DataOp)hop).getDataOpType()==DataOpTypes.TRANSIENTWRITE ) //for all writes to symbol table |
| //&& hop.getDim1()>0 && hop.getDim2()>0 ) //matrix with known dims |
| { |
| String varName = hop.getName(); |
| if( !vars.keySet().contains(varName) || overwrite ) //not existing so far |
| { |
| //extract matrix sizes for size propagation |
| if( hop.getDataType()==DataType.MATRIX ) |
| { |
| MatrixObject mo = new MatrixObject(ValueType.DOUBLE, null); |
| MatrixCharacteristics mc = new MatrixCharacteristics( |
| hop.getDim1(), hop.getDim2(), |
| ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), |
| hop.getNnz()); |
| MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,null,null); |
| mo.setMetaData(meta); |
| vars.put(varName, mo); |
| } |
| //extract scalar constants for second constant propagation |
| else if( hop.getDataType()==DataType.SCALAR ) |
| { |
| //extract literal assignments |
| if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof LiteralOp ) |
| { |
| ScalarObject constant = HopRewriteUtils.getScalarObject((LiteralOp)hop.getInput().get(0)); |
| if( constant!=null ) |
| vars.put(varName, constant); |
| } |
| //extract constant variable assignments |
| else if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof DataOp) |
| { |
| DataOp dop = (DataOp) hop.getInput().get(0); |
| String dopvarname = dop.getName(); |
| if( dop.isRead() && vars.keySet().contains(dopvarname) ) |
| { |
| ScalarObject constant = (ScalarObject) vars.get(dopvarname); |
| vars.put(varName, constant); //no clone because constant |
| } |
| } |
| //extract ncol/nrow variable assignments |
| else if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof UnaryOp |
| && (((UnaryOp)hop.getInput().get(0)).getOp()==OpOp1.NROW || |
| ((UnaryOp)hop.getInput().get(0)).getOp()==OpOp1.NCOL) ) |
| { |
| UnaryOp uop = (UnaryOp) hop.getInput().get(0); |
| if( uop.getOp()==OpOp1.NROW && uop.getInput().get(0).getDim1()>0 ) |
| vars.put(varName, new IntObject(uop.getInput().get(0).getDim1())); |
| else if( uop.getOp()==OpOp1.NCOL && uop.getInput().get(0).getDim2()>0 ) |
| vars.put(varName, new IntObject(uop.getInput().get(0).getDim2())); |
| } |
| //remove other updated scalars |
| else |
| { |
| //we need to remove other updated scalars in order to ensure result |
| //correctness of recompilation w/o being too conservative |
| vars.remove(varName); |
| } |
| } |
| } |
| else //already existing: take largest |
| { |
| Data dat = vars.get(varName); |
| if( dat instanceof MatrixObject ) |
| { |
| MatrixObject mo = (MatrixObject)dat; |
| MatrixCharacteristics mc = mo.getMatrixCharacteristics(); |
| if( OptimizerUtils.estimateSizeExactSparsity(mc.getRows(), mc.getCols(), (mc.getNonZeros()>=0)?((double)mc.getNonZeros())/mc.getRows()/mc.getCols():1.0) |
| < OptimizerUtils.estimateSize(hop.getDim1(), hop.getDim2()) ) |
| { |
| //update statistics if necessary |
| mc.setDimension(hop.getDim1(), hop.getDim2()); |
| mc.setNonZeros(hop.getNnz()); |
| } |
| } |
| else //scalar (just overwrite) |
| { |
| if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof LiteralOp ) |
| { |
| ScalarObject constant = HopRewriteUtils.getScalarObject((LiteralOp)hop.getInput().get(0)); |
| if( constant!=null ) |
| vars.put(varName, constant); |
| } |
| } |
| |
| } |
| } |
| } |
| |
| /** |
| * NOTE: no need for update visit status due to early abort |
| * |
| * @param hop |
| * @return |
| */ |
| private static boolean rRequiresRecompile( Hop hop ) |
| { |
| boolean ret = hop.requiresRecompile(); |
| if( hop.getVisited() == VisitStatus.DONE ) |
| return ret; |
| |
| if( hop.getInput() != null ) |
| for( Hop c : hop.getInput() ) |
| { |
| ret |= rRequiresRecompile(c); |
| if( ret ) break; // early abort |
| } |
| |
| hop.setVisited(VisitStatus.DONE); |
| |
| return ret; |
| } |
| |
| /** |
| * Clearing lops for a given hops includes to (1) remove the reference |
| * to constructed lops and (2) clear the exec type (for consistency). |
| * |
| * The latter is important for advanced optimizers like parfor; otherwise subtle |
| * side-effects of program recompilation and hop-lop rewrites possible |
| * (e.g., see indexingop hop-lop rewrite in combination parfor rewrite set |
| * exec type that eventuelly might lead to unnecessary remote_parfor jobs). |
| * |
| * @param hop |
| */ |
| public static void rClearLops( Hop hop ) |
| { |
| if( hop.getVisited() == VisitStatus.DONE ) |
| return; |
| |
| //clear all relevant lops to allow for recompilation |
| if( hop instanceof LiteralOp ) |
| { |
| //for literal ops, we just clear parents because always constant |
| if( hop.getLops() != null ) |
| hop.getLops().getOutputs().clear(); |
| } |
| else //GENERAL CASE |
| { |
| hop.resetExecType(); //remove exec type |
| hop.setLops(null); //clear lops |
| if( hop.getInput() != null ) |
| for( Hop c : hop.getInput() ) |
| rClearLops(c); |
| } |
| |
| hop.setVisited(VisitStatus.DONE); |
| } |
| |
| /** |
| * |
| * @param hop |
| * @param vars |
| * @throws DMLRuntimeException |
| */ |
| public static void rUpdateStatistics( Hop hop, LocalVariableMap vars ) |
| throws DMLRuntimeException |
| { |
| if( hop.getVisited() == VisitStatus.DONE ) |
| return; |
| |
| //recursively process children |
| if( hop.getInput() != null ) |
| for( Hop c : hop.getInput() ) |
| rUpdateStatistics(c, vars); |
| |
| boolean updatedSizeExpr = false; |
| |
| //update statistics for transient reads according to current statistics |
| //(with awareness not to override persistent reads to an existing name) |
| if( hop instanceof DataOp |
| && ((DataOp)hop).getDataOpType() != DataOpTypes.PERSISTENTREAD ) |
| { |
| DataOp d = (DataOp) hop; |
| String varName = d.getName(); |
| if( vars.keySet().contains( varName ) ) |
| { |
| Data dat = vars.get(varName); |
| if( dat instanceof MatrixObject ) { |
| MatrixObject mo = (MatrixObject) dat; |
| d.setDim1(mo.getNumRows()); |
| d.setDim2(mo.getNumColumns()); |
| d.setNnz(mo.getNnz()); |
| } |
| else if( dat instanceof FrameObject ) { |
| FrameObject fo = (FrameObject) dat; |
| d.setDim1(fo.getNumRows()); |
| d.setDim2(fo.getNumColumns()); |
| } |
| } |
| } |
| //special case for persistent reads with unknown size (read-after-write) |
| else if( hop instanceof DataOp |
| && ((DataOp)hop).getDataOpType() == DataOpTypes.PERSISTENTREAD |
| && !hop.dimsKnown() && ((DataOp)hop).getInputFormatType()!=FileFormatTypes.CSV |
| && !ConfigurationManager.getCompilerConfigFlag(ConfigType.IGNORE_READ_WRITE_METADATA) ) |
| { |
| //update hop with read meta data |
| DataOp dop = (DataOp) hop; |
| tryReadMetaDataFileMatrixCharacteristics(dop); |
| } |
| //update size expression for rand/seq according to symbol table entries |
| else if ( hop instanceof DataGenOp ) |
| { |
| DataGenOp d = (DataGenOp) hop; |
| HashMap<String,Integer> params = d.getParamIndexMap(); |
| if ( d.getOp() == DataGenMethod.RAND || d.getOp()==DataGenMethod.SINIT |
| || d.getOp() == DataGenMethod.SAMPLE ) |
| { |
| boolean initUnknown = !d.dimsKnown(); |
| int ix1 = params.get(DataExpression.RAND_ROWS); |
| int ix2 = params.get(DataExpression.RAND_COLS); |
| //update rows/cols by evaluating simple expression of literals, nrow, ncol, scalars, binaryops |
| d.refreshRowsParameterInformation(d.getInput().get(ix1), vars); |
| d.refreshColsParameterInformation(d.getInput().get(ix2), vars); |
| updatedSizeExpr = initUnknown & d.dimsKnown(); |
| } |
| else if ( d.getOp() == DataGenMethod.SEQ ) |
| { |
| boolean initUnknown = !d.dimsKnown(); |
| int ix1 = params.get(Statement.SEQ_FROM); |
| int ix2 = params.get(Statement.SEQ_TO); |
| int ix3 = params.get(Statement.SEQ_INCR); |
| double from = d.computeBoundsInformation(d.getInput().get(ix1), vars); |
| double to = d.computeBoundsInformation(d.getInput().get(ix2), vars); |
| double incr = d.computeBoundsInformation(d.getInput().get(ix3), vars); |
| |
| //special case increment |
| if ( from!=Double.MAX_VALUE && to!=Double.MAX_VALUE ) { |
| incr = ( from >= to && incr==1 ) ? -1.0 : 1.0; |
| } |
| |
| if ( from!=Double.MAX_VALUE && to!=Double.MAX_VALUE && incr!=Double.MAX_VALUE ) { |
| d.setDim1( 1 + (long)Math.floor((to-from)/incr) ); |
| d.setDim2( 1 ); |
| d.setIncrementValue( incr ); |
| } |
| updatedSizeExpr = initUnknown & d.dimsKnown(); |
| } |
| else { |
| throw new DMLRuntimeException("Unexpected data generation method: " + d.getOp()); |
| } |
| } |
| //update size expression for reshape according to symbol table entries |
| else if ( hop instanceof ReorgOp |
| && ((ReorgOp)(hop)).getOp()==Hop.ReOrgOp.RESHAPE ) |
| { |
| ReorgOp d = (ReorgOp) hop; |
| boolean initUnknown = !d.dimsKnown(); |
| d.refreshRowsParameterInformation(d.getInput().get(1), vars); |
| d.refreshColsParameterInformation(d.getInput().get(2), vars); |
| updatedSizeExpr = initUnknown & d.dimsKnown(); |
| } |
| //update size expression for indexing according to symbol table entries |
| else if( hop instanceof IndexingOp ) |
| { |
| IndexingOp iop = (IndexingOp)hop; |
| Hop input2 = iop.getInput().get(1); //inpRowL |
| Hop input3 = iop.getInput().get(2); //inpRowU |
| Hop input4 = iop.getInput().get(3); //inpColL |
| Hop input5 = iop.getInput().get(4); //inpColU |
| boolean initUnknown = !iop.dimsKnown(); |
| double rl = iop.computeBoundsInformation(input2, vars); |
| double ru = iop.computeBoundsInformation(input3, vars); |
| double cl = iop.computeBoundsInformation(input4, vars); |
| double cu = iop.computeBoundsInformation(input5, vars); |
| if( rl!=Double.MAX_VALUE && ru!=Double.MAX_VALUE ) |
| iop.setDim1( (long)(ru-rl+1) ); |
| if( cl!=Double.MAX_VALUE && cu!=Double.MAX_VALUE ) |
| iop.setDim2( (long)(cu-cl+1) ); |
| updatedSizeExpr = initUnknown & iop.dimsKnown(); |
| } |
| |
| //propagate statistics along inner nodes of DAG, |
| //without overwriting inferred size expressions |
| if( !updatedSizeExpr ) { |
| hop.refreshSizeInformation(); |
| } |
| |
| hop.setVisited(VisitStatus.DONE); |
| } |
| |
| /** |
| * public interface to package local literal replacement |
| * |
| * @param hop |
| * @param vars |
| * @throws DMLRuntimeException |
| */ |
| public static void rReplaceLiterals( Hop hop, LocalVariableMap vars ) |
| throws DMLRuntimeException |
| { |
| //public interface |
| LiteralReplacement.rReplaceLiterals(hop, vars); |
| } |
| |
| /** |
| * |
| * @param hop |
| * @param pid |
| */ |
| public static void rSetExecType( Hop hop, ExecType etype ) |
| { |
| if( hop.getVisited() == VisitStatus.DONE ) |
| return; |
| |
| //update function names |
| hop.setForcedExecType(etype); |
| |
| if( hop.getInput() != null ) |
| for( Hop c : hop.getInput() ) |
| rSetExecType(c, etype); |
| |
| hop.setVisited(VisitStatus.DONE); |
| } |
| |
| |
| /** |
| * Returns true iff (1) all instruction are reblock instructions and (2) all |
| * individual reblock operations fit in the current memory budget. |
| * |
| * @param inst |
| * @param pb |
| * @return |
| * @throws DMLRuntimeException |
| * @throws IOException |
| */ |
| public static boolean checkCPReblock(MRJobInstruction inst, MatrixObject[] inputs) |
| throws DMLRuntimeException, IOException |
| { |
| boolean ret = true; |
| |
| boolean localMode = InfrastructureAnalyzer.isLocalMode(); |
| |
| //check only shuffle inst |
| String rdInst = inst.getIv_randInstructions(); |
| String rrInst = inst.getIv_recordReaderInstructions(); |
| String mapInst = inst.getIv_instructionsInMapper(); |
| String aggInst = inst.getIv_aggInstructions(); |
| String otherInst = inst.getIv_otherInstructions(); |
| if( (rdInst != null && rdInst.length()>0) |
| || (rrInst != null && rrInst.length()>0) |
| || (mapInst != null && mapInst.length()>0) |
| || (aggInst != null && aggInst.length()>0) |
| || (otherInst != null && otherInst.length()>0) ) |
| { |
| ret = false; |
| } |
| |
| //check only reblock inst |
| if( ret ) { |
| String shuffleInst = inst.getIv_shuffleInstructions(); |
| String[] instParts = shuffleInst.split( Lop.INSTRUCTION_DELIMITOR ); |
| for( String rblk : instParts ) |
| if( !InstructionUtils.getOpCode(rblk).equals(ReBlock.OPCODE) |
| && !InstructionUtils.getOpCode(rblk).equals(CSVReBlock.OPCODE) ) |
| { |
| ret = false; |
| break; |
| } |
| } |
| |
| //check output empty blocks (for outputEmptyBlocks=false, a CP reblock can be |
| //counter-productive because any export from CP would reintroduce the empty blocks) |
| if( ret ){ |
| String shuffleInst = inst.getIv_shuffleInstructions(); |
| String[] instParts = shuffleInst.split( Lop.INSTRUCTION_DELIMITOR ); |
| for( String rblk : instParts ) |
| if( InstructionUtils.getOpCode(rblk).equals(ReBlock.OPCODE) |
| && rblk.endsWith("false") ) //no output of empty blocks |
| { |
| ret = false; |
| break; |
| } |
| } |
| |
| //check recompile memory budget |
| if( ret ) { |
| for( MatrixObject mo : inputs ) |
| { |
| long rows = mo.getNumRows(); |
| long cols = mo.getNumColumns(); |
| |
| // If the dimensions are unknown then reblock can not be recompiled into CP |
| // Note: unknown dimensions at this point can only happen for CSV files. |
| // however, we do a conservative check with the CSV filesize |
| if ( rows == -1 || cols == -1 ) |
| { |
| Path path = new Path(mo.getFileName()); |
| long size = MapReduceTool.getFilesizeOnHDFS(path); |
| if( size > CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE || CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE > OptimizerUtils.getLocalMemBudget() ) |
| { |
| ret = false; |
| break; |
| } |
| } |
| //default case (known dimensions) |
| else |
| { |
| long nnz = mo.getNnz(); |
| double sp = OptimizerUtils.getSparsity(rows, cols, nnz); |
| double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp); |
| if( !OptimizerUtils.isValidCPDimensions(rows, cols) |
| || !OptimizerUtils.isValidCPMatrixSize(rows, cols, sp) |
| || mem >= OptimizerUtils.getLocalMemBudget() ) |
| { |
| ret = false; |
| break; |
| } |
| } |
| } |
| } |
| |
| //check in-memory reblock size threshold (prevent long single-threaded text read) |
| //NOTE: this does not apply to local mode because there text read single-threaded as well |
| if( ret && !localMode ) { |
| for( MatrixObject mo : inputs ) |
| { |
| MatrixFormatMetaData iimd = (MatrixFormatMetaData) mo.getMetaData(); |
| if(( iimd.getInputInfo()==InputInfo.TextCellInputInfo |
| || iimd.getInputInfo()==InputInfo.MatrixMarketInputInfo |
| || iimd.getInputInfo()==InputInfo.CSVInputInfo |
| || iimd.getInputInfo()==InputInfo.BinaryCellInputInfo) |
| && !mo.isDirty() ) |
| { |
| //get file size on hdfs (as indicator for estimated read time) |
| Path path = new Path(mo.getFileName()); |
| long fileSize = MapReduceTool.getFilesizeOnHDFS(path); |
| //compute cp reblock size threshold based on available parallelism |
| long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE * |
| OptimizerUtils.getParallelTextReadParallelism(); |
| |
| if( fileSize > cpThreshold ) { |
| ret = false; |
| break; |
| } |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * CP Reblock check for spark instructions; in contrast to MR, we can not |
| * rely on the input file sizes because inputs might be passed via rdds. |
| * |
| * @param mc |
| * @return |
| * @throws DMLRuntimeException |
| */ |
| public static boolean checkCPReblock(ExecutionContext ec, String varin) |
| throws DMLRuntimeException |
| { |
| CacheableData<?> obj = ec.getCacheableData(varin); |
| MatrixCharacteristics mc = ec.getMatrixCharacteristics(varin); |
| |
| long rows = mc.getRows(); |
| long cols = mc.getCols(); |
| long nnz = mc.getNonZeros(); |
| |
| //check valid cp reblock recompilation hook |
| if( !ConfigurationManager.isDynamicRecompilation() |
| || !OptimizerUtils.isHybridExecutionMode() ) |
| { |
| return false; |
| } |
| |
| //robustness for usage through mlcontext (key/values of input rdds are |
| //not serializable for text; also bufferpool rdd read only supported for |
| // binarycell and binaryblock) |
| MatrixFormatMetaData iimd = (MatrixFormatMetaData) obj.getMetaData(); |
| if( obj.getRDDHandle() != null |
| && iimd.getInputInfo() != InputInfo.BinaryBlockInputInfo |
| && iimd.getInputInfo() != InputInfo.BinaryCellInputInfo ) { |
| return false; |
| } |
| |
| //robustness unknown dimensions, e.g., for csv reblock |
| if( rows <= 0 || cols <= 0 ) { |
| return false; |
| } |
| |
| //check valid dimensions and memory requirements |
| double sp = OptimizerUtils.getSparsity(rows, cols, nnz); |
| double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp); |
| if( !OptimizerUtils.isValidCPDimensions(rows, cols) |
| || !OptimizerUtils.isValidCPMatrixSize(rows, cols, sp) |
| || mem >= OptimizerUtils.getLocalMemBudget() ) |
| { |
| return false; |
| } |
| |
| //check in-memory reblock size threshold (preference: distributed) |
| long estFilesize = (long)(3.5 * mem); //conservative estimate |
| long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE * |
| OptimizerUtils.getParallelTextReadParallelism(); |
| return (estFilesize < cpThreshold); |
| } |
| |
| /** |
| * |
| * @param inst |
| * @param inputs |
| * @return |
| * @throws DMLRuntimeException |
| * @throws IOException |
| */ |
| public static boolean checkCPTransform(MRJobInstruction inst, MatrixObject[] inputs) |
| throws DMLRuntimeException, IOException |
| { |
| boolean ret = true; |
| |
| MatrixObject input = inputs[0]; // there can only be one input in TRANSFORM job |
| |
| Path path = new Path(input.getFileName()); |
| long sizeOnHDFS = MapReduceTool.getFilesizeOnHDFS(path); |
| |
| // dimensions are not checked here, since the worst case dimensions |
| // after transformations (with potential dummycoding) are typically unknown. |
| |
| if( sizeOnHDFS > CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE |
| || sizeOnHDFS*4 > OptimizerUtils.getLocalMemBudget() ) |
| ret = false; |
| LOG.info("checkCPTransform(): size = " + sizeOnHDFS + ", recompile to CP = " + ret); |
| return ret; |
| } |
| |
| /** |
| * |
| * @param inst |
| * @param updatedRandInst |
| * @return |
| * @throws DMLRuntimeException |
| */ |
| public static boolean checkCPDataGen( MRJobInstruction inst, String updatedRandInst ) |
| throws DMLRuntimeException |
| { |
| boolean ret = true; |
| |
| //check only shuffle inst |
| String shuffleInst = inst.getIv_shuffleInstructions(); |
| String rrInst = inst.getIv_recordReaderInstructions(); |
| String mapInst = inst.getIv_instructionsInMapper(); |
| String aggInst = inst.getIv_aggInstructions(); |
| String otherInst = inst.getIv_otherInstructions(); |
| if( (shuffleInst != null && shuffleInst.length()>0) |
| || (rrInst != null && rrInst.length()>0) |
| || (mapInst != null && mapInst.length()>0) |
| || (aggInst != null && aggInst.length()>0) |
| || (otherInst != null && otherInst.length()>0) ) |
| { |
| ret = false; |
| } |
| |
| //check only rand inst |
| if( ret ) { |
| String[] instParts = updatedRandInst.split( Lop.INSTRUCTION_DELIMITOR ); |
| for( String lrandStr : instParts ) { |
| if( InstructionUtils.getOpCode(lrandStr).equals(DataGen.RAND_OPCODE) ) |
| { |
| //check recompile memory budget |
| RandInstruction lrandInst = (RandInstruction) RandInstruction.parseInstruction(lrandStr); |
| long rows = lrandInst.getRows(); |
| long cols = lrandInst.getCols(); |
| double sparsity = lrandInst.getSparsity(); |
| double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sparsity); |
| if( !OptimizerUtils.isValidCPDimensions(rows, cols) |
| || !OptimizerUtils.isValidCPMatrixSize(rows, cols, sparsity) |
| || mem >= OptimizerUtils.getLocalMemBudget() ) |
| { |
| ret = false; |
| break; |
| } |
| } |
| else if( InstructionUtils.getOpCode(lrandStr).equals(DataGen.SEQ_OPCODE) ) |
| { |
| //check recompile memory budget |
| //(don't account for sparsity because always dense) |
| SeqInstruction lrandInst = (SeqInstruction) SeqInstruction.parseInstruction(lrandStr); |
| long rows = lrandInst.getRows(); |
| long cols = lrandInst.getCols(); |
| double mem = MatrixBlock.estimateSizeInMemory(rows, cols, 1.0d); |
| if( !OptimizerUtils.isValidCPDimensions(rows, cols) |
| || !OptimizerUtils.isValidCPMatrixSize(rows, cols, 1.0d) |
| || mem >= OptimizerUtils.getLocalMemBudget() ) |
| { |
| ret = false; |
| break; |
| } |
| } |
| else |
| { |
| ret = false; |
| break; |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * |
| * @param in |
| * @param out |
| * @throws DMLRuntimeException |
| */ |
| public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout) |
| throws DMLRuntimeException |
| { |
| MatrixObject in = ec.getMatrixObject(varin); |
| MatrixObject out = ec.getMatrixObject(varout); |
| |
| //read text input matrix (through buffer pool, matrix object carries all relevant |
| //information including additional arguments for csv reblock) |
| MatrixBlock mb = in.acquireRead(); |
| |
| //set output (incl update matrix characteristics) |
| out.acquireModify( mb ); |
| out.release(); |
| in.release(); |
| } |
| |
| /** |
| * |
| * @param ec |
| * @param varin |
| * @param varout |
| * @throws DMLRuntimeException |
| */ |
| public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout) |
| throws DMLRuntimeException |
| { |
| FrameObject in = ec.getFrameObject(varin); |
| FrameObject out = ec.getFrameObject(varout); |
| |
| //read text input frame (through buffer pool, frame object carries all relevant |
| //information including additional arguments for csv reblock) |
| FrameBlock fb = in.acquireRead(); |
| |
| //set output (incl update matrix characteristics) |
| out.acquireModify( fb ); |
| out.release(); |
| in.release(); |
| } |
| |
| /** |
| * |
| * @param fname |
| * @return |
| * @throws DMLRuntimeException |
| */ |
| private static void tryReadMetaDataFileMatrixCharacteristics( DataOp dop ) |
| throws DMLRuntimeException |
| { |
| try |
| { |
| //get meta data filename |
| String mtdname = DataExpression.getMTDFileName(dop.getFileName()); |
| |
| JobConf job = ConfigurationManager.getCachedJobConf(); |
| FileSystem fs = FileSystem.get(job); |
| Path path = new Path(mtdname); |
| if( fs.exists(path) ){ |
| BufferedReader br = null; |
| try |
| { |
| br = new BufferedReader(new InputStreamReader(fs.open(path))); |
| JSONObject mtd = JSONHelper.parse(br); |
| |
| DataType dt = DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase()); |
| dop.setDataType(dt); |
| if(dt != DataType.FRAME) |
| dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase())); |
| dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0); |
| dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0); |
| } |
| finally { |
| if( br != null ) |
| br.close(); |
| } |
| } |
| } |
| catch(Exception ex) |
| { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| } |