blob: 5796580d60ac0f9dcb1f95041fae60b13f7a84cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.yarn;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* The YARN Application Master for Giraph is launched when the GiraphYarnClient
* successfully requests an execution container from the Resource Manager. The
* Application Master is provided by Giraph to manage all requests for resources
* (worker nodes, memory, jar files, job configuration metadata, etc.) that
* Giraph will need to perform the job. When Giraph runs in a non-YARN context,
* the role of the Application Master is played by Hadoop when it launches our
* GraphMappers (worker/master task nodes) to run the job.
*/
public class GiraphApplicationMaster {
/** Logger */
private static final Logger LOG =
Logger.getLogger(GiraphApplicationMaster.class);
/** Exit code for YARN containers that were manually killed/aborted */
private static final int YARN_ABORT_EXIT_STATUS = -100;
/** Exit code for successfully run YARN containers */
private static final int YARN_SUCCESS_EXIT_STATUS = 0;
/** millis to sleep between heartbeats during long loops */
private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
/** A reusable map of resources already in HDFS for each task to copy-to-local
* env and use to launch each GiraphYarnTask. */
private static Map<String, LocalResource> LOCAL_RESOURCES;
/** Initialize the Configuration class with the resource file exported by
* the YarnClient. We will need to export this resource to the tasks also.
* Construct the HEARTBEAT to use to ping the RM about job progress/health.
*/
//TODO
/** For status update for clients - yet to be implemented\\
* Hostname of the container
*/
private String appMasterHostname = "";
/** Port on which the app master listens for status updates from clients*/
private int appMasterRpcPort = 0;
/** Tracking url to which app master publishes info for clients to monitor*/
private String appMasterTrackingUrl = "";
static {
// pick up new conf XML file and populate it with stuff exported from client
Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
}
/** GiraphApplicationMaster's application attempt id */
private final ApplicationAttemptId appAttemptId;
/** GiraphApplicationMaster container id. Leave me here, I'm very useful */
private final ContainerId containerId;
/** number of containers Giraph needs (conf.getMaxWorkers() + 1 master) */
private final int containersToLaunch;
/** MB of JVM heap per Giraph task container */
private final int heapPerContainer;
/** Giraph configuration for this job, transported here by YARN framework */
private final ImmutableClassesGiraphConfiguration giraphConf;
/** Yarn configuration for this job*/
private final YarnConfiguration yarnConf;
/** Completed Containers Counter */
private final AtomicInteger completedCount;
/** Failed Containers Counter */
private final AtomicInteger failedCount;
/** Number of containers requested (hopefully '-w' from our conf) */
private final AtomicInteger allocatedCount;
/** Number of successfully completed containers in this job run. */
private final AtomicInteger successfulCount;
/** the ACK #'s for AllocateRequests + heartbeats == last response # */
private AtomicInteger lastResponseId;
/** buffer tostore all tokens */
private ByteBuffer allTokens;
/** Executor to attempt asynchronous launches of Giraph containers */
private ExecutorService executor;
/** YARN progress is a <code>float</code> between 0.0f and 1.0f */
//Handle to communicate with the Resource Manager
@SuppressWarnings("rawtypes")
private AMRMClientAsync amRMClient;
/** Handle to communicate with the Node Manager */
private NMClientAsync nmClientAsync;
/** Listen to process the response from the Node Manager */
private NMCallbackHandler containerListener;
/** whether all containers finishe */
private volatile boolean done;
/**
* Construct the GiraphAppMaster, populate fields using env vars
* set up by YARN framework in this execution container.
* @param cId the ContainerId
* @param aId the ApplicationAttemptId
*/
protected GiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
throws IOException {
containerId = cId; // future good stuff will need me to operate.
appAttemptId = aId;
lastResponseId = new AtomicInteger(0);
giraphConf =
new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
yarnConf = new YarnConfiguration(giraphConf);
completedCount = new AtomicInteger(0);
failedCount = new AtomicInteger(0);
allocatedCount = new AtomicInteger(0);
successfulCount = new AtomicInteger(0);
containersToLaunch = giraphConf.getMaxWorkers() + 1;
executor = Executors.newFixedThreadPool(containersToLaunch);
heapPerContainer = giraphConf.getYarnTaskHeapMb();
LOG.info("GiraphAM for ContainerId " + cId + " ApplicationAttemptId " +
aId);
}
/**
* Coordinates all requests for Giraph's worker/master task containers, and
* manages application liveness heartbeat, completion status, teardown, etc.
* @return success or failure
*/
private boolean run() throws YarnException, IOException {
boolean success = false;
try {
getAllTokens();
registerRMCallBackHandler();
registerNMCallbackHandler();
registerAMToRM();
madeAllContainerRequestToRM();
LOG.info("Wait to finish ..");
while (!done) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
LOG.error(ex);
//TODO:
}
}
LOG.info("Done " + done);
} finally {
// if we get here w/o problems, the executor is already long finished.
if (null != executor && !executor.isTerminated()) {
LOG.info("Forcefully terminating executors with done =:" + done);
executor.shutdownNow(); // force kill, especially if got here by throw
}
success = finish();
}
return success;
}
/**
* Call when the application is done
* @return if all containers succeed
*/
private boolean finish() {
// When the application completes, it should stop all running containers
LOG.info("Application completed. Stopping running containers");
nmClientAsync.stop();
// When the application completes, it should send a finish application
// signal to the RM
LOG.info("Application completed. Signalling finish to RM");
FinalApplicationStatus appStatus;
String appMessage = null;
boolean success = true;
if (failedCount.get() == 0 &&
completedCount.get() == containersToLaunch) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
appMessage = "Diagnostics." + ", total=" + containersToLaunch +
", completed=" + completedCount.get() + ", failed=" +
failedCount.get();
success = false;
}
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();
return success;
}
/**
* Add all containers' request
* @return
*/
private void madeAllContainerRequestToRM() {
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < containersToLaunch; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
}
/**
* Setup the request that will be sent to the RM for the container ask.
*
* @return the setup ResourceRequest to be sent to RM
*/
private ContainerRequest setupContainerAskForRM() {
// setup requirements for hosts
// using * as any host will do for the distributed shell app
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(heapPerContainer);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri);
LOG.info("Requested container ask: " + request.toString());
return request;
}
/**
* Populate allTokens with the tokens received
* @return
*/
private void getAllTokens() throws IOException {
Credentials credentials = UserGroupInformation.getCurrentUser()
.getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
// Now remove the AM->RM token so that containers cannot access it.
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Token type :" + token.getKind());
}
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
/**
* Register RM callback and start listening
* @return
*/
private void registerRMCallBackHandler() {
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000,
allocListener);
amRMClient.init(yarnConf);
amRMClient.start();
}
/**
* Register NM callback and start listening
* @return
*/
private void registerNMCallbackHandler() {
containerListener = new NMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(yarnConf);
nmClientAsync.start();
}
/**
* Register AM to RM
* @return AM register response
*/
private RegisterApplicationMasterResponse registerAMToRM()
throws YarnException {
// register Application Master with the YARN Resource Manager so we can
// begin requesting resources.
try {
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("SECURITY ENABLED ");
}
// TODO: provide actual call back details
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname
, appMasterRpcPort, appMasterTrackingUrl);
return response;
} catch (IOException ioe) {
throw new IllegalStateException(
"GiraphApplicationMaster failed to register with RM.", ioe);
}
}
/**
* For each container successfully allocated, attempt to set up and launch
* a Giraph worker/master task.
* @param allocatedContainers the containers we have currently allocated.
*/
private void startContainerLaunchingThreads(final List<Container>
allocatedContainers) {
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching command on a new container." +
", containerId=" + allocatedContainer.getId() +
", containerNode=" + allocatedContainer.getNodeId().getHost() +
":" + allocatedContainer.getNodeId().getPort() +
", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
", containerResourceMemory=" +
allocatedContainer.getResource().getMemory());
// Launch and start the container on a separate thread to keep the main
// thread unblocked as all containers may not be allocated at one go.
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
executor.execute(runnableLaunchContainer);
}
}
/**
* Lazily compose the map of jar and file names to LocalResource records for
* inclusion in GiraphYarnTask container requests. Can re-use the same map
* as Giraph tasks need identical HDFS-based resources (jars etc.) to run.
* @return the resource map for a ContainerLaunchContext
*/
private synchronized Map<String, LocalResource> getTaskResourceMap() {
// Set the local resources: just send the copies already in HDFS
if (null == LOCAL_RESOURCES) {
LOCAL_RESOURCES = Maps.newHashMap();
try {
// if you have to update the giraphConf for export to tasks, do it now
updateGiraphConfForExport();
YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
appAttemptId.getApplicationId());
} catch (IOException ioe) {
// fail fast, this container will never launch.
throw new IllegalStateException("Could not configure the container" +
"launch context for GiraphYarnTasks.", ioe);
}
}
// else, return the prepopulated copy to reuse for each GiraphYarkTask
return LOCAL_RESOURCES;
}
/**
* If you're going to make ANY CHANGES to your local GiraphConfiguration
* while running the GiraphApplicationMaster, put them here.
* This method replaces the current XML file GiraphConfiguration
* stored in HDFS with the copy you have modified locally in-memory.
*/
private void updateGiraphConfForExport()
throws IOException {
// Giraph expects this MapReduce stuff
giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
appAttemptId.getAttemptId());
// now republish the giraph-conf.xml in HDFS
YarnUtils.exportGiraphConfiguration(giraphConf,
appAttemptId.getApplicationId());
}
/**
* Application entry point
* @param args command-line args (set by GiraphYarnClient, if any)
*/
public static void main(final String[] args) {
boolean result = false;
LOG.info("Starting GiraphAM ");
String containerIdString = System.getenv().get(
Environment.CONTAINER_ID.name());
if (containerIdString == null) {
// container id should always be set in the env by the framework
throw new IllegalArgumentException("ContainerId not found in env vars.");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
try {
GiraphApplicationMaster giraphAppMaster =
new GiraphApplicationMaster(containerId, appAttemptId);
result = giraphAppMaster.run();
// CHECKSTYLE: stop IllegalCatch
} catch (Throwable t) {
// CHECKSTYLE: resume IllegalCatch
LOG.error("GiraphApplicationMaster caught a " +
"top-level exception in main.", t);
System.exit(1);
}
if (result) {
LOG.info("Giraph Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Giraph Application Master failed. exiting");
System.exit(2);
}
}
/**
* Thread to connect to the {@link ContainerManager} and launch the container
* that will house one of our Giraph worker (or master) tasks.
*/
private class LaunchContainerRunnable implements Runnable {
/** Allocated container */
private Container container;
/** NM listener */
private NMCallbackHandler containerListener;
/**
* Constructor.
* @param newGiraphTaskContainer Allocated container
* @param containerListener container listener.
*/
public LaunchContainerRunnable(final Container newGiraphTaskContainer,
NMCallbackHandler containerListener) {
this.container = newGiraphTaskContainer;
this.containerListener = containerListener;
}
/**
* Connects to CM, sets up container launch context
* for shell command and eventually dispatches the container
* start request to the CM.
*/
public void run() {
// Connect to ContainerManager
// configure the launcher for the Giraph task it will host
ContainerLaunchContext ctx = buildContainerLaunchContext();
// request CM to start this container as spec'd in ContainerLaunchContext
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
}
/**
* Boilerplate to set up the ContainerLaunchContext to tell the Container
* Manager how to launch our Giraph task in the execution container we have
* already allocated.
* @return a populated ContainerLaunchContext object.
*/
private ContainerLaunchContext buildContainerLaunchContext() {
LOG.info("Setting up container launch container for containerid=" +
container.getId());
ContainerLaunchContext launchContext = Records
.newRecord(ContainerLaunchContext.class);
// args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task
final List<String> commands = generateShellExecCommand();
LOG.info("Conatain launch Commands :" + commands.get(0));
launchContext.setCommands(commands);
// Set up tokens for the container too. We are
// populating them mainly for NodeManagers to be able to download any
// files in the distributed file-system. The tokens are otherwise also
// useful in cases, for e.g., when one is running a
// "hadoop dfs" like command
launchContext.setTokens(allTokens.slice());
// add user information to the job
String jobUserName = "ERROR_UNKNOWN_USER";
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.getCurrentUser();
jobUserName = ugi.getUserName();
} catch (IOException ioe) {
jobUserName =
System.getenv(ApplicationConstants.Environment.USER.name());
}
//launchContext.setUser(jobUserName);
LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
// Set the environment variables to inject into remote task's container
buildEnvironment(launchContext);
// Set the local resources: just send the copies already in HDFS
launchContext.setLocalResources(getTaskResourceMap());
return launchContext;
}
/**
* Generates our command line string used to launch our Giraph tasks.
* @return the BASH shell commands to launch the job.
*/
private List<String> generateShellExecCommand() {
return ImmutableList.of("java " +
"-Xmx" + heapPerContainer + "M " +
"-Xms" + heapPerContainer + "M " +
"-cp .:${CLASSPATH} " +
"org.apache.giraph.yarn.GiraphYarnTask " +
appAttemptId.getApplicationId().getClusterTimestamp() + " " +
appAttemptId.getApplicationId().getId() + " " +
container.getId().getId() + " " +
appAttemptId.getAttemptId() + " " +
"1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
"/task-" + container.getId().getId() + "-stdout.log " +
"2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
"/task-" + container.getId().getId() + "-stderr.log "
);
}
/**
* Utility to populate the environment vars we wish to inject into the new
* containter's env when the Giraph BSP task is executed.
* @param launchContext the launch context which will set our environment
* vars in the app master's execution container.
*/
private void buildEnvironment(final ContainerLaunchContext launchContext) {
Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
// pick up the local classpath so when we instantiate a Configuration
// remotely, we also get the "mapred-site.xml" and "yarn-site.xml"
YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
// set this map of env vars into the launch context.
launchContext.setEnvironment(classPathForEnv);
}
}
/**
* CallbackHandler to process RM async calls
*/
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@SuppressWarnings("unchecked")
@Override
public void onContainersCompleted(List<ContainerStatus>
completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt=" +
completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID=" +
containerStatus.getContainerId() + ", state=" +
containerStatus.getState() + ", exitStatus=" +
containerStatus.getExitStatus() + ", diagnostics=" +
containerStatus.getDiagnostics());
switch (containerStatus.getExitStatus()) {
case YARN_SUCCESS_EXIT_STATUS:
successfulCount.incrementAndGet();
break;
case YARN_ABORT_EXIT_STATUS:
break; // not success or fail
default:
failedCount.incrementAndGet();
break;
}
completedCount.incrementAndGet();
}
if (completedCount.get() == containersToLaunch) {
done = true;
LOG.info("All container compeleted. done = " + done);
} else {
LOG.info("After completion of one conatiner. current status is:" +
" completedCount :" + completedCount.get() +
" containersToLaunch :" + containersToLaunch +
" successfulCount :" + successfulCount.get() +
" failedCount :" + failedCount.get());
}
}
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt=" +
allocatedContainers.size());
allocatedCount.addAndGet(allocatedContainers.size());
LOG.info("Total allocated # of container so far : " +
allocatedCount.get() +
" allocated out of " + containersToLaunch + " required.");
startContainerLaunchingThreads(allocatedContainers);
}
@Override
public void onShutdownRequest() {
done = true;
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
}
@Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
float progress = (float) completedCount.get() /
containersToLaunch;
return progress;
}
@Override
public void onError(Throwable e) {
done = true;
amRMClient.stop();
}
}
/**
* CallbackHandler to process NM async calls
*/
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
/** List of containers */
private ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
/**
* Add a container
* @param containerId id of container
* @param container container object
* @return
*/
public void addContainer(ContainerId containerId, Container container) {
containers.putIfAbsent(containerId, container);
}
@Override
public void onContainerStopped(ContainerId containerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to stop Container " + containerId);
}
containers.remove(containerId);
}
@Override
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
if (LOG.isDebugEnabled()) {
LOG.debug("Container Status: id=" + containerId + ", status=" +
containerStatus);
}
}
@Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to start Container " + containerId);
}
Container container = containers.get(containerId);
if (container != null) {
nmClientAsync.getContainerStatusAsync(containerId,
container.getNodeId());
}
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId, t);
containers.remove(containerId);
}
@Override
public void onGetContainerStatusError(
ContainerId containerId, Throwable t) {
LOG.error("Failed to query the status of Container " + containerId, t);
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to stop Container " + containerId);
containers.remove(containerId);
}
}
}