| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import java.io.IOException; |
| import java.net.URL; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitonException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.tez.common.ATSConstants; |
| import org.apache.tez.common.ReflectionUtils; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.DagTypeConverters; |
| import org.apache.tez.dag.api.EdgeProperty; |
| import org.apache.tez.dag.api.InputDescriptor; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.api.VertexLocationHint; |
| import org.apache.tez.dag.api.client.DAGStatus; |
| import org.apache.tez.dag.api.client.DAGStatusBuilder; |
| import org.apache.tez.dag.api.client.ProgressBuilder; |
| import org.apache.tez.dag.api.client.StatusGetOpts; |
| import org.apache.tez.dag.api.client.VertexStatus; |
| import org.apache.tez.dag.api.client.VertexStatusBuilder; |
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; |
| import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo; |
| import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.TaskAttemptListener; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.DAG; |
| import org.apache.tez.dag.app.dag.DAGReport; |
| import org.apache.tez.dag.app.dag.DAGScheduler; |
| import org.apache.tez.dag.app.dag.DAGState; |
| import org.apache.tez.dag.app.dag.DAGTerminationCause; |
| import org.apache.tez.dag.app.dag.StateChangeNotifier; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.VertexState; |
| import org.apache.tez.dag.app.dag.VertexTerminationCause; |
| import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; |
| import org.apache.tez.dag.app.dag.event.DAGEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; |
| import org.apache.tez.dag.app.dag.event.DAGEventStartDag; |
| import org.apache.tez.dag.app.dag.event.DAGEventType; |
| import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; |
| import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; |
| import org.apache.tez.dag.app.dag.event.VertexEvent; |
| import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; |
| import org.apache.tez.dag.app.dag.event.VertexEventTermination; |
| import org.apache.tez.dag.app.dag.event.VertexEventType; |
| import org.apache.tez.common.security.ACLManager; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.HistoryEvent; |
| import org.apache.tez.dag.history.events.DAGCommitStartedEvent; |
| import org.apache.tez.dag.history.events.DAGFinishedEvent; |
| import org.apache.tez.dag.history.events.DAGInitializedEvent; |
| import org.apache.tez.dag.history.events.DAGStartedEvent; |
| import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; |
| import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; |
| import org.apache.tez.dag.utils.RelocalizationUtils; |
| import org.apache.tez.dag.utils.TezBuilderUtils; |
| import org.apache.tez.runtime.api.OutputCommitter; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** Implementation of Job interface. Maintains the state machines of Job. |
| * The read and write calls use ReadWriteLock for concurrency. |
| */ |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, |
| EventHandler<DAGEvent> { |
| |
| private static final Log LOG = LogFactory.getLog(DAGImpl.class); |
| private static final String LINE_SEPARATOR = System |
| .getProperty("line.separator"); |
| |
| //final fields |
| private final TezDAGID dagId; |
| private final Clock clock; |
| |
| // TODO Recovery |
| //private final List<AMInfo> amInfos; |
| private final Lock readLock; |
| private final Lock writeLock; |
| private final String dagName; |
| private final TaskAttemptListener taskAttemptListener; |
| private final TaskHeartbeatHandler taskHeartbeatHandler; |
| private final Object tasksSyncHandle = new Object(); |
| |
| private volatile boolean committedOrAborted = false; |
| private volatile boolean allOutputsCommitted = false; |
| boolean commitAllOutputsOnSuccess = true; |
| |
| @VisibleForTesting |
| DAGScheduler dagScheduler; |
| |
| private final EventHandler eventHandler; |
| // TODO Metrics |
| //private final MRAppMetrics metrics; |
| private final String userName; |
| private final AppContext appContext; |
| private final UserGroupInformation dagUGI; |
| private final ACLManager aclManager; |
| private final StateChangeNotifier entityUpdateTracker; |
| |
| volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>(); |
| @VisibleForTesting |
| Map<String, Edge> edges = new HashMap<String, Edge>(); |
| private TezCounters dagCounters = new TezCounters(); |
| private Object fullCountersLock = new Object(); |
| @VisibleForTesting |
| TezCounters fullCounters = null; |
| private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>(); |
| |
| public final Configuration conf; |
| private final DAGPlan jobPlan; |
| |
| Map<String, LocalResource> localResources; |
| |
| private final List<String> diagnostics = new ArrayList<String>(); |
| |
| // Recovery related flags |
| boolean recoveryInitEventSeen = false; |
| boolean recoveryStartEventSeen = false; |
| |
| private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption; |
| |
| private static final DiagnosticsUpdateTransition |
| DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); |
| private static final InternalErrorTransition |
| INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); |
| private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION = |
| new CounterUpdateTransition(); |
| private static final DAGSchedulerUpdateTransition |
| DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition(); |
| |
| protected static final |
| StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent> |
| stateMachineFactory |
| = new StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent> |
| (DAGState.NEW) |
| |
| // Transitions from NEW state |
| .addTransition(DAGState.NEW, DAGState.NEW, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.NEW, |
| EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING, |
| DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED, |
| DAGState.ERROR, DAGState.TERMINATING), |
| DAGEventType.DAG_RECOVER, |
| new RecoverTransition()) |
| .addTransition(DAGState.NEW, DAGState.NEW, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition |
| (DAGState.NEW, |
| EnumSet.of(DAGState.INITED, DAGState.FAILED), |
| DAGEventType.DAG_INIT, |
| new InitTransition()) |
| .addTransition(DAGState.NEW, DAGState.KILLED, |
| DAGEventType.DAG_KILL, |
| new KillNewJobTransition()) |
| .addTransition(DAGState.NEW, DAGState.ERROR, |
| DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Transitions from INITED state |
| .addTransition(DAGState.INITED, DAGState.INITED, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.INITED, DAGState.INITED, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition(DAGState.INITED, DAGState.RUNNING, |
| DAGEventType.DAG_START, |
| new StartTransition()) |
| .addTransition(DAGState.INITED, DAGState.KILLED, |
| DAGEventType.DAG_KILL, |
| new KillInitedJobTransition()) |
| .addTransition(DAGState.INITED, DAGState.ERROR, |
| DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Transitions from RUNNING state |
| .addTransition |
| (DAGState.RUNNING, |
| EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED), |
| DAGEventType.DAG_VERTEX_COMPLETED, |
| new VertexCompletedTransition()) |
| .addTransition(DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING), |
| DAGEventType.DAG_VERTEX_RERUNNING, |
| new VertexReRunningTransition()) |
| .addTransition(DAGState.RUNNING, DAGState.TERMINATING, |
| DAGEventType.DAG_KILL, new DAGKilledTransition()) |
| .addTransition(DAGState.RUNNING, DAGState.RUNNING, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.RUNNING, DAGState.RUNNING, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition(DAGState.RUNNING, DAGState.RUNNING, |
| DAGEventType.DAG_SCHEDULER_UPDATE, |
| DAG_SCHEDULER_UPDATE_TRANSITION) |
| .addTransition( |
| DAGState.RUNNING, |
| DAGState.ERROR, DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Transitions from TERMINATING state. |
| .addTransition |
| (DAGState.TERMINATING, |
| EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED, |
| DAGState.ERROR), |
| DAGEventType.DAG_VERTEX_COMPLETED, |
| new VertexCompletedTransition()) |
| .addTransition(DAGState.TERMINATING, DAGState.TERMINATING, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.TERMINATING, DAGState.TERMINATING, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition( |
| DAGState.TERMINATING, |
| DAGState.ERROR, DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Ignore-able events |
| .addTransition(DAGState.TERMINATING, DAGState.TERMINATING, |
| EnumSet.of(DAGEventType.DAG_KILL, |
| DAGEventType.DAG_VERTEX_RERUNNING, |
| DAGEventType.DAG_SCHEDULER_UPDATE)) |
| |
| // Transitions from SUCCEEDED state |
| .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition( |
| DAGState.SUCCEEDED, |
| DAGState.ERROR, DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED, |
| EnumSet.of(DAGEventType.DAG_KILL, |
| DAGEventType.DAG_SCHEDULER_UPDATE, |
| DAGEventType.DAG_VERTEX_COMPLETED)) |
| |
| // Transitions from FAILED state |
| .addTransition(DAGState.FAILED, DAGState.FAILED, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.FAILED, DAGState.FAILED, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition( |
| DAGState.FAILED, |
| DAGState.ERROR, DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(DAGState.FAILED, DAGState.FAILED, |
| EnumSet.of(DAGEventType.DAG_KILL, |
| DAGEventType.DAG_START, |
| DAGEventType.DAG_VERTEX_RERUNNING, |
| DAGEventType.DAG_SCHEDULER_UPDATE, |
| DAGEventType.DAG_VERTEX_COMPLETED)) |
| |
| // Transitions from KILLED state |
| .addTransition(DAGState.KILLED, DAGState.KILLED, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DIAGNOSTIC_UPDATE_TRANSITION) |
| .addTransition(DAGState.KILLED, DAGState.KILLED, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| .addTransition( |
| DAGState.KILLED, |
| DAGState.ERROR, DAGEventType.INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(DAGState.KILLED, DAGState.KILLED, |
| EnumSet.of(DAGEventType.DAG_KILL, |
| DAGEventType.DAG_START, |
| DAGEventType.DAG_VERTEX_RERUNNING, |
| DAGEventType.DAG_SCHEDULER_UPDATE, |
| DAGEventType.DAG_VERTEX_COMPLETED)) |
| |
| // No transitions from INTERNAL_ERROR state. Ignore all. |
| .addTransition( |
| DAGState.ERROR, |
| DAGState.ERROR, |
| EnumSet.of( |
| DAGEventType.DAG_KILL, |
| DAGEventType.DAG_INIT, |
| DAGEventType.DAG_START, |
| DAGEventType.DAG_VERTEX_COMPLETED, |
| DAGEventType.DAG_VERTEX_RERUNNING, |
| DAGEventType.DAG_SCHEDULER_UPDATE, |
| DAGEventType.DAG_DIAGNOSTIC_UPDATE, |
| DAGEventType.INTERNAL_ERROR, |
| DAGEventType.DAG_COUNTER_UPDATE)) |
| .addTransition(DAGState.ERROR, DAGState.ERROR, |
| DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) |
| // create the topology tables |
| .installTopology(); |
| |
| private final StateMachine<DAGState, DAGEventType, DAGEvent> stateMachine; |
| |
| //changing fields while the job is running |
| @VisibleForTesting |
| int numCompletedVertices = 0; |
| private int numVertices; |
| private int numSuccessfulVertices = 0; |
| private int numFailedVertices = 0; |
| private int numKilledVertices = 0; |
| private boolean isUber = false; |
| private DAGTerminationCause terminationCause; |
| private Credentials credentials; |
| |
| @VisibleForTesting |
| long initTime; |
| @VisibleForTesting |
| long startTime; |
| @VisibleForTesting |
| long finishTime; |
| |
| Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap(); |
| Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap(); |
| private DAGState recoveredState = DAGState.NEW; |
| @VisibleForTesting |
| boolean recoveryCommitInProgress = false; |
| Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>(); |
| |
| static class VertexGroupInfo { |
| String groupName; |
| Set<String> groupMembers; |
| Set<String> outputs; |
| Map<String, InputDescriptor> edgeMergedInputs; |
| int successfulMembers; |
| boolean committed; |
| VertexGroupInfo(PlanVertexGroupInfo groupInfo) { |
| groupName = groupInfo.getGroupName(); |
| groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList()); |
| edgeMergedInputs = Maps.newHashMapWithExpectedSize(groupInfo.getEdgeMergedInputsCount()); |
| for (PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) { |
| edgeMergedInputs.put(edgInfo.getDestVertexName(), |
| DagTypeConverters.convertInputDescriptorFromDAGPlan(edgInfo.getMergedInput())); |
| } |
| outputs = Sets.newHashSet(groupInfo.getOutputsList()); |
| successfulMembers = 0; |
| committed = false; |
| } |
| } |
| |
| public DAGImpl(TezDAGID dagId, |
| Configuration conf, |
| DAGPlan jobPlan, |
| EventHandler eventHandler, |
| TaskAttemptListener taskAttemptListener, |
| Credentials dagCredentials, |
| Clock clock, |
| String appUserName, |
| TaskHeartbeatHandler thh, |
| AppContext appContext) { |
| this.dagId = dagId; |
| this.jobPlan = jobPlan; |
| this.conf = conf; |
| this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>"; |
| |
| this.userName = appUserName; |
| this.clock = clock; |
| this.appContext = appContext; |
| |
| this.taskAttemptListener = taskAttemptListener; |
| this.taskHeartbeatHandler = thh; |
| this.eventHandler = eventHandler; |
| ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); |
| this.readLock = readWriteLock.readLock(); |
| this.writeLock = readWriteLock.writeLock(); |
| |
| this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(jobPlan |
| .getLocalResourceList()); |
| |
| this.credentials = dagCredentials; |
| if (this.credentials == null) { |
| try { |
| dagUGI = UserGroupInformation.getCurrentUser(); |
| } catch (IOException e) { |
| throw new TezUncheckedException("Failed to set UGI for dag based on currentUser", e); |
| } |
| } else { |
| dagUGI = UserGroupInformation.createRemoteUser(this.userName); |
| dagUGI.addCredentials(this.credentials); |
| } |
| |
| this.aclManager = new ACLManager(appContext.getAMACLManager(), dagUGI.getShortUserName(), |
| this.conf); |
| |
| this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(conf); |
| // This "this leak" is okay because the retained pointer is in an |
| // instance variable. |
| stateMachine = stateMachineFactory.make(this); |
| this.entityUpdateTracker = new StateChangeNotifier(this); |
| } |
| |
| protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() { |
| return stateMachine; |
| } |
| |
| @Override |
| public TezDAGID getID() { |
| return dagId; |
| } |
| |
| @Override |
| public Map<String, LocalResource> getLocalResources() { |
| return localResources; |
| } |
| |
| // TODO maybe removed after TEZ-74 |
| @Override |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| @Override |
| public DAGPlan getJobPlan() { |
| return jobPlan; |
| } |
| |
| EventHandler getEventHandler() { |
| return this.eventHandler; |
| } |
| |
| @Override |
| public Vertex getVertex(TezVertexID vertexID) { |
| readLock.lock(); |
| try { |
| return vertices.get(vertexID); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean isUber() { |
| return isUber; |
| } |
| |
| @Override |
| public Credentials getCredentials() { |
| return this.credentials; |
| } |
| |
| @Override |
| public UserGroupInformation getDagUGI() { |
| return this.dagUGI; |
| } |
| |
| @Override |
| public DAGState restoreFromEvent(HistoryEvent historyEvent) { |
| switch (historyEvent.getEventType()) { |
| case DAG_INITIALIZED: |
| recoveredState = initializeDAG((DAGInitializedEvent) historyEvent); |
| recoveryInitEventSeen = true; |
| return recoveredState; |
| case DAG_STARTED: |
| if (!recoveryInitEventSeen) { |
| throw new RuntimeException("Started Event seen but" |
| + " no Init Event was encountered earlier"); |
| } |
| recoveryStartEventSeen = true; |
| this.startTime = ((DAGStartedEvent) historyEvent).getStartTime(); |
| recoveredState = DAGState.RUNNING; |
| return recoveredState; |
| case DAG_COMMIT_STARTED: |
| recoveryCommitInProgress = true; |
| return recoveredState; |
| case VERTEX_GROUP_COMMIT_STARTED: |
| VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = |
| (VertexGroupCommitStartedEvent) historyEvent; |
| recoveredGroupCommits.put( |
| vertexGroupCommitStartedEvent.getVertexGroupName(), false); |
| return recoveredState; |
| case VERTEX_GROUP_COMMIT_FINISHED: |
| VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = |
| (VertexGroupCommitFinishedEvent) historyEvent; |
| recoveredGroupCommits.put( |
| vertexGroupCommitFinishedEvent.getVertexGroupName(), true); |
| return recoveredState; |
| case DAG_FINISHED: |
| recoveryCommitInProgress = false; |
| DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent; |
| this.finishTime = finishedEvent.getFinishTime(); |
| recoveredState = finishedEvent.getState(); |
| this.fullCounters = finishedEvent.getTezCounters(); |
| return recoveredState; |
| default: |
| throw new RuntimeException("Unexpected event received for restoring" |
| + " state, eventType=" + historyEvent.getEventType()); |
| } |
| } |
| |
| @Override |
| public ACLManager getACLManager() { |
| return this.aclManager; |
| } |
| |
| @Override |
| public Map<String, TezVertexID> getVertexNameIDMapping() { |
| this.readLock.lock(); |
| try { |
| Map<String, TezVertexID> idNameMap = new HashMap<String, TezVertexID>(); |
| for (Vertex v : getVertices().values()) { |
| idNameMap.put(v.getName(), v.getVertexId()); |
| } |
| return idNameMap; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TezCounters getAllCounters() { |
| |
| readLock.lock(); |
| |
| try { |
| DAGState state = getInternalState(); |
| if (state == DAGState.ERROR || state == DAGState.FAILED |
| || state == DAGState.KILLED || state == DAGState.SUCCEEDED) { |
| this.mayBeConstructFinalFullCounters(); |
| return fullCounters; |
| } |
| |
| TezCounters counters = new TezCounters(); |
| counters.incrAllCounters(dagCounters); |
| return incrTaskCounters(counters, vertices.values()); |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public static TezCounters incrTaskCounters( |
| TezCounters counters, Collection<Vertex> vertices) { |
| for (Vertex vertex : vertices) { |
| counters.incrAllCounters(vertex.getAllCounters()); |
| } |
| return counters; |
| } |
| |
| @Override |
| public List<String> getDiagnostics() { |
| readLock.lock(); |
| try { |
| return diagnostics; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public DAGReport getReport() { |
| readLock.lock(); |
| try { |
| StringBuilder diagsb = new StringBuilder(); |
| for (String s : getDiagnostics()) { |
| diagsb.append(s).append("\n"); |
| } |
| |
| if (getInternalState() == DAGState.NEW) { |
| /* |
| return MRBuilderUtils.newJobReport(dagId, dagName, username, state, |
| appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, |
| cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); |
| */ |
| // TODO |
| return TezBuilderUtils.newDAGReport(); |
| } |
| |
| // TODO |
| return TezBuilderUtils.newDAGReport(); |
| /* |
| return MRBuilderUtils.newJobReport(dagId, dagName, username, state, |
| appSubmitTime, startTime, finishTime, setupProgress, |
| this.mapProgress, this.reduceProgress, |
| cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); |
| */ |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public float getProgress() { |
| this.readLock.lock(); |
| try { |
| float progress = 0.0f; |
| for (Vertex v : getVertices().values()) { |
| progress += v.getProgress(); |
| } |
| return progress / getTotalVertices(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Map<TezVertexID, Vertex> getVertices() { |
| synchronized (tasksSyncHandle) { |
| return Collections.unmodifiableMap(vertices); |
| } |
| } |
| |
| @Override |
| public DAGState getState() { |
| readLock.lock(); |
| try { |
| return getStateMachine().getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| // monitoring apis |
| @Override |
| public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) { |
| DAGStatusBuilder status = new DAGStatusBuilder(); |
| int totalTaskCount = 0; |
| int totalSucceededTaskCount = 0; |
| int totalRunningTaskCount = 0; |
| int totalFailedTaskCount = 0; |
| int totalKilledTaskCount = 0; |
| int totalFailedTaskAttemptCount = 0; |
| int totalKilledTaskAttemptCount = 0; |
| readLock.lock(); |
| try { |
| for(Map.Entry<String, Vertex> entry : vertexMap.entrySet()) { |
| ProgressBuilder progress = entry.getValue().getVertexProgress(); |
| status.addVertexProgress(entry.getKey(), progress); |
| totalTaskCount += progress.getTotalTaskCount(); |
| totalSucceededTaskCount += progress.getSucceededTaskCount(); |
| totalRunningTaskCount += progress.getRunningTaskCount(); |
| totalFailedTaskCount += progress.getFailedTaskCount(); |
| totalKilledTaskCount += progress.getKilledTaskCount(); |
| totalFailedTaskAttemptCount += progress.getFailedTaskAttemptCount(); |
| totalKilledTaskAttemptCount += progress.getKilledTaskAttemptCount(); |
| } |
| ProgressBuilder dagProgress = new ProgressBuilder(); |
| dagProgress.setTotalTaskCount(totalTaskCount); |
| dagProgress.setSucceededTaskCount(totalSucceededTaskCount); |
| dagProgress.setRunningTaskCount(totalRunningTaskCount); |
| dagProgress.setFailedTaskCount(totalFailedTaskCount); |
| dagProgress.setKilledTaskCount(totalKilledTaskCount); |
| dagProgress.setFailedTaskAttemptCount(totalFailedTaskAttemptCount); |
| dagProgress.setKilledTaskAttemptCount(totalKilledTaskAttemptCount); |
| status.setState(getState()); |
| status.setDiagnostics(diagnostics); |
| status.setDAGProgress(dagProgress); |
| if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) { |
| status.setDAGCounters(getAllCounters()); |
| } |
| return status; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private ProgressBuilder getDAGProgress() { |
| int totalTaskCount = 0; |
| int totalSucceededTaskCount = 0; |
| int totalRunningTaskCount = 0; |
| int totalFailedTaskCount = 0; |
| int totalKilledTaskCount = 0; |
| int totalFailedTaskAttemptCount = 0; |
| int totalKilledTaskAttemptCount = 0; |
| readLock.lock(); |
| try { |
| for(Map.Entry<String, Vertex> entry : vertexMap.entrySet()) { |
| ProgressBuilder progress = entry.getValue().getVertexProgress(); |
| totalTaskCount += progress.getTotalTaskCount(); |
| totalSucceededTaskCount += progress.getSucceededTaskCount(); |
| totalRunningTaskCount += progress.getRunningTaskCount(); |
| totalFailedTaskCount += progress.getFailedTaskCount(); |
| totalKilledTaskCount += progress.getKilledTaskCount(); |
| totalFailedTaskAttemptCount += progress.getFailedTaskAttemptCount(); |
| totalKilledTaskAttemptCount += progress.getKilledTaskAttemptCount(); |
| } |
| ProgressBuilder dagProgress = new ProgressBuilder(); |
| dagProgress.setTotalTaskCount(totalTaskCount); |
| dagProgress.setSucceededTaskCount(totalSucceededTaskCount); |
| dagProgress.setRunningTaskCount(totalRunningTaskCount); |
| dagProgress.setFailedTaskCount(totalFailedTaskCount); |
| dagProgress.setKilledTaskCount(totalKilledTaskCount); |
| dagProgress.setFailedTaskAttemptCount(totalFailedTaskAttemptCount); |
| dagProgress.setKilledTaskAttemptCount(totalKilledTaskAttemptCount); |
| return dagProgress; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public VertexStatusBuilder getVertexStatus(String vertexName, |
| Set<StatusGetOpts> statusOptions) { |
| Vertex vertex = vertexMap.get(vertexName); |
| if(vertex == null) { |
| return null; |
| } |
| return vertex.getVertexStatus(statusOptions); |
| } |
| |
| public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) { |
| return (TaskAttemptImpl) getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID()) |
| .getAttempt(taId); |
| } |
| |
| public TaskImpl getTask(TezTaskID tId) { |
| return (TaskImpl) getVertex(tId.getVertexID()).getTask(tId); |
| } |
| |
| protected void initializeVerticesAndStart() { |
| for (Vertex v : vertices.values()) { |
| if (v.getInputVerticesCount() == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Initing root vertex " + v.getLogIdentifier()); |
| } |
| eventHandler.handle(new VertexEvent(v.getVertexId(), |
| VertexEventType.V_INIT)); |
| } |
| } |
| for (Vertex v : vertices.values()) { |
| if (v.getInputVerticesCount() == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Starting root vertex " + v.getLogIdentifier()); |
| } |
| eventHandler.handle(new VertexEvent(v.getVertexId(), |
| VertexEventType.V_START)); |
| } |
| } |
| } |
| |
| private boolean commitOutput(String outputName, OutputCommitter outputCommitter) { |
| final OutputCommitter committer = outputCommitter; |
| try { |
| getDagUGI().doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| committer.commitOutput(); |
| return null; |
| } |
| }); |
| return true; |
| } catch (Exception e) { |
| LOG.info("Exception in committing output: " + outputName, e); |
| } |
| return false; |
| } |
| |
| private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) { |
| if (this.committedOrAborted) { |
| LOG.info("Ignoring multiple output commit/abort"); |
| return this.allOutputsCommitted; |
| } |
| LOG.info("Calling DAG commit/abort for dag: " + getID()); |
| this.committedOrAborted = true; |
| |
| boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess; |
| boolean failedWhileCommitting = false; |
| if (dagSucceeded && !successfulOutputsAlreadyCommitted) { |
| // commit all shared outputs |
| try { |
| appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), |
| new DAGCommitStartedEvent(getID(), clock.getTime()))); |
| } catch (IOException e) { |
| LOG.error("Failed to send commit event to history/recovery handler", e); |
| trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE); |
| return false; |
| } |
| for (VertexGroupInfo groupInfo : vertexGroups.values()) { |
| if (failedWhileCommitting) { |
| break; |
| } |
| if (!groupInfo.outputs.isEmpty()) { |
| groupInfo.committed = true; |
| Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); |
| for (String outputName : groupInfo.outputs) { |
| OutputCommitter committer = v.getOutputCommitters().get(outputName); |
| LOG.info("Committing output: " + outputName + " for group: " + groupInfo.groupName); |
| if (!commitOutput(outputName, committer)) { |
| failedWhileCommitting = true; |
| break; |
| } |
| } |
| } |
| } |
| // commit all other outputs |
| // we come here for successful dag completion and when outputs need to be |
| // committed at the end for all or none visibility |
| for (Vertex vertex : vertices.values()) { |
| if (failedWhileCommitting) { |
| break; |
| } |
| if (vertex.getOutputCommitters() == null) { |
| LOG.info("No output committers for vertex: " + vertex.getLogIdentifier()); |
| continue; |
| } |
| Map<String, OutputCommitter> outputCommitters = |
| new HashMap<String, OutputCommitter>(vertex.getOutputCommitters()); |
| Set<String> sharedOutputs = vertex.getSharedOutputs(); |
| // remove shared outputs |
| if (sharedOutputs != null) { |
| Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters |
| .entrySet().iterator(); |
| while (iter.hasNext()) { |
| if (sharedOutputs.contains(iter.next().getKey())) { |
| iter.remove(); |
| } |
| } |
| } |
| if (outputCommitters.isEmpty()) { |
| LOG.info("No exclusive output committers for vertex: " + vertex.getLogIdentifier()); |
| continue; |
| } |
| for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) { |
| LOG.info("Committing output: " + entry.getKey() + " for vertex: " |
| + vertex.getLogIdentifier()); |
| if (vertex.getState() != VertexState.SUCCEEDED) { |
| throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() + |
| " not in SUCCEEDED state. State= " + vertex.getState()); |
| } |
| if (!commitOutput(entry.getKey(), entry.getValue())) { |
| failedWhileCommitting = true; |
| break; |
| } |
| } |
| } |
| } |
| |
| if (failedWhileCommitting) { |
| LOG.info("DAG: " + getID() + " failed while committing"); |
| } |
| |
| if (!dagSucceeded || failedWhileCommitting) { |
| // come here because dag failed or |
| // dag succeeded and all or none semantics were on and a commit failed |
| for (Vertex vertex : vertices.values()) { |
| Map<String, OutputCommitter> outputCommitters = vertex |
| .getOutputCommitters(); |
| if (outputCommitters == null || outputCommitters.isEmpty()) { |
| LOG.info("No output committers for vertex: " + vertex.getLogIdentifier()); |
| continue; |
| } |
| for (Map.Entry<String, OutputCommitter> entry : outputCommitters |
| .entrySet()) { |
| final OutputCommitter committer = entry.getValue(); |
| if (commitAllOutputsOnSuccess // commit all outputs on success |
| || vertex.getState() != VertexState.SUCCEEDED // never commit unsuccessful outputs |
| ) { |
| LOG.info("Aborting output: " + entry.getKey() + " for vertex: " |
| + vertex.getLogIdentifier()); |
| try { |
| getDagUGI().doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| committer.abortOutput(VertexStatus.State.FAILED); |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| LOG.info("Exception in aborting output: " + entry.getKey() |
| + " for vertex: " + vertex.getLogIdentifier(), e); |
| } |
| } |
| // else successful outputs have already been committed |
| } |
| } |
| } |
| allOutputsCommitted = !failedWhileCommitting; |
| return allOutputsCommitted; |
| } |
| |
| @Override |
| /** |
| * The only entry point to change the DAG. |
| */ |
| public void handle(DAGEvent event) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing DAGEvent " + event.getDAGId() + " of type " |
| + event.getType() + " while in state " + getInternalState() |
| + ". Event: " + event); |
| } |
| try { |
| writeLock.lock(); |
| DAGState oldState = getInternalState(); |
| try { |
| getStateMachine().doTransition(event.getType(), event); |
| } catch (InvalidStateTransitonException e) { |
| LOG.error("Can't handle this event at current state", e); |
| addDiagnostic("Invalid event " + event.getType() + |
| " on Job " + this.dagId); |
| eventHandler.handle(new DAGEvent(this.dagId, |
| DAGEventType.INTERNAL_ERROR)); |
| } |
| //notify the eventhandler of state change |
| if (oldState != getInternalState()) { |
| LOG.info(dagId + " transitioned from " + oldState + " to " |
| + getInternalState()); |
| } |
| } |
| |
| finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Private |
| public DAGState getInternalState() { |
| readLock.lock(); |
| try { |
| return getStateMachine().getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| void setFinishTime() { |
| finishTime = clock.getTime(); |
| } |
| |
| private Map<String, Integer> constructTaskStats(ProgressBuilder progressBuilder) { |
| Map<String, Integer> taskStats = new HashMap<String, Integer>(); |
| taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, progressBuilder.getTotalTaskCount()); |
| taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, progressBuilder.getSucceededTaskCount()); |
| taskStats.put(ATSConstants.NUM_FAILED_TASKS, progressBuilder.getFailedTaskCount()); |
| taskStats.put(ATSConstants.NUM_KILLED_TASKS, progressBuilder.getKilledTaskCount()); |
| taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, |
| progressBuilder.getFailedTaskAttemptCount()); |
| taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, |
| progressBuilder.getKilledTaskAttemptCount()); |
| return taskStats; |
| } |
| |
| void logJobHistoryFinishedEvent() throws IOException { |
| this.setFinishTime(); |
| Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); |
| DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, |
| finishTime, DAGState.SUCCEEDED, "", getAllCounters(), |
| this.userName, this.dagName, taskStats); |
| this.appContext.getHistoryHandler().handleCriticalEvent( |
| new DAGHistoryEvent(dagId, finishEvt)); |
| } |
| |
| void logJobHistoryInitedEvent() { |
| DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId, |
| this.initTime, this.userName, this.dagName, this.getVertexNameIDMapping()); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(dagId, initEvt)); |
| } |
| |
| void logJobHistoryStartedEvent() { |
| DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId, |
| this.startTime, this.userName, this.dagName); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(dagId, startEvt)); |
| } |
| |
| void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException { |
| Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); |
| DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, |
| clock.getTime(), state, |
| StringUtils.join(getDiagnostics(), LINE_SEPARATOR), |
| getAllCounters(), this.userName, this.dagName, taskStats); |
| this.appContext.getHistoryHandler().handleCriticalEvent( |
| new DAGHistoryEvent(dagId, finishEvt)); |
| } |
| |
| static DAGState checkDAGForCompletion(DAGImpl dag) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Checking dag completion" |
| + ", numCompletedVertices=" + dag.numCompletedVertices |
| + ", numSuccessfulVertices=" + dag.numSuccessfulVertices |
| + ", numFailedVertices=" + dag.numFailedVertices |
| + ", numKilledVertices=" + dag.numKilledVertices |
| + ", numVertices=" + dag.numVertices |
| + ", terminationCause=" + dag.terminationCause); |
| } |
| |
| // log in case of accounting error. |
| if (dag.numCompletedVertices > dag.numVertices) { |
| LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices" |
| + ", numCompletedVertices=" + dag.numCompletedVertices |
| + ", numVertices=" + dag.numVertices |
| ); |
| } |
| |
| if (dag.numCompletedVertices == dag.numVertices) { |
| dag.setFinishTime(); |
| //Only succeed if vertices complete successfully and no terminationCause is registered. |
| if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) { |
| return dag.finished(DAGState.SUCCEEDED); |
| } |
| if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){ |
| String diagnosticMsg = "DAG killed due to user-initiated kill." + |
| " failedVertices:" + dag.numFailedVertices + |
| " killedVertices:" + dag.numKilledVertices; |
| LOG.info(diagnosticMsg); |
| dag.addDiagnostic(diagnosticMsg); |
| return dag.finished(DAGState.KILLED); |
| } |
| if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){ |
| String diagnosticMsg = "DAG failed due to vertex failure." + |
| " failedVertices:" + dag.numFailedVertices + |
| " killedVertices:" + dag.numKilledVertices; |
| LOG.info(diagnosticMsg); |
| dag.addDiagnostic(diagnosticMsg); |
| return dag.finished(DAGState.FAILED); |
| } |
| if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){ |
| String diagnosticMsg = "DAG failed due to commit failure." + |
| " failedVertices:" + dag.numFailedVertices + |
| " killedVertices:" + dag.numKilledVertices; |
| LOG.info(diagnosticMsg); |
| dag.addDiagnostic(diagnosticMsg); |
| return dag.finished(DAGState.FAILED); |
| } |
| if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){ |
| String diagnosticMsg = "DAG failed due to failure in recovery handling." + |
| " failedVertices:" + dag.numFailedVertices + |
| " killedVertices:" + dag.numKilledVertices; |
| LOG.info(diagnosticMsg); |
| dag.addDiagnostic(diagnosticMsg); |
| return dag.finished(DAGState.FAILED); |
| } |
| |
| // catch all |
| String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG" |
| + ", numCompletedVertices=" + dag.numCompletedVertices |
| + ", numSuccessfulVertices=" + dag.numSuccessfulVertices |
| + ", numFailedVertices=" + dag.numFailedVertices |
| + ", numKilledVertices=" + dag.numKilledVertices |
| + ", numVertices=" + dag.numVertices |
| + ", terminationCause=" + dag.terminationCause; |
| LOG.error(diagnosticMsg); |
| dag.addDiagnostic(diagnosticMsg); |
| return dag.finished(DAGState.ERROR); |
| } |
| |
| //return the current state, Job not finished yet |
| return dag.getInternalState(); |
| } |
| |
| private synchronized DAGState finished(DAGState finalState) { |
| // TODO Metrics |
| /* |
| if (getInternalState() == DAGState.RUNNING) { |
| metrics.endRunningJob(this); |
| } |
| */ |
| // TODO Metrics |
| /* |
| switch (finalState) { |
| case KILLED: |
| metrics.killedJob(this); |
| break; |
| case FAILED: |
| metrics.failedJob(this); |
| break; |
| case SUCCEEDED: |
| metrics.completedJob(this); |
| } |
| */ |
| |
| if (finishTime == 0) { |
| setFinishTime(); |
| } |
| |
| boolean allOutputsCommitted = commitOrAbortOutputs(finalState == DAGState.SUCCEEDED); |
| |
| if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) { |
| finalState = DAGState.FAILED; |
| trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE); |
| } |
| |
| boolean recoveryError = false; |
| try { |
| if (finalState == DAGState.SUCCEEDED) { |
| logJobHistoryFinishedEvent(); |
| } else { |
| logJobHistoryUnsuccesfulEvent(finalState); |
| } |
| } catch (IOException e) { |
| LOG.warn("Failed to persist recovery event for DAG completion" |
| + ", dagId=" + dagId |
| + ", finalState=" + finalState); |
| recoveryError = true; |
| } |
| |
| if (recoveryError) { |
| eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR)); |
| } else { |
| eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState)); |
| } |
| |
| LOG.info("DAG: " + getID() + " finished with state: " + finalState); |
| return finalState; |
| } |
| |
| private DAGStatus.State getDAGStatusFromState(DAGState finalState) { |
| switch (finalState) { |
| case NEW: |
| return DAGStatus.State.INITING; |
| case INITED: |
| return DAGStatus.State.INITING; |
| case RUNNING: |
| return DAGStatus.State.RUNNING; |
| case SUCCEEDED: |
| return DAGStatus.State.SUCCEEDED; |
| case FAILED: |
| return DAGStatus.State.FAILED; |
| case KILLED: |
| return DAGStatus.State.KILLED; |
| case ERROR: |
| return DAGStatus.State.ERROR; |
| case TERMINATING: |
| return DAGStatus.State.KILLED; |
| default: |
| throw new TezUncheckedException("Unknown DAGState: " + finalState); |
| } |
| } |
| |
| @Override |
| public String getUserName() { |
| return userName; |
| } |
| |
| @Override |
| public String getName() { |
| return dagName; |
| } |
| |
| @Override |
| public int getTotalVertices() { |
| readLock.lock(); |
| try { |
| return numVertices; |
| } finally { |
| readLock.unlock(); |
| } |
| |
| } |
| |
| @Override |
| public int getSuccessfulVertices() { |
| readLock.lock(); |
| try { |
| return numSuccessfulVertices; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Set the terminationCause if it had not yet been set. |
| * |
| * @param trigger The trigger |
| * @return true if setting the value succeeded. |
| */ |
| boolean trySetTerminationCause(DAGTerminationCause trigger) { |
| if(terminationCause == null){ |
| terminationCause = trigger; |
| return true; |
| } |
| return false; |
| } |
| |
| DAGTerminationCause getTerminationCause() { |
| readLock.lock(); |
| try { |
| return terminationCause; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public DAGState initializeDAG() { |
| return initializeDAG(null); |
| } |
| |
| DAGState initializeDAG(DAGInitializedEvent event) { |
| if (event != null) { |
| initTime = event.getInitTime(); |
| } else { |
| initTime = clock.getTime(); |
| } |
| |
| commitAllOutputsOnSuccess = conf.getBoolean( |
| TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, |
| TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT); |
| |
| // If we have no vertices, fail the dag |
| numVertices = getJobPlan().getVertexCount(); |
| if (numVertices == 0) { |
| addDiagnostic("No vertices for dag"); |
| trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES); |
| if (event != null) { |
| return DAGState.FAILED; |
| } |
| return finished(DAGState.FAILED); |
| } |
| |
| if (jobPlan.getVertexGroupsCount() > 0) { |
| for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) { |
| vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo)); |
| } |
| for (VertexGroupInfo groupInfo : vertexGroups.values()) { |
| for (String vertexName : groupInfo.groupMembers) { |
| List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName); |
| if (groupList == null) { |
| groupList = Lists.newLinkedList(); |
| vertexGroupInfo.put(vertexName, groupList); |
| } |
| groupList.add(groupInfo); |
| } |
| } |
| } |
| |
| // create the vertices` |
| for (int i=0; i < numVertices; ++i) { |
| String vertexName = getJobPlan().getVertex(i).getName(); |
| VertexImpl v = createVertex(this, vertexName, i); |
| addVertex(v); |
| } |
| |
| createDAGEdges(this); |
| Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList()); |
| |
| // setup the dag |
| for (Vertex v : vertices.values()) { |
| parseVertexEdges(this, edgePlans, v); |
| } |
| |
| // Initialize the edges, now that the payload and vertices have been set. |
| for (Edge e : edges.values()) { |
| try { |
| e.initialize(); |
| } catch (AMUserCodeException ex) { |
| String msg = "Exception in " + ex.getSource(); |
| LOG.error(msg, ex); |
| addDiagnostic(msg + ", " + ex.getMessage() + ", " |
| + ExceptionUtils.getStackTrace(ex.getCause())); |
| finished(DAGState.FAILED); |
| return DAGState.FAILED; |
| } |
| } |
| |
| assignDAGScheduler(this); |
| |
| for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) { |
| String groupName = entry.getKey(); |
| VertexGroupInfo groupInfo = entry.getValue(); |
| if (!groupInfo.outputs.isEmpty()) { |
| // shared outputs |
| for (String vertexName : groupInfo.groupMembers) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Setting shared outputs for group: " + groupName + |
| " on vertex: " + vertexName); |
| } |
| Vertex v = getVertex(vertexName); |
| v.addSharedOutputs(groupInfo.outputs); |
| } |
| } |
| } |
| return DAGState.INITED; |
| } |
| |
| private void createDAGEdges(DAGImpl dag) { |
| for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) { |
| EdgeProperty edgeProperty = DagTypeConverters |
| .createEdgePropertyMapFromDAGPlan(edgePlan); |
| |
| // edge manager may be also set via API when using custom edge type |
| dag.edges.put(edgePlan.getId(), |
| new Edge(edgeProperty, dag.getEventHandler())); |
| } |
| } |
| |
| private static void assignDAGScheduler(DAGImpl dag) { |
| String dagSchedulerClassName = dag.conf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, |
| TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT); |
| LOG.info("Using DAG Scheduler: " + dagSchedulerClassName); |
| dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] { |
| DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler}); |
| } |
| |
| private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) { |
| TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId); |
| |
| VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId); |
| VertexLocationHint vertexLocationHint = DagTypeConverters |
| .convertFromDAGPlan(vertexPlan.getTaskLocationHintList()); |
| |
| VertexImpl v = new VertexImpl( |
| vertexId, vertexPlan, vertexName, dag.conf, |
| dag.eventHandler, dag.taskAttemptListener, |
| dag.clock, dag.taskHeartbeatHandler, |
| !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint, |
| dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker); |
| return v; |
| } |
| |
| // hooks up this VertexImpl to input and output EdgeProperties |
| private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) { |
| VertexPlan vertexPlan = vertex.getVertexPlan(); |
| |
| Map<Vertex, Edge> inVertices = |
| new HashMap<Vertex, Edge>(); |
| |
| Map<Vertex, Edge> outVertices = |
| new HashMap<Vertex, Edge>(); |
| |
| for(String inEdgeId : vertexPlan.getInEdgeIdList()){ |
| EdgePlan edgePlan = edgePlans.get(inEdgeId); |
| Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName()); |
| Edge edge = dag.edges.get(inEdgeId); |
| edge.setSourceVertex(inVertex); |
| edge.setDestinationVertex(vertex); |
| inVertices.put(inVertex, edge); |
| } |
| |
| for(String outEdgeId : vertexPlan.getOutEdgeIdList()){ |
| EdgePlan edgePlan = edgePlans.get(outEdgeId); |
| Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName()); |
| Edge edge = dag.edges.get(outEdgeId); |
| edge.setSourceVertex(vertex); |
| edge.setDestinationVertex(outVertex); |
| outVertices.put(outVertex, edge); |
| } |
| |
| vertex.setInputVertices(inVertices); |
| vertex.setOutputVertices(outVertices); |
| } |
| |
| private static class RecoverTransition |
| implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> { |
| |
| @Override |
| public DAGState transition(DAGImpl dag, DAGEvent dagEvent) { |
| DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent; |
| if (recoverEvent.hasDesiredState()) { |
| // DAG completed or final end state known |
| dag.recoveredState = recoverEvent.getDesiredState(); |
| } |
| if (recoverEvent.getAdditionalUrlsForClasspath() != null) { |
| LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath() |
| + "] to classpath"); |
| RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath()); |
| } |
| |
| switch (dag.recoveredState) { |
| case NEW: |
| // send DAG an Init and start events |
| dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT)); |
| dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null)); |
| return DAGState.NEW; |
| case INITED: |
| // DAG inited but not started |
| // This implies vertices need to be sent init event |
| // Root vertices need to be sent start event |
| // The vertices may already have been sent these events but the |
| // DAG start may not have been persisted |
| for (Vertex v : dag.vertices.values()) { |
| if (v.getInputVerticesCount() == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending Running Recovery event to root vertex " |
| + v.getLogIdentifier()); |
| } |
| dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), |
| VertexState.RUNNING)); |
| } |
| } |
| return DAGState.RUNNING; |
| case RUNNING: |
| // if commit is in progress, DAG should fail as commits are not |
| // recoverable |
| boolean groupCommitInProgress = false; |
| if (!dag.recoveredGroupCommits.isEmpty()) { |
| for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) { |
| if (!entry.getValue().booleanValue()) { |
| LOG.info("Found a pending Vertex Group commit" |
| + ", vertexGroup=" + entry.getKey()); |
| groupCommitInProgress = true; |
| break; |
| } |
| } |
| } |
| |
| if (groupCommitInProgress || dag.recoveryCommitInProgress) { |
| // Fail the DAG as we have not seen a commit completion |
| dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE); |
| dag.setFinishTime(); |
| // Recover all other data for all vertices |
| // send recover event to all vertices with a final end state |
| for (Vertex v : dag.vertices.values()) { |
| VertexState desiredState = VertexState.SUCCEEDED; |
| if (dag.recoveredState.equals(DAGState.KILLED)) { |
| desiredState = VertexState.KILLED; |
| } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains( |
| dag.recoveredState)) { |
| desiredState = VertexState.FAILED; |
| } |
| dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), |
| desiredState)); |
| } |
| DAGState endState = DAGState.FAILED; |
| try { |
| dag.logJobHistoryUnsuccesfulEvent(endState); |
| } catch (IOException e) { |
| LOG.warn("Failed to persist recovery event for DAG completion" |
| + ", dagId=" + dag.dagId |
| + ", finalState=" + endState); |
| } |
| dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), |
| endState)); |
| return endState; |
| } |
| |
| for (Vertex v : dag.vertices.values()) { |
| if (v.getInputVerticesCount() == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending Running Recovery event to root vertex " |
| + v.getLogIdentifier()); |
| } |
| dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), |
| VertexState.RUNNING)); |
| } |
| } |
| return DAGState.RUNNING; |
| case SUCCEEDED: |
| case ERROR: |
| case FAILED: |
| case KILLED: |
| // Completed |
| |
| // Recover all other data for all vertices |
| // send recover event to all vertices with a final end state |
| for (Vertex v : dag.vertices.values()) { |
| VertexState desiredState = VertexState.SUCCEEDED; |
| if (dag.recoveredState.equals(DAGState.KILLED)) { |
| desiredState = VertexState.KILLED; |
| } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains( |
| dag.recoveredState)) { |
| desiredState = VertexState.FAILED; |
| } |
| dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), |
| desiredState)); |
| } |
| |
| // Let us inform AM of completion |
| dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), |
| dag.recoveredState)); |
| |
| LOG.info("Recovered DAG: " + dag.getID() + " finished with state: " |
| + dag.recoveredState); |
| return dag.recoveredState; |
| default: |
| // Error state |
| LOG.warn("Trying to recover DAG, failed to recover" |
| + " from non-handled state" + dag.recoveredState); |
| // Tell AM ERROR so that it can shutdown |
| dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), |
| DAGState.ERROR)); |
| return DAGState.FAILED; |
| } |
| } |
| |
| } |
| |
| private static class InitTransition |
| implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> { |
| |
| /** |
| * Note that this transition method is called directly (and synchronously) |
| * by MRAppMaster's init() method (i.e., no RPC, no thread-switching; |
| * just plain sequential call within AM context), so we can trigger |
| * modifications in AM state from here (at least, if AM is written that |
| * way; MR version is). |
| */ |
| @Override |
| public DAGState transition(DAGImpl dag, DAGEvent event) { |
| // TODO Metrics |
| //dag.metrics.submittedJob(dag); |
| //dag.metrics.preparingJob(dag); |
| |
| DAGState state = dag.initializeDAG(); |
| if (state != DAGState.INITED) { |
| return state; |
| } |
| |
| // TODO Metrics |
| //dag.metrics.endPreparingJob(dag); |
| dag.logJobHistoryInitedEvent(); |
| return DAGState.INITED; |
| |
| |
| } |
| |
| } // end of InitTransition |
| |
| public static class StartTransition |
| implements SingleArcTransition<DAGImpl, DAGEvent> { |
| /** |
| * This transition executes in the event-dispatcher thread, though it's |
| * triggered in MRAppMaster's startJobs() method. |
| */ |
| @Override |
| public void transition(DAGImpl dag, DAGEvent event) { |
| DAGEventStartDag startEvent = (DAGEventStartDag) event; |
| dag.startTime = dag.clock.getTime(); |
| dag.logJobHistoryStartedEvent(); |
| List<URL> additionalUrlsForClasspath = startEvent.getAdditionalUrlsForClasspath(); |
| if (additionalUrlsForClasspath != null) { |
| LOG.info("Added additional resources : [" + additionalUrlsForClasspath + "] to classpath"); |
| RelocalizationUtils.addUrlsToClassPath(additionalUrlsForClasspath); |
| } |
| // TODO Metrics |
| //job.metrics.runningJob(job); |
| |
| // Start all vertices with no incoming edges when job starts |
| dag.initializeVerticesAndStart(); |
| } |
| } |
| |
| // use LinkedHashMap to ensure the vertex order (TEZ-1065) |
| LinkedHashMap<String, Vertex> vertexMap = new LinkedHashMap<String, Vertex>(); |
| void addVertex(Vertex v) { |
| vertices.put(v.getVertexId(), v); |
| vertexMap.put(v.getName(), v); |
| } |
| |
| @Override |
| public Vertex getVertex(String vertexName) { |
| return vertexMap.get(vertexName); |
| } |
| |
| private void mayBeConstructFinalFullCounters() { |
| // Calculating full-counters. This should happen only once for the job. |
| synchronized (this.fullCountersLock) { |
| if (this.fullCounters != null) { |
| // Already constructed. Just return. |
| return; |
| } |
| this.constructFinalFullcounters(); |
| } |
| } |
| |
| @Private |
| public void constructFinalFullcounters() { |
| this.fullCounters = new TezCounters(); |
| this.fullCounters.incrAllCounters(dagCounters); |
| for (Vertex v : this.vertices.values()) { |
| this.fullCounters.incrAllCounters(v.getAllCounters()); |
| } |
| } |
| |
| /** |
| * Set the terminationCause and send a kill-message to all vertices. |
| * The vertex-kill messages are only sent once. |
| */ |
| void enactKill(DAGTerminationCause dagTerminationCause, |
| VertexTerminationCause vertexTerminationCause) { |
| |
| if(trySetTerminationCause(dagTerminationCause)){ |
| for (Vertex v : vertices.values()) { |
| eventHandler.handle( |
| new VertexEventTermination(v.getVertexId(), vertexTerminationCause) |
| ); |
| } |
| } |
| } |
| |
| // Task-start has been moved out of InitTransition, so this arc simply |
| // hardcodes 0 for both map and reduce finished tasks. |
| private static class KillNewJobTransition |
| implements SingleArcTransition<DAGImpl, DAGEvent> { |
| |
| @Override |
| public void transition(DAGImpl dag, DAGEvent dagEvent) { |
| dag.setFinishTime(); |
| dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); |
| dag.finished(DAGState.KILLED); |
| } |
| |
| } |
| |
| private static class KillInitedJobTransition |
| implements SingleArcTransition<DAGImpl, DAGEvent> { |
| |
| @Override |
| public void transition(DAGImpl dag, DAGEvent dagEvent) { |
| dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); |
| dag.addDiagnostic("Job received Kill in INITED state."); |
| dag.finished(DAGState.KILLED); |
| } |
| |
| } |
| |
| private static class DAGKilledTransition |
| implements SingleArcTransition<DAGImpl, DAGEvent> { |
| @Override |
| public void transition(DAGImpl job, DAGEvent event) { |
| job.addDiagnostic("Job received Kill while in RUNNING state."); |
| job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL); |
| // TODO Metrics |
| //job.metrics.endRunningJob(job); |
| } |
| |
| |
| } |
| |
| private static class VertexCompletedTransition implements |
| MultipleArcTransition<DAGImpl, DAGEvent, DAGState> { |
| |
| @Override |
| public DAGState transition(DAGImpl job, DAGEvent event) { |
| boolean forceTransitionToKillWait = false; |
| |
| DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received a vertex completion event" |
| + ", vertexId=" + vertexEvent.getVertexId() |
| + ", vertexState=" + vertexEvent.getVertexState()); |
| } |
| Vertex vertex = job.vertices.get(vertexEvent.getVertexId()); |
| job.numCompletedVertices++; |
| if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) { |
| if (!job.reRunningVertices.contains(vertex.getVertexId())) { |
| // vertex succeeded for the first time |
| job.dagScheduler.vertexCompleted(vertex); |
| } |
| forceTransitionToKillWait = !(job.vertexSucceeded(vertex)); |
| } |
| else if (vertexEvent.getVertexState() == VertexState.FAILED) { |
| job.enactKill( |
| DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE); |
| job.vertexFailed(vertex); |
| forceTransitionToKillWait = true; |
| } |
| else if (vertexEvent.getVertexState() == VertexState.KILLED) { |
| job.vertexKilled(vertex); |
| forceTransitionToKillWait = true; |
| } |
| |
| job.reRunningVertices.remove(vertex.getVertexId()); |
| |
| LOG.info("Vertex " + vertex.getLogIdentifier() + " completed." |
| + ", numCompletedVertices=" + job.numCompletedVertices |
| + ", numSuccessfulVertices=" + job.numSuccessfulVertices |
| + ", numFailedVertices=" + job.numFailedVertices |
| + ", numKilledVertices=" + job.numKilledVertices |
| + ", numVertices=" + job.numVertices); |
| |
| // if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT. |
| DAGState state = checkDAGForCompletion(job); |
| if(state == DAGState.RUNNING && forceTransitionToKillWait){ |
| return DAGState.TERMINATING; |
| } |
| else { |
| return state; |
| } |
| } |
| |
| } |
| |
| private static class VertexReRunningTransition implements |
| MultipleArcTransition<DAGImpl, DAGEvent, DAGState> { |
| |
| @Override |
| public DAGState transition(DAGImpl job, DAGEvent event) { |
| DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event; |
| Vertex vertex = job.vertices.get(vertexEvent.getVertexId()); |
| job.numCompletedVertices--; |
| boolean failed = job.vertexReRunning(vertex); |
| |
| |
| LOG.info("Vertex " + vertex.getLogIdentifier() + " re-running." |
| + ", numCompletedVertices=" + job.numCompletedVertices |
| + ", numSuccessfulVertices=" + job.numSuccessfulVertices |
| + ", numFailedVertices=" + job.numFailedVertices |
| + ", numKilledVertices=" + job.numKilledVertices |
| + ", numVertices=" + job.numVertices); |
| |
| if (failed) { |
| return DAGState.TERMINATING; |
| } |
| return DAGState.RUNNING; |
| } |
| } |
| |
| private boolean vertexSucceeded(Vertex vertex) { |
| numSuccessfulVertices++; |
| boolean failedCommit = false; |
| boolean recoveryFailed = false; |
| if (!commitAllOutputsOnSuccess) { |
| // committing successful outputs immediately. check for shared outputs |
| List<VertexGroupInfo> groupsList = vertexGroupInfo.get(vertex.getName()); |
| if (groupsList != null) { |
| List<VertexGroupInfo> commitList = Lists.newArrayListWithCapacity(groupsList |
| .size()); |
| for (VertexGroupInfo groupInfo : groupsList) { |
| groupInfo.successfulMembers++; |
| if (groupInfo.groupMembers.size() == groupInfo.successfulMembers |
| && !groupInfo.outputs.isEmpty()) { |
| // group has outputs and all vertex members are done |
| LOG.info("All members of group: " + groupInfo.groupName |
| + " are succeeded. Commiting outputs"); |
| commitList.add(groupInfo); |
| } |
| } |
| for (VertexGroupInfo groupInfo : commitList) { |
| if (recoveredGroupCommits.containsKey(groupInfo.groupName)) { |
| LOG.info("VertexGroup was already committed as per recovery" |
| + " data, groupName=" + groupInfo.groupName); |
| continue; |
| } |
| groupInfo.committed = true; |
| Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); |
| try { |
| appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), |
| new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName, |
| clock.getTime()))); |
| } catch (IOException e) { |
| LOG.error("Failed to send commit recovery event to handler", e); |
| recoveryFailed = true; |
| failedCommit = true; |
| } |
| if (!failedCommit) { |
| for (String outputName : groupInfo.outputs) { |
| OutputCommitter committer = v.getOutputCommitters().get(outputName); |
| LOG.info("Committing output: " + outputName); |
| if (!commitOutput(outputName, committer)) { |
| // using same logic as vertex level commit. stop after first failure. |
| failedCommit = true; |
| break; |
| } |
| } |
| } |
| if (failedCommit) { |
| break; |
| } |
| try { |
| appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), |
| new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName, |
| clock.getTime()))); |
| } catch (IOException e) { |
| LOG.error("Failed to send commit recovery event to handler", e); |
| recoveryFailed = true; |
| failedCommit = true; |
| } |
| } |
| } |
| } |
| |
| if (failedCommit) { |
| LOG.info("Aborting job due to failure in commit."); |
| if (!recoveryFailed) { |
| enactKill(DAGTerminationCause.COMMIT_FAILURE, |
| VertexTerminationCause.COMMIT_FAILURE); |
| } else { |
| LOG.info("Recovery failure occurred during commit"); |
| enactKill(DAGTerminationCause.RECOVERY_FAILURE, |
| VertexTerminationCause.COMMIT_FAILURE); |
| } |
| } |
| |
| return !failedCommit; |
| } |
| |
| private boolean vertexReRunning(Vertex vertex) { |
| reRunningVertices.add(vertex.getVertexId()); |
| numSuccessfulVertices--; |
| addDiagnostic("Vertex re-running" |
| + ", vertexName=" + vertex.getName() |
| + ", vertexId=" + vertex.getVertexId()); |
| |
| if (!commitAllOutputsOnSuccess) { |
| // partial output may already have been committed. fail if so |
| List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertex.getName()); |
| if (groupList != null) { |
| for (VertexGroupInfo groupInfo : groupList) { |
| if (groupInfo.committed) { |
| LOG.info("Aborting job as committed vertex: " |
| + vertex.getLogIdentifier() + " is re-running"); |
| enactKill(DAGTerminationCause.COMMIT_FAILURE, |
| VertexTerminationCause.COMMIT_FAILURE); |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| private void vertexFailed(Vertex vertex) { |
| numFailedVertices++; |
| addDiagnostic("Vertex failed" |
| + ", vertexName=" + vertex.getName() |
| + ", vertexId=" + vertex.getVertexId() |
| + ", diagnostics=" + vertex.getDiagnostics()); |
| // TODO: Metrics |
| //job.metrics.failedTask(task); |
| } |
| |
| private void vertexKilled(Vertex vertex) { |
| numKilledVertices++; |
| addDiagnostic("Vertex killed" |
| + ", vertexName=" + vertex.getName() |
| + ", vertexId=" + vertex.getVertexId() |
| + ", diagnostics=" + vertex.getDiagnostics()); |
| // TODO: Metrics |
| //job.metrics.killedTask(task); |
| } |
| |
| private void addDiagnostic(String diag) { |
| diagnostics.add(diag); |
| } |
| |
| private static class DiagnosticsUpdateTransition implements |
| SingleArcTransition<DAGImpl, DAGEvent> { |
| @Override |
| public void transition(DAGImpl job, DAGEvent event) { |
| job.addDiagnostic(((DAGEventDiagnosticsUpdate) event) |
| .getDiagnosticUpdate()); |
| } |
| } |
| |
| private static class CounterUpdateTransition implements |
| SingleArcTransition<DAGImpl, DAGEvent> { |
| @Override |
| public void transition(DAGImpl job, DAGEvent event) { |
| DAGEventCounterUpdate jce = (DAGEventCounterUpdate) event; |
| for (DAGEventCounterUpdate.CounterIncrementalUpdate ci : jce |
| .getCounterUpdates()) { |
| job.dagCounters.findCounter(ci.getCounterKey()).increment( |
| ci.getIncrementValue()); |
| } |
| } |
| } |
| |
| private static class DAGSchedulerUpdateTransition implements |
| SingleArcTransition<DAGImpl, DAGEvent> { |
| @Override |
| public void transition(DAGImpl dag, DAGEvent event) { |
| DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate) event; |
| switch(sEvent.getUpdateType()) { |
| case TA_SCHEDULE: |
| dag.dagScheduler.scheduleTask(sEvent); |
| break; |
| case TA_SCHEDULED: |
| DAGEventSchedulerUpdateTAAssigned taEvent = |
| (DAGEventSchedulerUpdateTAAssigned) sEvent; |
| dag.dagScheduler.taskScheduled(taEvent); |
| break; |
| case TA_SUCCEEDED: |
| dag.dagScheduler.taskSucceeded(sEvent); |
| break; |
| default: |
| throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:" |
| + sEvent.getUpdateType()); |
| } |
| } |
| } |
| |
| private static class InternalErrorTransition implements |
| SingleArcTransition<DAGImpl, DAGEvent> { |
| @Override |
| public void transition(DAGImpl job, DAGEvent event) { |
| //TODO Is this JH event required. |
| LOG.info(job.getID() + " terminating due to internal error"); |
| // terminate all vertices |
| job.enactKill(DAGTerminationCause.INTERNAL_ERROR, |
| VertexTerminationCause.INTERNAL_ERROR); |
| job.setFinishTime(); |
| job.finished(DAGState.ERROR); |
| } |
| } |
| |
| @Override |
| public boolean isComplete() { |
| readLock.lock(); |
| try { |
| DAGState state = getState(); |
| if (state.equals(DAGState.SUCCEEDED) |
| || state.equals(DAGState.FAILED) |
| || state.equals(DAGState.KILLED) |
| || state.equals(DAGState.ERROR)) { |
| return true; |
| } |
| return false; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| } |