blob: 36bc3291d6078bfd1c54ba7a7e1439d967ae8133 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.parfor.stat;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.UtilFunctions;
/**
* Central place for analyzing and obtaining static infrastructure properties
* such as memory and number of logical processors.
*
*
*/
public class InfrastructureAnalyzer
{
public static final long DEFAULT_JVM_SIZE = 512 * 1024 * 1024;
public static final String MR_FRAMEWORK_NAME = "mapreduce.framework.name"; // mapred-default.xml
public static final String MR_JOBTRACKER_ADDRESS = "mapreduce.jobtracker.address"; // mapred-default.xml
//static local master node properties
private static int _localPar = -1;
private static long _localJVMMaxMem = -1; //mutable
private static long _fLocalJVMMaxMem = -1; //immutable
//static hadoop cluster properties
private static int _remotePar = -1;
private static int _remoteParMap = -1;
private static int _remoteParReduce = -1;
private static boolean _localJT = false;
private static long _blocksize = -1;
//static initialization, called for each JVM (on each node)
static {
//analyze local node properties
analyzeLocalMachine();
//analyze remote Hadoop cluster properties
//analyzeHadoopCluster(); //note: due to overhead - analyze on-demand
}
///////
//methods for obtaining parallelism properties
/**
* Gets the number of logical processors of the current node,
* including hyper-threading if enabled.
*
* @return number of local processors of the current node
*/
public static int getLocalParallelism() {
return _localPar;
}
/**
* Gets the number of cluster nodes (number of tasktrackers). If multiple tasktracker
* are started per node, each tasktracker is viewed as individual node.
*
* @return number of cluster nodes
*/
public static int getRemoteParallelNodes() {
if( _remotePar == -1 )
analyzeHadoopCluster();
return _remotePar;
}
/**
* Gets the number of remote parallel map slots.
*
* @return number of remote parallel map tasks
*/
public static int getRemoteParallelMapTasks() {
if( _remotePar == -1 )
analyzeHadoopCluster();
return _remoteParMap;
}
public static void setRemoteParallelMapTasks(int pmap) {
_remoteParMap = pmap;
}
/**
* Gets the total number of remote parallel reduce slots.
*
* @return number of remote parallel reduce tasks
*/
public static int getRemoteParallelReduceTasks() {
if( _remotePar == -1 )
analyzeHadoopCluster();
return _remoteParReduce;
}
public static void setRemoteParallelReduceTasks(int preduce) {
_remoteParReduce = preduce;
}
///////
//methods for obtaining memory properties
/**
* Gets the maximum memory [in bytes] of the current JVM.
*
* @return maximum memory of the current JVM
*/
public static long getLocalMaxMemory() {
return _localJVMMaxMem;
}
public static void setLocalMaxMemory( long localMem ) {
_localJVMMaxMem = localMem;
}
public static double getLocalMaxMemoryFraction() {
//since parfor modifies _localJVMMaxMem, some internal primitives
//need access to the current fraction of total local memory
return (double)_localJVMMaxMem / _fLocalJVMMaxMem;
}
public static boolean isLocalMode() {
if( _remotePar == -1 )
analyzeHadoopConfiguration();
return _localJT;
}
public static boolean isLocalMode(JobConf job) {
// Due to a bug in HDP related to fetching the "mode" of execution within mappers,
// we explicitly probe the relevant properties instead of relying on results from
// analyzeHadoopCluster().
String jobTracker = job.get(MR_JOBTRACKER_ADDRESS, "local");
String framework = job.get(MR_FRAMEWORK_NAME, "local");
boolean isYarnEnabled = (framework!=null && framework.equals("yarn"));
return ("local".equals(jobTracker) & !isYarnEnabled);
}
///////
//methods for obtaining constraints or respective defaults
/**
* Gets the maximum local parallelism constraint.
*
* @return maximum local parallelism constraint
*/
public static int getCkMaxCP() {
//default value (if not specified)
return getLocalParallelism();
}
/**
* Gets the maximum remote parallelism constraint
*
* @return maximum remote parallelism constraint
*/
public static int getCkMaxMR() {
if( OptimizerUtils.isSparkExecutionMode() )
return SparkExecutionContext.getDefaultParallelism(true);
else
return getRemoteParallelMapTasks();
}
/**
* Gets the maximum memory constraint [in bytes].
*
* @return maximum memory constraint
*/
public static long getCmMax() {
//default value (if not specified)
//TODO spark remote map task budget?
return getLocalMaxMemory();
}
/**
* Gets the HDFS blocksize of the used cluster in bytes.
*
* @return HDFS block size
*/
public static long getHDFSBlockSize() {
if( _blocksize == -1 )
analyzeHadoopConfiguration();
return _blocksize;
}
public static long extractMaxMemoryOpt(String javaOpts)
{
long ret = -1; //mem in bytes
try
{
StringTokenizer st = new StringTokenizer( javaOpts, " " );
while( st.hasMoreTokens() ) {
String arg = st.nextToken();
if( !arg.startsWith("-Xmx") ) //search for max mem
continue;
//cut off "-Xmx" parameter
arg = arg.substring(4);
//parse number and unit
ret = UtilFunctions.parseMemorySize(arg);
}
if( ret < 0 ) { // no argument found
ret = DEFAULT_JVM_SIZE;
}
}
catch(Exception ex) {
//if anything breaks during parsing (e.g., because args not specified correctly)
ret = DEFAULT_JVM_SIZE;
}
return ret;
}
public static void setMaxMemoryOpt(JobConf job, String key, long bytes) {
String javaOptsOld = job.get( key );
String javaOptsNew = null;
String[] tokens = javaOptsOld.split(" "); //account also for no ' '
StringBuilder sb = new StringBuilder();
for( String arg : tokens ) {
if( arg.startsWith("-Xmx") ) { //search for max mem
sb.append("-Xmx");
sb.append( (bytes/(1024*1024)) );
sb.append("M");
}
else
sb.append(arg);
sb.append(" ");
}
javaOptsNew = sb.toString().trim();
job.set(key, javaOptsNew);
}
/**
* Gets the fraction of running map/reduce tasks to existing
* map/reduce task slots.
*
* NOTE: on YARN the number of slots is a spurious indicator
* because containers are purely scheduled based on memory.
*
* @param mapOnly if true, only look at map tasks
* @return cluster utilization (current / capacity)
* @throws IOException if IOException occurs
*/
public static double getClusterUtilization(boolean mapOnly)
throws IOException
{
//in local mode, the cluster utilization is always 0.0
JobConf job = ConfigurationManager.getCachedJobConf();
JobClient client = new JobClient(job);
ClusterStatus stat = client.getClusterStatus();
double ret = 0.0;
if( stat != null ) { //if in cluster mode
if( mapOnly ) {
int capacity = stat.getMaxMapTasks();
int current = stat.getMapTasks();
ret = ((double)current) / capacity;
}
else {
int capacity = stat.getMaxMapTasks() + stat.getMaxReduceTasks();
int current = stat.getMapTasks() + stat.getReduceTasks();
ret = ((double)current) / capacity;
}
}
return ret;
}
///////
//internal methods for analysis
/**
* Analyzes properties of local machine and JVM.
*/
private static void analyzeLocalMachine() {
//step 1: basic parallelism and memory
_localPar = Runtime.getRuntime().availableProcessors();
_localJVMMaxMem = Runtime.getRuntime().maxMemory();
_fLocalJVMMaxMem = _localJVMMaxMem;
}
/**
* Analyzes properties of hadoop cluster and configuration.
*/
private static void analyzeHadoopCluster() {
try {
JobConf job = ConfigurationManager.getCachedJobConf();
JobClient client = new JobClient(job);
ClusterStatus stat = client.getClusterStatus();
if( stat != null ) { //if in cluster mode
//analyze cluster status
_remotePar = stat.getTaskTrackers();
_remoteParMap = stat.getMaxMapTasks();
_remoteParReduce = stat.getMaxReduceTasks();
//analyze pure configuration properties
analyzeHadoopConfiguration();
}
}
catch (IOException e) {
throw new RuntimeException("Unable to analyze infrastructure.",e);
}
}
/**
* Analyzes only properties of hadoop configuration in order to prevent
* expensive call to cluster status .
*/
private static void analyzeHadoopConfiguration() {
JobConf job = ConfigurationManager.getCachedJobConf();
//HDFS blocksize
String blocksize = job.get(HDFSTool.DFS_BLOCKSIZE, "134217728");
_blocksize = Long.parseLong(blocksize);
//analyze if local mode (internally requires yarn_enabled)
_localJT = analyzeLocalMode(job);
}
private static boolean analyzeLocalMode(JobConf job) {
//analyze if local mode (if yarn enabled, we always assume cluster mode
//in order to workaround configuration issues on >=Hadoop 2.6)
String jobTracker = job.get("mapreduce.jobtracker.address", "local");
return "local".equals(jobTracker);
}
}