blob: 09304751be24ee971ce69cc0b53e19e0d1962e4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.yarn;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.CompilerConfig;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.hops.HopsException;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.hops.OptimizerUtils.OptimizationLevel;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.Program;
import org.apache.sysml.runtime.controlprogram.ProgramBlock;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.yarn.ropt.ResourceConfig;
import org.apache.sysml.yarn.ropt.ResourceOptimizer;
import org.apache.sysml.yarn.ropt.YarnClusterConfig;
public class DMLAppMasterUtils
{
private static ResourceConfig _rc = null;
private static HashMap<ProgramBlock, Long> _rcMap = null;
/**
*
* @param conf
* @param appId
* @return
*/
public static String constructHDFSWorkingDir(DMLConfig conf, ApplicationId appId)
{
StringBuilder sb = new StringBuilder();
sb.append( conf.getTextValue(DMLConfig.SCRATCH_SPACE) );
sb.append( Lop.FILE_SEPARATOR );
sb.append( appId );
sb.append( Lop.FILE_SEPARATOR );
return sb.toString();
}
/**
*
* @param conf
* @throws DMLRuntimeException
*/
public static void setupConfigRemoteMaxMemory(DMLConfig conf)
throws DMLRuntimeException
{
//set remote max memory (if in yarn appmaster context)
if( DMLScript.isActiveAM() ){
//set optimization level (for awareness of resource optimization)
CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(conf);
ConfigurationManager.setGlobalConfig(cconf);
if( isResourceOptimizerEnabled() )
{
//handle optimized memory (mr memory budget per program block)
//ensure cluster has been analyzed
InfrastructureAnalyzer.getRemoteMaxMemoryMap();
String memStr = conf.getTextValue(DMLConfig.YARN_MAPREDUCEMEM);
ResourceConfig rc = ResourceConfig.deserialize(memStr);
_rc = rc; //keep resource config for later program mapping
}
else
{
//handle user configuration
if( conf.getIntValue(DMLConfig.YARN_MAPREDUCEMEM)>0 )
{
//ensure cluster has been analyzed
InfrastructureAnalyzer.getRemoteMaxMemoryMap();
//set max map and reduce memory (to be used by the compiler)
//see GMR and parfor EMR and DPEMR for runtime configuration
long mem = ((long)conf.getIntValue(DMLConfig.YARN_MAPREDUCEMEM)) * 1024 * 1024;
InfrastructureAnalyzer.setRemoteMaxMemoryMap(mem);
InfrastructureAnalyzer.setRemoteMaxMemoryReduce(mem);
}
}
}
}
/**
*
* @param prog
* @throws DMLRuntimeException
* @throws HopsException
* @throws LopsException
* @throws IOException
*/
public static void setupProgramMappingRemoteMaxMemory(Program prog)
throws DMLRuntimeException, HopsException, LopsException, IOException
{
if( DMLScript.isActiveAM() && isResourceOptimizerEnabled() )
{
ArrayList<ProgramBlock> pbProg = getRuntimeProgramBlocks( prog );
ArrayList<ProgramBlock> B = ResourceOptimizer.compileProgram( pbProg, _rc );
_rcMap = new HashMap<ProgramBlock, Long>();
for( int i=0; i<B.size(); i++ ){
_rcMap.put(B.get(i), _rc.getMRResources(i));
}
}
}
/**
*
* @param sb
*/
public static void setupProgramBlockRemoteMaxMemory(ProgramBlock pb)
{
if( DMLScript.isActiveAM() && isResourceOptimizerEnabled() )
{
if( _rcMap != null && _rcMap.containsKey(pb) ){
//set max map and reduce memory (to be used by the compiler)
long mem = _rcMap.get(pb);
InfrastructureAnalyzer.setRemoteMaxMemoryMap(mem);
InfrastructureAnalyzer.setRemoteMaxMemoryReduce(mem);
OptimizerUtils.resetDefaultSize();
}
}
}
/**
*
* @param job
* @param conf
*/
public static void setupMRJobRemoteMaxMemory(JobConf job, DMLConfig conf)
{
if( DMLScript.isActiveAM() && conf.getBooleanValue(DMLConfig.YARN_APPMASTER) )
{
int memMB = -1;
//obtain the current configuation (optimized or user-specified)
if( isResourceOptimizerEnabled() )
memMB = (int)(InfrastructureAnalyzer.getRemoteMaxMemoryMap() / (1024*1024));
else
memMB = conf.getIntValue(DMLConfig.YARN_MAPREDUCEMEM);
//set the memory configuration into the job conf
if( memMB > 0 ){ //ignored if negative
String memOpts = "-Xmx"+memMB+"m -Xms"+memMB+"m -Xmn"+(int)(memMB/10)+"m";
//set mapper heapsizes
job.set( MRConfigurationNames.MR_MAP_JAVA_OPTS, memOpts );
job.set( MRConfigurationNames.MR_MAP_MEMORY_MB, String.valueOf(DMLYarnClient.computeMemoryAllocation(memMB)) );
//set reducer heapsizes
job.set( MRConfigurationNames.MR_REDUCE_JAVA_OPTS, memOpts );
job.set( MRConfigurationNames.MR_REDUCE_MEMORY_MB, String.valueOf(DMLYarnClient.computeMemoryAllocation(memMB)) );
}
}
}
/**
*
* @return
*/
public static boolean isResourceOptimizerEnabled()
{
return ( DMLYarnClientProxy.RESOURCE_OPTIMIZER
|| OptimizerUtils.isOptLevel(OptimizationLevel.O3_LOCAL_RESOURCE_TIME_MEMORY) );
}
/**
*
* @param args
* @return
* @throws DMLException
*/
protected static ArrayList<ProgramBlock> getRuntimeProgramBlocks(Program prog)
throws DMLRuntimeException
{
//construct single list of all program blocks including functions
ArrayList<ProgramBlock> ret = new ArrayList<ProgramBlock>();
ret.addAll(prog.getProgramBlocks());
ret.addAll(prog.getFunctionProgramBlocks().values());
return ret;
}
/**
*
* @param cc
*/
protected static void setupRemoteParallelTasks( YarnClusterConfig cc )
{
int pmap = (int) cc.getNumCores();
int preduce = (int) cc.getNumCores()/2;
InfrastructureAnalyzer.setRemoteParallelMapTasks(pmap);
InfrastructureAnalyzer.setRemoteParallelReduceTasks(preduce);
}
}