blob: b9e06256a82d0e11380807740c48b894edf38cc0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.Records;
/**
* The UnmanagedLauncher is a simple client that launches and unmanaged AM. An
* unmanagedAM is an AM that is not launched and managed by the RM. The client
* creates a new application on the RM and negotiates a new attempt id. Then it
* waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
* which it spawns the AM in another process and passes it the attempt id via
* env variable ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. The AM can be in any
* language. The AM can register with the RM using the attempt id and proceed as
* normal. The client redirects app stdout and stderr to its own stdout and
* stderr and waits for the AM process to exit. Then it waits for the RM to
* report app completion.
*/
public class UnmanagedAMLauncher {
private static final Log LOG = LogFactory.getLog(UnmanagedAMLauncher.class);
private Configuration conf;
// Handle to talk to the Resource Manager/Applications Manager
private YarnClientImpl rmClient;
// Application master specific info to register a new Application with RM/ASM
private String appName = "";
// App master priority
private int amPriority = 0;
// Queue for App master
private String amQueue = "";
// cmd to start AM
private String amCmd = null;
// set the classpath explicitly
private String classpath = null;
/**
* @param args
* Command line arguments
*/
public static void main(String[] args) {
try {
UnmanagedAMLauncher client = new UnmanagedAMLauncher();
LOG.info("Initializing Client");
boolean doRun = client.init(args);
if (!doRun) {
System.exit(0);
}
client.run();
} catch (Throwable t) {
LOG.fatal("Error running Client", t);
System.exit(1);
}
}
/**
*/
public UnmanagedAMLauncher(Configuration conf) throws Exception {
// Set up RPC
this.conf = conf;
}
public UnmanagedAMLauncher() throws Exception {
this(new Configuration());
}
private void printUsage(Options opts) {
new HelpFormatter().printHelp("Client", opts);
}
public boolean init(String[] args) throws ParseException {
Options opts = new Options();
opts.addOption("appname", true,
"Application Name. Default value - UnmanagedAM");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("queue", true,
"RM Queue in which this application is to be submitted");
opts.addOption("master_memory", true,
"Amount of memory in MB to be requested to run the application master");
opts.addOption("cmd", true, "command to start unmanaged AM (required)");
opts.addOption("classpath", true, "additional classpath");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
if (args.length == 0) {
printUsage(opts);
throw new IllegalArgumentException(
"No args specified for client to initialize");
}
if (cliParser.hasOption("help")) {
printUsage(opts);
return false;
}
appName = cliParser.getOptionValue("appname", "UnmanagedAM");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
classpath = cliParser.getOptionValue("classpath", null);
amCmd = cliParser.getOptionValue("cmd");
if (amCmd == null) {
printUsage(opts);
throw new IllegalArgumentException(
"No cmd specified for application master");
}
YarnConfiguration yarnConf = new YarnConfiguration(conf);
rmClient = new YarnClientImpl();
rmClient.init(yarnConf);
return true;
}
public void launchAM(ApplicationAttemptId attemptId) throws IOException {
Map<String, String> env = System.getenv();
ArrayList<String> envAMList = new ArrayList<String>();
boolean setClasspath = false;
for (Map.Entry<String, String> entry : env.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if(key.equals("CLASSPATH")) {
setClasspath = true;
if(classpath != null) {
value = value + File.pathSeparator + classpath;
}
}
envAMList.add(key + "=" + value);
}
if(!setClasspath && classpath!=null) {
envAMList.add("CLASSPATH="+classpath);
}
envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
String[] envAM = new String[envAMList.size()];
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(amProc
.getErrorStream()));
final BufferedReader inReader =
new BufferedReader(new InputStreamReader(amProc
.getInputStream()));
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while((line != null) && !isInterrupted()) {
System.err.println(line);
line = errReader.readLine();
}
} catch(IOException ioe) {
LOG.warn("Error reading the error stream", ioe);
}
}
};
Thread outThread = new Thread() {
@Override
public void run() {
try {
String line = inReader.readLine();
while((line != null) && !isInterrupted()) {
System.out.println(line);
line = inReader.readLine();
}
} catch(IOException ioe) {
LOG.warn("Error reading the out stream", ioe);
}
}
};
try {
errThread.start();
outThread.start();
} catch (IllegalStateException ise) { }
// wait for the process to finish and check the exit code
try {
int exitCode = amProc.waitFor();
LOG.info("AM process exited with value: " + exitCode);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
// make sure that the error thread exits
// on Windows these threads sometimes get stuck and hang the execution
// timeout and join later after destroying the process.
errThread.join();
outThread.join();
errReader.close();
inReader.close();
} catch (InterruptedException ie) {
LOG.info("ShellExecutor: Interrupted while reading the error/out stream",
ie);
} catch (IOException ioe) {
LOG.warn("Error while closing the error/out stream", ioe);
}
amProc.destroy();
}
public boolean run() throws IOException {
LOG.info("Starting Client");
// Connect to ResourceManager
rmClient.start();
try {
// Get a new application id
GetNewApplicationResponse newApp = rmClient.getNewApplication();
ApplicationId appId = newApp.getApplicationId();
// Create launch context for app master
LOG.info("Setting up application submission context for ASM");
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
// set the application id
appContext.setApplicationId(appId);
// set the application name
appContext.setApplicationName(appName);
// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer);
// unmanaged AM
appContext.setUnmanagedAM(true);
LOG.info("Setting unmanaged AM");
// Submit the application to the applications manager
LOG.info("Submitting application to ASM");
rmClient.submitApplication(appContext);
// Monitor the application to wait for launch state
ApplicationReport appReport = monitorApplication(appId,
EnumSet.of(YarnApplicationState.ACCEPTED));
ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
LOG.info("Launching application with id: " + attemptId);
// launch AM
launchAM(attemptId);
// Monitor the application for end state
appReport = monitorApplication(appId, EnumSet.of(
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
YarnApplicationState.FINISHED));
YarnApplicationState appState = appReport.getYarnApplicationState();
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
LOG.info("App ended with state: " + appReport.getYarnApplicationState()
+ " and status: " + appStatus);
boolean success;
if (YarnApplicationState.FINISHED == appState
&& FinalApplicationStatus.SUCCEEDED == appStatus) {
LOG.info("Application has completed successfully.");
success = true;
} else {
LOG.info("Application did finished unsuccessfully." + " YarnState="
+ appState.toString() + ", FinalStatus=" + appStatus.toString());
success = false;
}
return success;
} finally {
rmClient.stop();
}
}
/**
* Monitor the submitted application for completion. Kill application if time
* expires.
*
* @param appId
* Application Id of application to be monitored
* @return true if application completed successfully
* @throws YarnRemoteException
*/
private ApplicationReport monitorApplication(ApplicationId appId,
Set<YarnApplicationState> finalState) throws YarnRemoteException {
while (true) {
// Check app status every 1 second.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
}
// Get application report for the appId we are interested in
ApplicationReport report = rmClient.getApplicationReport(appId);
LOG.info("Got application report from ASM for" + ", appId="
+ appId.getId() + ", appAttemptId="
+ report.getCurrentApplicationAttemptId() + ", clientToken="
+ report.getClientToken() + ", appDiagnostics="
+ report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+ ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+ report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
+ ", distributedFinalState="
+ report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+ report.getTrackingUrl() + ", appUser=" + report.getUser());
YarnApplicationState state = report.getYarnApplicationState();
if (finalState.contains(state)) {
return report;
}
}
}
}