blob: 96f62a842eeb5861cb791812b30d53d4c0739813 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnRuntimeException;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezRemoteException;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.launcher.ContainerLauncher;
import org.apache.tez.dag.app.launcher.ContainerLauncherImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
import org.apache.tez.dag.app.rm.node.AMNodeMap;
import org.apache.tez.dag.app.taskclean.TaskCleaner;
import org.apache.tez.dag.app.taskclean.TaskCleanerImpl;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.avro.HistoryEventType;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
* All state changes happens via Job interface. Each event
* results in a Finite State Transition in Job.
*
* MR AppMaster is the composition of loosely coupled services. The services
* interact with each other via events. The components resembles the
* Actors model. The component acts on received event and send out the
* events to other components.
* This keeps it highly concurrent with no or minimal synchronization needs.
*
* The events are dispatched by a central Dispatch mechanism. All components
* register to the Dispatcher.
*
* The information is shared across different components using AppContext.
*/
@SuppressWarnings("rawtypes")
public class DAGAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
/**
* Priority of the MRAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private Clock clock;
private final DAGPlan dagPlan;
private long dagsStartTime;
private final long startTime;
private final long appSubmitTime;
private String appName;
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmPort;
private final int nmHttpPort;
private AMContainerMap containers;
private AMNodeMap nodes;
// TODO Metrics
//protected final MRAppMetrics metrics;
// TODO Recovery
//private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
private AppContext context;
private TezConfiguration conf;
private Dispatcher dispatcher;
// TODO Recovery
//private Recovery recoveryServ;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
//private Speculator speculator;
private ContainerHeartbeatHandler containerHeartbeatHandler;
private TaskHeartbeatHandler taskHeartbeatHandler;
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
// TODODAGAM Define DAGID
private TezDAGID dagId;
// private boolean newApiCommitter;
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
//private SpeculatorEventDispatcher speculatorEventDispatcher;
private TaskSchedulerEventHandler taskSchedulerEventHandler;
private HistoryEventHandler historyEventHandler;
private DAGAppMasterState state;
DAGClientServer clientRpcServer;
private DAGClientHandler clientHandler;
private DAG dag;
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, DAGPlan dagPB) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, dagPB);
}
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, DAGPlan dagPB) {
super(DAGAppMaster.class.getName());
this.dagPlan = dagPB;
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.state = DAGAppMasterState.NEW;
// TODO Metrics
//this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@Override
public void init(final Configuration tezConf) {
this.state = DAGAppMasterState.INITED;
assert tezConf instanceof TezConfiguration;
this.conf = (TezConfiguration) tezConf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = dagPlan.getName();
dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
clientHandler = new DAGClientHandler();
// TODO Committer.
// committer = createOutputCommitter(conf);
dispatcher = createDispatcher();
addIfService(dispatcher);
clientRpcServer = new DAGClientServer(clientHandler);
addIfService(clientRpcServer);
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
addIfService(taskHeartbeatHandler);
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
addIfService(containerHeartbeatHandler);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
taskHeartbeatHandler, containerHeartbeatHandler);
addIfService(taskAttemptListener);
containers = new AMContainerMap(containerHeartbeatHandler,
taskAttemptListener, context);
addIfService(containers);
dispatcher.register(AMContainerEventType.class, containers);
nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
addIfService(nodes);
dispatcher.register(AMNodeEventType.class, nodes);
//service to do the task cleanup
taskCleaner = createTaskCleaner(context);
addIfService(taskCleaner);
this.dagEventDispatcher = new DagEventDispatcher();
this.vertexEventDispatcher = new VertexEventDispatcher();
//register the event dispatchers
dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
// TODO XXX: Rename to NMComm
// corresponding service to launch allocated containers via NodeManager
// containerLauncher = createNMCommunicator(context);
containerLauncher = createContainerLauncher(context);
addIfService(containerLauncher);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer);
addIfService(taskSchedulerEventHandler);
dispatcher.register(AMSchedulerEventType.class,
taskSchedulerEventHandler);
historyEventHandler = new HistoryEventHandler(context);
addIfService(historyEventHandler);
dispatcher.register(HistoryEventType.class, historyEventHandler);
super.init(conf);
} // end of init()
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
/**
* Create the default file System for this job.
* @param conf the conf object
* @return the default filesystem for this job
* @throws IOException
*/
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(conf);
}
/**
* Exit call. Just in a function call to enable testing.
*/
protected void sysexit() {
System.exit(0);
}
protected class JobFinishEventHandlerCR
implements EventHandler<DAGFinishEvent> {
// Considering TaskAttempts are marked as completed before a container exit,
// it's very likely that a Container may not have "completed" by the time a
// job completes. This would imply that TaskAtetmpts may not be at a FINAL
// internal state (state machine state), and cleanup would not have happened.
// Since the shutdown handler has been called in the same thread which
// is handling all other async events, creating a separate thread for shutdown.
//
// For now, checking to see if all containers have COMPLETED, with a 5
// second timeout before the exit.
public void handle(DAGFinishEvent event) {
setStateOnDAGCompletion();
LOG.info("Handling JobFinished Event");
AMShutdownRunnable r = new AMShutdownRunnable();
Thread t = new Thread(r, "AMShutdownThread");
t.start();
}
// TODO Job End Notification
/*
protected void maybeSendJobEndNotification() {
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getID());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getDAGId(), ie);
}
}
}
*/
protected void stopAllServices() {
try {
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
}
protected void exit() {
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}
private void stopAM() {
stopAllServices();
exit();
}
protected boolean allContainersComplete() {
for (AMContainer amContainer : context.getAllContainers().values()) {
if (amContainer.getState() != AMContainerState.COMPLETED) {
return false;
}
}
return true;
}
protected boolean allTaskAttemptsComplete() {
// TODO XXX: Implement.
// TaskAttempts will transition to their final state machine state only
// after a container is complete and sends out a TA_TERMINATED event.
return true;
}
private class AMShutdownRunnable implements Runnable {
@Override
public void run() {
// TODO Job End Notification
//maybeSendJobEndNotification();
// TODO XXX Add a timeout.
LOG.info("Waiting for all containers and TaskAttempts to complete");
if (!dag.isUber()) {
while (!allContainersComplete() || !allTaskAttemptsComplete()) {
try {
synchronized (this) {
wait(100l);
}
} catch (InterruptedException e) {
LOG.info("AM Shutdown Thread interrupted. Exiting");
break;
}
}
LOG.info("All Containers and TaskAttempts Complete. Stopping services");
} else {
LOG.info("Uberized job. Not waiting for all containers to finish");
}
stopAM();
LOG.info("AM Shutdown Thread Completing");
}
}
}
private void handle(DAGAppMasterEvent event) {
switch (event.getType()) {
case INTERNAL_ERROR:
state = DAGAppMasterState.ERROR;
sendEvent(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
break;
default:
LOG.warn("No handler for event type: " + event.getType());
}
}
private class DAGAppMasterEventHandler implements
EventHandler<DAGAppMasterEvent> {
@Override
public void handle(DAGAppMasterEvent event) {
DAGAppMaster.this.handle(event);
}
}
private class DAGFinishEventHandler implements EventHandler<DAGFinishEvent> {
@Override
public void handle(DAGFinishEvent event) {
// job has finished
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// TODO Job End Notification
/*
// Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getDAGId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getDAGId(), ie);
}
}
*/
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
setStateOnDAGCompletion();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}
}
/**
* create an event handler that handles the job finish event.
* @return the dag finish event handler.
*/
protected EventHandler<DAGFinishEvent> createDAGFinishEventHandler() {
return new DAGFinishEventHandler();
}
/**
* Create the recovery service.
* @return an instance of the recovery service.
*/
// TODO Recovery
/*
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext, getCommitter());
}
*/
/**
* Create the RMContainerRequestor.
*
* @param clientService
* the MR Client Service.
* @param appContext
* the application context.
* @return an instance of the RMContainerRequestor.
*/
// protected ContainerRequestor createContainerRequestor(
// ClientService clientService, AppContext appContext) {
// return new ContainerRequestorRouter(clientService, appContext);
// }
/** Create and initialize (but don't start) a single dag. */
protected DAG createDAG(DAGPlan dagPB) {
// create single job
DAG newDag =
new DAGImpl(dagId, appAttemptID, conf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
// TODO Recovery
//completedTasksFromPreviousRun,
// TODO Metrics
//metrics,
//committer, newApiCommitter,
currentUser.getShortUserName(), appSubmitTime,
//amInfos,
taskHeartbeatHandler, context);
((RunningAppContext) context).setDAG(newDag);
dispatcher.register(DAGFinishEvent.Type.class,
createDAGFinishEventHandler());
return newDag;
} // end createDag()
/**
* Obtain the tokens needed by the job and put them in the UGI
* @param conf
*/
protected void downloadTokensAndSetupUGI(TezConfiguration conf) {
// TODO remove - TEZ-71
try {
this.currentUser = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(TezConfiguration.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, TezConfiguration.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Token of kind " + tk.getKind()
+ "in current ugi in the AppMaster for service "
+ tk.getService());
}
currentUser.addToken(tk); // For use by AppMaster itself.
}
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
TaskAttemptListener lis =
new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager);
return lis;
}
protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
TezConfiguration conf) {
TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
return thh;
}
protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
TezConfiguration conf) {
ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
TezConfiguration.DAG_AM_CONTAINER_LISTENER_THREAD_COUNT,
TezConfiguration.DAG_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT));
return chh;
}
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleanerImpl(context);
}
protected ContainerLauncher
createContainerLauncher(final AppContext context) {
return new ContainerLauncherImpl(context);
}
//TODO:should have an interface for MRClientService
/*
protected ClientService createClientService(AppContext context) {
return new MRClientService(context);
}
*/
public ApplicationId getAppID() {
return appAttemptID.getApplicationId();
}
public ApplicationAttemptId getAttemptID() {
return appAttemptID;
}
public TezDAGID getDAGId() {
return dagId;
}
public int getStartCount() {
return appAttemptID.getAttemptId();
}
public AppContext getContext() {
return context;
}
public Dispatcher getDispatcher() {
return dispatcher;
}
public ContainerLauncher getContainerLauncher() {
return containerLauncher;
}
public TaskAttemptListener getTaskAttemptListener() {
return taskAttemptListener;
}
public ContainerId getAppContainerId() {
return containerID;
}
public String getAppNMHost() {
return nmHost;
}
public int getAppNMPort() {
return nmPort;
}
public int getAppNMHttpPort() {
return nmHttpPort;
}
public DAGAppMasterState getState() {
return state;
}
public List<String> getDiagnostics() {
return dag.getDiagnostics();
}
public float getProgress() {
return dag.getProgress();
}
void setStateOnDAGCompletion() {
DAGAppMasterState oldState = state;
if(state == DAGAppMasterState.RUNNING) {
switch(dag.getState()) {
case SUCCEEDED:
state = DAGAppMasterState.SUCCEEDED;
break;
case FAILED:
state = DAGAppMasterState.FAILED;
break;
case KILLED:
state = DAGAppMasterState.FAILED;
case ERROR:
state = DAGAppMasterState.ERROR;
default:
state = DAGAppMasterState.ERROR;
}
}
LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state);
}
class DAGClientHandler implements DAGClient {
@Override
public List<String> getAllDAGs() throws TezRemoteException {
return Collections.singletonList(dag.getID().toString());
}
@Override
public DAGStatus getDAGStatus(String dagIdStr)
throws IOException, TezRemoteException {
return getDAG(dagIdStr).getDAGStatus();
}
@Override
public VertexStatus getVertexStatus(String dagIdStr, String vertexName)
throws IOException, TezRemoteException{
VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName);
if(status == null) {
throw new TezRemoteException("Unknown vertexName: " + vertexName);
}
return status;
}
DAG getDAG(String dagIdStr) throws IOException, TezRemoteException {
TezDAGID dagId = TezDAGID.fromString(dagIdStr);
if(dagId == null) {
throw new TezRemoteException("Bad dagId: " + dagIdStr);
}
if(!dagId.equals(dag.getID())) {
throw new TezRemoteException("Unknown dagId: " + dagIdStr);
}
return dag;
}
}
private class RunningAppContext implements AppContext {
private DAG dag;
private final TezConfiguration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock rLock = rwLock.readLock();
private final Lock wLock = rwLock.writeLock();
public RunningAppContext(TezConfiguration config) {
this.conf = config;
}
@Override
public DAGAppMaster getAppMaster() {
return DAGAppMaster.this;
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptID;
}
@Override
public ApplicationId getApplicationID() {
return appAttemptID.getApplicationId();
}
@Override
public String getApplicationName() {
return appName;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public DAG getDAG() {
try {
rLock.lock();
return dag;
} finally {
rLock.unlock();
}
}
@Override
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
@Override
public String getUser() {
return dag.getUserName();
}
@Override
public Clock getClock() {
return clock;
}
@Override
public ClusterInfo getClusterInfo() {
return this.clusterInfo;
}
@Override
public AMContainerMap getAllContainers() {
return containers;
}
@Override
public AMNodeMap getAllNodes() {
return nodes;
}
@Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new YarnRuntimeException(
"Cannot get ApplicationACLs before all services have started");
}
return taskSchedulerEventHandler.getApplicationAcls();
}
@Override
public TezDAGID getDAGID() {
try {
rLock.lock();
return dag.getID();
} finally {
rLock.unlock();
}
}
@Override
public void setDAG(DAG dag) {
try {
wLock.lock();
this.dag = dag;
} finally {
wLock.unlock();
}
}
}
@Override
public void start() {
this.state = DAGAppMasterState.RUNNING;
// TODO Recovery
// Pull completedTasks etc from recovery
/*
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
}
*/
// /////////////////// Create the job itself.
dag = createDAG(dagPlan);
// End of creating the job.
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
// create a job event for job intialization
DAGEvent initDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_INIT);
// Send init to the job (this does NOT trigger job execution)
// This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here.
dagEventDispatcher.handle(initDagEvent);
//start all the components
super.start();
this.dagsStartTime = clock.getTime();
AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
startTime, dagsStartTime, appSubmitTime);
dispatcher.getEventHandler().handle(
new DAGHistoryEvent(this.dagId, startEvent));
// All components have started, start the job.
startDags();
}
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
*
* TODO: Rework the design to actually support this. Currently much of the
* job stuff has been moved to init() above to support uberization (MR-1220).
* In a typical workflow, one presumably would want to uberize only a subset
* of the jobs (the "small" ones), which is awkward with the current design.
*/
@SuppressWarnings("unchecked")
protected void startDags() {
/** create a job-start event to get this ball rolling */
DAGEvent startDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_START);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startDagEvent);
}
private class DagEventDispatcher implements EventHandler<DAGEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(DAGEvent event) {
((EventHandler<DAGEvent>)context.getDAG()).handle(event);
}
}
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
Task task =
context.getDAG().getVertex(event.getTaskID().getVertexID()).
getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
}
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
DAG dag = context.getDAG();
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
getTask(event.getTaskAttemptID().getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
}
private class VertexEventDispatcher
implements EventHandler<VertexEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(VertexEvent event) {
DAG dag = context.getDAG();
org.apache.tez.dag.app.dag.Vertex vertex =
dag.getVertex(event.getVertexId());
((EventHandler<VertexEvent>) vertex).handle(event);
}
}
private static void validateInputParam(String value, String param)
throws IOException {
if (value == null) {
String msg = param + " is null";
LOG.error(msg);
throw new IOException(msg);
}
}
public static void main(String[] args) {
try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
// TODO: Deprecated keys?
//DeprecatedKeys.init();
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(Environment.NM_HOST.name());
String nodePortString = System.getenv(Environment.NM_PORT.name());
String nodeHttpPortString =
System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
Options opts = getCliOptions();
CommandLine cliParser = new GnuParser().parse(opts, args);
// Default to running mr if nothing specified.
// TODO change this once the client is ready.
String type;
TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
DAGPlan dagPlan = null;
if (cliParser.hasOption(OPT_PREDEFINED)) {
LOG.info("Running with PreDefined configuration");
type = cliParser.getOptionValue(OPT_PREDEFINED, "mr");
LOG.info("Running job type: " + type);
if (type.equals("mr")) {
dagPlan = MRRExampleHelper.createDAGConfigurationForMR();
} else if (type.equals("mrr")) {
dagPlan = MRRExampleHelper.createDAGConfigurationForMRR();
}
}
else {
// Read the protobuf DAG
DAGPlan.Builder dagPlanBuilder = DAGPlan.newBuilder();
FileInputStream dagPBBinaryStream = null;
try {
dagPBBinaryStream = new FileInputStream(TezConfiguration.DAG_AM_PLAN_PB_BINARY);
dagPlanBuilder.mergeFrom(dagPBBinaryStream);
}
finally {
if(dagPBBinaryStream != null){
dagPBBinaryStream.close();
}
}
dagPlan = dagPlanBuilder.build();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Running a DAG with "
+ dagPlan.getVertexCount() + " vertices ");
for (VertexPlan v : dagPlan.getVertexList()) {
LOG.debug("DAG has vertex " + v.getName());
}
}
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
// Do not automatically close FileSystem objects so that in case of
// SIGTERM I have a chance to write out the job history. I'll be closing
// the objects myself.
conf.setBoolean("fs.automatic.close", false);
Map<String, String> config = DagTypeConverters.createSettingsMapFromDAGPlan(dagPlan.getJobSettingList());
for(Entry<String, String> entry : config.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
DAGAppMaster appMaster =
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime, dagPlan);
ShutdownHookManager.get().addShutdownHook(
new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
initAndStartAppMaster(appMaster, conf,
jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
System.exit(1);
}
}
private static String OPT_PREDEFINED = "predefined";
private static Options getCliOptions() {
Options opts = new Options();
opts.addOption(OPT_PREDEFINED, true,
"Whether to run the predefined MR/MRR jobs");
return opts;
}
// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
static class DAGAppMasterShutdownHook implements Runnable {
DAGAppMaster appMaster;
DAGAppMasterShutdownHook(DAGAppMaster appMaster) {
this.appMaster = appMaster;
}
public void run() {
LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler and "
+ "JobHistoryEventHandler.");
// Notify TaskScheduler that a SIGTERM has been received so
// that it doesnt take too long in shutting down
// Signal the task scheduler.
if(appMaster.getServiceState() == STATE.STARTED) {
appMaster.taskSchedulerEventHandler.setSignalled(true);
}
if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED,
DAGAppMasterState.RUNNING).contains(appMaster.state)) {
// DAG not in a final state. Must have receive a KILL signal
appMaster.state = DAGAppMasterState.KILLED;
}
appMaster.stop();
}
}
// TODO XXX Does this really need to be a YarnConfiguration ?
protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
final TezConfiguration conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
return null;
}
});
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
dispatcher.getEventHandler().handle(event);
}
}