blob: 71a81b0496e7dd7545d0fbaf70eec54d080cb774 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* An ApplicationMaster for executing shell commands on a set of launched
* containers using the YARN framework.
*
* <p>
* This class is meant to act as an example on how to write yarn-based
* application masters.
* </p>
*
* <p>
* The ApplicationMaster is started on a container by the
* <code>ResourceManager</code>'s launcher. The first thing that the
* <code>ApplicationMaster</code> needs to do is to connect and register itself
* with the <code>ResourceManager</code>. The registration sets up information
* within the <code>ResourceManager</code> regarding what host:port the
* ApplicationMaster is listening on to provide any form of functionality to a
* client as well as a tracking url that a client can use to keep track of
* status/job history if needed.
* </p>
*
* <p>
* The <code>ApplicationMaster</code> needs to send a heartbeat to the
* <code>ResourceManager</code> at regular intervals to inform the
* <code>ResourceManager</code> that it is up and alive. The
* {@link AMRMProtocol#allocate} to the <code>ResourceManager</code> from the
* <code>ApplicationMaster</code> acts as a heartbeat.
*
* <p>
* For the actual handling of the job, the <code>ApplicationMaster</code> has to
* request the <code>ResourceManager</code> via {@link AllocateRequest} for the
* required no. of containers using {@link ResourceRequest} with the necessary
* resource specifications such as node location, computational
* (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
* responds with an {@link AllocateResponse} that informs the
* <code>ApplicationMaster</code> of the set of newly allocated containers,
* completed containers as well as current state of available resources.
* </p>
*
* <p>
* For each allocated container, the <code>ApplicationMaster</code> can then set
* up the necessary launch context via {@link ContainerLaunchContext} to specify
* the allocated container id, local resources required by the executable, the
* environment to be setup for the executable, commands to execute, etc. and
* submit a {@link StartContainerRequest} to the {@link ContainerManager} to
* launch and execute the defined commands on the given allocated container.
* </p>
*
* <p>
* The <code>ApplicationMaster</code> can monitor the launched container by
* either querying the <code>ResourceManager</code> using
* {@link AMRMProtocol#allocate} to get updates on completed containers or via
* the {@link ContainerManager} by querying for the status of the allocated
* container's {@link ContainerId}.
*
* <p>
* After the job has been completed, the <code>ApplicationMaster</code> has to
* send a {@link FinishApplicationMasterRequest} to the
* <code>ResourceManager</code> to inform it that the
* <code>ApplicationMaster</code> has been completed.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ApplicationMaster {
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
// Configuration
private Configuration conf;
// YARN RPC to communicate with the Resource Manager or Node Manager
private YarnRPC rpc;
// Handle to communicate with the Resource Manager
private AMRMClient resourceManager;
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
// TODO
// For status update for clients - yet to be implemented
// Hostname of the container
private String appMasterHostname = "";
// Port on which the app master listens for status updates from clients
private int appMasterRpcPort = 0;
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
// App Master configuration
// No. of containers to run shell command on
private int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private int containerMemory = 10;
// Priority of the request
private int requestPriority;
// Simple flag to denote whether all works is done
private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
// allocated to us
private AtomicInteger numAllocatedContainers = new AtomicInteger();
// Count of failed containers
private AtomicInteger numFailedContainers = new AtomicInteger();
// Count of containers already requested from the RM
// Needed as once requested, we should not request for containers again.
// Only request for more if the original requirement changes.
private AtomicInteger numRequestedContainers = new AtomicInteger();
// Shell command to be executed
private String shellCommand = "";
// Args to be passed to the shell command
private String shellArgs = "";
// Env variables to be setup for the shell command
private Map<String, String> shellEnv = new HashMap<String, String>();
// Location of shell script ( obtained from info set in env )
// Shell script path in fs
private String shellScriptPath = "";
// Timestamp needed for creating a local resource
private long shellScriptPathTimestamp = 0;
// File length needed for local resource
private long shellScriptPathLen = 0;
// Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh";
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
/**
* @param args Command line args
*/
public static void main(String[] args) {
boolean result = false;
try {
ApplicationMaster appMaster = new ApplicationMaster();
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
result = appMaster.run();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Application Master failed. exiting");
System.exit(2);
}
}
/**
* Dump out contents of $CWD and the environment to stdout for debugging
*/
private void dumpOutDebugInfo() {
LOG.info("Dump debug output");
Map<String, String> envs = System.getenv();
for (Map.Entry<String, String> env : envs.entrySet()) {
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
System.out.println("System env: key=" + env.getKey() + ", val="
+ env.getValue());
}
String cmd = "ls -al";
Runtime run = Runtime.getRuntime();
Process pr = null;
try {
pr = run.exec(cmd);
pr.waitFor();
BufferedReader buf = new BufferedReader(new InputStreamReader(
pr.getInputStream()));
String line = "";
while ((line = buf.readLine()) != null) {
LOG.info("System CWD content: " + line);
System.out.println("System CWD content: " + line);
}
buf.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ApplicationMaster() throws Exception {
// Set up the configuration and RPC
conf = new YarnConfiguration();
rpc = YarnRPC.create(conf);
}
/**
* Parse command line options
*
* @param args Command line args
* @return Whether init successful and run should be invoked
* @throws ParseException
* @throws IOException
*/
public boolean init(String[] args) throws ParseException, IOException {
Options opts = new Options();
opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_command", true,
"Shell command to be executed by the Application Master");
opts.addOption("shell_script", true,
"Location of the shell script to be executed");
opts.addOption("shell_args", true, "Command line args for the shell script");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
if (args.length == 0) {
printUsage(opts);
throw new IllegalArgumentException(
"No args specified for application master to initialize");
}
if (cliParser.hasOption("help")) {
printUsage(opts);
return false;
}
if (cliParser.hasOption("debug")) {
dumpOutDebugInfo();
}
Map<String, String> envs = System.getenv();
if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
appAttemptID = ConverterUtils.toApplicationAttemptId(envs
.get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
} else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
} else {
throw new IllegalArgumentException(
"Application Attempt Id not set in the environment");
}
} else {
ContainerId containerId = ConverterUtils.toContainerId(envs
.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
appAttemptID = containerId.getApplicationAttemptId();
}
LOG.info("Application master for app" + ", appId="
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+ appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId());
if (!cliParser.hasOption("shell_command")) {
throw new IllegalArgumentException(
"No shell command specified to be executed by application master");
}
shellCommand = cliParser.getOptionValue("shell_command");
if (cliParser.hasOption("shell_args")) {
shellArgs = cliParser.getOptionValue("shell_args");
}
if (cliParser.hasOption("shell_env")) {
String shellEnvs[] = cliParser.getOptionValues("shell_env");
for (String env : shellEnvs) {
env = env.trim();
int index = env.indexOf('=');
if (index == -1) {
shellEnv.put(env, "");
continue;
}
String key = env.substring(0, index);
String val = "";
if (index < (env.length() - 1)) {
val = env.substring(index + 1);
}
shellEnv.put(key, val);
}
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.valueOf(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
shellScriptPathLen = Long.valueOf(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
}
if (!shellScriptPath.isEmpty()
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
LOG.error("Illegal values in env for shell script path" + ", path="
+ shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+ shellScriptPathTimestamp);
throw new IllegalArgumentException(
"Illegal values in env for shell script path");
}
}
containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "10"));
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
return true;
}
/**
* Helper function to print usage
*
* @param opts Parsed command line options
*/
private void printUsage(Options opts) {
new HelpFormatter().printHelp("ApplicationMaster", opts);
}
/**
* Main run function for the application master
*
* @throws YarnRemoteException
*/
public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster");
// Connect to ResourceManager
resourceManager = new AMRMClientImpl(appAttemptID);
resourceManager.init(conf);
resourceManager.start();
try {
// Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to
// the RPC server
// TODO use the rpc port info to register with the RM for the client to
// send requests to this app master
// Register self with ResourceManager
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the
// resource manager
int minMem = response.getMinimumResourceCapability().getMemory();
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
// A resource ask has to be atleast the minimum of the capability of the
// cluster, the value has to be a multiple of the min value and cannot
// exceed the max.
// If it is not an exact multiple of min, the RM will allocate to the
// nearest multiple of min
if (containerMemory < minMem) {
LOG.info("Container memory specified below min threshold of cluster."
+ " Using min value." + ", specified=" + containerMemory + ", min="
+ minMem);
containerMemory = minMem;
} else if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
// Setup heartbeat emitter
// TODO poll RM every now and then with an empty request to let RM know
// that we are alive
// The heartbeat interval after which an AM is timed out by the RM is
// defined by a config setting:
// RM_AM_EXPIRY_INTERVAL_MS with default defined by
// DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
// The allocate calls to the RM count as heartbeats so, for now,
// this additional heartbeat emitter is not required.
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
int loopCounter = -1;
while (numCompletedContainers.get() < numTotalContainers && !appDone) {
loopCounter++;
// log current state
LOG.info("Current application state: loop=" + loopCounter
+ ", appDone=" + appDone + ", total=" + numTotalContainers
+ ", requested=" + numRequestedContainers + ", completed="
+ numCompletedContainers + ", failed=" + numFailedContainers
+ ", currentAllocated=" + numAllocatedContainers);
// Sleep before each loop when asking RM for containers
// to avoid flooding RM with spurious requests when it
// need not have any available containers
// Sleeping for 1000 ms.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.info("Sleep interrupted " + e.getMessage());
}
// No. of containers to request
// For the first loop, askCount will be equal to total containers needed
// From that point on, askCount will always be 0 as current
// implementation does not change its ask on container failures.
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
if (askCount > 0) {
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
resourceManager.addContainerRequest(containerAsk);
}
// Send the request to RM
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
AMResponse amResp = sendContainerAskToRM();
// Retrieve list of allocated containers from the response
List<Container> allocatedContainers = amResp.getAllocatedContainers();
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerState" + allocatedContainer.getState()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
allocatedContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchThread.start();
}
// Check what the current available resources in the cluster are
// TODO should we do anything if the available resources are not enough?
Resource availableResources = amResp.getAvailableResources();
LOG.info("Current available resources in the cluster "
+ availableResources);
// Check the completed containers
List<ContainerStatus> completedContainers = amResp
.getCompletedContainersStatuses();
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID="
+ containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics());
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
// increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
// container failed
if (-100 != exitStatus) {
// shell script failed
// counts as completed
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
} else {
// something else bad happened
// app job did not complete for some reason
// we should re-try as the container was lost for some reason
numAllocatedContainers.decrementAndGet();
numRequestedContainers.decrementAndGet();
// we do not need to release the container as it would be done
// by the RM/CM.
}
} else {
// nothing to do
// container completed successfully
numCompletedContainers.incrementAndGet();
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
}
if (numCompletedContainers.get() == numTotalContainers) {
appDone = true;
}
LOG.info("Current application state: loop=" + loopCounter
+ ", appDone=" + appDone + ", total=" + numTotalContainers
+ ", requested=" + numRequestedContainers + ", completed="
+ numCompletedContainers + ", failed=" + numFailedContainers
+ ", currentAllocated=" + numAllocatedContainers);
// TODO
// Add a timeout handling layer
// for misbehaving shell commands
}
// Join all launched threads
// needed for when we time out
// and we need to release containers
for (Thread launchThread : launchThreads) {
try {
launchThread.join(10000);
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
e.printStackTrace();
}
}
// When the application completes, it should send a finish application
// signal to the RM
LOG.info("Application completed. Signalling finish to RM");
FinalApplicationStatus appStatus;
String appMessage = null;
boolean isSuccess = true;
if (numFailedContainers.get() == 0) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ ", completed=" + numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
isSuccess = false;
}
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
return isSuccess;
} finally {
resourceManager.stop();
}
}
/**
* Thread to connect to the {@link ContainerManager} and launch the container
* that will execute the shell command.
*/
private class LaunchContainerRunnable implements Runnable {
// Allocated container
Container container;
// Handle to communicate with ContainerManager
ContainerManager cm;
/**
* @param lcontainer Allocated container
*/
public LaunchContainerRunnable(Container lcontainer) {
this.container = lcontainer;
}
/**
* Helper function to connect to CM
*/
private void connectToCM() {
LOG.debug("Connecting to ContainerManager for containerid="
+ container.getId());
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
cmAddress, conf));
}
@Override
/**
* Connects to CM, sets up container launch context
* for shell command and eventually dispatches the container
* start request to the CM.
*/
public void run() {
// Connect to ContainerManager
connectToCM();
LOG.info("Setting up container launch container for containerid="
+ container.getId());
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
ctx.setContainerId(container.getId());
ctx.setResource(container.getResource());
String jobUserName = System.getenv(ApplicationConstants.Environment.USER
.name());
ctx.setUser(jobUserName);
LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName);
// Set the environment
ctx.setEnvironment(shellEnv);
// Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
// The container for the eventual shell commands needs its own local
// resources too.
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!shellScriptPath.isEmpty()) {
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
shellScriptPath)));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + shellScriptPath);
e.printStackTrace();
// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
// TODO
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
}
shellRsrc.setTimestamp(shellScriptPathTimestamp);
shellRsrc.setSize(shellScriptPathLen);
localResources.put(ExecShellStringPath, shellRsrc);
}
ctx.setLocalResources(localResources);
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!shellScriptPath.isEmpty()) {
vargs.add(ExecShellStringPath);
}
// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
ctx.setCommands(commands);
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
try {
cm.startContainer(startReq);
} catch (YarnRemoteException e) {
LOG.info("Start container failed for :" + ", containerId="
+ container.getId());
e.printStackTrace();
// TODO do we need to release this container?
}
// Get container status?
// Left commented out as the shell scripts are short lived
// and we are relying on the status for completed containers
// from RM to detect status
// GetContainerStatusRequest statusReq =
// Records.newRecord(GetContainerStatusRequest.class);
// statusReq.setContainerId(container.getId());
// GetContainerStatusResponse statusResp;
// try {
// statusResp = cm.getContainerStatus(statusReq);
// LOG.info("Container Status"
// + ", id=" + container.getId()
// + ", status=" +statusResp.getStatus());
// } catch (YarnRemoteException e) {
// e.printStackTrace();
// }
}
}
/**
* Setup the request that will be sent to the RM for the container ask.
*
* @param numContainers Containers to ask for from RM
* @return the setup ResourceRequest to be sent to RM
*/
private ContainerRequest setupContainerAskForRM(int numContainers) {
// setup requirements for hosts
// using * as any host will do for the distributed shell app
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(requestPriority);
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri, numContainers);
LOG.info("Requested container ask: " + request.toString());
return request;
}
/**
* Ask RM to allocate given no. of containers to this Application Master
*
* @param requestedContainers Containers to ask for from RM
* @return Response from RM to AM with allocated containers
* @throws YarnRemoteException
*/
private AMResponse sendContainerAskToRM() throws YarnRemoteException {
float progressIndicator = (float) numCompletedContainers.get()
/ numTotalContainers;
LOG.info("Sending request to RM for containers" + ", progress="
+ progressIndicator);
AllocateResponse resp = resourceManager.allocate(progressIndicator);
return resp.getAMResponse();
}
}