blob: 77c120cea667cb630ec4da791e77cfa4b3606b71 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.s4.tools.yarn;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.s4.tools.DefineCluster;
import org.apache.s4.tools.Deploy;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
/**
* Client for S4 application submission to YARN.
*
* <p>
* It connects to an existing YARN infrastructure (based on configuration files in HADOOP_CONF_DIR environment
* variables), then launches an S4 application master, passing all relevant configuration. The application master then
* asks resources from the resource manager and creates S4 nodes on those allocated resources, with the appropriate
* configuration.
* </p>
*
* <p>
* <u>Implementation notes from YARN example: </u>
* </p>
* <p>
* To submit an application, a client first needs to connect to the <code>ResourceManager</code> aka ApplicationsManager
* or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} provides a way for the client to get access to
* cluster information and to request for a new {@link ApplicationId}.
* <p>
*
* <p>
* For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. The
* {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} and application
* name, user submitting the application, the priority assigned to the application and the queue to which this
* application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} also defines the
* {@link ContainerLaunchContext} which describes the <code>Container</code> with which the {@link S4ApplicationMaster}
* is launched.
* </p>
*
* <p>
* The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
* {@link S4ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available and the
* environment to be set for the {@link S4ApplicationMaster} and the commands to be executed to run the
* {@link S4ApplicationMaster}.
* <p>
*
* <p>
* Using the {@link ApplicationSubmissionContext}, the client submits the application to the
* <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> for an
* {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client kills the
* application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>.
* </p>
*
*/
public class S4YarnClient extends YarnClientImpl {
private static final String HADOOP_CONF_DIR_ENV = System.getenv("HADOOP_CONF_DIR");
private static final ImmutableSet<String> YARN_CONF_FILES = ImmutableSet.of("core-site.xml", "hdfs-site.xml",
"yarn-site.xml", "mapred-site.xml");
private static Logger logger = LoggerFactory.getLogger(S4YarnClient.class);
// Configuration
private final Configuration conf;
S4CLIYarnArgs yarnArgs;
private int amMemory;
private final long clientStartTime = System.currentTimeMillis();
/**
* @param args
* Command line arguments
*/
public static void main(String[] args) {
S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();
logger.info("S4YarnClient args = " + Arrays.toString(args));
Tools.parseArgs(yarnArgs, args);
boolean result = false;
try {
YarnConfiguration yarnConfig = new YarnConfiguration();
if (Strings.isNullOrEmpty(HADOOP_CONF_DIR_ENV)) {
logger.error("You must define HADOOP_CONF_DIR environment variable");
System.exit(1);
}
File confDir = new File(HADOOP_CONF_DIR_ENV);
if (!(confDir.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return YARN_CONF_FILES.contains(pathname.getName());
}
}).length == 4)) {
logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml",
HADOOP_CONF_DIR_ENV);
System.exit(1);
}
for (String fileName : YARN_CONF_FILES) {
yarnConfig.addResource(new Path(new File(HADOOP_CONF_DIR_ENV, fileName).toURI()));
}
S4YarnClient client = new S4YarnClient(yarnArgs, yarnConfig);
result = client.run(false);
} catch (Throwable t) {
logger.error("Error running Client", t);
System.exit(1);
}
if (result) {
logger.info("Application completed successfully");
System.exit(0);
}
logger.error("Application failed to complete successfully");
System.exit(1);
}
public S4YarnClient(S4CLIYarnArgs yarnArgs, Configuration conf) throws Exception {
this.yarnArgs = yarnArgs;
this.conf = conf;
init(this.conf);
}
/**
* Main run function for the client
*
* @return true if application completed successfully
* @throws IOException
*/
public boolean run(boolean testMode) throws IOException {
logger.info("Running Client");
start();
YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
logger.info("Got Cluster metric info from ASM" + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
List<NodeReport> clusterNodeReports = super.getNodeReports();
logger.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) {
logger.info("Got node report from ASM for" + ", nodeId=" + node.getNodeId() + ", nodeAddress"
+ node.getHttpAddress() + ", nodeRackName" + node.getRackName() + ", nodeNumContainers"
+ node.getNumContainers() + ", nodeHealthStatus" + node.getNodeHealthStatus());
}
QueueInfo queueInfo = super.getQueueInfo(yarnArgs.queue);
logger.info("Queue info" + ", queueName=" + queueInfo.getQueueName() + ", queueCurrentCapacity="
+ queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ ", queueApplicationCount=" + queueInfo.getApplications().size() + ", queueChildQueueCount="
+ queueInfo.getChildQueues().size());
List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
for (QueueUserACLInfo aclInfo : listAclInfo) {
for (QueueACL userAcl : aclInfo.getUserAcls()) {
logger.info("User ACL Info for Queue" + ", queueName=" + aclInfo.getQueueName() + ", userAcl="
+ userAcl.name());
}
}
// Get a new application id
GetNewApplicationResponse newApp = super.getNewApplication();
ApplicationId appId = newApp.getApplicationId();
// TODO get min/max resource capabilities from RM and change memory ask if needed
// If we do not have min/max, we may not be able to correctly request
// the required resources from the RM for the app master
// Memory ask has to be a multiple of min and less than max.
// Dump out information about cluster capability as seen by the resource manager
int minMem = newApp.getMinimumResourceCapability().getMemory();
int maxMem = newApp.getMaximumResourceCapability().getMemory();
logger.info("Min mem capability of resources in this cluster " + minMem);
logger.info("Max mem capability of resources in this cluster " + maxMem);
// A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
// a multiple of the min value and cannot exceed the max.
// If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
amMemory = yarnArgs.masterMemory;
if (amMemory < minMem) {
logger.info("AM memory specified below min threshold of cluster. Using min value." + ", specified="
+ amMemory + ", min=" + minMem);
amMemory = minMem;
} else if (amMemory > maxMem) {
logger.info("AM memory specified above max threshold of cluster. Using max value." + ", specified="
+ amMemory + ", max=" + maxMem);
amMemory = maxMem;
}
// Create launch context for app master
logger.info("Setting up application submission context for ASM");
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
// set the application id
appContext.setApplicationId(appId);
// set the application name
appContext.setApplicationName(yarnArgs.appName);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
if (!(new File(yarnArgs.s4Dir).isDirectory())) {
logger.error("Invalid s4 directory : " + yarnArgs.s4Dir);
System.exit(1);
}
// TODO avoid depending on source distribution paths
File[] classPathFiles = new File(yarnArgs.s4Dir + "/subprojects/s4-yarn/build/install/s4-yarn/lib").listFiles();
FileSystem fs = FileSystem.get(conf);
for (int i = 0; i < classPathFiles.length; i++) {
Path dest = copyToLocalResources(appId, fs, localResources, classPathFiles[i]);
logger.info("Copied classpath resource " + classPathFiles[i].getAbsolutePath() + " to "
+ dest.toUri().toString());
}
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
// Set the necessary security tokens as needed
// amContainer.setContainerTokens(containerToken);
// Set the env variables to be setup in the env where the application master will be run
logger.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
classPathEnv.append(':');
classPathEnv.append(c.trim());
}
classPathEnv.append(":./log4j.properties");
env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
logger.info("Setting up app master command");
// vargs.add("${JAVA_HOME}" + "/bin/java");
// TODO set java from JAVA_HOME
vargs.add("java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// vargs.add("-Xdebug");
// vargs.add("-Xrunjdwp:transport=dt_socket,address=8888,server=y");
// Set Application Master class name
vargs.add(S4ApplicationMaster.class.getName());
// Set params for Application Master
vargs.add(CommonS4YarnArgs.S4_NODE_MEMORY + " " + String.valueOf(yarnArgs.containerMemory));
vargs.add(CommonS4YarnArgs.NB_S4_NODES + " " + String.valueOf(yarnArgs.numContainers));
vargs.add(CommonS4YarnArgs.PRIORITY + " " + String.valueOf(yarnArgs.priority));
addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.S4_NODE_JVM_PARAMETERS, ",",
yarnArgs.extraS4NodeJVMParams);
addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.EXTRA_MODULES_CLASSES, ",",
yarnArgs.extraModulesClasses);
addListElementsToCommandLineBuffer(vargs, "-namedStringParameters", ",", yarnArgs.extraNamedParameters);
vargs.add("-c " + String.valueOf(yarnArgs.cluster));
vargs.add("-zk " + String.valueOf(yarnArgs.zkString));
if (testMode) {
vargs.add("-test");
}
if (yarnArgs.debug) {
vargs.add("--debug");
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
logger.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
amContainer.setCommands(commands);
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
amContainer.setResource(capability);
appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(yarnArgs.priority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(yarnArgs.queue);
// Set the user submitting this application
// TODO can it be empty?
appContext.setUser(yarnArgs.user);
// Define a new cluster for the application
String[] defineClusterArgs = { "-cluster=" + yarnArgs.cluster, "-nbTasks=" + yarnArgs.nbTasks,
"-flp=" + yarnArgs.flp, "-zk=" + yarnArgs.zkString };
DefineCluster.main(defineClusterArgs);
// Deply the application to the new cluster
String[] deployApplicationArgs = { "-s4r=" + yarnArgs.s4rPath, "-cluster=" + yarnArgs.cluster,
"-appName=" + yarnArgs.cluster, "-shutdown=false", "-zk=" + yarnArgs.zkString };
Deploy.main(deployApplicationArgs);
logger.info("Submitting application to ASM");
super.submitApplication(appContext);
// TODO
// Try submitting the same request again
// app submission failure?
// Monitor the application (TODO: optional?)
return monitorApplication(appId);
}
public static void addListElementsToCommandLineBuffer(Vector<CharSequence> vargs, String paramName,
String paramSeparator, List<String> yarnArg) {
if (yarnArg != null && !yarnArg.isEmpty()) {
StringBuilder sb = new StringBuilder();
if (!Strings.isNullOrEmpty(paramName)) {
sb.append(paramName + "=");
}
ListIterator<String> it = yarnArg.listIterator();
while (it.hasNext()) {
sb.append(it.next() + (it.hasNext() ? paramSeparator : ""));
}
vargs.add(sb.toString());
}
}
private Path copyToLocalResources(ApplicationId appId, FileSystem fs, Map<String, LocalResource> localResources,
File file) throws IOException {
Path src = new Path(file.getAbsolutePath());
// TODO use home directory + appId / appName?
Path dst = new Path(new Path(fs.getHomeDirectory(), "/app-" + appId.getId()), file.getName());
fs.copyFromLocalFile(false, true, src, dst);
FileStatus destStatus = fs.getFileStatus(dst);
LocalResource resource = Records.newRecord(LocalResource.class);
resource.setType(LocalResourceType.FILE);
resource.setVisibility(LocalResourceVisibility.APPLICATION);
resource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
// Set timestamp and length of file so that the framework
// can do basic sanity checks for the local resource
// after it has been copied over to ensure it is the same
// resource the client intended to use with the application
resource.setTimestamp(destStatus.getModificationTime());
resource.setSize(destStatus.getLen());
localResources.put(file.getName(), resource);
return dst;
}
/**
* Monitor the submitted application for completion. Kill application if time expires.
*
* @param appId
* Application Id of application to be monitored
* @return true if application completed successfully
* @throws YarnRemoteException
*/
private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
while (true) {
// Check app status every 1 second.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.debug("Thread sleep in monitoring loop interrupted");
}
// Get application report for the appId we are interested in
ApplicationReport report = super.getApplicationReport(appId);
logger.info("Got application report from ASM for" + ", appId=" + appId.getId() + ", clientToken="
+ report.getClientToken() + ", appDiagnostics=" + report.getDiagnostics() + ", appMasterHost="
+ report.getHost() + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+ report.getRpcPort() + ", appStartTime=" + report.getStartTime() + ", yarnAppState="
+ report.getYarnApplicationState().toString() + ", distributedFinalState="
+ report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + report.getTrackingUrl()
+ ", appUser=" + report.getUser());
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
logger.info("Application has completed successfully. Breaking monitoring loop");
return true;
} else {
logger.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+ ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
return false;
}
} else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
logger.info("Application did not finish." + " YarnState=" + state.toString() + ", DSFinalStatus="
+ dsStatus.toString() + ". Breaking monitoring loop");
return false;
}
if ((yarnArgs.timeout != -1) && (System.currentTimeMillis() > (clientStartTime + yarnArgs.timeout))) {
logger.info("Reached client specified timeout for application. Killing application");
forceKillApplication(appId);
return false;
}
}
}
/**
* Kill a submitted application by sending a call to the Applications Manager
*
* @param appId
* Application Id to be killed.
* @throws YarnRemoteException
*/
private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
super.killApplication(appId);
}
}