blob: 701ba76acd8da207509728295695ea231058be55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.yarn.ropt;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysml.hops.DataOp;
import org.apache.sysml.hops.Hop;
import org.apache.sysml.hops.Hop.DataOpTypes;
import org.apache.sysml.hops.HopsException;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.hops.cost.CostEstimationWrapper;
import org.apache.sysml.hops.recompile.Recompiler;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.parser.ForStatementBlock;
import org.apache.sysml.parser.IfStatementBlock;
import org.apache.sysml.parser.StatementBlock;
import org.apache.sysml.parser.WhileStatementBlock;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock;
import org.apache.sysml.runtime.controlprogram.IfProgramBlock;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.Program;
import org.apache.sysml.runtime.controlprogram.ProgramBlock;
import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysml.runtime.controlprogram.parfor.opt.OptTreeConverter;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.MatrixDimensionsMetaData;
import org.apache.sysml.yarn.DMLYarnClient;
import org.apache.sysml.yarn.ropt.YarnOptimizerUtils.GridEnumType;
/**
* TODO parallel version with exposed numThreads parameter
*
*/
public class ResourceOptimizer
{
private static final Log LOG = LogFactory.getLog(ResourceOptimizer.class);
//internal configuration parameters
public static final long MIN_CP_BUDGET = 512*1024*1024; //512MB
public static final boolean INCLUDE_PREDICATES = true;
public static final boolean PRUNING_SMALL = true;
public static final boolean PRUNING_UNKNOWN = true;
public static final boolean COSTS_MAX_PARALLELISM = true;
public static final boolean COST_INDIVIDUAL_BLOCKS = true;
private static long _cntCompilePB = 0;
private static long _cntCostPB = 0;
/**
*
* @param prog
* @param cc
* @param cptype
* @param mrtype
* @return
* @throws DMLRuntimeException
*/
public synchronized static ResourceConfig optimizeResourceConfig( ArrayList<ProgramBlock> prog, YarnClusterConfig cc, GridEnumType cptype, GridEnumType mrtype )
throws DMLRuntimeException
{
ResourceConfig ROpt = null;
try
{
//init statistics and counters
Timing time = new Timing(true);
initStatistics();
//get constraints (yarn-specific: force higher min to limit degree of parallelism)
long max = (long)(YarnOptimizerUtils.toB(cc.getMaxAllocationMB()) / DMLYarnClient.MEM_FACTOR);
long minCP = (long) Math.max(YarnOptimizerUtils.toB(cc.getMinAllocationMB()) / DMLYarnClient.MEM_FACTOR, MIN_CP_BUDGET);
long minMR = YarnOptimizerUtils.computeMinContraint(minCP, max, cc.getAvgNumCores());
//enumerate grid points for given types (refers to jvm max heap)
ArrayList<Long> SRc = enumerateGridPoints(prog, minCP, max, cptype);
ArrayList<Long> SRm = enumerateGridPoints(prog, minMR, max, mrtype);
//init resource config and global costs
ROpt = new ResourceConfig(prog, minMR);
double costOpt = Double.MAX_VALUE;
for( Long rc : SRc ) //enumerate CP memory rc
{
//baseline compile and pruning
ArrayList<ProgramBlock> B = compileProgram(prog, null, rc, minMR); //unrolled Bp
ArrayList<ProgramBlock> Bp = pruneProgramBlocks( B );
LOG.debug("Enum (rc="+rc+"): |B|="+B.size()+", |Bp|="+Bp.size());
//init local memo table [resource, cost]
double[][] memo = initLocalMemoTable( Bp, minMR );
for( int i=0; i<Bp.size(); i++ ) //for all relevant blocks
{
ProgramBlock pb = Bp.get(i);
for( Long rm : SRm ) //for each MR memory
{
//recompile program block
recompileProgramBlock(pb, rc, rm);
//local costing and memo table maintenance (cost entire program to account for
//in-memory status of variables and loops)
double lcost = getProgramCosts( pb );
if( lcost < memo[i][1] ) { //accept new local opt
memo[i][0] = rm;
memo[i][1] = lcost;
//LOG.debug("Enum (rc="+rc+"): found new local opt w/ cost="+lcost);
}
//LOG.debug("Enum (rc="+rc+", rm="+rm+"): lcost="+lcost+", mincost="+memo[i][1]);
}
}
//global costing
double[][] gmemo = initGlobalMemoTable(B, Bp, memo, minMR);
recompileProgramBlocks(B, rc, gmemo);
double gcost = getProgramCosts(B.get(0).getProgram());
if( gcost < costOpt ){ //accept new global opt
ROpt.setCPResource(rc.longValue());
ROpt.setMRResources(B, gmemo);
costOpt = gcost;
LOG.debug("Enum (rc="+rc+"): found new opt w/ cost="+gcost);
}
}
//print optimization summary
LOG.info("Optimization summary:");
LOG.info("-- optimal plan (rc, rm): "+YarnOptimizerUtils.toMB(ROpt.getCPResource())+"MB, "+YarnOptimizerUtils.toMB(ROpt.getMaxMRResource())+"MB");
LOG.info("-- costs of optimal plan: "+costOpt);
LOG.info("-- # of block compiles: "+_cntCompilePB);
LOG.info("-- # of block costings: "+_cntCostPB);
LOG.info("-- optimization time: "+String.format("%.3f", (double)time.stop()/1000)+" sec.");
LOG.info("-- optimal plan details: "+ROpt.serialize());
}
catch(Exception ex)
{
throw new DMLRuntimeException(ex);
}
return ROpt;
}
/**
*
* @param prog
* @param B
* @param rc
* @return
* @throws IOException
* @throws LopsException
* @throws HopsException
* @throws DMLRuntimeException
*/
public static ArrayList<ProgramBlock> compileProgram( ArrayList<ProgramBlock> prog, ResourceConfig rc )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
//recompile program block hierarchy to list of blocks and apply optimized resource configuration
ArrayList<ProgramBlock> B = compileProgram(prog, null, rc.getCPResource(), rc.getMaxMRResource());
ResourceOptimizer.recompileProgramBlocks(B, rc.getCPResource(), rc.getMRResourcesMemo());
return B;
}
/**
*
* @param prog
* @param B
* @return
* @throws IOException
* @throws LopsException
* @throws HopsException
* @throws DMLRuntimeException
*/
private static ArrayList<ProgramBlock> compileProgram( ArrayList<ProgramBlock> prog, ArrayList<ProgramBlock> B, double cp, double mr )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
if( B == null ) //init
{
B = new ArrayList<ProgramBlock>();
InfrastructureAnalyzer.setLocalMaxMemory( (long)cp );
InfrastructureAnalyzer.setRemoteMaxMemoryMap( (long)mr );
InfrastructureAnalyzer.setRemoteMaxMemoryReduce( (long)mr );
OptimizerUtils.resetDefaultSize(); //dependent on cp, mr
}
for( ProgramBlock pb : prog )
compileProgram( pb, B, cp, mr );
return B;
}
/**
*
* @param pb
* @param Bp
* @return
* @throws IOException
* @throws LopsException
* @throws HopsException
* @throws DMLRuntimeException
*/
private static ArrayList<ProgramBlock> compileProgram( ProgramBlock pb, ArrayList<ProgramBlock> B, double cp, double mr )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
if (pb instanceof FunctionProgramBlock)
{
FunctionProgramBlock fpb = (FunctionProgramBlock)pb;
compileProgram(fpb.getChildBlocks(), B, cp, mr);
}
else if (pb instanceof WhileProgramBlock)
{
WhileProgramBlock wpb = (WhileProgramBlock)pb;
WhileStatementBlock sb = (WhileStatementBlock) pb.getStatementBlock();
if( INCLUDE_PREDICATES && sb!=null && sb.getPredicateHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getPredicateHops(), new LocalVariableMap(), null, false, 0);
wpb.setPredicate( inst );
B.add(wpb);
_cntCompilePB ++;
}
compileProgram(wpb.getChildBlocks(), B, cp, mr);
}
else if (pb instanceof IfProgramBlock)
{
IfProgramBlock ipb = (IfProgramBlock)pb;
IfStatementBlock sb = (IfStatementBlock) ipb.getStatementBlock();
if( INCLUDE_PREDICATES && sb!=null && sb.getPredicateHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getPredicateHops(), new LocalVariableMap(), null, false, 0);
ipb.setPredicate( inst );
B.add(ipb);
_cntCompilePB ++;
}
compileProgram(ipb.getChildBlocksIfBody(), B, cp, mr);
compileProgram(ipb.getChildBlocksElseBody(), B, cp, mr);
}
else if (pb instanceof ForProgramBlock) //incl parfor
{
ForProgramBlock fpb = (ForProgramBlock)pb;
ForStatementBlock sb = (ForStatementBlock) fpb.getStatementBlock();
if( INCLUDE_PREDICATES && sb!=null ){
if( sb.getFromHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getFromHops(), new LocalVariableMap(), null, false, 0);
fpb.setFromInstructions( inst );
}
if( sb.getToHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getToHops(), new LocalVariableMap(), null, false, 0);
fpb.setToInstructions( inst );
}
if( sb.getIncrementHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getIncrementHops(), new LocalVariableMap(), null, false, 0);
fpb.setIncrementInstructions( inst );
}
B.add(fpb);
_cntCompilePB ++;
}
compileProgram(fpb.getChildBlocks(), B, cp, mr);
}
else
{
StatementBlock sb = pb.getStatementBlock();
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb, sb.get_hops(),
new LocalVariableMap(), null, false, 0);
pb.setInstructions( inst );
B.add(pb);
_cntCompilePB ++;
}
return B;
}
/**
*
* @param pbs
* @param cp
* @param memo
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
private static void recompileProgramBlocks( ArrayList<ProgramBlock> pbs, long cp, double[][] memo )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
for( int i=0; i<pbs.size(); i++ )
{
ProgramBlock pb = pbs.get(i);
long mr = (long)memo[i][0];
recompileProgramBlock(pb, cp, mr);
}
}
/**
*
* @param pb
* @param cp
* @param mr
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
private static void recompileProgramBlock( ProgramBlock pb, long cp, long mr )
throws DMLRuntimeException, HopsException, LopsException, IOException
{
//init compiler memory budget
InfrastructureAnalyzer.setLocalMaxMemory( cp );
InfrastructureAnalyzer.setRemoteMaxMemoryMap( mr );
InfrastructureAnalyzer.setRemoteMaxMemoryReduce( mr );
OptimizerUtils.resetDefaultSize(); //dependent on cp, mr
//recompile instructions (incl predicates)
if (pb instanceof WhileProgramBlock)
{
WhileProgramBlock wpb = (WhileProgramBlock)pb;
WhileStatementBlock sb = (WhileStatementBlock) pb.getStatementBlock();
if( INCLUDE_PREDICATES && sb!=null && sb.getPredicateHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getPredicateHops(), new LocalVariableMap(), null, false, 0);
inst = annotateMRJobInstructions(inst, cp, mr);
wpb.setPredicate( inst );
}
}
else if (pb instanceof IfProgramBlock)
{
IfProgramBlock ipb = (IfProgramBlock)pb;
IfStatementBlock sb = (IfStatementBlock) ipb.getStatementBlock();
if( INCLUDE_PREDICATES && sb!=null && sb.getPredicateHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getPredicateHops(), new LocalVariableMap(), null, false, 0);
inst = annotateMRJobInstructions(inst, cp, mr);
ipb.setPredicate( inst );
}
}
else if (pb instanceof ForProgramBlock) //incl parfor
{
ForProgramBlock fpb = (ForProgramBlock)pb;
ForStatementBlock sb = (ForStatementBlock) fpb.getStatementBlock();
if( INCLUDE_PREDICATES && sb!=null ){
if( sb.getFromHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getFromHops(), new LocalVariableMap(), null, false, 0);
inst = annotateMRJobInstructions(inst, cp, mr);
fpb.setFromInstructions( inst );
}
if( sb.getToHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getToHops(), new LocalVariableMap(), null, false, 0);
inst = annotateMRJobInstructions(inst, cp, mr);
fpb.setToInstructions( inst );
}
if( sb.getIncrementHops()!=null ){
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb.getIncrementHops(), new LocalVariableMap(), null, false, 0);
inst = annotateMRJobInstructions(inst, cp, mr);
fpb.setIncrementInstructions( inst );
}
}
}
else //last-level program blocks
{
StatementBlock sb = pb.getStatementBlock();
ArrayList<Instruction> inst = Recompiler.recompileHopsDag(sb, sb.get_hops(),
new LocalVariableMap(), null, false, 0);
inst = annotateMRJobInstructions(inst, cp, mr);
pb.setInstructions( inst );
}
_cntCompilePB ++;
}
/**
*
* @param inst
* @return
* @throws DMLRuntimeException
*/
private static ArrayList<Instruction> annotateMRJobInstructions( ArrayList<Instruction> inst, long cp, long mr )
throws DMLRuntimeException
{
//check for empty instruction lists (e.g., predicates)
if( inst == null || !COSTS_MAX_PARALLELISM )
return inst;
try
{
for( int i=0; i<inst.size(); i++ )
{
Instruction linst = inst.get(i);
if( linst instanceof MRJobInstruction ){
//copy mr job instruction
MRJobResourceInstruction newlinst = new MRJobResourceInstruction((MRJobInstruction)linst);
//compute and annotate
long maxMemPerNode = (long)YarnClusterAnalyzer.getMaxAllocationBytes();
long nNodes = YarnClusterAnalyzer.getNumNodes();
long totalMem = nNodes * maxMemPerNode;
long maxMRTasks = (long)(totalMem - DMLYarnClient.computeMemoryAllocation(cp))
/ (long)DMLYarnClient.computeMemoryAllocation(mr);
newlinst.setMaxMRTasks( maxMRTasks );
//write enhanced instruction back
inst.set(i, newlinst);
}
}
}
catch(Exception ex)
{
throw new DMLRuntimeException(ex);
}
return inst;
}
/**
*
* @param pb
* @return
* @throws DMLRuntimeException
* @throws HopsException
*/
private static double getProgramCosts( ProgramBlock pb )
throws DMLRuntimeException, HopsException
{
double val = 0;
if( COST_INDIVIDUAL_BLOCKS ) {
LocalVariableMap vars = new LocalVariableMap();
collectReadVariables(pb.getStatementBlock().get_hops(), vars);
ExecutionContext ec = ExecutionContextFactory.createContext(false, null);
ec.setVariables(vars);
val = CostEstimationWrapper.getTimeEstimate(pb, ec, false);
}
else{
//we need to cost the entire program in order to take in-memory status into account
ExecutionContext ec = ExecutionContextFactory.createContext();
val = CostEstimationWrapper.getTimeEstimate(pb.getProgram(), ec);
}
_cntCostPB ++;
return val;
}
/**
*
* @param prog
* @throws DMLRuntimeException
*/
private static double getProgramCosts( Program prog )
throws DMLRuntimeException
{
//we need to cost the entire program in order to take in-memory status into account
ExecutionContext ec = ExecutionContextFactory.createContext();
double val = CostEstimationWrapper.getTimeEstimate(prog, ec);
_cntCostPB ++;
return val;
}
/**
*
* @param hops
* @param vars
*/
private static void collectReadVariables( ArrayList<Hop> hops, LocalVariableMap vars )
{
if( hops!=null ) {
Hop.resetVisitStatus(hops);
for( Hop hop : hops )
collectReadVariables(hop, vars);
}
}
/**
*
* @param hop
* @param vars
*/
private static void collectReadVariables( Hop hop, LocalVariableMap vars )
{
if( hop == null )
return;
//process childs
for(Hop hi : hop.getInput())
collectReadVariables( hi, vars );
//investigate hop exec type and known dimensions
if( hop instanceof DataOp && hop.getDataType()==DataType.MATRIX
&& (((DataOp)hop).getDataOpType()==DataOpTypes.TRANSIENTREAD
|| ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD) )
{
String varname = hop.getName();
MatrixCharacteristics mc = new MatrixCharacteristics(hop.getDim1(), hop.getDim2(),
(int)hop.getRowsInBlock(), (int)hop.getColsInBlock(), hop.getNnz());
MatrixDimensionsMetaData md = new MatrixDimensionsMetaData(mc);
MatrixObject mo = new MatrixObject(ValueType.DOUBLE, "/tmp", md);
vars.put(varname, mo);
}
hop.setVisited(Hop.VisitStatus.DONE);
}
/**
*
* @param B
* @return
* @throws HopsException
*/
private static ArrayList<ProgramBlock> pruneProgramBlocks( ArrayList<ProgramBlock> B )
throws HopsException
{
//prune all program blocks w/o mr instructions (mr budget does not matter)
if( PRUNING_SMALL ){
ArrayList<ProgramBlock> Bp = new ArrayList<ProgramBlock>();
for( ProgramBlock pb : B )
if( OptTreeConverter.containsMRJobInstruction(pb.getInstructions(), false, true) )
Bp.add( pb );
B = Bp;
}
//prune all program blocks, where all mr hops are due to unknowns
if( PRUNING_UNKNOWN ){
ArrayList<ProgramBlock> Bp = new ArrayList<ProgramBlock>();
for( ProgramBlock pb : B )
if( !pruneHasOnlyUnknownMR(pb) )
Bp.add( pb );
B = Bp;
}
return B;
}
/**
*
* @param pb
* @return
* @throws HopsException
*/
private static boolean pruneHasOnlyUnknownMR( ProgramBlock pb )
throws HopsException
{
if (pb instanceof WhileProgramBlock)
{
WhileStatementBlock sb = (WhileStatementBlock) pb.getStatementBlock();
sb.getPredicateHops().resetVisitStatus();
return pruneHasOnlyUnknownMR(sb.getPredicateHops());
}
else if (pb instanceof IfProgramBlock)
{
IfStatementBlock sb = (IfStatementBlock) pb.getStatementBlock();
sb.getPredicateHops().resetVisitStatus();
return pruneHasOnlyUnknownMR(sb.getPredicateHops());
}
else if (pb instanceof ForProgramBlock) //incl parfor
{
ForStatementBlock sb = (ForStatementBlock) pb.getStatementBlock();
sb.getFromHops().resetVisitStatus();
sb.getToHops().resetVisitStatus();
sb.getIncrementHops().resetVisitStatus();
return pruneHasOnlyUnknownMR(sb.getFromHops())
&& pruneHasOnlyUnknownMR(sb.getToHops())
&& pruneHasOnlyUnknownMR(sb.getIncrementHops());
}
else //last-level program blocks
{
StatementBlock sb = pb.getStatementBlock();
return pruneHasOnlyUnknownMR(sb.get_hops());
}
}
/**
*
* @param sb
* @return
* @throws HopsException
*/
private static boolean pruneHasOnlyUnknownMR( ArrayList<Hop> hops )
throws HopsException
{
boolean ret = false;
if( hops!=null ){
ret = true;
Hop.resetVisitStatus(hops);
for( Hop hop : hops )
ret &= pruneHasOnlyUnknownMR(hop);
}
return ret;
}
/**
*
* @param hop
* @return
*/
private static boolean pruneHasOnlyUnknownMR( Hop hop )
{
if( hop == null || hop.getVisited() == Hop.VisitStatus.DONE )
return true;
boolean ret = true;
//process childs
for(Hop hi : hop.getInput())
ret &= pruneHasOnlyUnknownMR( hi );
//investigate hop exec type and known dimensions
if( hop.getExecType()==ExecType.MR ) {
boolean lret = false;
//1) operator output dimensions unknown
lret |= !hop.dimsKnown();
//2) operator output dimensions known but inputs unknown
//(use cases for e.g. AggUnary with scalar output, Binary with one known input)
for(Hop hi : hop.getInput())
lret |= !hi.dimsKnown();
ret &= lret;
}
hop.setVisited(Hop.VisitStatus.DONE);
return ret;
}
/**
*
* @param prog
* @param cc
* @param type
* @return
* @throws DMLRuntimeException
* @throws HopsException
*/
private static ArrayList<Long> enumerateGridPoints( ArrayList<ProgramBlock> prog, long min, long max, GridEnumType type )
throws DMLRuntimeException, HopsException
{
//create enumerator
GridEnumeration ge = null;
switch( type ){
case EQUI_GRID:
ge = new GridEnumerationEqui(prog, min, max); break;
case EXP_GRID:
ge = new GridEnumerationExp(prog, min, max); break;
case MEM_EQUI_GRID:
ge = new GridEnumerationMemory(prog, min, max); break;
case HYBRID_MEM_EXP_GRID:
ge = new GridEnumerationHybrid(prog, min, max); break;
default:
throw new DMLRuntimeException("Unsupported grid enumeration type: "+type);
}
//generate points
ArrayList<Long> ret = ge.enumerateGridPoints();
LOG.debug("Gen: min="+YarnOptimizerUtils.toMB(min)+", max="+YarnOptimizerUtils.toMB(max)+", npoints="+ret.size());
return ret;
}
/**
*
* @param Bp
* @param min
* @return
* @throws DMLRuntimeException
*/
private static double[][] initLocalMemoTable( ArrayList<ProgramBlock> Bp, double min )
throws DMLRuntimeException
{
//allocate memo structure
int len = Bp.size();
double[][] memo = new double[len][2];
//init with min resource and current costs
for( int i=0; i<len; i++ )
{
ProgramBlock pb = Bp.get(i);
ExecutionContext ec = ExecutionContextFactory.createContext();
memo[i][0] = min;
memo[i][1] = CostEstimationWrapper.getTimeEstimate(pb.getProgram(), ec);
}
return memo;
}
/**
*
* @param B
* @param Bp
* @param lmemo
* @param min
* @return
*/
private static double[][] initGlobalMemoTable( ArrayList<ProgramBlock> B, ArrayList<ProgramBlock> Bp, double[][] lmemo, double min )
{
//allocate memo structure
int len = B.size();
int lenp = Bp.size(); //lenp<=len
double[][] memo = new double[len][2];
//init with min resources
for( int i=0; i<len; i++ ) {
memo[i][0] = min;
memo[i][1] = -1;
}
//overwrite existing values
int j = 0;
for( int i=0; i<len && j<lenp; i++ )
{
ProgramBlock pb = B.get(i);
if( pb != Bp.get(j) )
continue;
//map local memo entry
memo[i][0] = lmemo[j][0];
memo[i][1] = -1;
j++;
}
return memo;
}
/**
*
*/
public static void initStatistics()
{
_cntCompilePB = 0;
_cntCostPB = 0;
}
////////
// old code
public static long jvmToPhy(long jvm, boolean mrRealRun) {
long ret = (long) Math.ceil((double)jvm * DMLYarnClient.MEM_FACTOR);
if (mrRealRun) {
long lowerBound = (long)YarnClusterAnalyzer.getMinMRContarinerPhyMB() * 1024 * 1024;
if (ret < lowerBound)
return lowerBound;
}
return ret;
}
public static long budgetToJvm(double budget) {
return (long) Math.ceil(budget / OptimizerUtils.MEM_UTIL_FACTOR);
}
public static double phyToBudget(long physical) throws IOException {
return (double)physical / DMLYarnClient.MEM_FACTOR * OptimizerUtils.MEM_UTIL_FACTOR;
}
}