blob: 37d42e6f845468a2bef061a1aa3704edfd97ba35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.yarn;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.parser.ParseException;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLScriptException;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysml.runtime.util.MapReduceTool;
/**
* NOTES:
* * Security: By default, submitted applications are ran as user 'yarn'.
* In order to allow for security and relative filenames on hdfs (/user/<username>/.),
* we can configure the LinuxContainerExecutor in yarn-site.xml, which runs the
* application as the user who submits the application.
* * SystemML.jar file dependency: We need to submit the SystemML.jar along with the
* application. Unfortunately, hadoop jar unpacks the jar such that we dont get a handle
* to the original jar filename. We currently parse the constant IBM_JAVA_COMMAND_LINE
* to get the jar filename. For robustness, we fall back to repackaging the unzipped files
* to a jar if this constant does not exist.
*
*/
public class DMLYarnClient
{
private static final Log LOG = LogFactory.getLog(DMLYarnClient.class);
//Internal configuration parameters
// environment variable to obtain the original jar filename
public static final String JARFILE_ENV_CONST = "IBM_JAVA_COMMAND_LINE";
// environment variable to obtain default jvm arguments
public static final String JVMOPTS_ENV_CONST = "HADOOP_OPTS";
// environment variable to obtain mapred home (for robustness only)
public static final String MAPRED_HOME_ENV_CONST = "HADOOP_MAPRED_HOME";
public static final String HADOOP_HOME_ENV_CONST = "HADOOP_HOME";
// default of 1 core since YARN scheduler does not take the number of cores into account yet
public static final int NUM_CORES = 1;
// factor for compute virtual memory to request based on given max heap size
// (if the absolute overhead is larger than a threshold, we use this threshold as a constant overhead)
public static final double MEM_FACTOR = 1.5;
public static final int MAX_MEM_OVERHEAD = 2*1024; //2GB
// default application state report (in milliseconds)
public static final int APP_STATE_INTERVAL = 200;
// default application master name
public static final String APPMASTER_NAME = "SystemML-AM";
// default dml script file name for hdfs script serialization
public static final String DML_SCRIPT_NAME = "script.dml";
// default dml config file name for hdfs config serialization
public static final String DML_CONFIG_NAME = "config.xml";
// default SystemML jar file name for hdfs jar copy
public static final String DML_JAR_NAME = "SystemML.jar";
// default dml stop message file name for hdfs message serialization
public static final String DML_STOPMSG_NAME = "stop_msg.txt";
private String _dmlScript = null;
private DMLConfig _dmlConfig = null;
private String[] _args = null;
//hdfs file names local resources
private String _hdfsJarFile = null;
private String _hdfsDMLScript = null;
private String _hdfsDMLConfig = null;
/**
* Protected since only supposed to be accessed via proxy in same package.
* This is to ensure robustness in case of missing yarn libraries.
*
* @param dmlScriptStr
* @param conf
* @param args
*/
protected DMLYarnClient(String dmlScriptStr, DMLConfig conf, String[] args)
{
_dmlScript = dmlScriptStr;
_dmlConfig = conf;
_args = args;
}
/**
* Method to launch the dml yarn app master and execute the given dml script
* with the given configuration and jar file.
*
* NOTE: on launching the yarn app master, we do not explicitly probe if we
* are running on a yarn or MR1 cluster. In case of MR1, already the class
* YarnConfiguration will not be found and raise a classnotfound. In case of any
* exception we fall back to run CP directly in the client process.
*
* @return true if dml program successfully executed as yarn app master
* @throws IOException
*/
protected boolean launchDMLYarnAppmaster()
throws IOException, DMLScriptException
{
boolean ret = false;
String hdfsWD = null;
try
{
Timing time = new Timing(true);
// load yarn configuration
YarnConfiguration yconf = new YarnConfiguration();
// create yarn client
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yconf);
yarnClient.start();
// create application and get the ApplicationID
YarnClientApplication app = yarnClient.createApplication();
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
LOG.debug("Created application (applicationID: "+appId+")");
// prepare hdfs working directory via ApplicationID
// copy script, config, jar file to hdfs
hdfsWD = DMLAppMasterUtils.constructHDFSWorkingDir(_dmlConfig, appId);
copyResourcesToHdfsWorkingDir(yconf, hdfsWD);
//construct command line argument
String command = constructAMCommand(_args, _dmlConfig);
LOG.debug("Constructed application master command: \n"+command);
// set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
amContainer.setCommands( Collections.singletonList(command) );
amContainer.setLocalResources( constructLocalResourceMap(yconf) );
amContainer.setEnvironment( constructEnvionmentMap(yconf) );
// Set up resource type requirements for ApplicationMaster
int memHeap = _dmlConfig.getIntValue(DMLConfig.YARN_APPMASTERMEM);
int memAlloc = (int) computeMemoryAllocation(memHeap);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory( memAlloc );
capability.setVirtualCores( NUM_CORES );
LOG.debug("Requested application resources: memory="+memAlloc+", vcores="+NUM_CORES);
// Finally, set-up ApplicationSubmissionContext for the application
String qname = _dmlConfig.getTextValue(DMLConfig.YARN_APPQUEUE);
appContext.setApplicationName(APPMASTER_NAME); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue(qname); // queue
LOG.debug("Configured application meta data: name="+APPMASTER_NAME+", queue="+qname);
// submit application (non-blocking)
yarnClient.submitApplication(appContext);
// Check application status periodically (and output web ui address)
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
LOG.info("Application tracking-URL: "+appReport.getTrackingUrl());
YarnApplicationState appState = appReport.getYarnApplicationState();
YarnApplicationState oldState = appState;
LOG.info("Application state: " + appState);
while( appState != YarnApplicationState.FINISHED
&& appState != YarnApplicationState.KILLED
&& appState != YarnApplicationState.FAILED )
{
Thread.sleep(APP_STATE_INTERVAL); //wait for 200ms
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
if( appState != oldState ) {
oldState = appState;
LOG.info("Application state: " + appState);
}
}
//check final status (failed or succeeded)
FinalApplicationStatus finalState = appReport.getFinalApplicationStatus();
LOG.info("Application final status: " + finalState);
//show application and total runtime
double appRuntime = (double)(appReport.getFinishTime() - appReport.getStartTime()) / 1000;
LOG.info( "Application runtime: " + appRuntime + " sec." );
LOG.info( "Total runtime: " + String.format("%.3f", time.stop()/1000) + " sec.");
//raised script-level error in case of failed final status
if( finalState != FinalApplicationStatus.SUCCEEDED )
{
//propagate script-level stop call message
String stop_msg = readMessageToHDFSWorkingDir(_dmlConfig, yconf, appId);
if( stop_msg != null )
throw new DMLScriptException(stop_msg);
//generic failure message
throw new DMLRuntimeException("DML yarn app master finished with final status: "+finalState+".");
}
ret = true;
}
catch(DMLScriptException ex) {
//rethrow DMLScriptException to propagate stop call
throw ex;
}
catch(Exception ex) {
LOG.error("Failed to run DML yarn app master.", ex);
ret = false;
}
finally
{
//cleanup working directory
if( hdfsWD != null )
MapReduceTool.deleteFileIfExistOnHDFS(hdfsWD);
}
return ret;
}
/**
*
* @param appId
* @throws ParseException
* @throws IOException
* @throws DMLRuntimeException
* @throws InterruptedException
*/
@SuppressWarnings("deprecation")
private void copyResourcesToHdfsWorkingDir( YarnConfiguration yconf, String hdfsWD )
throws ParseException, IOException, DMLRuntimeException, InterruptedException
{
FileSystem fs = FileSystem.get(yconf);
//create working directory
MapReduceTool.createDirIfNotExistOnHDFS(hdfsWD, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
//serialize the dml config to HDFS file
//NOTE: we do not modify and ship the absolute scratch space path of the current user
//because this might result in permission issues if the app master is run with a different user
//(runtime plan migration during resource reoptimizations now needs to use qualified names
//for shipping/reading intermediates) TODO modify resource reoptimizer on prototype integration.
Path confPath = new Path(hdfsWD, DML_CONFIG_NAME);
FSDataOutputStream fout = fs.create(confPath, true);
//_dmlConfig.makeQualifiedScratchSpacePath();
fout.writeBytes(_dmlConfig.serializeDMLConfig() + "\n");
fout.close();
_hdfsDMLConfig = confPath.makeQualified(fs).toString();
LOG.debug("DML config written to HDFS file: "+_hdfsDMLConfig+"");
//serialize the dml script to HDFS file
Path scriptPath = new Path(hdfsWD, DML_SCRIPT_NAME);
FSDataOutputStream fout2 = fs.create(scriptPath, true);
fout2.writeBytes(_dmlScript);
fout2.close();
_hdfsDMLScript = scriptPath.makeQualified(fs).toString();
LOG.debug("DML script written to HDFS file: "+_hdfsDMLScript+"");
// copy local jar file to HDFS (try to get the original jar filename)
String fname = getLocalJarFileNameFromEnvConst();
if( fname == null ){
//get location of unpacked jar classes and repackage (if required)
String lclassFile = DMLYarnClient.class.getProtectionDomain().getCodeSource().getLocation().getPath().toString();
File flclassFile = new File( lclassFile );
if( !flclassFile.isDirectory() ) //called w/ jar
fname = lclassFile;
else //called w/ unpacked jar (need to be repackaged)
fname = createJar(lclassFile);
}
Path srcPath = new Path(fname);
Path dstPath = new Path(hdfsWD, srcPath.getName());
FileUtil.copy(FileSystem.getLocal(yconf), srcPath, fs, dstPath, false, true, yconf);
_hdfsJarFile = dstPath.makeQualified(fs).toString();
LOG.debug("Jar file copied from local file: "+srcPath.toString()+" to HDFS file: "+dstPath.toString());
}
/**
*
* @return null if the constant does not exists
*/
private String getLocalJarFileNameFromEnvConst()
{
String fname = null;
try
{
//parse environment constants
Map<String, String> env = System.getenv();
if( env.containsKey(JARFILE_ENV_CONST) ){
String tmp = env.get(JARFILE_ENV_CONST);
String[] tmpargs = tmp.split(" ");
for( int i=0; i<tmpargs.length && fname==null; i++ )
if( tmpargs[i]!=null && tmpargs[i].endsWith("RunJar") )
fname = tmpargs[i+1];
}
}
catch(Exception ex)
{
LOG.warn("Failed to parse environment variables ("+ex.getMessage()+")");
fname = null; //indicate to use fallback strategy
}
//give warning that we fallback to alternative jar shipping method
if( LOG.isDebugEnabled() && fname == null ) {
LOG.warn("Failed to find jar file via environment variable '"+JARFILE_ENV_CONST+"', fallback to jar packaging.");
}
return fname;
}
/**
* This is our fallback strategy for obtaining our SystemML.jar that we need
* to submit as resource for the yarn application. We repackage the unzipped
* jar to a temporary jar and later copy it to hdfs.
*
* @param dir
* @return
* @throws IOException
* @throws InterruptedException
*/
private String createJar( String dir )
throws IOException, InterruptedException
{
//construct jar command
String jarname = dir+"/"+DML_JAR_NAME;
File fdir = new File(dir);
File[] tmp = fdir.listFiles();
StringBuilder flist = new StringBuilder();
for( File ftmp : tmp ) {
flist.append(ftmp.getName());
flist.append(" ");
}
//get jdk home (property 'java.home' gives jre-home of parent jdk or standalone)
String javahome = System.getProperty("java.home");
File fjdkhome = new File(new File(javahome).getParent() + File.separator + "bin");
String jarPrefix = "";
if( fjdkhome.exists() ) { //exists if jdk
jarPrefix = fjdkhome.getAbsolutePath();
jarPrefix += File.separator;
}
if( jarPrefix.isEmpty() )
LOG.warn("Failed to find jdk home of running jre (java.home="+javahome+").");
//execute jar command
String command = jarPrefix+"jar cf "+jarname+" "+flist.subSequence(0, flist.length()-1);
LOG.debug("Packaging jar of unzipped files: "+command);
Process child = Runtime.getRuntime().exec(command, null, fdir);
int c = 0;
while ((c = child.getInputStream().read()) != -1)
System.out.print((char) c);
while ((c = child.getErrorStream().read()) != -1)
System.err.print((char) c);
child.waitFor();
return jarname;
}
/**
*
* @param args
* @param conf
* @return
*/
private String constructAMCommand( String[] args, DMLConfig conf )
{
//start command
StringBuilder command = new StringBuilder();
command.append(Environment.JAVA_HOME.$$() + "/bin/java");
//add client jvm arguments (concatenation of HADOOP_CLIENT_OPTS and HADOOP_OPTS)
if( System.getenv().containsKey(JVMOPTS_ENV_CONST) ) {
String externalArgs = System.getenv(JVMOPTS_ENV_CONST);
//safe parsing and replacement of redundant Xmx, Xms, Xmn
if( externalArgs != null ) {
String[] parts = externalArgs.split(" ");
for( int i=0; i<parts.length; i++ )
if( !( parts[i].startsWith("-Xmx")
|| parts[i].startsWith("-Xms")
|| parts[i].startsWith("-Xmn") ) )
{
command.append(" ");
command.append(parts[i]);
}
}
}
//add jvm heap configuration (specify xmn for default gcpolicy:gencon)
int memHeap = conf.getIntValue(DMLConfig.YARN_APPMASTERMEM);
command.append(" -Xmx"+memHeap+"m");
command.append(" -Xms"+memHeap+"m");
command.append(" -Xmn"+(int)(memHeap/10)+"m");
command.append(' ');
command.append(DMLAppMaster.class.getName());
//add command line args (modify script and config file path)
for( int i=0; i<_args.length; i++ )
{
String arg = _args[i];
command.append(' ');
if( i>0 && _args[i-1].equals("-f") ){
command.append(_hdfsDMLScript);
command.append(" -config=" + _hdfsDMLConfig);
}
else if( _args[i].startsWith("-config") ){
//ignore because config added with -f
}
else
command.append(arg);
}
//setup stdout and stderr logs
command.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
command.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
return command.toString();
}
/**
*
* @param yconf
* @param path
* @param lpath
* @return
* @throws IOException
*/
private Map<String, LocalResource> constructLocalResourceMap(YarnConfiguration yconf)
throws IOException
{
Map<String, LocalResource> rMap = new HashMap<String, LocalResource>();
Path path = new Path(_hdfsJarFile);
LocalResource resource = Records.newRecord(LocalResource.class);
FileStatus jarStat = FileSystem.get(yconf).getFileStatus(path);
resource.setResource(ConverterUtils.getYarnUrlFromPath(path));
resource.setSize(jarStat.getLen());
resource.setTimestamp(jarStat.getModificationTime());
resource.setType(LocalResourceType.FILE);
resource.setVisibility(LocalResourceVisibility.PUBLIC);
rMap.put(DML_JAR_NAME, resource);
return rMap;
}
/**
*
* @param yconf
* @return
* @throws IOException
*/
private Map<String, String> constructEnvionmentMap(YarnConfiguration yconf)
throws IOException
{
Map<String, String> eMap = new HashMap<String, String>();
//setup default app master environment
StringBuilder classpath = new StringBuilder();
for (String value : yconf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
{
if( classpath.length() > 0 )
classpath.append(File.pathSeparator);
classpath.append( value.trim() );
}
//setup mapreduce appmaster environment (for robustness if not included in default environment)
//for example, by default HDP 2.2 did not include mapred client libraries in this configuration
//note: we cannot use MRConfigurationNames.MR_APPLICATION_CLASSPATH because it refers to HDFS and $PWD that needs to be setup
Map<String, String> env = System.getenv();
String mapred_home = null;
//get mapred home via alternative environment variables
if( env.containsKey(MAPRED_HOME_ENV_CONST) ) {
mapred_home = env.get(MAPRED_HOME_ENV_CONST);
}
else if ( env.containsKey(HADOOP_HOME_ENV_CONST) ){
String tmp = env.get(HADOOP_HOME_ENV_CONST);
mapred_home = tmp + File.separator + ".." + File.separator + "hadoop-mapreduce";
}
//concatenate mapred home libs to classpath
if( mapred_home != null ) {
if( classpath.length() > 0 )
classpath.append( File.pathSeparator );
classpath.append( mapred_home + File.separator + "*" );
classpath.append( File.pathSeparator );
classpath.append( mapred_home + File.separator + "lib" + File.separator + "*" );
}
eMap.put(Environment.CLASSPATH.name(), classpath.toString());
MRApps.setClasspath(eMap, yconf);
LOG.debug("Constructed environment classpath: "+classpath.toString());
return eMap;
}
/**
*
* @param conf
* @param yconf
* @param appId
* @return
*/
private String readMessageToHDFSWorkingDir(DMLConfig conf, YarnConfiguration yconf, ApplicationId appId)
{
String ret = null;
//construct working directory (consistent with client)
String hdfsWD = DMLAppMasterUtils.constructHDFSWorkingDir(conf, appId);
Path msgPath = new Path(hdfsWD, DMLYarnClient.DML_STOPMSG_NAME);
//write given message to hdfs
try {
FileSystem fs = FileSystem.get(yconf);
if( fs.exists(msgPath) )
{
FSDataInputStream fin = fs.open(msgPath);
BufferedReader br = new BufferedReader(new InputStreamReader(fin));
ret = br.readLine();
fin.close();
LOG.debug("Stop message read from HDFS file "+msgPath+": "+ret );
}
}
catch(Exception ex) {
LOG.error("Failed to read stop message from HDFS file: "+msgPath, ex);
}
return ret;
}
/**
*
* @param heapsize
* @return
*/
public static long computeMemoryAllocation( long heapsize )
{
long ret = heapsize;
if( heapsize * MEM_FACTOR - heapsize < MAX_MEM_OVERHEAD )
ret = (long) (heapsize * MEM_FACTOR);
else
ret = heapsize + MAX_MEM_OVERHEAD;
return ret;
}
}