blob: fbf5e9d40e65581cfcf03a675683d0e3acff81c0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.security.TokenCache;
import com.google.common.annotations.VisibleForTesting;
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
EventHandler<DAGEvent> {
private static final Log LOG = LogFactory.getLog(DAGImpl.class);
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
//final fields
private final TezDAGID dagId;
private final Clock clock;
private final ApplicationACLsManager aclsManager;
// TODO Recovery
//private final List<AMInfo> amInfos;
private final Lock readLock;
private final Lock writeLock;
private final String dagName;
private final TaskAttemptListener taskAttemptListener;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private final Object tasksSyncHandle = new Object();
@VisibleForTesting
DAGScheduler dagScheduler;
private final EventHandler eventHandler;
// TODO Metrics
//private final MRAppMetrics metrics;
private final String userName;
private final String queueName;
private final AppContext appContext;
volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
private Map<String, Edge> edges = new HashMap<String, Edge>();
private TezCounters dagCounters = new TezCounters();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
public final Configuration conf;
private final DAGPlan jobPlan;
private final List<String> diagnostics = new ArrayList<String>();
private static final DiagnosticsUpdateTransition
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
new CounterUpdateTransition();
private static final DAGSchedulerUpdateTransition
DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
stateMachineFactory
= new StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
(DAGState.NEW)
// Transitions from NEW state
.addTransition(DAGState.NEW, DAGState.NEW,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.NEW, DAGState.NEW,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(DAGState.NEW,
EnumSet.of(DAGState.INITED, DAGState.FAILED),
DAGEventType.DAG_INIT,
new InitTransition())
.addTransition(DAGState.NEW, DAGState.KILLED,
DAGEventType.DAG_KILL,
new KillNewJobTransition())
.addTransition(DAGState.NEW, DAGState.ERROR,
DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from INITED state
.addTransition(DAGState.INITED, DAGState.INITED,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.INITED, DAGState.INITED,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(DAGState.INITED, DAGState.RUNNING,
DAGEventType.DAG_START,
new StartTransition())
.addTransition(DAGState.INITED, DAGState.KILLED,
DAGEventType.DAG_KILL,
new KillInitedJobTransition())
.addTransition(DAGState.INITED, DAGState.ERROR,
DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
.addTransition
(DAGState.RUNNING,
EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
DAGEventType.DAG_VERTEX_COMPLETED,
new VertexCompletedTransition())
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_VERTEX_RERUNNING,
new VertexReRunningTransition())
.addTransition(DAGState.RUNNING, DAGState.TERMINATING,
DAGEventType.DAG_KILL, new DAGKilledTransition())
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE,
DAG_SCHEDULER_UPDATE_TRANSITION)
.addTransition(
DAGState.RUNNING,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from TERMINATING state.
.addTransition
(DAGState.TERMINATING,
EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED),
DAGEventType.DAG_VERTEX_COMPLETED,
new VertexCompletedTransition())
.addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
DAGState.TERMINATING,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
EnumSet.of(DAGEventType.DAG_KILL,
DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE))
// Transitions from SUCCEEDED state
.addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
DAGState.SUCCEEDED,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
EnumSet.of(DAGEventType.DAG_KILL,
DAGEventType.DAG_VERTEX_COMPLETED))
// Transitions from FAILED state
.addTransition(DAGState.FAILED, DAGState.FAILED,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.FAILED, DAGState.FAILED,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
DAGState.FAILED,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.FAILED, DAGState.FAILED,
EnumSet.of(DAGEventType.DAG_KILL,
DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_VERTEX_COMPLETED))
// Transitions from KILLED state
.addTransition(DAGState.KILLED, DAGState.KILLED,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.KILLED, DAGState.KILLED,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
DAGState.KILLED,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.KILLED, DAGState.KILLED,
EnumSet.of(DAGEventType.DAG_KILL,
DAGEventType.DAG_START,
DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE,
DAGEventType.DAG_VERTEX_COMPLETED))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
DAGState.ERROR,
DAGState.ERROR,
EnumSet.of(DAGEventType.DAG_INIT,
DAGEventType.DAG_KILL,
DAGEventType.DAG_VERTEX_COMPLETED,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DAGEventType.INTERNAL_ERROR))
.addTransition(DAGState.ERROR, DAGState.ERROR,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();
private final StateMachine<DAGState, DAGEventType, DAGEvent> stateMachine;
//changing fields while the job is running
private int numVertices;
private int numCompletedVertices = 0;
private int numSuccessfulVertices = 0;
private int numFailedVertices = 0;
private int numKilledVertices = 0;
private boolean isUber = false;
private DAGTerminationCause terminationCause;
private Credentials credentials;
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
private long initTime;
private long startTime;
private long finishTime;
public DAGImpl(TezDAGID dagId,
Configuration conf,
DAGPlan jobPlan,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
String appUserName,
TaskHeartbeatHandler thh,
AppContext appContext) {
this.dagId = dagId;
this.jobPlan = jobPlan;
this.conf = conf;
this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
this.userName = appUserName;
this.clock = clock;
this.appContext = appContext;
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.taskAttemptListener = taskAttemptListener;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.credentials = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
this.aclsManager = new ApplicationACLsManager(conf);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
return stateMachine;
}
@Override
public TezDAGID getID() {
return dagId;
}
// TODO maybe removed after TEZ-74
@Override
public Configuration getConf() {
return conf;
}
@Override
public DAGPlan getJobPlan() {
return jobPlan;
}
EventHandler getEventHandler() {
return this.eventHandler;
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
ApplicationAccessType jobOperation) {
return aclsManager.checkAccess(callerUGI, jobOperation, userName,
this.dagId.getApplicationId());
}
@Override
public Vertex getVertex(TezVertexID vertexID) {
readLock.lock();
try {
return vertices.get(vertexID);
} finally {
readLock.unlock();
}
}
@Override
public boolean isUber() {
return isUber;
}
@Override
public TezCounters getAllCounters() {
readLock.lock();
try {
DAGState state = getInternalState();
if (state == DAGState.ERROR || state == DAGState.FAILED
|| state == DAGState.KILLED || state == DAGState.SUCCEEDED) {
this.mayBeConstructFinalFullCounters();
return fullCounters;
}
TezCounters counters = new TezCounters();
counters.incrAllCounters(dagCounters);
return incrTaskCounters(counters, vertices.values());
} finally {
readLock.unlock();
}
}
public static TezCounters incrTaskCounters(
TezCounters counters, Collection<Vertex> vertices) {
for (Vertex vertex : vertices) {
counters.incrAllCounters(vertex.getAllCounters());
}
return counters;
}
@Override
public List<String> getDiagnostics() {
readLock.lock();
try {
return diagnostics;
} finally {
readLock.unlock();
}
}
@Override
public DAGReport getReport() {
readLock.lock();
try {
StringBuilder diagsb = new StringBuilder();
for (String s : getDiagnostics()) {
diagsb.append(s).append("\n");
}
if (getInternalState() == DAGState.NEW) {
/*
return MRBuilderUtils.newJobReport(dagId, dagName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
*/
// TODO
return TezBuilderUtils.newDAGReport();
}
// TODO
return TezBuilderUtils.newDAGReport();
/*
return MRBuilderUtils.newJobReport(dagId, dagName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
*/
} finally {
readLock.unlock();
}
}
@Override
public float getProgress() {
this.readLock.lock();
try {
float progress = 0.0f;
for (Vertex v : getVertices().values()) {
progress += v.getProgress();
}
return progress / getTotalVertices();
} finally {
this.readLock.unlock();
}
}
@Override
public Map<TezVertexID, Vertex> getVertices() {
synchronized (tasksSyncHandle) {
return Collections.unmodifiableMap(vertices);
}
}
@Override
public DAGState getState() {
readLock.lock();
try {
return getStateMachine().getCurrentState();
} finally {
readLock.unlock();
}
}
// monitoring apis
@Override
public DAGStatusBuilder getDAGStatus() {
DAGStatusBuilder status = new DAGStatusBuilder();
int totalTaskCount = 0;
int totalSucceededTaskCount = 0;
int totalRunningTaskCount = 0;
int totalFailedTaskCount = 0;
int totalKilledTaskCount = 0;
readLock.lock();
try {
for(Map.Entry<String, Vertex> entry : vertexMap.entrySet()) {
ProgressBuilder progress = entry.getValue().getVertexProgress();
status.addVertexProgress(entry.getKey(), progress);
totalTaskCount += progress.getTotalTaskCount();
totalSucceededTaskCount += progress.getSucceededTaskCount();
totalRunningTaskCount += progress.getRunningTaskCount();
totalFailedTaskCount += progress.getFailedTaskCount();
totalKilledTaskCount += progress.getKilledTaskCount();
}
ProgressBuilder dagProgress = new ProgressBuilder();
dagProgress.setTotalTaskCount(totalTaskCount);
dagProgress.setSucceededTaskCount(totalSucceededTaskCount);
dagProgress.setRunningTaskCount(totalRunningTaskCount);
dagProgress.setFailedTaskCount(totalFailedTaskCount);
dagProgress.setKilledTaskCount(totalKilledTaskCount);
status.setState(getState());
status.setDiagnostics(diagnostics);
status.setDAGProgress(dagProgress);
return status;
} finally {
readLock.unlock();
}
}
@Override
public VertexStatusBuilder getVertexStatus(String vertexName) {
Vertex vertex = vertexMap.get(vertexName);
if(vertex == null) {
return null;
}
return vertex.getVertexStatus();
}
protected void startRootVertices() {
for (Vertex v : vertices.values()) {
if (v.getInputVerticesCount() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting root vertex " + v.getName());
}
eventHandler.handle(new VertexEvent(v.getVertexId(),
VertexEventType.V_START));
}
}
}
protected void initializeVertices() {
for (Vertex v : vertices.values()) {
eventHandler.handle(new VertexEvent(v.getVertexId(),
VertexEventType.V_INIT));
}
}
@Override
/**
* The only entry point to change the DAG.
*/
public void handle(DAGEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing DAGEvent " + event.getDAGId() + " of type "
+ event.getType() + " while in state " + getInternalState()
+ ". Event: " + event);
}
try {
writeLock.lock();
DAGState oldState = getInternalState();
try {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
addDiagnostic("Invalid event " + event.getType() +
" on Job " + this.dagId);
eventHandler.handle(new DAGEvent(this.dagId,
DAGEventType.INTERNAL_ERROR));
}
//notify the eventhandler of state change
if (oldState != getInternalState()) {
LOG.info(dagId + " transitioned from " + oldState + " to "
+ getInternalState());
}
}
finally {
writeLock.unlock();
}
}
@Private
public DAGState getInternalState() {
readLock.lock();
try {
return getStateMachine().getCurrentState();
} finally {
readLock.unlock();
}
}
void setFinishTime() {
finishTime = clock.getTime();
}
void logJobHistoryFinishedEvent() {
this.setFinishTime();
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
finishTime, DAGStatus.State.SUCCEEDED, "", getAllCounters());
this.eventHandler.handle(
new DAGHistoryEvent(finishEvt));
}
void logJobHistoryInitedEvent() {
// FIXME should we have more information in this event?
// numVertices, etc?
DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
this.initTime, this.startTime);
this.eventHandler.handle(
new DAGHistoryEvent(startEvt));
}
void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
getAllCounters());
this.eventHandler.handle(
new DAGHistoryEvent(finishEvt));
}
static DAGState checkJobForCompletion(DAGImpl dag) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking dag completion"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ ", numFailedVertices=" + dag.numFailedVertices
+ ", numKilledVertices=" + dag.numKilledVertices
+ ", numVertices=" + dag.numVertices
+ ", terminationCause=" + dag.terminationCause);
}
// log in case of accounting error.
if (dag.numCompletedVertices > dag.numVertices) {
LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ ", numVertices=" + dag.numVertices
);
}
if (dag.numCompletedVertices == dag.numVertices) {
//Only succeed if vertices complete successfully and no terminationCause is registered.
if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
dag.setFinishTime();
dag.logJobHistoryFinishedEvent();
return dag.finished(DAGState.SUCCEEDED);
}
else if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
dag.setFinishTime();
String diagnosticMsg = "DAG killed due to user-initiated kill." +
" failedVertices:" + dag.numFailedVertices +
" killedVertices:" + dag.numKilledVertices;
LOG.info(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
dag.abortJob(DAGStatus.State.KILLED);
return dag.finished(DAGState.KILLED);
}
if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
dag.setFinishTime();
String diagnosticMsg = "DAG failed due to vertex failure." +
" failedVertices:" + dag.numFailedVertices +
" killedVertices:" + dag.numKilledVertices;
LOG.info(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
dag.abortJob(DAGStatus.State.FAILED);
return dag.finished(DAGState.FAILED);
}
else {
// should never get here.
throw new TezUncheckedException("All vertices complete, but cannot determine final state of DAG"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ ", numFailedVertices=" + dag.numFailedVertices
+ ", numKilledVertices=" + dag.numKilledVertices
+ ", numVertices=" + dag.numVertices
+ ", terminationCause=" + dag.terminationCause);
}
}
//return the current state, Job not finished yet
return dag.getInternalState();
}
DAGState finished(DAGState finalState) {
// TODO Metrics
/*
if (getInternalState() == DAGState.RUNNING) {
metrics.endRunningJob(this);
}
*/
if (finishTime == 0) setFinishTime();
eventHandler.handle(new DAGAppMasterEventDAGFinished(getID()));
// TODO Metrics
/*
switch (finalState) {
case KILLED:
metrics.killedJob(this);
break;
case FAILED:
metrics.failedJob(this);
break;
case SUCCEEDED:
metrics.completedJob(this);
}
*/
return finalState;
}
@Override
public String getUserName() {
return userName;
}
@Override
public String getQueueName() {
return queueName;
}
@Override
public String getName() {
return dagName;
}
@Override
public int getTotalVertices() {
readLock.lock();
try {
return numVertices;
} finally {
readLock.unlock();
}
}
@Override
public int getSuccessfulVertices() {
readLock.lock();
try {
return numSuccessfulVertices;
} finally {
readLock.unlock();
}
}
/**
* Set the terminationCause if it had not yet been set.
*
* @param trigger The trigger
* @return true if setting the value succeeded.
*/
boolean trySetTerminationCause(DAGTerminationCause trigger) {
if(terminationCause == null){
terminationCause = trigger;
return true;
}
return false;
}
DAGTerminationCause getTerminationCause() {
readLock.lock();
try {
return terminationCause;
} finally {
readLock.unlock();
}
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app2.job.Job#getJobACLs()
*/
@Override
public Map<ApplicationAccessType, String> getJobACLs() {
// TODO ApplicationACLs
return null;
}
// TODO Recovery
/*
@Override
public List<AMInfo> getAMInfos() {
return amInfos;
}
*/
public static class InitTransition
implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
/**
* Note that this transition method is called directly (and synchronously)
* by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
* just plain sequential call within AM context), so we can trigger
* modifications in AM state from here (at least, if AM is written that
* way; MR version is).
*/
@Override
public DAGState transition(DAGImpl dag, DAGEvent event) {
// TODO Metrics
//dag.metrics.submittedJob(dag);
//dag.metrics.preparingJob(dag);
try {
setup(dag);
// If we have no vertices, fail the dag
dag.numVertices = dag.getJobPlan().getVertexCount();
if (dag.numVertices == 0) {
dag.addDiagnostic("No vertices for dag");
dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
dag.abortJob(DAGStatus.State.FAILED);
return dag.finished(DAGState.FAILED);
}
checkTaskLimits();
// create the vertices
for (int i=0; i < dag.numVertices; ++i) {
String vertexName = dag.getJobPlan().getVertex(i).getName();
VertexImpl v = createVertex(dag, vertexName, i);
dag.addVertex(v);
}
createDAGEdges(dag);
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
// setup the dag
for (Vertex v : dag.vertices.values()) {
parseVertexEdges(dag, edgePlans, v);
}
assignDAGScheduler(dag);
// TODO Metrics
//dag.metrics.endPreparingJob(dag);
return DAGState.INITED;
} catch (IOException e) {
LOG.warn("Job init failed", e);
dag.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
dag.abortJob(DAGStatus.State.FAILED);
// TODO Metrics
//dag.metrics.endPreparingJob(dag);
return dag.finished(DAGState.FAILED);
}
}
private void createDAGEdges(DAGImpl dag) {
for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
EdgeProperty edgeProperty = DagTypeConverters
.createEdgePropertyMapFromDAGPlan(edgePlan);
// edge manager may be also set via API when using custom edge type
dag.edges.put(edgePlan.getId(),
new Edge(edgeProperty, dag.getEventHandler()));
}
}
private void assignDAGScheduler(DAGImpl dag) {
if (dag.conf.getBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING,
TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT)) {
LOG.info("Using Natural order dag scheduler due to aggressive scheduling");
dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
} else {
boolean isMRR = true;
for (Vertex vertex : dag.vertices.values()) {
Map<Vertex, Edge> outVertices = vertex.getOutputVertices();
Map<Vertex, Edge> inVertices = vertex.getInputVertices();
if (!(outVertices == null || outVertices.isEmpty() || (outVertices
.size() == 1 && outVertices.values().iterator().next().getEdgeProperty()
.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER))) {
// more than 1 output OR single output is not bipartite
isMRR = false;
break;
}
if (!(inVertices == null || inVertices.isEmpty() || (inVertices
.size() == 1 && inVertices.values().iterator().next().getEdgeProperty()
.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER))) {
// more than 1 output OR single output is not bipartite
isMRR = false;
break;
}
}
if (isMRR) {
LOG.info("Using MRR dag scheduler");
dag.dagScheduler = new DAGSchedulerMRR(
dag,
dag.eventHandler,
dag.appContext.getTaskScheduler(),
dag.conf
.getFloat(
TezConfiguration.TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION,
TezConfiguration.TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT));
} else {
LOG.info("Using Natural order dag scheduler");
dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
}
}
}
private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
VertexLocationHint vertexLocationHint = DagTypeConverters
.convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
return new VertexImpl(
vertexId, vertexPlan, vertexName, dag.conf,
dag.eventHandler, dag.taskAttemptListener,
dag.credentials, dag.clock,
dag.taskHeartbeatHandler, dag.appContext,
vertexLocationHint);
}
// hooks up this VertexImpl to input and output EdgeProperties
private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
VertexPlan vertexPlan = vertex.getVertexPlan();
Map<Vertex, Edge> inVertices =
new HashMap<Vertex, Edge>();
Map<Vertex, Edge> outVertices =
new HashMap<Vertex, Edge>();
for(String inEdgeId : vertexPlan.getInEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(inEdgeId);
Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
Edge edge = dag.edges.get(inEdgeId);
edge.setSourceVertex(inVertex);
edge.setDestinationVertex(vertex);
inVertices.put(inVertex, edge);
}
for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(outEdgeId);
Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
Edge edge = dag.edges.get(outEdgeId);
edge.setSourceVertex(vertex);
edge.setDestinationVertex(outVertex);
outVertices.put(outVertex, edge);
}
vertex.setInputVertices(inVertices);
vertex.setOutputVertices(outVertices);
}
protected void setup(DAGImpl job) throws IOException {
job.initTime = job.clock.getTime();
String dagIdString = job.dagId.toString().replace("application", "job");
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(dagIdString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(dagIdString, job.jobToken);
LOG.info("Adding job token for " + dagIdString
+ " to jobTokenSecretManager");
// Populate the jobToken into job credentials.
TokenCache.setJobToken(job.jobToken, job.credentials);
}
/**
* If the number of tasks are greater than the configured value
* throw an exception that will fail job initialization
*/
private void checkTaskLimits() {
// no code, for now
}
} // end of InitTransition
public static class StartTransition
implements SingleArcTransition<DAGImpl, DAGEvent> {
/**
* This transition executes in the event-dispatcher thread, though it's
* triggered in MRAppMaster's startJobs() method.
*/
@Override
public void transition(DAGImpl job, DAGEvent event) {
job.startTime = job.clock.getTime();
job.initializeVertices();
job.logJobHistoryInitedEvent();
// TODO Metrics
//job.metrics.runningJob(job);
// Start all vertices with no incoming edges when job starts
job.startRootVertices();
}
}
private void abortJob(DAGStatus.State abortState) {
// TODO: DAG Committer
logJobHistoryUnsuccesfulEvent(abortState);
}
Map<String, Vertex> vertexMap = new HashMap<String, Vertex>();
void addVertex(Vertex v) {
vertices.put(v.getVertexId(), v);
vertexMap.put(v.getName(), v);
}
@Override
public Vertex getVertex(String vertexName) {
return vertexMap.get(vertexName);
}
private void mayBeConstructFinalFullCounters() {
// Calculating full-counters. This should happen only once for the job.
synchronized (this.fullCountersLock) {
if (this.fullCounters != null) {
// Already constructed. Just return.
return;
}
this.constructFinalFullcounters();
}
}
@Private
public void constructFinalFullcounters() {
this.fullCounters = new TezCounters();
this.fullCounters.incrAllCounters(dagCounters);
for (Vertex v : this.vertices.values()) {
this.fullCounters.incrAllCounters(v.getAllCounters());
}
}
/**
* Set the terminationCause and send a kill-message to all vertices.
* The vertex-kill messages are only sent once.
* @param the trigger that is causing the DAG to transition to KILLED/FAILED
* @param event The type of kill event to send to the vertices.
*/
void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
if(trySetTerminationCause(dagTerminationCause)){
for (Vertex v : vertices.values()) {
eventHandler.handle(
new VertexEventTermination(v.getVertexId(), vertexTerminationCause)
);
}
}
}
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
private static class KillNewJobTransition
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
job.setFinishTime();
job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.KILLED);
job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
job.finished(DAGState.KILLED);
}
}
private static class KillInitedJobTransition
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
job.abortJob(DAGStatus.State.KILLED);
job.addDiagnostic("Job received Kill in INITED state.");
job.finished(DAGState.KILLED);
}
}
private static class DAGKilledTransition
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
job.addDiagnostic("Job received Kill while in RUNNING state.");
job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
// TODO Metrics
//job.metrics.endRunningJob(job);
}
}
private static class VertexCompletedTransition implements
MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
@Override
public DAGState transition(DAGImpl job, DAGEvent event) {
boolean forceTransitionToKillWait = false;
DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
if (LOG.isDebugEnabled()) {
LOG.debug("Received a vertex completion event"
+ ", vertexId=" + vertexEvent.getVertexId()
+ ", vertexState=" + vertexEvent.getVertexState());
}
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
job.vertexSucceeded(vertex);
job.dagScheduler.vertexCompleted(vertex);
}
else if (vertexEvent.getVertexState() == VertexState.FAILED) {
job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
job.vertexFailed(vertex);
forceTransitionToKillWait = true;
}
else if (vertexEvent.getVertexState() == VertexState.KILLED) {
job.vertexKilled(vertex);
forceTransitionToKillWait = true;
}
LOG.info("Vertex " + vertex.getVertexId() + " completed."
+ ", numCompletedVertices=" + job.numCompletedVertices
+ ", numSuccessfulVertices=" + job.numSuccessfulVertices
+ ", numFailedVertices=" + job.numFailedVertices
+ ", numKilledVertices=" + job.numKilledVertices
+ ", numVertices=" + job.numVertices);
// if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
DAGState state = checkJobForCompletion(job);
if(state == DAGState.RUNNING && forceTransitionToKillWait){
return DAGState.TERMINATING;
}
else {
return state;
}
}
}
private static class VertexReRunningTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices--;
job.vertexReRunning(vertex);
LOG.info("Vertex " + vertex.getVertexId() + " re-running."
+ ", numCompletedVertices=" + job.numCompletedVertices
+ ", numSuccessfulVertices=" + job.numSuccessfulVertices
+ ", numFailedVertices=" + job.numFailedVertices
+ ", numKilledVertices=" + job.numKilledVertices
+ ", numVertices=" + job.numVertices);
}
}
private void vertexSucceeded(Vertex vertex) {
numSuccessfulVertices++;
// TODO: Metrics
//job.metrics.completedTask(task);
}
private void vertexReRunning(Vertex vertex) {
numSuccessfulVertices--;
addDiagnostic("Vertex re-running " + vertex.getVertexId());
// TODO: Metrics
//job.metrics.completedTask(task);
}
private void vertexFailed(Vertex vertex) {
numFailedVertices++;
addDiagnostic("Vertex failed " + vertex.getVertexId());
// TODO: Metrics
//job.metrics.failedTask(task);
}
private void vertexKilled(Vertex vertex) {
numKilledVertices++;
addDiagnostic("Vertex killed " + vertex.getVertexId());
// TODO: Metrics
//job.metrics.killedTask(task);
}
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
private static class DiagnosticsUpdateTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
job.addDiagnostic(((DAGEventDiagnosticsUpdate) event)
.getDiagnosticUpdate());
}
}
private static class CounterUpdateTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
DAGEventCounterUpdate jce = (DAGEventCounterUpdate) event;
for (DAGEventCounterUpdate.CounterIncrementalUpdate ci : jce
.getCounterUpdates()) {
job.dagCounters.findCounter(ci.getCounterKey()).increment(
ci.getIncrementValue());
}
}
}
private static class DAGSchedulerUpdateTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl dag, DAGEvent event) {
DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate) event;
switch(sEvent.getUpdateType()) {
case TA_SCHEDULE:
dag.dagScheduler.scheduleTask(sEvent);
break;
case TA_SCHEDULED:
DAGEventSchedulerUpdateTAAssigned taEvent =
(DAGEventSchedulerUpdateTAAssigned) sEvent;
dag.dagScheduler.taskScheduled(taEvent);
break;
case TA_SUCCEEDED:
dag.dagScheduler.taskSucceeded(sEvent);
break;
default:
throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:"
+ sEvent.getUpdateType());
}
}
}
private static class InternalErrorTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
//TODO Is this JH event required.
LOG.info(job.getID() + " terminating due to internal error");
// terminate all vertices
job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
VertexTerminationCause.INTERNAL_ERROR);
job.setFinishTime();
job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.FAILED);
job.finished(DAGState.ERROR);
}
}
@Override
public boolean isComplete() {
readLock.lock();
try {
DAGState state = getState();
if (state.equals(DAGState.SUCCEEDED)
|| state.equals(DAGState.FAILED)
|| state.equals(DAGState.KILLED)
|| state.equals(DAGState.ERROR)) {
return true;
}
return false;
} finally {
readLock.unlock();
}
}
}