blob: 0da89333e4408844e8dbf7174b29ea57522a45fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
* All state changes happens via Job interface. Each event
* results in a Finite State Transition in Job.
*
* MR AppMaster is the composition of loosely coupled services. The services
* interact with each other via events. The components resembles the
* Actors model. The component acts on received event and send out the
* events to other components.
* This keeps it highly concurrent with no or minimal synchronization needs.
*
* The events are dispatched by a central Dispatch mechanism. All components
* register to the Dispatcher.
*
* The information is shared across different components using AppContext.
*/
public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private Clock clock;
private final long startTime = System.currentTimeMillis();
private String appName;
private final int startCount;
private final ApplicationId appID;
private final ApplicationAttemptId appAttemptID;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
private Speculator speculator;
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private Job job;
public MRAppMaster(ApplicationId applicationId, int startCount) {
this(applicationId, new SystemClock(), startCount);
}
public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.appID = applicationId;
this.appAttemptID = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(ApplicationAttemptId.class);
this.appAttemptID.setApplicationId(appID);
this.appAttemptID.setAttemptId(startCount);
this.startCount = startCount;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationId);
}
@Override
public void init(final Configuration conf) {
context = new RunningAppContext();
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
if (conf.getBoolean(AMConstants.RECOVERY_ENABLE, false)
&& startCount > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life.");
Recovery recoveryServ = new RecoveryService(appID, clock, startCount);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
} else {
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
}
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
//service to do the task cleanup
taskCleaner = createTaskCleaner(context);
addIfService(taskCleaner);
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
addIfService(historyService);
JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
historyService);
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
//optional service to speculate on task attempts' progress
speculator = createSpeculator(conf, context);
addIfService(speculator);
}
dispatcher.register(Speculator.EventType.class,
new SpeculatorEventDispatcher());
Credentials fsTokens = new Credentials();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
try {
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRConstants.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
+ "in current ugi in the AppMaster for service "
+ tk.getService());
currentUser.addToken(tk); // For use by AppMaster itself.
}
} catch (IOException e) {
throw new YarnException(e);
}
}
super.init(conf);
//---- start of what used to be startJobs() code:
Configuration config = getConfig();
job = createJob(config, fsTokens);
/** create a job event for job intialization */
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
/** send init to the job (this does NOT trigger job execution) */
synchronousJobEventDispatcher.handle(initJobEvent);
// send init to speculator. This won't yest start as dispatcher isn't
// started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
// JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to
// ubermode if appropriate (by registering different container-allocator
// and container-launcher services/event-handlers).
if (job.isUber()) {
LOG.info("MRAppMaster uberizing job " + job.getID()
+ " in local container (\"uber-AM\").");
} else {
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator =
createContainerAllocator(clientService, context, job.isUber());
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
if (containerAllocator instanceof Service) {
((Service) containerAllocator).init(config);
}
// corresponding service to launch allocated containers via NodeManager
containerLauncher = createContainerLauncher(context, job.isUber());
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
if (containerLauncher instanceof Service) {
((Service) containerLauncher).init(config);
}
} // end of init()
/** Create and initialize (but don't start) a single job.
* @param fsTokens */
protected Job createJob(Configuration conf, Credentials fsTokens) {
// create single job
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
completedTasksFromPreviousRun, metrics);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
new EventHandler<JobFinishEvent>() {
@Override
public void handle(JobFinishEvent event) {
// job has finished
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOG.info("Calling stop for all the services");
try {
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//TODO: this is required because rpc server does not shut down
// in spite of calling server.stop().
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
System.exit(0);
}
});
return newJob;
} // end createJob()
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
getStartCount());
return eventHandler;
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
Class<? extends Speculator> speculatorClass;
try {
speculatorClass
// "yarn.mapreduce.job.speculator.class"
= conf.getClass(AMConstants.SPECULATOR_CLASS,
DefaultSpeculator.class,
Speculator.class);
Constructor<? extends Speculator> speculatorConstructor
= speculatorClass.getConstructor
(Configuration.class, AppContext.class);
Speculator result = speculatorConstructor.newInstance(conf, context);
return result;
} catch (InstantiationException ex) {
LOG.error("Can't make a speculator -- check "
+ AMConstants.SPECULATOR_CLASS + " " + ex);
throw new YarnException(ex);
} catch (IllegalAccessException ex) {
LOG.error("Can't make a speculator -- check "
+ AMConstants.SPECULATOR_CLASS + " " + ex);
throw new YarnException(ex);
} catch (InvocationTargetException ex) {
LOG.error("Can't make a speculator -- check "
+ AMConstants.SPECULATOR_CLASS + " " + ex);
throw new YarnException(ex);
} catch (NoSuchMethodException ex) {
LOG.error("Can't make a speculator -- check "
+ AMConstants.SPECULATOR_CLASS + " " + ex);
throw new YarnException(ex);
}
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
TaskAttemptListener lis =
new TaskAttemptListenerImpl(context, jobTokenSecretManager);
return lis;
}
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleanerImpl(context);
}
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context, boolean isLocal) {
//return new StaticContainerAllocator(context);
return isLocal
? new LocalContainerAllocator(clientService, context)
: new RMContainerAllocator(clientService, context);
}
protected ContainerLauncher createContainerLauncher(AppContext context,
boolean isLocal) {
return isLocal
? new LocalContainerLauncher(context,
(TaskUmbilicalProtocol) taskAttemptListener)
: new ContainerLauncherImpl(context);
}
//TODO:should have an interface for MRClientService
protected ClientService createClientService(AppContext context) {
return new MRClientService(context);
}
public ApplicationId getAppID() {
return appID;
}
public int getStartCount() {
return startCount;
}
public AppContext getContext() {
return context;
}
public Dispatcher getDispatcher() {
return dispatcher;
}
public Set<TaskId> getCompletedTaskFromPreviousRun() {
return completedTasksFromPreviousRun;
}
public ContainerAllocator getContainerAllocator() {
return containerAllocator;
}
public ContainerLauncher getContainerLauncher() {
return containerLauncher;
}
public TaskAttemptListener getTaskAttemptListener() {
return taskAttemptListener;
}
class RunningAppContext implements AppContext {
private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptID;
}
@Override
public ApplicationId getApplicationID() {
return appID;
}
@Override
public String getApplicationName() {
return appName;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public Job getJob(JobId jobID) {
return jobs.get(jobID);
}
@Override
public Map<JobId, Job> getAllJobs() {
return jobs;
}
@Override
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
@Override
public CharSequence getUser() {
return getConfig().get(MRJobConfig.USER_NAME);
}
@Override
public Clock getClock() {
return clock;
}
}
@Override
public void start() {
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
startJobs();
//start all the components
super.start();
}
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
*
* TODO: Rework the design to actually support this. Currently much of the
* job stuff has been moved to init() above to support uberization (MR-1220).
* In a typical workflow, one presumably would want to uberize only a subset
* of the jobs (the "small" ones), which is awkward with the current design.
*/
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}
private class JobEventDispatcher implements EventHandler<JobEvent> {
@Override
public void handle(JobEvent event) {
((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
}
}
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
Task task = context.getJob(event.getTaskID().getJobId()).getTask(
event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
}
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
Task task = job.getTask(event.getTaskAttemptID().getTaskId());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
}
private class SpeculatorEventDispatcher implements
EventHandler<SpeculatorEvent> {
@Override
public void handle(SpeculatorEvent event) {
if (getConfig().getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| getConfig().getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
// Speculator IS enabled, direct the event to there.
speculator.handle(event);
}
}
}
public static void main(String[] args) {
try {
//Configuration.addDefaultResource("job.xml");
ApplicationId applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(Long.valueOf(args[0]));
applicationId.setId(Integer.valueOf(args[1]));
int failCount = Integer.valueOf(args[2]);
MRAppMaster appMaster = new MRAppMaster(applicationId, failCount);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRConstants.JOB_CONF_FILE));
conf.set(MRJobConfig.USER_NAME,
System.getProperty("user.name"));
UserGroupInformation.setConfiguration(conf);
appMaster.init(conf);
appMaster.start();
} catch (Throwable t) {
LOG.error("Caught throwable. Exiting:", t);
System.exit(1);
}
}
}