blob: f7204e8f2804f9dfbad6c76f7c8e6fb089e7d343 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.hops.recompile;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.wink.json4j.JSONObject;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.CompilerConfig.ConfigType;
import org.apache.sysml.hops.DataGenOp;
import org.apache.sysml.hops.DataOp;
import org.apache.sysml.hops.FunctionOp;
import org.apache.sysml.hops.FunctionOp.FunctionType;
import org.apache.sysml.hops.Hop;
import org.apache.sysml.hops.Hop.DataGenMethod;
import org.apache.sysml.hops.Hop.DataOpTypes;
import org.apache.sysml.hops.Hop.FileFormatTypes;
import org.apache.sysml.hops.Hop.OpOp1;
import org.apache.sysml.hops.Hop.VisitStatus;
import org.apache.sysml.hops.HopsException;
import org.apache.sysml.hops.IndexingOp;
import org.apache.sysml.hops.LiteralOp;
import org.apache.sysml.hops.MemoTable;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.hops.ReorgOp;
import org.apache.sysml.hops.UnaryOp;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
import org.apache.sysml.hops.rewrite.ProgramRewriter;
import org.apache.sysml.lops.CSVReBlock;
import org.apache.sysml.lops.DataGen;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.lops.ReBlock;
import org.apache.sysml.lops.compile.Dag;
import org.apache.sysml.parser.DMLProgram;
import org.apache.sysml.parser.DataExpression;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.parser.ForStatementBlock;
import org.apache.sysml.parser.IfStatementBlock;
import org.apache.sysml.parser.Statement;
import org.apache.sysml.parser.StatementBlock;
import org.apache.sysml.parser.WhileStatementBlock;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock;
import org.apache.sysml.runtime.controlprogram.IfProgramBlock;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.ProgramBlock;
import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.opt.OptTreeConverter;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
import org.apache.sysml.runtime.instructions.cp.IntObject;
import org.apache.sysml.runtime.instructions.cp.ScalarObject;
import org.apache.sysml.runtime.instructions.mr.RandInstruction;
import org.apache.sysml.runtime.instructions.mr.SeqInstruction;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Explain;
import org.apache.sysml.utils.Explain.ExplainType;
import org.apache.sysml.utils.JSONHelper;
/**
* Dynamic recompilation of hop dags to runtime instructions, which includes the
* following substeps:
*
* (1) deep copy hop dag, (2) refresh matrix characteristics, (3) apply
* dynamic rewrites, (4) refresh memory estimates, (5) construct lops (incl
* operator selection), and (6) generate runtime program (incl piggybacking).
*
*
*/
public class Recompiler
{
private static final Log LOG = LogFactory.getLog(Recompiler.class.getName());
//Max threshold for in-memory reblock of text input [in bytes]
//reason: single-threaded text read at 20MB/s, 1GB input -> 50s (should exploit parallelism)
//note that we scale this threshold up by the degree of available parallelism
private static final long CP_REBLOCK_THRESHOLD_SIZE = (long)1024*1024*1024;
private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = (long)256*1024*1024;
private static final long CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE = (long)1024*1024*1024;
/** Local reused rewriter for dynamic rewrites during recompile */
/** Local DML configuration for thread-local config updates */
private static ThreadLocal<ProgramRewriter> _rewriter = new ThreadLocal<ProgramRewriter>() {
@Override protected ProgramRewriter initialValue() { return new ProgramRewriter(false, true); }
};
/**
* Re-initializes the recompiler according to the current optimizer flags.
*/
public static void reinitRecompiler() {
_rewriter.set(new ProgramRewriter(false, true));
}
/**
* A) Recompile basic program block hop DAG.
*
* We support to basic types inplace or via deep copy. Deep copy is the default and is required
* in order to apply non-reversible rewrites. In-place is required in order to modify the existing
* hops (e.g., for parfor pre-recompilation).
*
* @param hops
* @param vars
* @return
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
public static ArrayList<Instruction> recompileHopsDag( StatementBlock sb, ArrayList<Hop> hops, LocalVariableMap vars, RecompileStatus status, boolean inplace, long tid )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
ArrayList<Instruction> newInst = null;
//need for synchronization as we do temp changes in shared hops/lops
//however, we create deep copies for most dags to allow for concurrent recompile
synchronized( hops )
{
LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " +
OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB");
// prepare hops dag for recompile
if( !inplace ){
// deep copy hop dag (for non-reversable rewrites)
hops = deepCopyHopsDag(hops);
}
else {
// clear existing lops
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rClearLops( hopRoot );
}
// replace scalar reads with literals
if( !inplace ) {
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rReplaceLiterals( hopRoot, vars );
}
// refresh matrix characteristics (update stats)
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rUpdateStatistics( hopRoot, vars );
// dynamic hop rewrites
if( !inplace )
_rewriter.get().rewriteHopDAGs( hops, null );
// refresh memory estimates (based on updated stats,
// before: init memo table with propagated worst-case estimates,
// after: extract worst-case estimates from memo table
Hop.resetVisitStatus(hops);
MemoTable memo = new MemoTable();
memo.init(hops, status);
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
hopRoot.refreshMemEstimates(memo);
memo.extract(hops, status);
// construct lops
Dag<Lop> dag = new Dag<Lop>();
for( Hop hopRoot : hops ){
Lop lops = hopRoot.constructLops();
lops.addToDag(dag);
}
// generate runtime instructions (incl piggybacking)
newInst = dag.getJobs(sb, ConfigurationManager.getDMLConfig());
}
// replace thread ids in new instructions
if( tid != 0 ) //only in parfor context
newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false);
// explain recompiled hops / instructions
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS ){
LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" +
Explain.explainHops(hops, 1));
}
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME ){
LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" +
Explain.explain(newInst, 1));
}
return newInst;
}
/**
* B) Recompile predicate hop DAG (single root):
*
* Note: This overloaded method is required for predicate instructions because
* they have only a single hops DAG and we need to synchronize on the original
* (shared) hops object. Hence, we cannot create any wrapper arraylist for each
* recompilation - this would result in race conditions for concurrent recompilation
* in a parfor body.
*
* Note: no statementblock passed because for predicate dags we dont have separate live variable analysis information.
*
* @param hops
* @param vars
* @return
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
public static ArrayList<Instruction> recompileHopsDag( Hop hops, LocalVariableMap vars, RecompileStatus status, boolean inplace, long tid )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
ArrayList<Instruction> newInst = null;
//need for synchronization as we do temp changes in shared hops/lops
synchronized( hops )
{
LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " +
OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB");
// prepare hops dag for recompile
if( !inplace ) {
// deep copy hop dag (for non-reversable rewrites)
//(this also clears existing lops in the created dag)
hops = deepCopyHopsDag(hops);
}
else {
// clear existing lops
hops.resetVisitStatus();
rClearLops( hops );
}
// replace scalar reads with literals
if( !inplace ) {
hops.resetVisitStatus();
rReplaceLiterals( hops, vars );
}
// refresh matrix characteristics (update stats)
hops.resetVisitStatus();
rUpdateStatistics( hops, vars );
// dynamic hop rewrites
if( !inplace )
_rewriter.get().rewriteHopDAG( hops, null );
// refresh memory estimates (based on updated stats)
MemoTable memo = new MemoTable();
hops.resetVisitStatus();
memo.init(hops, status);
hops.resetVisitStatus();
hops.refreshMemEstimates(memo);
// construct lops
Dag<Lop> dag = new Dag<Lop>();
Lop lops = hops.constructLops();
lops.addToDag(dag);
// generate runtime instructions (incl piggybacking)
newInst = dag.getJobs(null, ConfigurationManager.getDMLConfig());
}
// replace thread ids in new instructions
if( tid != 0 ) //only in parfor context
newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false);
// explain recompiled instructions
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS )
LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(hops,1));
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME )
LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(newInst,1));
return newInst;
}
/**
* C) Recompile basic program block hop DAG, but forced to CP.
*
* This happens always 'inplace', without statistics updates, and
* without dynamic rewrites.
*
* @param hops
* @param tid
* @return
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
public static ArrayList<Instruction> recompileHopsDag2Forced( StatementBlock sb, ArrayList<Hop> hops, long tid, ExecType et )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
ArrayList<Instruction> newInst = null;
//need for synchronization as we do temp changes in shared hops/lops
//however, we create deep copies for most dags to allow for concurrent recompile
synchronized( hops )
{
LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " +
OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB");
// clear existing lops
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rClearLops( hopRoot );
// update exec type
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rSetExecType( hopRoot, et );
Hop.resetVisitStatus(hops);
// construct lops
Dag<Lop> dag = new Dag<Lop>();
for( Hop hopRoot : hops ){
Lop lops = hopRoot.constructLops();
lops.addToDag(dag);
}
// generate runtime instructions (incl piggybacking)
newInst = dag.getJobs(sb, ConfigurationManager.getDMLConfig());
}
// replace thread ids in new instructions
if( tid != 0 ) //only in parfor context
newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false);
return newInst;
}
/**
* D) Recompile predicate hop DAG (single root), but forced to CP.
*
* This happens always 'inplace', without statistics updates, and
* without dynamic rewrites.
*
* @param hops
* @param tid
* @param et
* @return
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
public static ArrayList<Instruction> recompileHopsDag2Forced( Hop hops, long tid, ExecType et )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
ArrayList<Instruction> newInst = null;
//need for synchronization as we do temp changes in shared hops/lops
synchronized( hops )
{
LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " +
OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB");
// clear existing lops
hops.resetVisitStatus();
rClearLops( hops );
// update exec type
hops.resetVisitStatus();
rSetExecType( hops, et );
hops.resetVisitStatus();
// construct lops
Dag<Lop> dag = new Dag<Lop>();
Lop lops = hops.constructLops();
lops.addToDag(dag);
// generate runtime instructions (incl piggybacking)
newInst = dag.getJobs(null, ConfigurationManager.getDMLConfig());
}
// replace thread ids in new instructions
if( tid != 0 ) //only in parfor context
newInst = ProgramConverter.createDeepCopyInstructionSet(newInst, tid, -1, null, null, null, false, false);
return newInst;
}
/**
*
* @param sb
* @param hops
* @return
* @throws HopsException
* @throws LopsException
* @throws DMLRuntimeException
* @throws IOException
*/
public static ArrayList<Instruction> recompileHopsDagInstructions( StatementBlock sb, ArrayList<Hop> hops )
throws HopsException, LopsException, DMLRuntimeException, IOException
{
ArrayList<Instruction> newInst = null;
//need for synchronization as we do temp changes in shared hops/lops
//however, we create deep copies for most dags to allow for concurrent recompile
synchronized( hops )
{
LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " +
OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB");
// clear existing lops
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rClearLops( hopRoot );
// construct lops
Dag<Lop> dag = new Dag<Lop>();
for( Hop hopRoot : hops ){
Lop lops = hopRoot.constructLops();
lops.addToDag(dag);
}
// generate runtime instructions (incl piggybacking)
newInst = dag.getJobs(sb, ConfigurationManager.getDMLConfig());
}
// explain recompiled hops / instructions
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS ){
LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" +
Explain.explainHops(hops, 1));
}
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME ){
LOG.info("EXPLAIN RECOMPILE \nGENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+"):\n" +
Explain.explain(newInst, 1));
}
return newInst;
}
/**
*
* @param hops
* @return
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
public static ArrayList<Instruction> recompileHopsDagInstructions( Hop hops )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
ArrayList<Instruction> newInst = null;
//need for synchronization as we do temp changes in shared hops/lops
synchronized( hops )
{
LOG.debug ("\n**************** Optimizer (Recompile) *************\nMemory Budget = " +
OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " MB");
// clear existing lops
hops.resetVisitStatus();
rClearLops( hops );
// construct lops
Dag<Lop> dag = new Dag<Lop>();
Lop lops = hops.constructLops();
lops.addToDag(dag);
// generate runtime instructions (incl piggybacking)
newInst = dag.getJobs(null, ConfigurationManager.getDMLConfig());
}
// explain recompiled instructions
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_HOPS )
LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(hops,1));
if( DMLScript.EXPLAIN == ExplainType.RECOMPILE_RUNTIME )
LOG.info("EXPLAIN RECOMPILE \nPRED (line "+hops.getBeginLine()+"):\n" + Explain.explain(newInst,1));
return newInst;
}
/**
*
* @param pbs
* @param vars
* @param tid
* @throws DMLRuntimeException
*/
public static void recompileProgramBlockHierarchy( ArrayList<ProgramBlock> pbs, LocalVariableMap vars, long tid, boolean resetRecompile )
throws DMLRuntimeException
{
try
{
RecompileStatus status = new RecompileStatus();
synchronized( pbs )
{
for( ProgramBlock pb : pbs )
rRecompileProgramBlock(pb, vars, status, tid, resetRecompile);
}
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to recompile program block hierarchy.", ex);
}
}
/**
* Method to recompile program block hierarchy to forced execution time. This affects also
* referenced functions and chains of functions. Use et==null in order to release the forced
* exec type.
*
* @param pbs
* @param tid
* @throws DMLRuntimeException
*/
public static void recompileProgramBlockHierarchy2Forced( ArrayList<ProgramBlock> pbs, long tid, HashSet<String> fnStack, ExecType et )
throws DMLRuntimeException
{
try
{
synchronized( pbs )
{
for( ProgramBlock pb : pbs )
rRecompileProgramBlock2Forced(pb, tid, fnStack, et);
}
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to recompile program block hierarchy to CP.", ex);
}
}
/**
* This method does NO full program block recompile (no stats update, no rewrites, no recursion) but
* only regenerates lops and instructions. The primary use case is recompilation after are hop configuration
* changes which allows to preserve statistics (e.g., propagated worst case stats from other program blocks)
* and better performance for recompiling individual program blocks.
*
* @param pb
* @throws IOException
* @throws DMLRuntimeException
* @throws LopsException
* @throws HopsException
*/
public static void recompileProgramBlockInstructions(ProgramBlock pb)
throws HopsException, LopsException, DMLRuntimeException, IOException
{
if( pb instanceof WhileProgramBlock )
{
//recompile while predicate instructions
WhileProgramBlock wpb = (WhileProgramBlock)pb;
WhileStatementBlock wsb = (WhileStatementBlock) pb.getStatementBlock();
if( wsb!=null && wsb.getPredicateHops()!=null )
wpb.setPredicate(recompileHopsDagInstructions(wsb.getPredicateHops()));
}
else if( pb instanceof IfProgramBlock )
{
//recompile if predicate instructions
IfProgramBlock ipb = (IfProgramBlock)pb;
IfStatementBlock isb = (IfStatementBlock) pb.getStatementBlock();
if( isb!=null && isb.getPredicateHops()!=null )
ipb.setPredicate(recompileHopsDagInstructions(isb.getPredicateHops()));
}
else if( pb instanceof ForProgramBlock )
{
//recompile for/parfor predicate instructions
ForProgramBlock fpb = (ForProgramBlock)pb;
ForStatementBlock fsb = (ForStatementBlock) pb.getStatementBlock();
if( fsb!=null && fsb.getFromHops()!=null )
fpb.setFromInstructions(recompileHopsDagInstructions(fsb.getFromHops()));
if( fsb!=null && fsb.getToHops()!=null )
fpb.setToInstructions(recompileHopsDagInstructions(fsb.getToHops()));
if( fsb!=null && fsb.getIncrementHops()!=null )
fpb.setIncrementInstructions(recompileHopsDagInstructions(fsb.getIncrementHops()));
}
else
{
//recompile last-level program block instructions
StatementBlock sb = pb.getStatementBlock();
if( sb!=null && sb.get_hops()!=null ) {
pb.setInstructions(recompileHopsDagInstructions(sb, sb.get_hops()));
}
}
}
/**
*
* @param hops
* @return
*/
public static boolean requiresRecompilation( ArrayList<Hop> hops )
{
boolean ret = false;
if( hops != null )
{
synchronized( hops )
{
Hop.resetVisitStatus(hops);
for( Hop hop : hops )
{
ret |= rRequiresRecompile(hop);
if( ret ) break; // early abort
}
}
}
return ret;
}
/**
*
* @param hops
* @return
*/
public static boolean requiresRecompilation( Hop hop )
{
boolean ret = false;
if( hop != null )
{
synchronized( hop )
{
hop.resetVisitStatus();
ret = rRequiresRecompile(hop);
}
}
return ret;
}
/**
* Deep copy of hops dags for parallel recompilation.
*
* @param hops
* @return
* @throws CloneNotSupportedException
*/
public static ArrayList<Hop> deepCopyHopsDag( ArrayList<Hop> hops )
throws HopsException
{
ArrayList<Hop> ret = new ArrayList<Hop>();
try {
//note: need memo table over all independent DAGs in order to
//account for shared transient reads (otherwise more instructions generated)
HashMap<Long, Hop> memo = new HashMap<Long, Hop>(); //orig ID, new clone
for( Hop hopRoot : hops )
ret.add(rDeepCopyHopsDag(hopRoot, memo));
}
catch(Exception ex)
{
throw new HopsException(ex);
}
return ret;
}
/**
* Deep copy of hops dags for parallel recompilation.
*
* @param hops
* @return
* @throws CloneNotSupportedException
*/
public static Hop deepCopyHopsDag( Hop hops )
throws HopsException
{
Hop ret = null;
try {
HashMap<Long, Hop> memo = new HashMap<Long, Hop>(); //orig ID, new clone
ret = rDeepCopyHopsDag(hops, memo);
}
catch(Exception ex)
{
throw new HopsException(ex);
}
return ret;
}
/**
*
* @param hops
* @param memo
* @return
* @throws CloneNotSupportedException
*/
private static Hop rDeepCopyHopsDag( Hop hops, HashMap<Long,Hop> memo )
throws CloneNotSupportedException
{
Hop ret = memo.get(hops.getHopID());
//create clone if required
if( ret == null )
{
ret = (Hop) hops.clone();
ArrayList<Hop> tmp = new ArrayList<Hop>();
//create new childs
for( Hop in : hops.getInput() )
{
Hop newIn = rDeepCopyHopsDag(in, memo);
tmp.add(newIn);
}
//modify references of childs
for( Hop in : tmp )
{
ret.getInput().add(in);
in.getParent().add(ret);
}
memo.put(hops.getHopID(), ret);
}
return ret;
}
public static void updateFunctionNames(ArrayList<Hop> hops, long pid)
{
Hop.resetVisitStatus(hops);
for( Hop hopRoot : hops )
rUpdateFunctionNames( hopRoot, pid );
}
public static void rUpdateFunctionNames( Hop hop, long pid )
{
if( hop.getVisited() == VisitStatus.DONE )
return;
//update function names
if( hop instanceof FunctionOp && ((FunctionOp)hop).getFunctionType() != FunctionType.MULTIRETURN_BUILTIN) {
FunctionOp fop = (FunctionOp) hop;
fop.setFunctionName( fop.getFunctionName() +
ProgramConverter.CP_CHILD_THREAD + pid);
}
if( hop.getInput() != null )
for( Hop c : hop.getInput() )
rUpdateFunctionNames(c, pid);
hop.setVisited(VisitStatus.DONE);
}
//////////////////////////////
// private helper functions //
//////////////////////////////
/**
*
* @param pb
* @param vars
* @param tid
* @throws IOException
* @throws LopsException
* @throws DMLRuntimeException
* @throws HopsException
*/
private static void rRecompileProgramBlock( ProgramBlock pb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile )
throws HopsException, DMLRuntimeException, LopsException, IOException
{
if (pb instanceof WhileProgramBlock)
{
WhileProgramBlock wpb = (WhileProgramBlock)pb;
WhileStatementBlock wsb = (WhileStatementBlock) wpb.getStatementBlock();
//recompile predicate
recompileWhilePredicate(wpb, wsb, vars, status, tid, resetRecompile);
//remove updated scalars because in loop
removeUpdatedScalars(vars, wsb);
//copy vars for later compare
LocalVariableMap oldVars = (LocalVariableMap) vars.clone();
RecompileStatus oldStatus = (RecompileStatus) status.clone();
for (ProgramBlock pb2 : wpb.getChildBlocks())
rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile);
if( reconcileUpdatedCallVarsLoops(oldVars, vars, wsb)
| reconcileUpdatedCallVarsLoops(oldStatus, status, wsb) ) {
//second pass with unknowns if required
recompileWhilePredicate(wpb, wsb, vars, status, tid, resetRecompile);
for (ProgramBlock pb2 : wpb.getChildBlocks())
rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile);
}
removeUpdatedScalars(vars, wsb);
}
else if (pb instanceof IfProgramBlock)
{
IfProgramBlock ipb = (IfProgramBlock)pb;
IfStatementBlock isb = (IfStatementBlock)ipb.getStatementBlock();
//recompile predicate
recompileIfPredicate(ipb, isb, vars, status, tid, resetRecompile);
//copy vars for later compare
LocalVariableMap oldVars = (LocalVariableMap) vars.clone();
LocalVariableMap varsElse = (LocalVariableMap) vars.clone();
RecompileStatus oldStatus = (RecompileStatus)status.clone();
RecompileStatus statusElse = (RecompileStatus)status.clone();
for( ProgramBlock pb2 : ipb.getChildBlocksIfBody() )
rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile);
for( ProgramBlock pb2 : ipb.getChildBlocksElseBody() )
rRecompileProgramBlock(pb2, varsElse, statusElse, tid, resetRecompile);
reconcileUpdatedCallVarsIf(oldVars, vars, varsElse, isb);
reconcileUpdatedCallVarsIf(oldStatus, status, statusElse, isb);
removeUpdatedScalars(vars, ipb.getStatementBlock());
}
else if (pb instanceof ForProgramBlock) //includes ParFORProgramBlock
{
ForProgramBlock fpb = (ForProgramBlock)pb;
ForStatementBlock fsb = (ForStatementBlock) fpb.getStatementBlock();
//recompile predicates
recompileForPredicates(fpb, fsb, vars, status, tid, resetRecompile);
//remove updated scalars because in loop
removeUpdatedScalars(vars, fpb.getStatementBlock());
//copy vars for later compare
LocalVariableMap oldVars = (LocalVariableMap) vars.clone();
RecompileStatus oldStatus = (RecompileStatus) status.clone();
for( ProgramBlock pb2 : fpb.getChildBlocks() )
rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile);
if( reconcileUpdatedCallVarsLoops(oldVars, vars, fsb)
| reconcileUpdatedCallVarsLoops(oldStatus, status, fsb)) {
//second pass with unknowns if required
recompileForPredicates(fpb, fsb, vars, status, tid, resetRecompile);
for( ProgramBlock pb2 : fpb.getChildBlocks() )
rRecompileProgramBlock(pb2, vars, status, tid, resetRecompile);
}
removeUpdatedScalars(vars, fpb.getStatementBlock());
}
else if ( pb instanceof FunctionProgramBlock ) //includes ExternalFunctionProgramBlock and ExternalFunctionProgramBlockCP
{
//do nothing
}
else
{
StatementBlock sb = pb.getStatementBlock();
ArrayList<Instruction> tmp = pb.getInstructions();
if( sb != null //recompile all for stats propagation and recompile flags
//&& Recompiler.requiresRecompilation( sb.get_hops() )
/*&& !Recompiler.containsNonRecompileInstructions(tmp)*/ )
{
tmp = Recompiler.recompileHopsDag(sb, sb.get_hops(), vars, status, true, tid);
pb.setInstructions( tmp );
//propagate stats across hops (should be executed on clone of vars)
Recompiler.extractDAGOutputStatistics(sb.get_hops(), vars);
//reset recompilation flags (w/ special handling functions)
if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs
&& !containsRootFunctionOp(sb.get_hops())
&& resetRecompile )
{
Hop.resetRecompilationFlag(sb.get_hops(), ExecType.CP);
sb.updateRecompilationFlag();
}
}
}
}
/**
*
* @param oldCallVars
* @param callVars
* @param sb
* @return
*/
public static boolean reconcileUpdatedCallVarsLoops( LocalVariableMap oldCallVars, LocalVariableMap callVars, StatementBlock sb )
{
boolean requiresRecompile = false;
//handle matrices
for( String varname : sb.variablesUpdated().getVariableNames() )
{
Data dat1 = oldCallVars.get(varname);
Data dat2 = callVars.get(varname);
if( dat1!=null && dat1 instanceof MatrixObject && dat2!=null && dat2 instanceof MatrixObject )
{
MatrixObject moOld = (MatrixObject) dat1;
MatrixObject mo = (MatrixObject) dat2;
MatrixCharacteristics mcOld = moOld.getMatrixCharacteristics();
MatrixCharacteristics mc = mo.getMatrixCharacteristics();
if( mcOld.getRows() != mc.getRows()
|| mcOld.getCols() != mc.getCols()
|| mcOld.getNonZeros() != mc.getNonZeros() )
{
long ldim1 = mc.getRows(), ldim2 = mc.getCols(), lnnz = mc.getNonZeros();
//handle row dimension change in body
if( mcOld.getRows() != mc.getRows() ) {
ldim1=-1; //unknown
requiresRecompile = true;
}
//handle column dimension change in body
if( mcOld.getCols() != mc.getCols() ) {
ldim2=-1; //unknown
requiresRecompile = true;
}
//handle sparsity change
if( mcOld.getNonZeros() != mc.getNonZeros() ) {
lnnz=-1; //unknown
requiresRecompile = true;
}
MatrixObject moNew = createOutputMatrix(ldim1, ldim2, lnnz);
callVars.put(varname, moNew);
}
}
}
return requiresRecompile;
}
/**
*
* @param oldCallVars
* @param callVars
* @param sb
* @return
*/
public static boolean reconcileUpdatedCallVarsLoops( RecompileStatus oldCallStatus, RecompileStatus callStatus, StatementBlock sb )
{
boolean requiresRecompile = false;
//handle matrices
for( String varname : sb.variablesUpdated().getVariableNames() )
{
MatrixCharacteristics dat1 = oldCallStatus.getTWriteStats().get(varname);
MatrixCharacteristics dat2 = callStatus.getTWriteStats().get(varname);
if( dat1!=null && dat2!=null )
{
MatrixCharacteristics mcOld = dat1;
MatrixCharacteristics mc = dat2;
if( mcOld.getRows() != mc.getRows()
|| mcOld.getCols() != mc.getCols()
|| mcOld.getNonZeros() != mc.getNonZeros() )
{
long ldim1 = mc.getRows(), ldim2 = mc.getCols(), lnnz = mc.getNonZeros();
//handle row dimension change in body
if( mcOld.getRows() != mc.getRows() ) {
ldim1 = -1;
requiresRecompile = true;
}
//handle column dimension change in body
if( mcOld.getCols() != mc.getCols() ) {
ldim2 = -1;
requiresRecompile = true;
}
//handle sparsity change
if( mcOld.getNonZeros() != mc.getNonZeros() ) {
lnnz = -1;
requiresRecompile = true;
}
MatrixCharacteristics moNew = new MatrixCharacteristics(ldim1, ldim2, -1, -1, lnnz);
callStatus.getTWriteStats().put(varname, moNew);
}
}
}
return requiresRecompile;
}
/**
*
* @param oldCallVars
* @param callVarsIf
* @param callVarsElse
* @param sb
* @return
*/
public static LocalVariableMap reconcileUpdatedCallVarsIf( LocalVariableMap oldCallVars, LocalVariableMap callVarsIf, LocalVariableMap callVarsElse, StatementBlock sb )
{
for( String varname : sb.variablesUpdated().getVariableNames() )
{
Data origVar = oldCallVars.get(varname);
Data ifVar = callVarsIf.get(varname);
Data elseVar = callVarsElse.get(varname);
Data dat1 = null, dat2 = null;
if( ifVar!=null && elseVar!=null ){ // both branches exists
dat1 = ifVar;
dat2 = elseVar;
}
else if( ifVar!=null && elseVar==null ){ //only if
dat1 = origVar;
dat2 = ifVar;
}
else { //only else
dat1 = origVar;
dat2 = elseVar;
}
//compare size and value information (note: by definition both dat1 and dat2 are of same type
//because we do not allow data type changes)
if( dat1 != null && dat1 instanceof MatrixObject && dat2!=null )
{
//handle matrices
if( dat1 instanceof MatrixObject && dat2 instanceof MatrixObject )
{
MatrixObject moOld = (MatrixObject) dat1;
MatrixObject mo = (MatrixObject) dat2;
MatrixCharacteristics mcOld = moOld.getMatrixCharacteristics();
MatrixCharacteristics mc = mo.getMatrixCharacteristics();
if( mcOld.getRows() != mc.getRows()
|| mcOld.getCols() != mc.getCols()
|| mcOld.getNonZeros() != mc.getNonZeros() )
{
long ldim1 =mc.getRows(), ldim2=mc.getCols(), lnnz=mc.getNonZeros();
//handle row dimension change
if( mcOld.getRows() != mc.getRows() ) {
ldim1 = -1; //unknown
}
if( mcOld.getCols() != mc.getCols() ) {
ldim2 = -1; //unknown
}
//handle sparsity change
if( mcOld.getNonZeros() != mc.getNonZeros() ) {
lnnz = -1; //unknown
}
MatrixObject moNew = createOutputMatrix(ldim1, ldim2, lnnz);
callVarsIf.put(varname, moNew);
}
}
}
}
return callVarsIf;
}
/**
*
* @param oldStatus
* @param callStatusIf
* @param callStatusElse
* @param sb
* @return
*/
public static RecompileStatus reconcileUpdatedCallVarsIf( RecompileStatus oldStatus, RecompileStatus callStatusIf, RecompileStatus callStatusElse, StatementBlock sb )
{
for( String varname : sb.variablesUpdated().getVariableNames() )
{
MatrixCharacteristics origVar = oldStatus.getTWriteStats().get(varname);
MatrixCharacteristics ifVar = callStatusIf.getTWriteStats().get(varname);
MatrixCharacteristics elseVar = callStatusElse.getTWriteStats().get(varname);
MatrixCharacteristics dat1 = null, dat2 = null;
if( ifVar!=null && elseVar!=null ){ // both branches exists
dat1 = ifVar;
dat2 = elseVar;
}
else if( ifVar!=null && elseVar==null ){ //only if
dat1 = origVar;
dat2 = ifVar;
}
else { //only else
dat1 = origVar;
dat2 = elseVar;
}
//compare size and value information (note: by definition both dat1 and dat2 are of same type
//because we do not allow data type changes)
if( dat1 != null && dat2!=null )
{
MatrixCharacteristics mcOld = dat1;
MatrixCharacteristics mc = dat2;
if( mcOld.getRows() != mc.getRows()
|| mcOld.getCols() != mc.getCols()
|| mcOld.getNonZeros() != mc.getNonZeros() )
{
long ldim1 = (mcOld.getRows()>=0 && mc.getRows()>=0) ?
Math.max( mcOld.getRows(), mc.getRows() ) : -1;
long ldim2 = (mcOld.getCols()>=0 && mc.getCols()>=0) ?
Math.max( mcOld.getCols(), mc.getCols() ) : -1;
long lnnz = (mcOld.getNonZeros()>=0 && mc.getNonZeros()>=0) ?
Math.max( mcOld.getNonZeros(), mc.getNonZeros() ) : -1;
MatrixCharacteristics mcNew = new MatrixCharacteristics(ldim1, ldim2, -1, -1, lnnz);
callStatusIf.getTWriteStats().put(varname, mcNew);
}
}
}
return callStatusIf;
}
/**
*
* @param hops
* @return
*/
private static boolean containsRootFunctionOp( ArrayList<Hop> hops )
{
boolean ret = false;
for( Hop h : hops )
if( h instanceof FunctionOp )
ret |= true;
return ret;
}
/**
*
* @param dim1
* @param dim2
* @param nnz
* @return
*/
private static MatrixObject createOutputMatrix( long dim1, long dim2, long nnz )
{
MatrixObject moOut = new MatrixObject(ValueType.DOUBLE, null);
MatrixCharacteristics mc = new MatrixCharacteristics(
dim1, dim2,
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(),
nnz);
MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,null,null);
moOut.setMetaData(meta);
return moOut;
}
//helper functions for predicate recompile
/**
*
* @param ipb
* @param isb
* @param vars
* @param tid
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
private static void recompileIfPredicate( IfProgramBlock ipb, IfStatementBlock isb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
if( isb != null )
{
Hop hops = isb.getPredicateHops();
if( hops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(hops, vars, status, true, tid);
ipb.setPredicate( tmp );
if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs
&& resetRecompile )
{
Hop.resetRecompilationFlag(hops, ExecType.CP);
isb.updatePredicateRecompilationFlag();
}
//update predicate vars (potentially after constant folding, e.g., in parfor)
if( hops instanceof LiteralOp )
ipb.setPredicateResultVar(((LiteralOp)hops).getName().toLowerCase());
}
}
}
/**
*
* @param wpb
* @param wsb
* @param vars
* @param tid
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
private static void recompileWhilePredicate( WhileProgramBlock wpb, WhileStatementBlock wsb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
if( wsb != null )
{
Hop hops = wsb.getPredicateHops();
if( hops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(hops, vars, status, true, tid);
wpb.setPredicate( tmp );
if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs
&& resetRecompile )
{
Hop.resetRecompilationFlag(hops, ExecType.CP);
wsb.updatePredicateRecompilationFlag();
}
//update predicate vars (potentially after constant folding, e.g., in parfor)
if( hops instanceof LiteralOp )
wpb.setPredicateResultVar(((LiteralOp)hops).getName().toLowerCase());
}
}
}
/**
*
* @param fpb
* @param fsb
* @param vars
* @param tid
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
private static void recompileForPredicates( ForProgramBlock fpb, ForStatementBlock fsb, LocalVariableMap vars, RecompileStatus status, long tid, boolean resetRecompile )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
if( fsb != null )
{
Hop fromHops = fsb.getFromHops();
Hop toHops = fsb.getToHops();
Hop incrHops = fsb.getIncrementHops();
//handle recompilation flags
if( ParForProgramBlock.RESET_RECOMPILATION_FLAGs
&& resetRecompile )
{
if( fromHops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(fromHops, vars, status, true, tid);
fpb.setFromInstructions(tmp);
Hop.resetRecompilationFlag(fromHops,ExecType.CP);
}
if( toHops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(toHops, vars, status, true, tid);
fpb.setToInstructions(tmp);
Hop.resetRecompilationFlag(toHops,ExecType.CP);
}
if( incrHops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(incrHops, vars, status, true, tid);
fpb.setIncrementInstructions(tmp);
Hop.resetRecompilationFlag(incrHops,ExecType.CP);
}
fsb.updatePredicateRecompilationFlags();
}
else //no reset of recompilation flags
{
if( fromHops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(fromHops, vars, status, true, tid);
fpb.setFromInstructions(tmp);
}
if( toHops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(toHops, vars, status, true, tid);
fpb.setToInstructions(tmp);
}
if( incrHops != null ) {
ArrayList<Instruction> tmp = recompileHopsDag(incrHops, vars, status, true, tid);
fpb.setIncrementInstructions(tmp);
}
}
//update predicate vars (potentially after constant folding, e.g., in parfor)
String[] itervars = fpb.getIterablePredicateVars();
if( fromHops != null && fromHops instanceof LiteralOp )
itervars[1] = ((LiteralOp)fromHops).getName();
if( toHops != null && toHops instanceof LiteralOp )
itervars[2] = ((LiteralOp)toHops).getName();
if( incrHops != null && incrHops instanceof LiteralOp )
itervars[3] = ((LiteralOp)incrHops).getName();
}
}
/**
*
* @param pb
* @param tid
* @throws HopsException
* @throws DMLRuntimeException
* @throws LopsException
* @throws IOException
*/
private static void rRecompileProgramBlock2Forced( ProgramBlock pb, long tid, HashSet<String> fnStack, ExecType et )
throws HopsException, DMLRuntimeException, LopsException, IOException
{
if (pb instanceof WhileProgramBlock)
{
WhileProgramBlock pbTmp = (WhileProgramBlock)pb;
WhileStatementBlock sbTmp = (WhileStatementBlock)pbTmp.getStatementBlock();
//recompile predicate
if( sbTmp!=null && !(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getPredicate(), true, true)) )
pbTmp.setPredicate( Recompiler.recompileHopsDag2Forced(sbTmp.getPredicateHops(), tid, et) );
//recompile body
for (ProgramBlock pb2 : pbTmp.getChildBlocks())
rRecompileProgramBlock2Forced(pb2, tid, fnStack, et);
}
else if (pb instanceof IfProgramBlock)
{
IfProgramBlock pbTmp = (IfProgramBlock)pb;
IfStatementBlock sbTmp = (IfStatementBlock)pbTmp.getStatementBlock();
//recompile predicate
if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getPredicate(), true, true)) )
pbTmp.setPredicate( Recompiler.recompileHopsDag2Forced(sbTmp.getPredicateHops(), tid, et) );
//recompile body
for( ProgramBlock pb2 : pbTmp.getChildBlocksIfBody() )
rRecompileProgramBlock2Forced(pb2, tid, fnStack, et);
for( ProgramBlock pb2 : pbTmp.getChildBlocksElseBody() )
rRecompileProgramBlock2Forced(pb2, tid, fnStack, et);
}
else if (pb instanceof ForProgramBlock) //includes ParFORProgramBlock
{
ForProgramBlock pbTmp = (ForProgramBlock)pb;
ForStatementBlock sbTmp = (ForStatementBlock) pbTmp.getStatementBlock();
//recompile predicate
if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getFromInstructions(), true, true)) )
pbTmp.setFromInstructions( Recompiler.recompileHopsDag2Forced(sbTmp.getFromHops(), tid, et) );
if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getToInstructions(), true, true)) )
pbTmp.setToInstructions( Recompiler.recompileHopsDag2Forced(sbTmp.getToHops(), tid, et) );
if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsMRJobInstruction(pbTmp.getIncrementInstructions(), true, true)) )
pbTmp.setIncrementInstructions( Recompiler.recompileHopsDag2Forced(sbTmp.getIncrementHops(), tid, et) );
//recompile body
for( ProgramBlock pb2 : pbTmp.getChildBlocks() )
rRecompileProgramBlock2Forced(pb2, tid, fnStack, et);
}
else if ( pb instanceof FunctionProgramBlock )//includes ExternalFunctionProgramBlock and ExternalFunctionProgramBlockCP
{
FunctionProgramBlock tmp = (FunctionProgramBlock)pb;
for( ProgramBlock pb2 : tmp.getChildBlocks() )
rRecompileProgramBlock2Forced(pb2, tid, fnStack, et);
}
else
{
StatementBlock sb = pb.getStatementBlock();
//recompile hops dag to CP (note selective recompile 'if CP and no MR inst'
//would be invalid with permutation matrix mult across multiple dags)
if( sb != null ) {
ArrayList<Instruction> tmp = pb.getInstructions();
tmp = Recompiler.recompileHopsDag2Forced(sb, sb.get_hops(), tid, et);
pb.setInstructions( tmp );
}
//recompile functions
if( OptTreeConverter.containsFunctionCallInstruction(pb) )
{
ArrayList<Instruction> tmp = pb.getInstructions();
for( Instruction inst : tmp )
if( inst instanceof FunctionCallCPInstruction )
{
FunctionCallCPInstruction func = (FunctionCallCPInstruction)inst;
String fname = func.getFunctionName();
String fnamespace = func.getNamespace();
String fKey = DMLProgram.constructFunctionKey(fnamespace, fname);
if( !fnStack.contains(fKey) ) //memoization for multiple calls, recursion
{
fnStack.add(fKey);
FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fnamespace, fname);
rRecompileProgramBlock2Forced(fpb, tid, fnStack, et); //recompile chains of functions
}
}
}
}
}
/**
*
* @param callVars
* @param sb
*/
public static void removeUpdatedScalars( LocalVariableMap callVars, StatementBlock sb )
{
if( sb != null )
{
//remove update scalar variables from constants
for( String varname : sb.variablesUpdated().getVariables().keySet() )
{
Data dat = callVars.get(varname);
if( dat != null && dat.getDataType() == DataType.SCALAR )
{
callVars.remove(varname);
}
}
}
}
/**
*
* @param hops
* @param vars
*/
public static void extractDAGOutputStatistics(ArrayList<Hop> hops, LocalVariableMap vars)
{
extractDAGOutputStatistics(hops, vars, true);
}
/**
*
* @param hops
* @param vars
*/
public static void extractDAGOutputStatistics(ArrayList<Hop> hops, LocalVariableMap vars, boolean overwrite)
{
for( Hop hop : hops ) //for all hop roots
extractDAGOutputStatistics(hop, vars, overwrite);
}
/**
*
* @param hop
* @param vars
*/
public static void extractDAGOutputStatistics(Hop hop, LocalVariableMap vars)
{
extractDAGOutputStatistics(hop, vars, true);
}
/**
*
* @param hop
* @param vars
* @param overwrite
*/
public static void extractDAGOutputStatistics(Hop hop, LocalVariableMap vars, boolean overwrite)
{
if( hop instanceof DataOp && ((DataOp)hop).getDataOpType()==DataOpTypes.TRANSIENTWRITE ) //for all writes to symbol table
//&& hop.getDim1()>0 && hop.getDim2()>0 ) //matrix with known dims
{
String varName = hop.getName();
if( !vars.keySet().contains(varName) || overwrite ) //not existing so far
{
//extract matrix sizes for size propagation
if( hop.getDataType()==DataType.MATRIX )
{
MatrixObject mo = new MatrixObject(ValueType.DOUBLE, null);
MatrixCharacteristics mc = new MatrixCharacteristics(
hop.getDim1(), hop.getDim2(),
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(),
hop.getNnz());
MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,null,null);
mo.setMetaData(meta);
vars.put(varName, mo);
}
//extract scalar constants for second constant propagation
else if( hop.getDataType()==DataType.SCALAR )
{
//extract literal assignments
if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof LiteralOp )
{
ScalarObject constant = HopRewriteUtils.getScalarObject((LiteralOp)hop.getInput().get(0));
if( constant!=null )
vars.put(varName, constant);
}
//extract constant variable assignments
else if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof DataOp)
{
DataOp dop = (DataOp) hop.getInput().get(0);
String dopvarname = dop.getName();
if( dop.isRead() && vars.keySet().contains(dopvarname) )
{
ScalarObject constant = (ScalarObject) vars.get(dopvarname);
vars.put(varName, constant); //no clone because constant
}
}
//extract ncol/nrow variable assignments
else if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof UnaryOp
&& (((UnaryOp)hop.getInput().get(0)).getOp()==OpOp1.NROW ||
((UnaryOp)hop.getInput().get(0)).getOp()==OpOp1.NCOL) )
{
UnaryOp uop = (UnaryOp) hop.getInput().get(0);
if( uop.getOp()==OpOp1.NROW && uop.getInput().get(0).getDim1()>0 )
vars.put(varName, new IntObject(uop.getInput().get(0).getDim1()));
else if( uop.getOp()==OpOp1.NCOL && uop.getInput().get(0).getDim2()>0 )
vars.put(varName, new IntObject(uop.getInput().get(0).getDim2()));
}
//remove other updated scalars
else
{
//we need to remove other updated scalars in order to ensure result
//correctness of recompilation w/o being too conservative
vars.remove(varName);
}
}
}
else //already existing: take largest
{
Data dat = vars.get(varName);
if( dat instanceof MatrixObject )
{
MatrixObject mo = (MatrixObject)dat;
MatrixCharacteristics mc = mo.getMatrixCharacteristics();
if( OptimizerUtils.estimateSizeExactSparsity(mc.getRows(), mc.getCols(), (mc.getNonZeros()>=0)?((double)mc.getNonZeros())/mc.getRows()/mc.getCols():1.0)
< OptimizerUtils.estimateSize(hop.getDim1(), hop.getDim2()) )
{
//update statistics if necessary
mc.setDimension(hop.getDim1(), hop.getDim2());
mc.setNonZeros(hop.getNnz());
}
}
else //scalar (just overwrite)
{
if( hop.getInput().size()==1 && hop.getInput().get(0) instanceof LiteralOp )
{
ScalarObject constant = HopRewriteUtils.getScalarObject((LiteralOp)hop.getInput().get(0));
if( constant!=null )
vars.put(varName, constant);
}
}
}
}
}
/**
* NOTE: no need for update visit status due to early abort
*
* @param hop
* @return
*/
private static boolean rRequiresRecompile( Hop hop )
{
boolean ret = hop.requiresRecompile();
if( hop.getVisited() == VisitStatus.DONE )
return ret;
if( hop.getInput() != null )
for( Hop c : hop.getInput() )
{
ret |= rRequiresRecompile(c);
if( ret ) break; // early abort
}
hop.setVisited(VisitStatus.DONE);
return ret;
}
/**
* Clearing lops for a given hops includes to (1) remove the reference
* to constructed lops and (2) clear the exec type (for consistency).
*
* The latter is important for advanced optimizers like parfor; otherwise subtle
* side-effects of program recompilation and hop-lop rewrites possible
* (e.g., see indexingop hop-lop rewrite in combination parfor rewrite set
* exec type that eventuelly might lead to unnecessary remote_parfor jobs).
*
* @param hop
*/
public static void rClearLops( Hop hop )
{
if( hop.getVisited() == VisitStatus.DONE )
return;
//clear all relevant lops to allow for recompilation
if( hop instanceof LiteralOp )
{
//for literal ops, we just clear parents because always constant
if( hop.getLops() != null )
hop.getLops().getOutputs().clear();
}
else //GENERAL CASE
{
hop.resetExecType(); //remove exec type
hop.setLops(null); //clear lops
if( hop.getInput() != null )
for( Hop c : hop.getInput() )
rClearLops(c);
}
hop.setVisited(VisitStatus.DONE);
}
/**
*
* @param hop
* @param vars
* @throws DMLRuntimeException
*/
public static void rUpdateStatistics( Hop hop, LocalVariableMap vars )
throws DMLRuntimeException
{
if( hop.getVisited() == VisitStatus.DONE )
return;
//recursively process children
if( hop.getInput() != null )
for( Hop c : hop.getInput() )
rUpdateStatistics(c, vars);
boolean updatedSizeExpr = false;
//update statistics for transient reads according to current statistics
//(with awareness not to override persistent reads to an existing name)
if( hop instanceof DataOp
&& ((DataOp)hop).getDataOpType() != DataOpTypes.PERSISTENTREAD )
{
DataOp d = (DataOp) hop;
String varName = d.getName();
if( vars.keySet().contains( varName ) )
{
Data dat = vars.get(varName);
if( dat instanceof MatrixObject ) {
MatrixObject mo = (MatrixObject) dat;
d.setDim1(mo.getNumRows());
d.setDim2(mo.getNumColumns());
d.setNnz(mo.getNnz());
}
else if( dat instanceof FrameObject ) {
FrameObject fo = (FrameObject) dat;
d.setDim1(fo.getNumRows());
d.setDim2(fo.getNumColumns());
}
}
}
//special case for persistent reads with unknown size (read-after-write)
else if( hop instanceof DataOp
&& ((DataOp)hop).getDataOpType() == DataOpTypes.PERSISTENTREAD
&& !hop.dimsKnown() && ((DataOp)hop).getInputFormatType()!=FileFormatTypes.CSV
&& !ConfigurationManager.getCompilerConfigFlag(ConfigType.IGNORE_READ_WRITE_METADATA) )
{
//update hop with read meta data
DataOp dop = (DataOp) hop;
tryReadMetaDataFileMatrixCharacteristics(dop);
}
//update size expression for rand/seq according to symbol table entries
else if ( hop instanceof DataGenOp )
{
DataGenOp d = (DataGenOp) hop;
HashMap<String,Integer> params = d.getParamIndexMap();
if ( d.getOp() == DataGenMethod.RAND || d.getOp()==DataGenMethod.SINIT
|| d.getOp() == DataGenMethod.SAMPLE )
{
boolean initUnknown = !d.dimsKnown();
int ix1 = params.get(DataExpression.RAND_ROWS);
int ix2 = params.get(DataExpression.RAND_COLS);
//update rows/cols by evaluating simple expression of literals, nrow, ncol, scalars, binaryops
d.refreshRowsParameterInformation(d.getInput().get(ix1), vars);
d.refreshColsParameterInformation(d.getInput().get(ix2), vars);
updatedSizeExpr = initUnknown & d.dimsKnown();
}
else if ( d.getOp() == DataGenMethod.SEQ )
{
boolean initUnknown = !d.dimsKnown();
int ix1 = params.get(Statement.SEQ_FROM);
int ix2 = params.get(Statement.SEQ_TO);
int ix3 = params.get(Statement.SEQ_INCR);
double from = d.computeBoundsInformation(d.getInput().get(ix1), vars);
double to = d.computeBoundsInformation(d.getInput().get(ix2), vars);
double incr = d.computeBoundsInformation(d.getInput().get(ix3), vars);
//special case increment
if ( from!=Double.MAX_VALUE && to!=Double.MAX_VALUE ) {
incr = ( from >= to && incr==1 ) ? -1.0 : 1.0;
}
if ( from!=Double.MAX_VALUE && to!=Double.MAX_VALUE && incr!=Double.MAX_VALUE ) {
d.setDim1( 1 + (long)Math.floor((to-from)/incr) );
d.setDim2( 1 );
d.setIncrementValue( incr );
}
updatedSizeExpr = initUnknown & d.dimsKnown();
}
else {
throw new DMLRuntimeException("Unexpected data generation method: " + d.getOp());
}
}
//update size expression for reshape according to symbol table entries
else if ( hop instanceof ReorgOp
&& ((ReorgOp)(hop)).getOp()==Hop.ReOrgOp.RESHAPE )
{
ReorgOp d = (ReorgOp) hop;
boolean initUnknown = !d.dimsKnown();
d.refreshRowsParameterInformation(d.getInput().get(1), vars);
d.refreshColsParameterInformation(d.getInput().get(2), vars);
updatedSizeExpr = initUnknown & d.dimsKnown();
}
//update size expression for indexing according to symbol table entries
else if( hop instanceof IndexingOp )
{
IndexingOp iop = (IndexingOp)hop;
Hop input2 = iop.getInput().get(1); //inpRowL
Hop input3 = iop.getInput().get(2); //inpRowU
Hop input4 = iop.getInput().get(3); //inpColL
Hop input5 = iop.getInput().get(4); //inpColU
boolean initUnknown = !iop.dimsKnown();
double rl = iop.computeBoundsInformation(input2, vars);
double ru = iop.computeBoundsInformation(input3, vars);
double cl = iop.computeBoundsInformation(input4, vars);
double cu = iop.computeBoundsInformation(input5, vars);
if( rl!=Double.MAX_VALUE && ru!=Double.MAX_VALUE )
iop.setDim1( (long)(ru-rl+1) );
if( cl!=Double.MAX_VALUE && cu!=Double.MAX_VALUE )
iop.setDim2( (long)(cu-cl+1) );
updatedSizeExpr = initUnknown & iop.dimsKnown();
}
//propagate statistics along inner nodes of DAG,
//without overwriting inferred size expressions
if( !updatedSizeExpr ) {
hop.refreshSizeInformation();
}
hop.setVisited(VisitStatus.DONE);
}
/**
* public interface to package local literal replacement
*
* @param hop
* @param vars
* @throws DMLRuntimeException
*/
public static void rReplaceLiterals( Hop hop, LocalVariableMap vars )
throws DMLRuntimeException
{
//public interface
LiteralReplacement.rReplaceLiterals(hop, vars);
}
/**
*
* @param hop
* @param pid
*/
public static void rSetExecType( Hop hop, ExecType etype )
{
if( hop.getVisited() == VisitStatus.DONE )
return;
//update function names
hop.setForcedExecType(etype);
if( hop.getInput() != null )
for( Hop c : hop.getInput() )
rSetExecType(c, etype);
hop.setVisited(VisitStatus.DONE);
}
/**
* Returns true iff (1) all instruction are reblock instructions and (2) all
* individual reblock operations fit in the current memory budget.
*
* @param inst
* @param pb
* @return
* @throws DMLRuntimeException
* @throws IOException
*/
public static boolean checkCPReblock(MRJobInstruction inst, MatrixObject[] inputs)
throws DMLRuntimeException, IOException
{
boolean ret = true;
boolean localMode = InfrastructureAnalyzer.isLocalMode();
//check only shuffle inst
String rdInst = inst.getIv_randInstructions();
String rrInst = inst.getIv_recordReaderInstructions();
String mapInst = inst.getIv_instructionsInMapper();
String aggInst = inst.getIv_aggInstructions();
String otherInst = inst.getIv_otherInstructions();
if( (rdInst != null && rdInst.length()>0)
|| (rrInst != null && rrInst.length()>0)
|| (mapInst != null && mapInst.length()>0)
|| (aggInst != null && aggInst.length()>0)
|| (otherInst != null && otherInst.length()>0) )
{
ret = false;
}
//check only reblock inst
if( ret ) {
String shuffleInst = inst.getIv_shuffleInstructions();
String[] instParts = shuffleInst.split( Lop.INSTRUCTION_DELIMITOR );
for( String rblk : instParts )
if( !InstructionUtils.getOpCode(rblk).equals(ReBlock.OPCODE)
&& !InstructionUtils.getOpCode(rblk).equals(CSVReBlock.OPCODE) )
{
ret = false;
break;
}
}
//check output empty blocks (for outputEmptyBlocks=false, a CP reblock can be
//counter-productive because any export from CP would reintroduce the empty blocks)
if( ret ){
String shuffleInst = inst.getIv_shuffleInstructions();
String[] instParts = shuffleInst.split( Lop.INSTRUCTION_DELIMITOR );
for( String rblk : instParts )
if( InstructionUtils.getOpCode(rblk).equals(ReBlock.OPCODE)
&& rblk.endsWith("false") ) //no output of empty blocks
{
ret = false;
break;
}
}
//check recompile memory budget
if( ret ) {
for( MatrixObject mo : inputs )
{
long rows = mo.getNumRows();
long cols = mo.getNumColumns();
// If the dimensions are unknown then reblock can not be recompiled into CP
// Note: unknown dimensions at this point can only happen for CSV files.
// however, we do a conservative check with the CSV filesize
if ( rows == -1 || cols == -1 )
{
Path path = new Path(mo.getFileName());
long size = MapReduceTool.getFilesizeOnHDFS(path);
if( size > CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE || CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE > OptimizerUtils.getLocalMemBudget() )
{
ret = false;
break;
}
}
//default case (known dimensions)
else
{
long nnz = mo.getNnz();
double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
if( !OptimizerUtils.isValidCPDimensions(rows, cols)
|| !OptimizerUtils.isValidCPMatrixSize(rows, cols, sp)
|| mem >= OptimizerUtils.getLocalMemBudget() )
{
ret = false;
break;
}
}
}
}
//check in-memory reblock size threshold (prevent long single-threaded text read)
//NOTE: this does not apply to local mode because there text read single-threaded as well
if( ret && !localMode ) {
for( MatrixObject mo : inputs )
{
MatrixFormatMetaData iimd = (MatrixFormatMetaData) mo.getMetaData();
if(( iimd.getInputInfo()==InputInfo.TextCellInputInfo
|| iimd.getInputInfo()==InputInfo.MatrixMarketInputInfo
|| iimd.getInputInfo()==InputInfo.CSVInputInfo
|| iimd.getInputInfo()==InputInfo.BinaryCellInputInfo)
&& !mo.isDirty() )
{
//get file size on hdfs (as indicator for estimated read time)
Path path = new Path(mo.getFileName());
long fileSize = MapReduceTool.getFilesizeOnHDFS(path);
//compute cp reblock size threshold based on available parallelism
long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE *
OptimizerUtils.getParallelTextReadParallelism();
if( fileSize > cpThreshold ) {
ret = false;
break;
}
}
}
}
return ret;
}
/**
* CP Reblock check for spark instructions; in contrast to MR, we can not
* rely on the input file sizes because inputs might be passed via rdds.
*
* @param mc
* @return
* @throws DMLRuntimeException
*/
public static boolean checkCPReblock(ExecutionContext ec, String varin)
throws DMLRuntimeException
{
CacheableData<?> obj = ec.getCacheableData(varin);
MatrixCharacteristics mc = ec.getMatrixCharacteristics(varin);
long rows = mc.getRows();
long cols = mc.getCols();
long nnz = mc.getNonZeros();
//check valid cp reblock recompilation hook
if( !ConfigurationManager.isDynamicRecompilation()
|| !OptimizerUtils.isHybridExecutionMode() )
{
return false;
}
//robustness for usage through mlcontext (key/values of input rdds are
//not serializable for text; also bufferpool rdd read only supported for
// binarycell and binaryblock)
MatrixFormatMetaData iimd = (MatrixFormatMetaData) obj.getMetaData();
if( obj.getRDDHandle() != null
&& iimd.getInputInfo() != InputInfo.BinaryBlockInputInfo
&& iimd.getInputInfo() != InputInfo.BinaryCellInputInfo ) {
return false;
}
//robustness unknown dimensions, e.g., for csv reblock
if( rows <= 0 || cols <= 0 ) {
return false;
}
//check valid dimensions and memory requirements
double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
if( !OptimizerUtils.isValidCPDimensions(rows, cols)
|| !OptimizerUtils.isValidCPMatrixSize(rows, cols, sp)
|| mem >= OptimizerUtils.getLocalMemBudget() )
{
return false;
}
//check in-memory reblock size threshold (preference: distributed)
long estFilesize = (long)(3.5 * mem); //conservative estimate
long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE *
OptimizerUtils.getParallelTextReadParallelism();
return (estFilesize < cpThreshold);
}
/**
*
* @param inst
* @param inputs
* @return
* @throws DMLRuntimeException
* @throws IOException
*/
public static boolean checkCPTransform(MRJobInstruction inst, MatrixObject[] inputs)
throws DMLRuntimeException, IOException
{
boolean ret = true;
MatrixObject input = inputs[0]; // there can only be one input in TRANSFORM job
Path path = new Path(input.getFileName());
long sizeOnHDFS = MapReduceTool.getFilesizeOnHDFS(path);
// dimensions are not checked here, since the worst case dimensions
// after transformations (with potential dummycoding) are typically unknown.
if( sizeOnHDFS > CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE
|| sizeOnHDFS*4 > OptimizerUtils.getLocalMemBudget() )
ret = false;
LOG.info("checkCPTransform(): size = " + sizeOnHDFS + ", recompile to CP = " + ret);
return ret;
}
/**
*
* @param inst
* @param updatedRandInst
* @return
* @throws DMLRuntimeException
*/
public static boolean checkCPDataGen( MRJobInstruction inst, String updatedRandInst )
throws DMLRuntimeException
{
boolean ret = true;
//check only shuffle inst
String shuffleInst = inst.getIv_shuffleInstructions();
String rrInst = inst.getIv_recordReaderInstructions();
String mapInst = inst.getIv_instructionsInMapper();
String aggInst = inst.getIv_aggInstructions();
String otherInst = inst.getIv_otherInstructions();
if( (shuffleInst != null && shuffleInst.length()>0)
|| (rrInst != null && rrInst.length()>0)
|| (mapInst != null && mapInst.length()>0)
|| (aggInst != null && aggInst.length()>0)
|| (otherInst != null && otherInst.length()>0) )
{
ret = false;
}
//check only rand inst
if( ret ) {
String[] instParts = updatedRandInst.split( Lop.INSTRUCTION_DELIMITOR );
for( String lrandStr : instParts ) {
if( InstructionUtils.getOpCode(lrandStr).equals(DataGen.RAND_OPCODE) )
{
//check recompile memory budget
RandInstruction lrandInst = (RandInstruction) RandInstruction.parseInstruction(lrandStr);
long rows = lrandInst.getRows();
long cols = lrandInst.getCols();
double sparsity = lrandInst.getSparsity();
double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sparsity);
if( !OptimizerUtils.isValidCPDimensions(rows, cols)
|| !OptimizerUtils.isValidCPMatrixSize(rows, cols, sparsity)
|| mem >= OptimizerUtils.getLocalMemBudget() )
{
ret = false;
break;
}
}
else if( InstructionUtils.getOpCode(lrandStr).equals(DataGen.SEQ_OPCODE) )
{
//check recompile memory budget
//(don't account for sparsity because always dense)
SeqInstruction lrandInst = (SeqInstruction) SeqInstruction.parseInstruction(lrandStr);
long rows = lrandInst.getRows();
long cols = lrandInst.getCols();
double mem = MatrixBlock.estimateSizeInMemory(rows, cols, 1.0d);
if( !OptimizerUtils.isValidCPDimensions(rows, cols)
|| !OptimizerUtils.isValidCPMatrixSize(rows, cols, 1.0d)
|| mem >= OptimizerUtils.getLocalMemBudget() )
{
ret = false;
break;
}
}
else
{
ret = false;
break;
}
}
}
return ret;
}
/**
*
* @param in
* @param out
* @throws DMLRuntimeException
*/
public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout)
throws DMLRuntimeException
{
MatrixObject in = ec.getMatrixObject(varin);
MatrixObject out = ec.getMatrixObject(varout);
//read text input matrix (through buffer pool, matrix object carries all relevant
//information including additional arguments for csv reblock)
MatrixBlock mb = in.acquireRead();
//set output (incl update matrix characteristics)
out.acquireModify( mb );
out.release();
in.release();
}
/**
*
* @param ec
* @param varin
* @param varout
* @throws DMLRuntimeException
*/
public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout)
throws DMLRuntimeException
{
FrameObject in = ec.getFrameObject(varin);
FrameObject out = ec.getFrameObject(varout);
//read text input frame (through buffer pool, frame object carries all relevant
//information including additional arguments for csv reblock)
FrameBlock fb = in.acquireRead();
//set output (incl update matrix characteristics)
out.acquireModify( fb );
out.release();
in.release();
}
/**
*
* @param fname
* @return
* @throws DMLRuntimeException
*/
private static void tryReadMetaDataFileMatrixCharacteristics( DataOp dop )
throws DMLRuntimeException
{
try
{
//get meta data filename
String mtdname = DataExpression.getMTDFileName(dop.getFileName());
JobConf job = ConfigurationManager.getCachedJobConf();
FileSystem fs = FileSystem.get(job);
Path path = new Path(mtdname);
if( fs.exists(path) ){
BufferedReader br = null;
try
{
br = new BufferedReader(new InputStreamReader(fs.open(path)));
JSONObject mtd = JSONHelper.parse(br);
DataType dt = DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase());
dop.setDataType(dt);
if(dt != DataType.FRAME)
dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase()));
dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0);
dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0);
}
finally {
if( br != null )
br.close();
}
}
}
catch(Exception ex)
{
throw new DMLRuntimeException(ex);
}
}
}