blob: 4e0d55b10df302e9a7dda17566e83850d3d6d56f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app2;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerAllocator;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl2;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app2.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.job.Task;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app2.local.LocalContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app2.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.app2.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app2.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanerImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
* All state changes happens via Job interface. Each event
* results in a Finite State Transition in Job.
*
* MR AppMaster is the composition of loosely coupled services. The services
* interact with each other via events. The components resembles the
* Actors model. The component acts on received event and send out the
* events to other components.
* This keeps it highly concurrent with no or minimal synchronization needs.
*
* The events are dispatched by a central Dispatch mechanism. All components
* register to the Dispatcher.
*
* The information is shared across different components using AppContext.
*/
@SuppressWarnings("rawtypes")
public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
/**
* Priority of the MRAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private Clock clock;
private final long startTime;
private final long appSubmitTime;
private String appName;
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmPort;
private final int nmHttpPort;
private AMContainerMap containers;
private AMNodeMap nodes;
protected final MRAppMetrics metrics;
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
private Recovery recoveryServ;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
private Speculator speculator;
private ContainerHeartbeatHandler containerHeartbeatHandler;
private TaskHeartbeatHandler taskHeartbeatHandler;
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private EventHandler<JobHistoryEvent> jobHistoryEventHandler;
private AbstractService stagingDirCleanerService;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private ContainerRequestor containerRequestor;
private ContainerAllocator amScheduler;
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@Override
public void init(final Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
newApiCommitter = false;
jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
appAttemptID.getApplicationId().getId());
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if ((numReduceTasks > 0 &&
conf.getBoolean("mapred.reducer.new-api", false)) ||
(numReduceTasks == 0 &&
conf.getBoolean("mapred.mapper.new-api", false))) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
committer = createOutputCommitter(conf);
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
if (recoveryEnabled && recoverySupportedByCommitter
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
recoveryServ = createRecoveryService(context);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
inRecovery = true;
} else {
LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
dispatcher = createDispatcher();
addIfService(dispatcher);
}
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
addIfService(taskHeartbeatHandler);
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
addIfService(containerHeartbeatHandler);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
taskHeartbeatHandler, containerHeartbeatHandler);
addIfService(taskAttemptListener);
containers = new AMContainerMap(containerHeartbeatHandler,
taskAttemptListener, context);
addIfService(containers);
dispatcher.register(AMContainerEventType.class, containers);
nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
addIfService(nodes);
dispatcher.register(AMNodeEventType.class, nodes);
//service to do the task cleanup
taskCleaner = createTaskCleaner(context);
addIfService(taskCleaner);
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
//service to log job history events
jobHistoryEventHandler = createJobHistoryHandler(context);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
jobHistoryEventHandler);
this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
//optional service to speculate on task attempts' progress
speculator = createSpeculator(conf, context);
addIfService(speculator);
}
speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
dispatcher.register(Speculator.EventType.class,
speculatorEventDispatcher);
// TODO XXX: Rename to NMComm
// corresponding service to launch allocated containers via NodeManager
// containerLauncher = createNMCommunicator(context);
containerLauncher = createContainerLauncher(context);
addIfService(containerLauncher);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerRequestor = createContainerRequestor(clientService, context);
addIfService(containerRequestor);
dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
amScheduler = createAMScheduler(containerRequestor, context);
addIfService(amScheduler);
dispatcher.register(AMSchedulerEventType.class, amScheduler);
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
this.stagingDirCleanerService = createStagingDirCleaningService();
addService(stagingDirCleanerService);
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
// Note: Even though JobHistoryEventHandler is started last, if any
// component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler
addIfService(this.jobHistoryEventHandler);
super.init(conf);
} // end of init()
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
private OutputCommitter createOutputCommitter(Configuration conf) {
OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
.newTaskId(jobId, 0, TaskType.MAP);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
.newTaskAttemptId(taskID, 0);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptID));
OutputFormat outputFormat;
try {
outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new YarnException(e);
}
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles());
}
/**
* Create the default file System for this job.
* @param conf the conf object
* @return the default filesystem for this job
* @throws IOException
*/
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(conf);
}
/**
* clean up staging directories for the job.
* @throws IOException
*/
public void cleanupStagingDir() throws IOException {
/* make sure we clean the staging files */
String jobTempDir = null;
FileSystem fs = getFileSystem(getConfig());
try {
if (!keepJobFiles(new JobConf(getConfig()))) {
jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
if (jobTempDir == null) {
LOG.warn("Job Staging directory is null");
return;
}
Path jobTempDirPath = new Path(jobTempDir);
LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig()) +
" " + jobTempDir);
fs.delete(jobTempDirPath, true);
}
} catch(IOException io) {
LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
}
}
/**
* Exit call. Just in a function call to enable testing.
*/
protected void sysexit() {
System.exit(0);
}
protected class JobFinishEventHandlerCR implements EventHandler<JobFinishEvent> {
// Considering TaskAttempts are marked as completed before a container exit,
// it's very likely that a Container may not have "completed" by the time a
// job completes. This would imply that TaskAtetmpts may not be at a FINAL
// internal state (state machine state), and cleanup would not have happened.
// Since the shutdown handler has been called in the same thread which
// is handling all other async events, creating a separate thread for shutdown.
//
// For now, checking to see if all containers have COMPLETED, with a 5
// second timeout before the exit.
// TODO XXX: Modify TaskAttemptCleaner to empty it's queue while stopping.
public void handle(JobFinishEvent event) {
LOG.info("Handling JobFinished Event");
AMShutdownRunnable r = new AMShutdownRunnable();
Thread t = new Thread(r, "AMShutdownThread");
t.start();
}
protected void maybeSendJobEndNotification() {
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie);
}
}
}
protected void stopAllServices() {
try {
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
}
protected void exit() {
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}
private void stopAM() {
stopAllServices();
exit();
}
protected boolean allContainersComplete() {
for (AMContainer amContainer : context.getAllContainers().values()) {
if (amContainer.getState() != AMContainerState.COMPLETED) {
return false;
}
}
return true;
}
protected boolean allTaskAttemptsComplete() {
// TODO XXX: Implement.
// TaskAttempts will transition to their final state machine state only
// after a container is complete and sends out a TA_TERMINATED event.
return true;
}
private class AMShutdownRunnable implements Runnable {
@Override
public void run() {
maybeSendJobEndNotification();
// TODO XXX Add a timeout.
LOG.info("Waiting for all containers and TaskAttempts to complete");
if (!job.isUber()) {
while (!allContainersComplete() || !allTaskAttemptsComplete()) {
try {
synchronized (this) {
wait(100l);
}
} catch (InterruptedException e) {
LOG.info("AM Shutdown Thread interrupted. Exiting");
break;
}
}
LOG.info("All Containers and TaskAttempts Complete. Stopping services");
} else {
LOG.info("Uberized job. Not waiting for all containers to finish");
}
stopAM();
LOG.info("AM Shutdown Thread Completing");
}
}
}
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@Override
public void handle(JobFinishEvent event) {
// job has finished
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie);
}
}
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}
}
/**
* create an event handler that handles the job finish event.
* @return the job finish event handler.
*/
protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
return new JobFinishEventHandler();
}
/**
* Create the recovery service.
* @return an instance of the recovery service.
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext, getCommitter());
}
/**
* Create the RMContainerRequestor.
*
* @param clientService
* the MR Client Service.
* @param appContext
* the application context.
* @return an instance of the RMContainerRequestor.
*/
protected ContainerRequestor createContainerRequestor(
ClientService clientService, AppContext appContext) {
return new ContainerRequestorRouter(clientService, appContext);
}
/**
* Create the AM Scheduler.
*
* @param requestor
* The Container Requestor.
* @param appContext
* the application context.
* @return an instance of the AMScheduler.
*/
protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
AppContext appContext) {
return new AMSchedulerRouter(requestor, appContext);
}
/** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) {
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos,
taskHeartbeatHandler, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
} // end createJob()
/**
* Obtain the tokens needed by the job and put them in the UGI
* @param conf
*/
protected void downloadTokensAndSetupUGI(Configuration conf) {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Token of kind " + tk.getKind()
+ "in current ugi in the AppMaster for service "
+ tk.getService());
}
currentUser.addToken(tk); // For use by AppMaster itself.
}
}
} catch (IOException e) {
throw new YarnException(e);
}
}
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new JobHistoryEventHandler2(context, getStartCount());
}
protected AbstractService createStagingDirCleaningService() {
return new StagingDirCleaningService();
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
Class<? extends Speculator> speculatorClass;
try {
speculatorClass
// "yarn.mapreduce.job.speculator.class"
= conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
DefaultSpeculator.class,
Speculator.class);
Constructor<? extends Speculator> speculatorConstructor
= speculatorClass.getConstructor
(Configuration.class, AppContext.class);
Speculator result = speculatorConstructor.newInstance(conf, context);
return result;
} catch (InstantiationException ex) {
LOG.error("Can't make a speculator -- check "
+ MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
throw new YarnException(ex);
} catch (IllegalAccessException ex) {
LOG.error("Can't make a speculator -- check "
+ MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
throw new YarnException(ex);
} catch (InvocationTargetException ex) {
LOG.error("Can't make a speculator -- check "
+ MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
throw new YarnException(ex);
} catch (NoSuchMethodException ex) {
LOG.error("Can't make a speculator -- check "
+ MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
throw new YarnException(ex);
}
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
TaskAttemptListener lis = new TaskAttemptListenerImpl2(context, thh, chh,
jobTokenSecretManager);
return lis;
}
protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
Configuration conf) {
TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
return thh;
}
protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
Configuration conf) {
ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
// TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
return chh;
}
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleanerImpl(context);
}
protected ContainerLauncher
createContainerLauncher(final AppContext context) {
return new ContainerLauncherImpl(context);
}
//TODO:should have an interface for MRClientService
protected ClientService createClientService(AppContext context) {
return new MRClientService(context);
}
public ApplicationId getAppID() {
return appAttemptID.getApplicationId();
}
public ApplicationAttemptId getAttemptID() {
return appAttemptID;
}
public JobId getJobId() {
return jobId;
}
public OutputCommitter getCommitter() {
return committer;
}
public boolean isNewApiCommitter() {
return newApiCommitter;
}
public int getStartCount() {
return appAttemptID.getAttemptId();
}
public AppContext getContext() {
return context;
}
public Dispatcher getDispatcher() {
return dispatcher;
}
public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() {
return completedTasksFromPreviousRun;
}
public List<AMInfo> getAllAMInfos() {
return amInfos;
}
public ContainerLauncher getContainerLauncher() {
return containerLauncher;
}
public TaskAttemptListener getTaskAttemptListener() {
return taskAttemptListener;
}
/**
* By the time life-cycle of this router starts, job-init would have already
* happened.
*/
private final class ContainerRequestorRouter extends AbstractService
implements ContainerRequestor {
private final ClientService clientService;
private final AppContext context;
private ContainerRequestor real;
public ContainerRequestorRouter(ClientService clientService,
AppContext appContext) {
super(ContainerRequestorRouter.class.getName());
this.clientService = clientService;
this.context = appContext;
}
@Override
public void start() {
if (job.isUber()) {
real = new LocalContainerRequestor(clientService,
context);
} else {
real = new RMContainerRequestor(clientService, context);
}
((Service)this.real).init(getConfig());
((Service)this.real).start();
super.start();
}
@Override
public void stop() {
if (real != null) {
((Service) real).stop();
}
super.stop();
}
@Override
public void handle(RMCommunicatorEvent event) {
real.handle(event);
}
@Override
public Resource getAvailableResources() {
return real.getAvailableResources();
}
@Override
public void addContainerReq(ContainerRequest req) {
real.addContainerReq(req);
}
@Override
public void decContainerReq(ContainerRequest req) {
real.decContainerReq(req);
}
public void setSignalled(boolean isSignalled) {
((RMCommunicator) real).setSignalled(isSignalled);
}
}
/**
* By the time life-cycle of this router starts, job-init would have already
* happened.
*/
private final class AMSchedulerRouter extends AbstractService
implements ContainerAllocator {
private final ContainerRequestor requestor;
private final AppContext context;
private ContainerAllocator containerAllocator;
AMSchedulerRouter(ContainerRequestor requestor,
AppContext context) {
super(AMSchedulerRouter.class.getName());
this.requestor = requestor;
this.context = context;
}
@Override
public synchronized void start() {
if (job.isUber()) {
this.containerAllocator = new LocalContainerAllocator(this.context,
jobId, nmHost, nmPort, nmHttpPort, containerID,
(TaskUmbilicalProtocol) taskAttemptListener, taskAttemptListener,
(RMCommunicator) this.requestor);
} else {
this.containerAllocator = new RMContainerAllocator(this.requestor,
this.context);
}
((Service)this.containerAllocator).init(getConfig());
((Service)this.containerAllocator).start();
super.start();
}
@Override
public synchronized void stop() {
if (containerAllocator != null) {
((Service) this.containerAllocator).stop();
super.stop();
}
}
@Override
public void handle(AMSchedulerEvent event) {
this.containerAllocator.handle(event);
}
}
public TaskHeartbeatHandler getTaskHeartbeatHandler() {
return taskHeartbeatHandler;
}
private final class StagingDirCleaningService extends AbstractService {
StagingDirCleaningService() {
super(StagingDirCleaningService.class.getName());
}
@Override
public synchronized void stop() {
try {
cleanupStagingDir();
} catch (IOException io) {
LOG.error("Failed to cleanup staging dir: ", io);
}
super.stop();
}
}
private class RunningAppContext implements AppContext {
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
public RunningAppContext(Configuration config) {
this.conf = config;
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptID;
}
@Override
public ApplicationId getApplicationID() {
return appAttemptID.getApplicationId();
}
@Override
public String getApplicationName() {
return appName;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public Job getJob(JobId jobID) {
return jobs.get(jobID);
}
@Override
public Map<JobId, Job> getAllJobs() {
return jobs;
}
@Override
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
@Override
public CharSequence getUser() {
return this.conf.get(MRJobConfig.USER_NAME);
}
@Override
public Clock getClock() {
return clock;
}
@Override
public ClusterInfo getClusterInfo() {
return this.clusterInfo;
}
@Override
public AMContainerMap getAllContainers() {
return containers;
}
@Override
public AMNodeMap getAllNodes() {
return nodes;
}
@Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new YarnException(
"Cannot get ApplicationACLs before all services have started");
}
return ((RMCommunicator) containerRequestor).getApplicationAcls();
}
}
@SuppressWarnings("unchecked")
@Override
public void start() {
// Pull completedTasks etc from recovery
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
}
// / Create the AMInfo for the current AppMaster
if (amInfos == null) {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort);
amInfos.add(amInfo);
// /////////////////// Create the job itself.
job = createJob(getConfig());
// End of creating the job.
// Send out an MR AM inited event for this AM and all previous AMs.
for (AMInfo info : amInfos) {
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerPort(), info
.getNodeManagerHttpPort())));
}
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
// create a job event for job intialization
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
// Send init to the job (this does NOT trigger job execution)
// This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
// JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to
// ubermode if appropriate (by registering different container-allocator
// and container-launcher services/event-handlers).
if (job.isUber()) {
speculatorEventDispatcher.disableSpeculation();
LOG.info("MRAppMaster uberizing job " + job.getID()
+ " in local container (\"uber-AM\") on node "
+ nmHost + ":" + nmPort + ".");
} else {
// send init to speculator only for non-uber jobs.
// This won't yet start as dispatcher isn't started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
//start all the components
super.start();
// All components have started, start the job.
startJobs();
}
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
*
* TODO: Rework the design to actually support this. Currently much of the
* job stuff has been moved to init() above to support uberization (MR-1220).
* In a typical workflow, one presumably would want to uberize only a subset
* of the jobs (the "small" ones), which is awkward with the current design.
*/
@SuppressWarnings("unchecked")
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}
private class JobEventDispatcher implements EventHandler<JobEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(JobEvent event) {
((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
}
}
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
Task task = context.getJob(event.getTaskID().getJobId()).getTask(
event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
}
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
Task task = job.getTask(event.getTaskAttemptID().getTaskId());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
}
private class SpeculatorEventDispatcher implements
EventHandler<SpeculatorEvent> {
private final Configuration conf;
private volatile boolean disabled;
public SpeculatorEventDispatcher(Configuration config) {
this.conf = config;
}
@Override
public void handle(SpeculatorEvent event) {
if (disabled) {
return;
}
TaskId tId = event.getTaskID();
TaskType tType = null;
/* event's TaskId will be null if the event type is JOB_CREATE or
* ATTEMPT_STATUS_UPDATE
*/
if (tId != null) {
tType = tId.getTaskType();
}
boolean shouldMapSpec =
conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
boolean shouldReduceSpec =
conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
/* The point of the following is to allow the MAP and REDUCE speculative
* config values to be independent:
* IF spec-exec is turned on for maps AND the task is a map task
* OR IF spec-exec is turned on for reduces AND the task is a reduce task
* THEN call the speculator to handle the event.
*/
if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
|| (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
// Speculator IS enabled, direct the event to there.
speculator.handle(event);
}
}
public void disableSpeculation() {
disabled = true;
}
}
private static void validateInputParam(String value, String param)
throws IOException {
if (value == null) {
String msg = param + " is null";
LOG.error(msg);
throw new IOException(msg);
}
}
public static void main(String[] args) {
try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr =
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
String nodeHttpPortString =
System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(containerIdStr,
ApplicationConstants.AM_CONTAINER_ID_ENV);
validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
validateInputParam(nodeHttpPortString,
ApplicationConstants.NM_HTTP_PORT_ENV);
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime);
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
conf.set(MRJobConfig.USER_NAME, jobUserName);
// Do not automatically close FileSystem objects so that in case of
// SIGTERM I have a chance to write out the job history. I'll be closing
// the objects myself.
conf.setBoolean("fs.automatic.close", false);
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
System.exit(1);
}
}
// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
static class MRAppMasterShutdownHook implements Runnable {
MRAppMaster appMaster;
MRAppMasterShutdownHook(MRAppMaster appMaster) {
this.appMaster = appMaster;
}
public void run() {
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler.");
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so
// that they don't take too long in shutting down
// Signal the RMCommunicator.
((ContainerRequestorRouter) appMaster.containerRequestor)
.setSignalled(true);
if(appMaster.jobHistoryEventHandler != null) {
((JobHistoryEventHandler2) appMaster.jobHistoryEventHandler)
.setSignalled(true);
}
appMaster.stop();
}
}
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final YarnConfiguration conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
return null;
}
});
}
}