| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import javax.annotation.Nullable; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.StringInterner; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitonException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.tez.common.ATSConstants; |
| import org.apache.tez.common.ReflectionUtils; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.DagTypeConverters; |
| import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; |
| import org.apache.tez.dag.api.EdgeProperty.DataMovementType; |
| import org.apache.tez.dag.api.InputDescriptor; |
| import org.apache.tez.dag.api.InputInitializerDescriptor; |
| import org.apache.tez.dag.api.OutputCommitterDescriptor; |
| import org.apache.tez.dag.api.OutputDescriptor; |
| import org.apache.tez.dag.api.ProcessorDescriptor; |
| import org.apache.tez.dag.api.RootInputLeafOutput; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.api.VertexLocationHint; |
| import org.apache.tez.dag.api.TaskLocationHint; |
| import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; |
| import org.apache.tez.dag.api.VertexManagerPluginDescriptor; |
| import org.apache.tez.dag.api.client.ProgressBuilder; |
| import org.apache.tez.dag.api.client.StatusGetOpts; |
| import org.apache.tez.dag.api.client.VertexStatus; |
| import org.apache.tez.dag.api.client.VertexStatus.State; |
| import org.apache.tez.dag.api.client.VertexStatusBuilder; |
| import org.apache.tez.dag.api.event.VertexStateUpdate; |
| import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated; |
| import org.apache.tez.dag.api.oldrecords.TaskState; |
| import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; |
| import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.TaskAttemptListener; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.DAG; |
| import org.apache.tez.dag.app.dag.RootInputInitializerManager; |
| import org.apache.tez.dag.app.dag.StateChangeNotifier; |
| import org.apache.tez.dag.app.dag.Task; |
| import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; |
| import org.apache.tez.dag.app.dag.TaskTerminationCause; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.VertexState; |
| import org.apache.tez.dag.app.dag.VertexTerminationCause; |
| import org.apache.tez.dag.app.dag.event.DAGEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventType; |
| import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; |
| import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; |
| import org.apache.tez.dag.app.dag.event.TaskEvent; |
| import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; |
| import org.apache.tez.dag.app.dag.event.TaskEventTermination; |
| import org.apache.tez.dag.app.dag.event.TaskEventType; |
| import org.apache.tez.dag.app.dag.event.VertexEvent; |
| import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; |
| import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized; |
| import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit; |
| import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; |
| import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed; |
| import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized; |
| import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; |
| import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered; |
| import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; |
| import org.apache.tez.dag.app.dag.event.VertexEventTermination; |
| import org.apache.tez.dag.app.dag.event.VertexEventType; |
| import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; |
| import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.HistoryEvent; |
| import org.apache.tez.dag.history.events.VertexCommitStartedEvent; |
| import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; |
| import org.apache.tez.dag.history.events.VertexFinishedEvent; |
| import org.apache.tez.dag.history.events.VertexInitializedEvent; |
| import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; |
| import org.apache.tez.dag.history.events.VertexStartedEvent; |
| import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; |
| import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; |
| import org.apache.tez.dag.records.TaskAttemptTerminationCause; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; |
| import org.apache.tez.runtime.api.OutputCommitter; |
| import org.apache.tez.runtime.api.OutputCommitterContext; |
| import org.apache.tez.runtime.api.InputSpecUpdate; |
| import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; |
| import org.apache.tez.runtime.api.events.DataMovementEvent; |
| import org.apache.tez.runtime.api.events.InputFailedEvent; |
| import org.apache.tez.runtime.api.events.InputDataInformationEvent; |
| import org.apache.tez.runtime.api.events.InputInitializerEvent; |
| import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; |
| import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; |
| import org.apache.tez.runtime.api.events.VertexManagerEvent; |
| import org.apache.tez.runtime.api.impl.EventMetaData; |
| import org.apache.tez.runtime.api.impl.EventType; |
| import org.apache.tez.runtime.api.impl.GroupInputSpec; |
| import org.apache.tez.runtime.api.impl.InputSpec; |
| import org.apache.tez.runtime.api.impl.OutputSpec; |
| import org.apache.tez.runtime.api.impl.TezEvent; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.HashMultiset; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multiset; |
| import com.google.common.collect.Sets; |
| |
| import org.apache.tez.state.OnStateChangedCallback; |
| import org.apache.tez.state.StateMachineTez; |
| |
| /** Implementation of Vertex interface. Maintains the state machines of Vertex. |
| * The read and write calls use ReadWriteLock for concurrency. |
| */ |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, |
| EventHandler<VertexEvent> { |
| |
| private static final String LINE_SEPARATOR = System |
| .getProperty("line.separator"); |
| |
| private static final Log LOG = LogFactory.getLog(VertexImpl.class); |
| |
| //final fields |
| private final Clock clock; |
| |
| |
| private final Lock readLock; |
| private final Lock writeLock; |
| private final TaskAttemptListener taskAttemptListener; |
| private final TaskHeartbeatHandler taskHeartbeatHandler; |
| private final Object tasksSyncHandle = new Object(); |
| |
| private final EventHandler eventHandler; |
| // TODO Metrics |
| //private final MRAppMetrics metrics; |
| private final AppContext appContext; |
| |
| private boolean lazyTasksCopyNeeded = false; |
| // must be a linked map for ordering |
| volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>(); |
| private Object fullCountersLock = new Object(); |
| private TezCounters fullCounters = null; |
| private Resource taskResource; |
| |
| private Configuration conf; |
| |
| private final boolean isSpeculationEnabled; |
| |
| //fields initialized in init |
| |
| @VisibleForTesting |
| int numStartedSourceVertices = 0; |
| @VisibleForTesting |
| int numInitedSourceVertices = 0; |
| @VisibleForTesting |
| int numRecoveredSourceVertices = 0; |
| |
| private int distanceFromRoot = 0; |
| |
| private final List<String> diagnostics = new ArrayList<String>(); |
| |
| protected final StateChangeNotifier stateChangeNotifier; |
| |
| //task/attempt related datastructures |
| @VisibleForTesting |
| int numSuccessSourceAttemptCompletions = 0; |
| |
| List<GroupInputSpec> groupInputSpecList; |
| Set<String> sharedOutputs = Sets.newHashSet(); |
| |
| private static final InternalErrorTransition |
| INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); |
| private static final RouteEventTransition |
| ROUTE_EVENT_TRANSITION = new RouteEventTransition(); |
| private static final TaskAttemptCompletedEventTransition |
| TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = |
| new TaskAttemptCompletedEventTransition(); |
| private static final TaskAttempStatusUpdateEventTransition |
| TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION = new TaskAttempStatusUpdateEventTransition(); |
| private static final SourceTaskAttemptCompletedEventTransition |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = |
| new SourceTaskAttemptCompletedEventTransition(); |
| private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK = |
| new VertexStateChangedCallback(); |
| |
| private VertexState recoveredState = VertexState.NEW; |
| @VisibleForTesting |
| List<TezEvent> recoveredEvents = new ArrayList<TezEvent>(); |
| private boolean vertexAlreadyInitialized = false; |
| |
| @VisibleForTesting |
| final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>(); |
| |
| LegacySpeculator speculator; |
| |
| protected static final |
| StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent> |
| stateMachineFactory |
| = new StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent> |
| (VertexState.NEW) |
| |
| // Transitions from NEW state |
| .addTransition |
| (VertexState.NEW, |
| EnumSet.of(VertexState.NEW, VertexState.INITED, |
| VertexState.INITIALIZING, VertexState.FAILED), |
| VertexEventType.V_INIT, |
| new InitTransition()) |
| .addTransition(VertexState.NEW, |
| EnumSet.of(VertexState.NEW), |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| new NullEdgeInitializedTransition()) |
| .addTransition(VertexState.NEW, |
| EnumSet.of(VertexState.NEW), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition(VertexState.NEW, |
| EnumSet.of(VertexState.NEW), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition |
| (VertexState.NEW, |
| EnumSet.of(VertexState.NEW, VertexState.INITED, |
| VertexState.INITIALIZING, VertexState.RUNNING, |
| VertexState.SUCCEEDED, VertexState.FAILED, |
| VertexState.KILLED, VertexState.ERROR, |
| VertexState.RECOVERING), |
| VertexEventType.V_RECOVER, |
| new StartRecoverTransition()) |
| .addTransition |
| (VertexState.NEW, |
| EnumSet.of(VertexState.INITED, |
| VertexState.INITIALIZING, VertexState.RUNNING, |
| VertexState.SUCCEEDED, VertexState.FAILED, |
| VertexState.KILLED, VertexState.ERROR, |
| VertexState.RECOVERING), |
| VertexEventType.V_SOURCE_VERTEX_RECOVERED, |
| new RecoverTransition()) |
| .addTransition(VertexState.NEW, VertexState.NEW, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| new SourceVertexStartedTransition()) |
| .addTransition(VertexState.NEW, VertexState.KILLED, |
| VertexEventType.V_TERMINATE, |
| new TerminateNewVertexTransition()) |
| .addTransition(VertexState.NEW, VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition |
| (VertexState.RECOVERING, |
| EnumSet.of(VertexState.NEW, VertexState.INITED, |
| VertexState.INITIALIZING, VertexState.RUNNING, |
| VertexState.SUCCEEDED, VertexState.FAILED, |
| VertexState.KILLED, VertexState.ERROR, |
| VertexState.RECOVERING), |
| VertexEventType.V_SOURCE_VERTEX_RECOVERED, |
| new RecoverTransition()) |
| .addTransition |
| (VertexState.RECOVERING, VertexState.RECOVERING, |
| EnumSet.of(VertexEventType.V_INIT, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED), |
| new BufferDataRecoverTransition()) |
| .addTransition |
| (VertexState.RECOVERING, VertexState.RECOVERING, |
| VertexEventType.V_TERMINATE, |
| new TerminateDuringRecoverTransition()) |
| .addTransition |
| (VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| |
| // Transitions from INITIALIZING state |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, |
| VertexState.FAILED), |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| new RootInputInitializedTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING), |
| VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, |
| new OneToOneSourceSplitTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITED, VertexState.FAILED), |
| VertexEventType.V_READY_TO_INIT, |
| new VertexInitializedTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| new RootInputInitFailedTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING, |
| VertexEventType.V_START, |
| new StartWhileInitializingTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| new SourceVertexStartedTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.FAILED), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.KILLED, |
| VertexEventType.V_TERMINATE, |
| new TerminateInitingVertexTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, |
| VertexState.FAILED), |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| new NullEdgeInitializedTransition()) |
| |
| // Transitions from INITED state |
| // SOURCE_VERTEX_STARTED - for sources which determine parallelism, |
| // they must complete before this vertex can start. |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| new RootInputInitFailedTransition()) |
| .addTransition |
| (VertexState.INITED, |
| EnumSet.of(VertexState.INITED, VertexState.ERROR), |
| VertexEventType.V_INIT, |
| new IgnoreInitInInitedTransition()) |
| .addTransition(VertexState.INITED, VertexState.INITED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| new SourceVertexStartedTransition()) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.INITED), |
| VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, |
| new OneToOneSourceSplitTransition()) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.INITED), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.RUNNING, VertexState.INITED, VertexState.TERMINATING), |
| VertexEventType.V_START, |
| new StartTransition()) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.INITED, VertexState.FAILED), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition(VertexState.INITED, VertexState.KILLED, |
| VertexEventType.V_TERMINATE, |
| new TerminateInitedVertexTransition()) |
| .addTransition(VertexState.INITED, EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| .addTransition(VertexState.INITED, VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Transitions from RUNNING state |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.TERMINATING), |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| new RootInputInitFailedTransition()) |
| .addTransition(VertexState.RUNNING, VertexState.RUNNING, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition |
| (VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, |
| VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED, |
| VertexState.ERROR), |
| VertexEventType.V_TASK_COMPLETED, |
| new TaskCompletedTransition()) |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING), |
| VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, |
| new OneToOneSourceSplitTransition()) |
| .addTransition(VertexState.RUNNING, VertexState.TERMINATING, |
| VertexEventType.V_TERMINATE, |
| new VertexKilledTransition()) |
| .addTransition(VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| .addTransition(VertexState.RUNNING, VertexState.RUNNING, |
| VertexEventType.V_TASK_RESCHEDULED, |
| new TaskRescheduledTransition()) |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, |
| VertexState.FAILED), |
| VertexEventType.V_COMPLETED, |
| new VertexNoTasksCompletedTransition()) |
| .addTransition( |
| VertexState.RUNNING, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition( |
| VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition( |
| VertexState.RUNNING, |
| VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, |
| TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION) |
| |
| // Transitions from TERMINATING state. |
| .addTransition |
| (VertexState.TERMINATING, |
| EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED), |
| VertexEventType.V_TASK_COMPLETED, |
| new TaskCompletedTransition()) |
| .addTransition( |
| VertexState.TERMINATING, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(VertexState.TERMINATING, VertexState.TERMINATING, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_RESCHEDULED)) |
| |
| // Transitions from SUCCEEDED state |
| .addTransition( |
| VertexState.SUCCEEDED, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition(VertexState.SUCCEEDED, |
| EnumSet.of(VertexState.RUNNING, VertexState.FAILED), |
| VertexEventType.V_TASK_RESCHEDULED, |
| new TaskRescheduledAfterVertexSuccessTransition()) |
| |
| .addTransition( |
| VertexState.SUCCEEDED, |
| EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED), |
| // accumulate these in case we get restarted |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition( |
| VertexState.SUCCEEDED, |
| EnumSet.of(VertexState.FAILED, VertexState.ERROR), |
| VertexEventType.V_TASK_COMPLETED, |
| new TaskCompletedAfterVertexSuccessTransition()) |
| // Ignore-able events |
| .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, |
| // after we are done reruns of source tasks should not affect |
| // us. These reruns may be triggered by other consumer vertices. |
| // We should have been in RUNNING state if we had triggered the |
| // reruns. |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) |
| .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| new TaskAttemptCompletedEventTransition()) |
| |
| |
| // Transitions from FAILED state |
| .addTransition( |
| VertexState.FAILED, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(VertexState.FAILED, VertexState.FAILED, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_START, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, |
| VertexEventType.V_TASK_COMPLETED, |
| VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_RECOVERED)) |
| |
| // Transitions from KILLED state |
| .addTransition( |
| VertexState.KILLED, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(VertexState.KILLED, VertexState.KILLED, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_INIT, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_START, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, |
| VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_COMPLETED, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_RECOVERED)) |
| |
| // No transitions from INTERNAL_ERROR state. Ignore all. |
| .addTransition( |
| VertexState.ERROR, |
| VertexState.ERROR, |
| EnumSet.of(VertexEventType.V_INIT, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_START, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, |
| VertexEventType.V_TASK_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_INTERNAL_ERROR, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_RECOVERED)) |
| // create the topology tables |
| .installTopology(); |
| |
| private void augmentStateMachine() { |
| stateMachine |
| .registerStateEnteredCallback(VertexState.SUCCEEDED, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.FAILED, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.KILLED, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.RUNNING, |
| STATE_CHANGED_CALLBACK); |
| } |
| |
| private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine; |
| |
| //changing fields while the vertex is running |
| @VisibleForTesting |
| int numTasks; |
| @VisibleForTesting |
| int completedTaskCount = 0; |
| @VisibleForTesting |
| int succeededTaskCount = 0; |
| @VisibleForTesting |
| int failedTaskCount = 0; |
| @VisibleForTesting |
| int killedTaskCount = 0; |
| |
| // Both failed and killed task attempt counts are incremented via direct calls |
| // and not via state machine changes as they always increase. In no situation, does |
| // the counter need to be reset or changed back as failed attempts never go back to succeeded. |
| // Likewise for killed attempts. |
| // The same cannot apply to succeeded task attempts if they are tracked as they might be |
| // subsequently declared as failed. |
| @VisibleForTesting |
| AtomicInteger failedTaskAttemptCount = new AtomicInteger(0); |
| @VisibleForTesting |
| AtomicInteger killedTaskAttemptCount = new AtomicInteger(0); |
| |
| @VisibleForTesting |
| long initTimeRequested; // Time at which INIT request was received. |
| @VisibleForTesting |
| long initedTime; // Time when entering state INITED |
| @VisibleForTesting |
| long startTimeRequested; // Time at which START request was received. |
| @VisibleForTesting |
| long startedTime; // Time when entering state STARTED |
| @VisibleForTesting |
| long finishTime; |
| private float progress; |
| |
| private final TezVertexID vertexId; //runtime assigned id. |
| private final VertexPlan vertexPlan; |
| private boolean initWaitsForRootInitializers = false; |
| |
| private final String vertexName; |
| private final ProcessorDescriptor processorDescriptor; |
| |
| private boolean vertexToBeReconfiguredByManager = false; |
| AtomicBoolean vmIsInitialized = new AtomicBoolean(false); |
| AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); |
| |
| @VisibleForTesting |
| Map<Vertex, Edge> sourceVertices; |
| private Map<Vertex, Edge> targetVertices; |
| Set<Edge> uninitializedEdges = Sets.newHashSet(); |
| |
| private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| rootInputDescriptors; |
| private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> |
| additionalOutputs; |
| private Map<String, OutputCommitter> outputCommitters; |
| private Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>(); |
| private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate |
| .getDefaultSinglePhysicalInputSpecUpdate(); |
| private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>(); |
| private Set<String> inputsWithInitializers; |
| private int numInitializedInputs; |
| private boolean startSignalPending = false; |
| private boolean tasksNotYetScheduled = true; |
| // We may always store task events in the vertex for scalability |
| List<TezEvent> pendingTaskEvents = Lists.newLinkedList(); |
| List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>(); |
| List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList(); |
| |
| |
| private RootInputInitializerManager rootInputInitializerManager; |
| |
| VertexManager vertexManager; |
| |
| private final UserGroupInformation dagUgi; |
| |
| private boolean parallelismSet = false; |
| private TezVertexID originalOneToOneSplitSource = null; |
| |
| private AtomicBoolean committed = new AtomicBoolean(false); |
| private AtomicBoolean aborted = new AtomicBoolean(false); |
| private boolean commitVertexOutputs = false; |
| |
| private Map<String, VertexGroupInfo> dagVertexGroups; |
| |
| private TaskLocationHint taskLocationHints[]; |
| private Map<String, LocalResource> localResources; |
| private Map<String, String> environment; |
| private final String javaOpts; |
| private final ContainerContext containerContext; |
| private VertexTerminationCause terminationCause; |
| |
| private String logIdentifier; |
| @VisibleForTesting |
| boolean recoveryCommitInProgress = false; |
| private boolean summaryCompleteSeen = false; |
| @VisibleForTesting |
| boolean hasCommitter = false; |
| private boolean vertexCompleteSeen = false; |
| private Map<String,EdgeManagerPluginDescriptor> recoveredSourceEdgeManagers = null; |
| private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null; |
| |
| // Recovery related flags |
| boolean recoveryInitEventSeen = false; |
| boolean recoveryStartEventSeen = false; |
| private VertexStats vertexStats = null; |
| |
| private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts; |
| |
| public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, |
| String vertexName, Configuration conf, EventHandler eventHandler, |
| TaskAttemptListener taskAttemptListener, Clock clock, |
| TaskHeartbeatHandler thh, boolean commitVertexOutputs, |
| AppContext appContext, VertexLocationHint vertexLocationHint, |
| Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, |
| StateChangeNotifier entityStatusTracker) { |
| this.vertexId = vertexId; |
| this.vertexPlan = vertexPlan; |
| this.vertexName = StringInterner.weakIntern(vertexName); |
| this.conf = conf; |
| this.clock = clock; |
| this.appContext = appContext; |
| this.commitVertexOutputs = commitVertexOutputs; |
| |
| this.taskAttemptListener = taskAttemptListener; |
| this.taskHeartbeatHandler = thh; |
| this.eventHandler = eventHandler; |
| ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); |
| this.readLock = readWriteLock.readLock(); |
| this.writeLock = readWriteLock.writeLock(); |
| |
| if (LOG.isDebugEnabled()) { |
| logLocationHints(this.vertexName, vertexLocationHint); |
| } |
| setTaskLocationHints(vertexLocationHint); |
| |
| this.dagUgi = appContext.getCurrentDAG().getDagUGI(); |
| |
| this.taskResource = DagTypeConverters |
| .createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig()); |
| this.processorDescriptor = DagTypeConverters |
| .convertProcessorDescriptorFromDAGPlan(vertexPlan |
| .getProcessorDescriptor()); |
| this.localResources = DagTypeConverters |
| .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig() |
| .getLocalResourceList()); |
| this.environment = DagTypeConverters |
| .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig() |
| .getEnvironmentSettingList()); |
| this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan |
| .getTaskConfig().getJavaOpts() : null; |
| this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption; |
| this.containerContext = new ContainerContext(this.localResources, |
| appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this); |
| |
| if (vertexPlan.getInputsCount() > 0) { |
| setAdditionalInputs(vertexPlan.getInputsList()); |
| } |
| if (vertexPlan.getOutputsCount() > 0) { |
| setAdditionalOutputs(vertexPlan.getOutputsList()); |
| } |
| this.stateChangeNotifier = entityStatusTracker; |
| |
| // Setup the initial parallelism early. This may be changed after |
| // initialization or on a setParallelism call. |
| this.numTasks = vertexPlan.getTaskConfig().getNumTasks(); |
| // Not sending the notifier a parallelism update since this is the initial parallelism |
| |
| this.dagVertexGroups = dagVertexGroups; |
| |
| isSpeculationEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, |
| TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT); |
| |
| if (isSpeculationEnabled()) { |
| speculator = new LegacySpeculator(conf, getAppContext(), this); |
| } |
| |
| logIdentifier = this.getVertexId() + " [" + this.getName() + "]"; |
| // This "this leak" is okay because the retained pointer is in an |
| // instance variable. |
| |
| stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>( |
| stateMachineFactory.make(this), this); |
| augmentStateMachine(); |
| } |
| |
| private boolean isSpeculationEnabled() { |
| return isSpeculationEnabled; |
| } |
| |
| protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() { |
| return stateMachine; |
| } |
| |
| @Override |
| public TezVertexID getVertexId() { |
| return vertexId; |
| } |
| |
| @Override |
| public VertexPlan getVertexPlan() { |
| return vertexPlan; |
| } |
| |
| @Override |
| public int getDistanceFromRoot() { |
| return distanceFromRoot; |
| } |
| |
| @Override |
| public String getName() { |
| return vertexName; |
| } |
| |
| EventHandler getEventHandler() { |
| return this.eventHandler; |
| } |
| |
| @Override |
| public Task getTask(TezTaskID taskID) { |
| readLock.lock(); |
| try { |
| return tasks.get(taskID); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Task getTask(int taskIndex) { |
| return getTask(TezTaskID.getInstance(this.vertexId, taskIndex)); |
| } |
| |
| @Override |
| public int getTotalTasks() { |
| return numTasks; |
| } |
| |
| @Override |
| public int getCompletedTasks() { |
| readLock.lock(); |
| try { |
| return succeededTaskCount + failedTaskCount + killedTaskCount; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getSucceededTasks() { |
| readLock.lock(); |
| try { |
| return succeededTaskCount; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getRunningTasks() { |
| readLock.lock(); |
| try { |
| int num=0; |
| for (Task task : tasks.values()) { |
| if(task.getState() == TaskState.RUNNING) |
| num++; |
| } |
| return num; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TezCounters getAllCounters() { |
| |
| readLock.lock(); |
| |
| try { |
| VertexState state = getInternalState(); |
| if (state == VertexState.ERROR || state == VertexState.FAILED |
| || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { |
| this.mayBeConstructFinalFullCounters(); |
| return fullCounters; |
| } |
| |
| TezCounters counters = new TezCounters(); |
| return incrTaskCounters(counters, tasks.values()); |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public VertexStats getVertexStats() { |
| |
| readLock.lock(); |
| try { |
| VertexState state = getInternalState(); |
| if (state == VertexState.ERROR || state == VertexState.FAILED |
| || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { |
| this.mayBeConstructFinalFullCounters(); |
| return this.vertexStats; |
| } |
| |
| VertexStats stats = new VertexStats(); |
| return updateVertexStats(stats, tasks.values()); |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| |
| public static TezCounters incrTaskCounters( |
| TezCounters counters, Collection<Task> tasks) { |
| for (Task task : tasks) { |
| counters.incrAllCounters(task.getCounters()); |
| } |
| return counters; |
| } |
| |
| public static VertexStats updateVertexStats( |
| VertexStats stats, Collection<Task> tasks) { |
| for (Task task : tasks) { |
| stats.updateStats(task.getReport()); |
| } |
| return stats; |
| } |
| |
| |
| @Override |
| public List<String> getDiagnostics() { |
| readLock.lock(); |
| try { |
| return diagnostics; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public float getProgress() { |
| this.readLock.lock(); |
| try { |
| computeProgress(); |
| return progress; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ProgressBuilder getVertexProgress() { |
| this.readLock.lock(); |
| try { |
| ProgressBuilder progress = new ProgressBuilder(); |
| progress.setTotalTaskCount(numTasks); |
| progress.setSucceededTaskCount(succeededTaskCount); |
| progress.setRunningTaskCount(getRunningTasks()); |
| progress.setFailedTaskCount(failedTaskCount); |
| progress.setKilledTaskCount(killedTaskCount); |
| progress.setFailedTaskAttemptCount(failedTaskAttemptCount.get()); |
| progress.setKilledTaskAttemptCount(killedTaskAttemptCount.get()); |
| return progress; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public VertexStatusBuilder getVertexStatus( |
| Set<StatusGetOpts> statusOptions) { |
| this.readLock.lock(); |
| try { |
| VertexStatusBuilder status = new VertexStatusBuilder(); |
| status.setState(getInternalState()); |
| status.setDiagnostics(diagnostics); |
| status.setProgress(getVertexProgress()); |
| if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) { |
| status.setVertexCounters(getAllCounters()); |
| } |
| return status; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TaskLocationHint getTaskLocationHint(TezTaskID taskId) { |
| this.readLock.lock(); |
| try { |
| if (taskLocationHints == null || |
| taskLocationHints.length <= taskId.getId()) { |
| return null; |
| } |
| return taskLocationHints[taskId.getId()]; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| private void computeProgress() { |
| this.readLock.lock(); |
| try { |
| float progress = 0f; |
| for (Task task : this.tasks.values()) { |
| progress += (task.isFinished() ? 1f : task.getProgress()); |
| } |
| if (this.numTasks != 0) { |
| progress /= this.numTasks; |
| } |
| this.progress = progress; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Map<TezTaskID, Task> getTasks() { |
| synchronized (tasksSyncHandle) { |
| lazyTasksCopyNeeded = true; |
| return Collections.unmodifiableMap(tasks); |
| } |
| } |
| |
| @Override |
| public VertexState getState() { |
| readLock.lock(); |
| try { |
| return getStateMachine().getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Set the terminationCause if it had not yet been set. |
| * |
| * @param trigger The trigger |
| * @return true if setting the value succeeded. |
| */ |
| boolean trySetTerminationCause(VertexTerminationCause trigger) { |
| if(terminationCause == null){ |
| terminationCause = trigger; |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public VertexTerminationCause getTerminationCause(){ |
| readLock.lock(); |
| try { |
| return terminationCause; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public AppContext getAppContext() { |
| return this.appContext; |
| } |
| |
| private void handleParallelismUpdate(int newParallelism, |
| Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, |
| Map<String, InputSpecUpdate> rootInputSpecUpdates) { |
| LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks; |
| Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet() |
| .iterator(); |
| int i = 0; |
| while (iter.hasNext()) { |
| i++; |
| iter.next(); |
| if (i <= newParallelism) { |
| continue; |
| } |
| iter.remove(); |
| } |
| this.recoveredSourceEdgeManagers = sourceEdgeManagers; |
| this.recoveredRootInputSpecUpdates = rootInputSpecUpdates; |
| } |
| |
| @Override |
| public VertexState restoreFromEvent(HistoryEvent historyEvent) { |
| switch (historyEvent.getEventType()) { |
| case VERTEX_INITIALIZED: |
| recoveryInitEventSeen = true; |
| recoveredState = setupVertex((VertexInitializedEvent) historyEvent); |
| createTasks(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Recovered state for vertex after Init event" |
| + ", vertex=" + logIdentifier |
| + ", recoveredState=" + recoveredState); |
| } |
| return recoveredState; |
| case VERTEX_STARTED: |
| if (!recoveryInitEventSeen) { |
| throw new RuntimeException("Started Event seen but" |
| + " no Init Event was encountered earlier"); |
| } |
| recoveryStartEventSeen = true; |
| VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent; |
| startTimeRequested = startedEvent.getStartRequestedTime(); |
| startedTime = startedEvent.getStartTime(); |
| recoveredState = VertexState.RUNNING; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Recovered state for vertex after Started event" |
| + ", vertex=" + logIdentifier |
| + ", recoveredState=" + recoveredState); |
| } |
| return recoveredState; |
| case VERTEX_PARALLELISM_UPDATED: |
| VertexParallelismUpdatedEvent updatedEvent = |
| (VertexParallelismUpdatedEvent) historyEvent; |
| if (updatedEvent.getVertexLocationHint() != null) { |
| setTaskLocationHints(updatedEvent.getVertexLocationHint()); |
| } |
| int oldNumTasks = numTasks; |
| numTasks = updatedEvent.getNumTasks(); |
| stateChangeNotifier.stateChanged(vertexId, |
| new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); |
| handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(), |
| updatedEvent.getRootInputSpecUpdates()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Recovered state for vertex after parallelism updated event" |
| + ", vertex=" + logIdentifier |
| + ", recoveredState=" + recoveredState); |
| } |
| return recoveredState; |
| case VERTEX_COMMIT_STARTED: |
| recoveryCommitInProgress = true; |
| hasCommitter = true; |
| return recoveredState; |
| case VERTEX_FINISHED: |
| VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent; |
| if (finishedEvent.isFromSummary()) { |
| summaryCompleteSeen = true; |
| } else { |
| vertexCompleteSeen = true; |
| } |
| numTasks = finishedEvent.getNumTasks(); |
| recoveryCommitInProgress = false; |
| recoveredState = finishedEvent.getState(); |
| diagnostics.add(finishedEvent.getDiagnostics()); |
| finishTime = finishedEvent.getFinishTime(); |
| // TODO counters ?? |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Recovered state for vertex after finished event" |
| + ", vertex=" + logIdentifier |
| + ", recoveredState=" + recoveredState); |
| } |
| return recoveredState; |
| case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: |
| VertexRecoverableEventsGeneratedEvent vEvent = |
| (VertexRecoverableEventsGeneratedEvent) historyEvent; |
| this.recoveredEvents.addAll(vEvent.getTezEvents()); |
| return recoveredState; |
| default: |
| throw new RuntimeException("Unexpected event received for restoring" |
| + " state, eventType=" + historyEvent.getEventType()); |
| |
| } |
| } |
| |
| @Override |
| public String getLogIdentifier() { |
| return this.logIdentifier; |
| } |
| |
| @Override |
| public void incrementFailedTaskAttemptCount() { |
| this.failedTaskAttemptCount.incrementAndGet(); |
| } |
| |
| @Override |
| public void incrementKilledTaskAttemptCount() { |
| this.killedTaskAttemptCount.incrementAndGet(); |
| } |
| |
| @Override |
| public int getFailedTaskAttemptCount() { |
| return this.failedTaskAttemptCount.get(); |
| } |
| |
| @Override |
| public int getKilledTaskAttemptCount() { |
| return this.killedTaskAttemptCount.get(); |
| } |
| |
| private void setTaskLocationHints(VertexLocationHint vertexLocationHint) { |
| if (vertexLocationHint != null && |
| vertexLocationHint.getTaskLocationHints() != null && |
| !vertexLocationHint.getTaskLocationHints().isEmpty()) { |
| List<TaskLocationHint> locHints = vertexLocationHint.getTaskLocationHints(); |
| taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]); |
| } |
| } |
| |
| @Override |
| public void scheduleSpeculativeTask(TezTaskID taskId) { |
| Preconditions.checkState(taskId.getId() < numTasks); |
| eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| } |
| |
| @Override |
| public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) { |
| writeLock.lock(); |
| try { |
| tasksNotYetScheduled = false; |
| if (!pendingTaskEvents.isEmpty()) { |
| LOG.info("Routing pending task events for vertex: " + logIdentifier); |
| try { |
| handleRoutedTezEvents(this, pendingTaskEvents, false); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier; |
| LOG.error(msg, e); |
| addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| eventHandler.handle(new VertexEventTermination(vertexId, VertexTerminationCause.AM_USERCODE_FAILURE)); |
| return; |
| } |
| pendingTaskEvents.clear(); |
| } |
| for (TaskWithLocationHint task : tasksToSchedule) { |
| if (numTasks <= task.getTaskIndex().intValue()) { |
| throw new TezUncheckedException( |
| "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier); |
| } |
| TaskLocationHint locationHint = task.getTaskLocationHint(); |
| if (locationHint != null) { |
| if (taskLocationHints == null) { |
| taskLocationHints = new TaskLocationHint[numTasks]; |
| } |
| taskLocationHints[task.getTaskIndex().intValue()] = locationHint; |
| } |
| eventHandler.handle(new TaskEvent( |
| TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()), |
| TaskEventType.T_SCHEDULE)); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, |
| Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, |
| Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) |
| throws AMUserCodeException { |
| setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, |
| false, fromVertexManager); |
| } |
| |
| private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, |
| Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, |
| Map<String, InputSpecUpdate> rootInputSpecUpdates, |
| boolean recovering, boolean fromVertexManager) throws AMUserCodeException { |
| if (recovering) { |
| writeLock.lock(); |
| try { |
| if (sourceEdgeManagers != null) { |
| for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : |
| sourceEdgeManagers.entrySet()) { |
| LOG.info("Recovering edge manager for source:" |
| + entry.getKey() + " destination: " + getLogIdentifier()); |
| Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); |
| Edge edge = sourceVertices.get(sourceVertex); |
| try { |
| edge.setCustomEdgeManager(entry.getValue()); |
| } catch (Exception e) { |
| throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge," |
| + "sourceVertex:" + edge.getSourceVertexName() |
| + "destinationVertex:" + edge.getDestinationVertexName(), e); |
| } |
| } |
| } |
| |
| // Restore any rootInputSpecUpdates which may have been registered during a parallelism |
| // update. |
| if (rootInputSpecUpdates != null) { |
| LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString()); |
| this.rootInputSpecs.putAll(rootInputSpecUpdates); |
| } |
| return; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " |
| + parallelism + " for vertex: " + logIdentifier); |
| setVertexLocationHint(vertexLocationHint); |
| writeLock.lock(); |
| |
| try { |
| if (parallelismSet == true) { |
| String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier; |
| LOG.info(msg); |
| throw new TezUncheckedException(msg); |
| } |
| |
| if (fromVertexManager && canInitVertex()) { |
| // vertex is fully defined. setParallelism has been called. VertexManager should have |
| // informed us about this. Otherwise we would have notified listeners that we are fully |
| // defined before we are actually fully defined |
| Preconditions.checkState(vertexToBeReconfiguredByManager, "Vertex is fully configured but still" |
| + " the reconfiguration API has been called. VertexManager must notify the framework using " |
| + " context.vertexReconfigurationPlanned() before re-configuring the vertex."); |
| } |
| |
| parallelismSet = true; |
| |
| // Input initializer/Vertex Manager/1-1 split expected to set parallelism. |
| if (numTasks == -1) { |
| if (getState() != VertexState.INITIALIZING) { |
| throw new TezUncheckedException( |
| "Vertex state is not Initializing. Value: " + getState() |
| + " for vertex: " + logIdentifier); |
| } |
| |
| if(sourceEdgeManagers != null) { |
| for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) { |
| LOG.info("Replacing edge manager for source:" |
| + entry.getKey() + " destination: " + getLogIdentifier()); |
| Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); |
| Edge edge = sourceVertices.get(sourceVertex); |
| try { |
| edge.setCustomEdgeManager(entry.getValue()); |
| } catch (Exception e) { |
| throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge," |
| + "sourceVertex:" + edge.getSourceVertexName() |
| + "destinationVertex:" + edge.getDestinationVertexName(), e); |
| } |
| } |
| } |
| if (rootInputSpecUpdates != null) { |
| LOG.info("Got updated RootInputsSpecs: " + rootInputSpecUpdates.toString()); |
| // Sanity check for correct number of updates. |
| for (Entry<String, InputSpecUpdate> rootInputSpecUpdateEntry : rootInputSpecUpdates |
| .entrySet()) { |
| Preconditions |
| .checkState( |
| rootInputSpecUpdateEntry.getValue().isForAllWorkUnits() |
| || (rootInputSpecUpdateEntry.getValue().getAllNumPhysicalInputs() != null && rootInputSpecUpdateEntry |
| .getValue().getAllNumPhysicalInputs().size() == parallelism), |
| "Not enough input spec updates for root input named " |
| + rootInputSpecUpdateEntry.getKey()); |
| } |
| this.rootInputSpecs.putAll(rootInputSpecUpdates); |
| } |
| int oldNumTasks = numTasks; |
| this.numTasks = parallelism; |
| stateChangeNotifier.stateChanged(vertexId, |
| new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); |
| this.createTasks(); |
| LOG.info("Vertex " + getLogIdentifier() + |
| " parallelism set to " + parallelism); |
| if (canInitVertex()) { |
| getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT)); |
| } |
| } else { |
| // This is an artificial restriction since there's no way of knowing whether a VertexManager |
| // will attempt to update root input specs. When parallelism has not been initialized, the |
| // Vertex will not be in started state so it's safe to update the specifications. |
| // TODO TEZ-937 - add e mechanism to query vertex managers, or for VMs to indicate readines |
| // for a vertex to start. |
| Preconditions.checkState(rootInputSpecUpdates == null, |
| "Root Input specs can only be updated when the vertex is configured with -1 tasks"); |
| if (parallelism >= numTasks) { |
| // not that hard to support perhaps. but checking right now since there |
| // is no use case for it and checking may catch other bugs. |
| String msg = "Increasing parallelism is not supported, vertexId=" + logIdentifier; |
| LOG.warn(msg); |
| throw new TezUncheckedException(msg); |
| } |
| if (parallelism == numTasks) { |
| LOG.info("setParallelism same as current value: " + parallelism + |
| " for vertex: " + logIdentifier); |
| Preconditions.checkArgument(sourceEdgeManagers != null, |
| "Source edge managers or RootInputSpecs must be set when not changing parallelism"); |
| } else { |
| LOG.info( |
| "Resetting vertex location hints due to change in parallelism for vertex: " + logIdentifier); |
| vertexLocationHint = null; |
| } |
| |
| // start buffering incoming events so that we can re-route existing events |
| for (Edge edge : sourceVertices.values()) { |
| edge.startEventBuffering(); |
| } |
| |
| // assign to local variable of LinkedHashMap to make sure that changing |
| // type of task causes compile error. We depend on LinkedHashMap for order |
| LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks; |
| Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet() |
| .iterator(); |
| int i = 0; |
| while (iter.hasNext()) { |
| i++; |
| Map.Entry<TezTaskID, Task> entry = iter.next(); |
| Task task = entry.getValue(); |
| if (task.getState() != TaskState.NEW) { |
| String msg = "All tasks must be in initial state when changing parallelism" |
| + " for vertex: " + getLogIdentifier(); |
| LOG.warn(msg); |
| throw new TezUncheckedException(msg); |
| } |
| if (i <= parallelism) { |
| continue; |
| } |
| LOG.info("Removing task: " + entry.getKey()); |
| iter.remove(); |
| } |
| LOG.info("Vertex " + logIdentifier + |
| " parallelism set to " + parallelism + " from " + numTasks); |
| int oldNumTasks = numTasks; |
| this.numTasks = parallelism; |
| stateChangeNotifier.stateChanged(vertexId, |
| new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); |
| assert tasks.size() == numTasks; |
| |
| // set new edge managers |
| if(sourceEdgeManagers != null) { |
| for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) { |
| LOG.info("Replacing edge manager for source:" |
| + entry.getKey() + " destination: " + getLogIdentifier()); |
| Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); |
| Edge edge = sourceVertices.get(sourceVertex); |
| try { |
| edge.setCustomEdgeManager(entry.getValue()); |
| } catch (Exception e) { |
| throw new TezUncheckedException(e); |
| } |
| } |
| } |
| |
| VertexParallelismUpdatedEvent parallelismUpdatedEvent = |
| new VertexParallelismUpdatedEvent(vertexId, numTasks, |
| vertexLocationHint, |
| sourceEdgeManagers, rootInputSpecUpdates, oldNumTasks); |
| appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(), |
| parallelismUpdatedEvent)); |
| |
| // stop buffering events |
| for (Edge edge : sourceVertices.values()) { |
| edge.stopEventBuffering(); |
| } |
| } |
| |
| for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) { |
| Edge edge = entry.getValue(); |
| if (edge.getEdgeProperty().getDataMovementType() |
| == DataMovementType.ONE_TO_ONE) { |
| // inform these target vertices that we have changed parallelism |
| VertexEventOneToOneSourceSplit event = |
| new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(), |
| getVertexId(), |
| ((originalOneToOneSplitSource!=null) ? |
| originalOneToOneSplitSource : getVertexId()), |
| numTasks); |
| getEventHandler().handle(event); |
| } |
| } |
| |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void setVertexLocationHint(VertexLocationHint vertexLocationHint) { |
| writeLock.lock(); |
| try { |
| if (LOG.isDebugEnabled()) { |
| logLocationHints(this.vertexName, vertexLocationHint); |
| } |
| setTaskLocationHints(vertexLocationHint); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void vertexReconfigurationPlanned() { |
| vertexReconfigurationPlanned(false); |
| } |
| |
| public void vertexReconfigurationPlanned(boolean testOverride) { |
| writeLock.lock(); |
| try { |
| if (testOverride) { |
| Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(), |
| "test should override only failed cases"); |
| } else { |
| Preconditions.checkState(!vmIsInitialized.get(), |
| "context.vertexReconfigurationPlanned() cannot be called after initialize()"); |
| Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() " |
| + " cannot be invoked after the vertex has been configured."); |
| } |
| this.vertexToBeReconfiguredByManager = true; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void doneReconfiguringVertex() { |
| writeLock.lock(); |
| try { |
| Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be " |
| + "invoked only after vertexReconfigurationPlanned() is invoked"); |
| this.vertexToBeReconfiguredByManager = false; |
| if (completelyConfiguredSent.compareAndSet(false, true)) { |
| // vertex already started and at that time this event was not sent. Send now. |
| stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName, |
| org.apache.tez.dag.api.event.VertexState.CONFIGURED)); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| /** |
| * The only entry point to change the Vertex. |
| */ |
| public void handle(VertexEvent event) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing VertexEvent " + event.getVertexId() |
| + " of type " + event.getType() + " while in state " |
| + getInternalState() + ". Event: " + event); |
| } |
| try { |
| writeLock.lock(); |
| VertexState oldState = getInternalState(); |
| try { |
| getStateMachine().doTransition(event.getType(), event); |
| } catch (InvalidStateTransitonException e) { |
| String message = "Invalid event " + event.getType() + |
| " on vertex " + this.vertexName + |
| " with vertexId " + this.vertexId + |
| " at current state " + oldState; |
| LOG.error("Can't handle " + message, e); |
| addDiagnostic(message); |
| eventHandler.handle(new VertexEvent(this.vertexId, |
| VertexEventType.V_INTERNAL_ERROR)); |
| } |
| |
| if (oldState != getInternalState()) { |
| LOG.info(logIdentifier + " transitioned from " + oldState + " to " |
| + getInternalState() + " due to event " |
| + event.getType()); |
| } |
| } |
| |
| finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private VertexState getInternalState() { |
| readLock.lock(); |
| try { |
| return getStateMachine().getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| //helpful in testing |
| protected void addTask(Task task) { |
| synchronized (tasksSyncHandle) { |
| if (lazyTasksCopyNeeded) { |
| LinkedHashMap<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>(); |
| newTasks.putAll(tasks); |
| tasks = newTasks; |
| lazyTasksCopyNeeded = false; |
| } |
| } |
| tasks.put(task.getTaskId(), task); |
| // TODO Metrics |
| //metrics.waitingTask(task); |
| } |
| |
| void setFinishTime() { |
| finishTime = clock.getTime(); |
| } |
| |
| |
| void logJobHistoryVertexInitializedEvent() { |
| VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, |
| initTimeRequested, initedTime, numTasks, |
| getProcessorName(), getAdditionalInputs()); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(getDAGId(), initEvt)); |
| } |
| |
| void logJobHistoryVertexStartedEvent() { |
| VertexStartedEvent startEvt = new VertexStartedEvent(vertexId, |
| startTimeRequested, startedTime); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(getDAGId(), startEvt)); |
| } |
| |
| void logJobHistoryVertexFinishedEvent() throws IOException { |
| this.setFinishTime(); |
| logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, ""); |
| } |
| |
| void logJobHistoryVertexFailedEvent(VertexState state) throws IOException { |
| logJobHistoryVertexCompletedHelper(state, clock.getTime(), |
| StringUtils.join(getDiagnostics(), LINE_SEPARATOR)); |
| } |
| |
| private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime, |
| String diagnostics) throws IOException { |
| Map<String, Integer> taskStats = new HashMap<String, Integer>(); |
| taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount); |
| taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount); |
| taskStats.put(ATSConstants.NUM_FAILED_TASKS, failedTaskCount); |
| taskStats.put(ATSConstants.NUM_KILLED_TASKS, killedTaskCount); |
| taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get()); |
| taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get()); |
| |
| VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, initTimeRequested, |
| initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics, |
| getAllCounters(), getVertexStats(), taskStats); |
| this.appContext.getHistoryHandler().handleCriticalEvent( |
| new DAGHistoryEvent(getDAGId(), finishEvt)); |
| } |
| |
| static VertexState checkVertexForCompletion(final VertexImpl vertex) { |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Checking for vertex completion for " |
| + vertex.logIdentifier |
| + ", numTasks=" + vertex.numTasks |
| + ", failedTaskCount=" + vertex.failedTaskCount |
| + ", killedTaskCount=" + vertex.killedTaskCount |
| + ", successfulTaskCount=" + vertex.succeededTaskCount |
| + ", completedTaskCount=" + vertex.completedTaskCount |
| + ", terminationCause=" + vertex.terminationCause); |
| } |
| |
| //check for vertex failure first |
| if (vertex.completedTaskCount > vertex.tasks.size()) { |
| LOG.error("task completion accounting issue: completedTaskCount > nTasks:" |
| + " for vertex " + vertex.logIdentifier |
| + ", numTasks=" + vertex.numTasks |
| + ", failedTaskCount=" + vertex.failedTaskCount |
| + ", killedTaskCount=" + vertex.killedTaskCount |
| + ", successfulTaskCount=" + vertex.succeededTaskCount |
| + ", completedTaskCount=" + vertex.completedTaskCount |
| + ", terminationCause=" + vertex.terminationCause); |
| } |
| |
| if (vertex.completedTaskCount == vertex.tasks.size()) { |
| //Only succeed if tasks complete successfully and no terminationCause is registered. |
| if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) { |
| LOG.info("Vertex succeeded: " + vertex.logIdentifier); |
| try { |
| if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) { |
| // commit only once. Dont commit shared outputs |
| LOG.info("Invoking committer commit for vertex, vertexId=" |
| + vertex.logIdentifier); |
| if (vertex.outputCommitters != null |
| && !vertex.outputCommitters.isEmpty()) { |
| boolean firstCommit = true; |
| for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) { |
| final OutputCommitter committer = entry.getValue(); |
| final String outputName = entry.getKey(); |
| if (vertex.sharedOutputs.contains(outputName)) { |
| // dont commit shared committers. Will be committed by the DAG |
| continue; |
| } |
| if (firstCommit) { |
| // Log commit start event on first actual commit |
| try { |
| vertex.appContext.getHistoryHandler().handleCriticalEvent( |
| new DAGHistoryEvent(vertex.getDAGId(), |
| new VertexCommitStartedEvent(vertex.vertexId, |
| vertex.clock.getTime()))); |
| } catch (IOException e) { |
| LOG.error("Failed to persist commit start event to recovery, vertex=" |
| + vertex.logIdentifier, e); |
| vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR); |
| return vertex.finished(VertexState.FAILED); |
| } |
| } else { |
| firstCommit = false; |
| } |
| vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| LOG.info("Invoking committer commit for output=" + outputName |
| + ", vertexId=" + vertex.logIdentifier); |
| committer.commitOutput(); |
| return null; |
| } |
| }); |
| } |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("Failed to do commit on vertex, vertexId=" |
| + vertex.logIdentifier, e); |
| vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE); |
| return vertex.finished(VertexState.FAILED); |
| } |
| return vertex.finished(VertexState.SUCCEEDED); |
| } |
| else if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){ |
| vertex.setFinishTime(); |
| String diagnosticMsg = "Vertex killed due to user-initiated job kill. " |
| + "failedTasks:" |
| + vertex.failedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.addDiagnostic(diagnosticMsg); |
| vertex.abortVertex(VertexStatus.State.KILLED); |
| return vertex.finished(VertexState.KILLED); |
| } |
| else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){ |
| vertex.setFinishTime(); |
| String diagnosticMsg = "Vertex killed as other vertex failed. " |
| + "failedTasks:" |
| + vertex.failedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.addDiagnostic(diagnosticMsg); |
| vertex.abortVertex(VertexStatus.State.KILLED); |
| return vertex.finished(VertexState.KILLED); |
| } |
| else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){ |
| if(vertex.failedTaskCount == 0){ |
| LOG.error("task failure accounting error. terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0"); |
| } |
| vertex.setFinishTime(); |
| String diagnosticMsg = "Vertex failed as one or more tasks failed. " |
| + "failedTasks:" |
| + vertex.failedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.addDiagnostic(diagnosticMsg); |
| vertex.abortVertex(VertexStatus.State.FAILED); |
| return vertex.finished(VertexState.FAILED); |
| } |
| else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) { |
| vertex.setFinishTime(); |
| String diagnosticMsg = "Vertex failed/killed due to internal error. " |
| + "failedTasks:" |
| + vertex.failedTaskCount |
| + " killedTasks:" |
| + vertex.killedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.abortVertex(State.FAILED); |
| return vertex.finished(VertexState.FAILED); |
| } |
| else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) { |
| vertex.setFinishTime(); |
| String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. " |
| + "failedTasks:" |
| + vertex.failedTaskCount |
| + " killedTasks:" |
| + vertex.killedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.abortVertex(State.FAILED); |
| return vertex.finished(VertexState.FAILED); |
| } |
| else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) { |
| vertex.setFinishTime(); |
| String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. " |
| + "failedTasks:" |
| + vertex.failedTaskCount |
| + " killedTasks:" |
| + vertex.killedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.abortVertex(State.FAILED); |
| return vertex.finished(VertexState.FAILED); |
| } |
| else { |
| //should never occur |
| throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier |
| + ", failedTaskCount=" + vertex.failedTaskCount |
| + ", killedTaskCount=" + vertex.killedTaskCount |
| + ", successfulTaskCount=" + vertex.succeededTaskCount |
| + ", completedTaskCount=" + vertex.completedTaskCount |
| + ", terminationCause=" + vertex.terminationCause); |
| } |
| } |
| |
| //return the current state, Vertex not finished yet |
| return vertex.getInternalState(); |
| } |
| |
| /** |
| * Set the terminationCause and send a kill-message to all tasks. |
| * The task-kill messages are only sent once. |
| */ |
| void tryEnactKill(VertexTerminationCause trigger, |
| TaskTerminationCause taskterminationCause) { |
| // In most cases the dag is shutting down due to some error |
| TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN; |
| if (taskterminationCause == TaskTerminationCause.DAG_KILL) { |
| errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT; |
| } |
| if(trySetTerminationCause(trigger)){ |
| String msg = "Killing tasks in vertex: " + logIdentifier + " due to trigger: " + trigger; |
| LOG.info(msg); |
| for (Task task : tasks.values()) { |
| eventHandler.handle( // attempt was terminated because the vertex is shutting down |
| new TaskEventTermination(task.getTaskId(), errCause, msg)); |
| } |
| } |
| } |
| |
| VertexState finished(VertexState finalState, |
| VertexTerminationCause terminationCause, String diag) { |
| if (finishTime == 0) setFinishTime(); |
| if (terminationCause != null) { |
| trySetTerminationCause(terminationCause); |
| } |
| if (rootInputInitializerManager != null) { |
| rootInputInitializerManager.shutdown(); |
| rootInputInitializerManager = null; |
| } |
| |
| switch (finalState) { |
| case ERROR: |
| addDiagnostic("Vertex: " + logIdentifier + " error due to:" + terminationCause); |
| if (!StringUtils.isEmpty(diag)) { |
| addDiagnostic(diag); |
| } |
| eventHandler.handle(new DAGEvent(getDAGId(), |
| DAGEventType.INTERNAL_ERROR)); |
| try { |
| logJobHistoryVertexFailedEvent(finalState); |
| } catch (IOException e) { |
| LOG.error("Failed to send vertex finished event to recovery", e); |
| } |
| break; |
| case KILLED: |
| case FAILED: |
| addDiagnostic("Vertex " + logIdentifier + " killed/failed due to:" + terminationCause); |
| if (!StringUtils.isEmpty(diag)) { |
| addDiagnostic(diag); |
| } |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState, terminationCause)); |
| try { |
| logJobHistoryVertexFailedEvent(finalState); |
| } catch (IOException e) { |
| LOG.error("Failed to send vertex finished event to recovery", e); |
| } |
| break; |
| case SUCCEEDED: |
| try { |
| logJobHistoryVertexFinishedEvent(); |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState)); |
| } catch (IOException e) { |
| LOG.error("Failed to send vertex finished event to recovery", e); |
| finalState = VertexState.FAILED; |
| this.terminationCause = VertexTerminationCause.INTERNAL_ERROR; |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState)); |
| } |
| break; |
| default: |
| throw new TezUncheckedException("Unexpected VertexState: " + finalState); |
| } |
| return finalState; |
| } |
| |
| VertexState finished(VertexState finalState) { |
| return finished(finalState, null, null); |
| } |
| |
| |
| private void initializeCommitters() throws Exception { |
| if (!this.additionalOutputSpecs.isEmpty()) { |
| LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier); |
| for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry: |
| additionalOutputs.entrySet()) { |
| final String outputName = entry.getKey(); |
| final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue(); |
| if (od.getControllerDescriptor() == null |
| || od.getControllerDescriptor().getClassName() == null) { |
| LOG.info("Ignoring committer as none specified for output=" |
| + outputName |
| + ", vertexId=" + logIdentifier); |
| continue; |
| } |
| LOG.info("Instantiating committer for output=" + outputName |
| + ", vertexId=" + logIdentifier |
| + ", committerClass=" + od.getControllerDescriptor().getClassName()); |
| |
| dagUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| OutputCommitterContext outputCommitterContext = |
| new OutputCommitterContextImpl(appContext.getApplicationID(), |
| appContext.getApplicationAttemptId().getAttemptId(), |
| appContext.getCurrentDAG().getName(), |
| vertexName, |
| od, |
| vertexId.getId()); |
| OutputCommitter outputCommitter = ReflectionUtils |
| .createClazzInstance(od.getControllerDescriptor().getClassName(), |
| new Class[]{OutputCommitterContext.class}, |
| new Object[]{outputCommitterContext}); |
| LOG.info("Invoking committer init for output=" + outputName |
| + ", vertexId=" + logIdentifier); |
| outputCommitter.initialize(); |
| outputCommitters.put(outputName, outputCommitter); |
| LOG.info("Invoking committer setup for output=" + outputName |
| + ", vertexId=" + logIdentifier); |
| outputCommitter.setupOutput(); |
| return null; |
| } |
| }); |
| } |
| } |
| } |
| |
| private boolean initializeVertex() { |
| try { |
| initializeCommitters(); |
| } catch (Exception e) { |
| LOG.warn("Vertex Committer init failed, vertex=" + logIdentifier, e); |
| addDiagnostic("Vertex init failed : " |
| + ExceptionUtils.getStackTrace(e)); |
| trySetTerminationCause(VertexTerminationCause.INIT_FAILURE); |
| abortVertex(VertexStatus.State.FAILED); |
| finished(VertexState.FAILED); |
| return false; |
| } |
| |
| // TODO: Metrics |
| initedTime = clock.getTime(); |
| |
| logJobHistoryVertexInitializedEvent(); |
| return true; |
| } |
| |
| /** |
| * If the number of tasks are greater than the configured value |
| * throw an exception that will fail job initialization |
| */ |
| private void checkTaskLimits() { |
| // no code, for now |
| } |
| |
| @VisibleForTesting |
| ContainerContext getContainerContext(int taskIdx) { |
| if (taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(vertexName, taskIdx)) { |
| String jvmOpts = taskSpecificLaunchCmdOpts.getTaskSpecificOption(javaOpts, vertexName, taskIdx); |
| ContainerContext context = new ContainerContext(this.localResources, |
| appContext.getCurrentDAG().getCredentials(), this.environment, jvmOpts); |
| return context; |
| } else { |
| return this.containerContext; |
| } |
| } |
| |
| private void createTasks() { |
| for (int i=0; i < this.numTasks; ++i) { |
| ContainerContext conContext = getContainerContext(i); |
| TaskImpl task = |
| new TaskImpl(this.getVertexId(), i, |
| this.eventHandler, |
| conf, |
| this.taskAttemptListener, |
| this.clock, |
| this.taskHeartbeatHandler, |
| this.appContext, |
| (this.targetVertices != null ? |
| this.targetVertices.isEmpty() : true), |
| this.taskResource, |
| conContext, |
| this.stateChangeNotifier); |
| this.addTask(task); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Created task for vertex " + logIdentifier + ": " + |
| task.getTaskId()); |
| } |
| } |
| |
| } |
| |
| private VertexState setupVertex() { |
| return setupVertex(null); |
| } |
| |
| private VertexState setupVertex(VertexInitializedEvent event) { |
| |
| if (event == null) { |
| initTimeRequested = clock.getTime(); |
| } else { |
| initTimeRequested = event.getInitRequestedTime(); |
| initedTime = event.getInitedTime(); |
| } |
| |
| // VertexManager needs to be setup before attempting to Initialize any |
| // Inputs - since events generated by them will be routed to the |
| // VertexManager for handling. |
| |
| if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) { |
| List<GroupInputSpec> groupSpecList = Lists.newLinkedList(); |
| for (VertexGroupInfo groupInfo : dagVertexGroups.values()) { |
| if (groupInfo.edgeMergedInputs.containsKey(getName())) { |
| InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName()); |
| groupSpecList.add(new GroupInputSpec(groupInfo.groupName, |
| Lists.newLinkedList(groupInfo.groupMembers), mergedInput)); |
| } |
| } |
| if (!groupSpecList.isEmpty()) { |
| groupInputSpecList = groupSpecList; |
| } |
| } |
| |
| // Check if any inputs need initializers |
| if (event != null) { |
| this.rootInputDescriptors = event.getAdditionalInputs(); |
| } else { |
| if (rootInputDescriptors != null) { |
| LOG.info("Root Inputs exist for Vertex: " + getName() + " : " |
| + rootInputDescriptors); |
| for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input |
| : rootInputDescriptors.values()) { |
| if (input.getControllerDescriptor() != null && |
| input.getControllerDescriptor().getClassName() != null) { |
| if (inputsWithInitializers == null) { |
| inputsWithInitializers = Sets.newHashSet(); |
| } |
| inputsWithInitializers.add(input.getName()); |
| LOG.info("Starting root input initializer for input: " |
| + input.getName() + ", with class: [" |
| + input.getControllerDescriptor().getClassName() + "]"); |
| } |
| } |
| } |
| } |
| |
| boolean hasBipartite = false; |
| if (sourceVertices != null) { |
| for (Edge edge : sourceVertices.values()) { |
| if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) { |
| hasBipartite = true; |
| break; |
| } |
| } |
| } |
| |
| if (hasBipartite && inputsWithInitializers != null) { |
| LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment"); |
| if (event != null) { |
| return VertexState.FAILED; |
| } else { |
| return finished(VertexState.FAILED); |
| } |
| } |
| |
| assignVertexManager(); |
| |
| try { |
| vertexManager.initialize(); |
| vmIsInitialized.set(true); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier; |
| LOG.error(msg, e); |
| finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| |
| // Setup tasks early if possible. If the VertexManager is not being used |
| // to set parallelism, sending events to Tasks is safe (and less confusing |
| // then relying on tasks to be created after TaskEvents are generated). |
| // For VertexManagers setting parallelism, the setParallelism call needs |
| // to be inline. |
| if (event != null) { |
| int oldNumTasks = numTasks; |
| numTasks = event.getNumTasks(); |
| stateChangeNotifier.stateChanged(vertexId, |
| new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); |
| } else { |
| numTasks = getVertexPlan().getTaskConfig().getNumTasks(); |
| // Not sending a parallelism update notification since this is from the original plan |
| } |
| |
| if (!(numTasks == -1 || numTasks >= 0)) { |
| addDiagnostic("Invalid task count for vertex" |
| + ", numTasks=" + numTasks); |
| trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS); |
| if (event != null) { |
| abortVertex(VertexStatus.State.FAILED); |
| return finished(VertexState.FAILED); |
| } else { |
| return VertexState.FAILED; |
| } |
| } |
| |
| checkTaskLimits(); |
| return VertexState.INITED; |
| } |
| |
| private void assignVertexManager() { |
| boolean hasBipartite = false; |
| boolean hasOneToOne = false; |
| boolean hasCustom = false; |
| if (sourceVertices != null) { |
| for (Edge edge : sourceVertices.values()) { |
| switch(edge.getEdgeProperty().getDataMovementType()) { |
| case SCATTER_GATHER: |
| hasBipartite = true; |
| break; |
| case ONE_TO_ONE: |
| hasOneToOne = true; |
| break; |
| case BROADCAST: |
| break; |
| case CUSTOM: |
| hasCustom = true; |
| break; |
| default: |
| throw new TezUncheckedException("Unknown data movement type: " + |
| edge.getEdgeProperty().getDataMovementType()); |
| } |
| } |
| } |
| |
| boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin(); |
| |
| if (hasUserVertexManager) { |
| VertexManagerPluginDescriptor pluginDesc = DagTypeConverters |
| .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan |
| .getVertexManagerPlugin()); |
| LOG.info("Setting user vertex manager plugin: " |
| + pluginDesc.getClassName() + " on vertex: " + getLogIdentifier()); |
| vertexManager = new VertexManager(pluginDesc, this, appContext, stateChangeNotifier); |
| } else { |
| // Intended order of picking a vertex manager |
| // If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703 |
| // If there is a custom edge we fall back to default ImmediateStartVertexManager |
| // If there is a one to one edge then we use the InputReadyVertexManager |
| // If there is a scatter-gather edge then we use the ShuffleVertexManager |
| // Else we use the default ImmediateStartVertexManager |
| if (inputsWithInitializers != null) { |
| LOG.info("Setting vertexManager to RootInputVertexManager for " |
| + logIdentifier); |
| vertexManager = new VertexManager( |
| VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()), |
| this, appContext, stateChangeNotifier); |
| } else if (hasOneToOne && !hasCustom) { |
| LOG.info("Setting vertexManager to InputReadyVertexManager for " |
| + logIdentifier); |
| vertexManager = new VertexManager( |
| VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()), |
| this, appContext, stateChangeNotifier); |
| } else if (hasBipartite && !hasCustom) { |
| LOG.info("Setting vertexManager to ShuffleVertexManager for " |
| + logIdentifier); |
| // shuffle vertex manager needs a conf payload |
| vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(), |
| this, appContext, stateChangeNotifier); |
| } else { |
| // schedule all tasks upon vertex start. Default behavior. |
| LOG.info("Setting vertexManager to ImmediateStartVertexManager for " |
| + logIdentifier); |
| vertexManager = new VertexManager( |
| VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()), |
| this, appContext, stateChangeNotifier); |
| } |
| } |
| } |
| |
| public static class StartRecoverTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent; |
| VertexState desiredState = recoverEvent.getDesiredState(); |
| |
| switch (desiredState) { |
| case RUNNING: |
| break; |
| case SUCCEEDED: |
| case KILLED: |
| case FAILED: |
| case ERROR: |
| switch (desiredState) { |
| case SUCCEEDED: |
| vertex.succeededTaskCount = vertex.numTasks; |
| vertex.completedTaskCount = vertex.numTasks; |
| break; |
| case KILLED: |
| vertex.killedTaskCount = vertex.numTasks; |
| break; |
| case FAILED: |
| case ERROR: |
| vertex.failedTaskCount = vertex.numTasks; |
| break; |
| } |
| if (vertex.tasks != null) { |
| TaskState taskState = TaskState.KILLED; |
| switch (desiredState) { |
| case SUCCEEDED: |
| taskState = TaskState.SUCCEEDED; |
| break; |
| case KILLED: |
| taskState = TaskState.KILLED; |
| break; |
| case FAILED: |
| case ERROR: |
| taskState = TaskState.FAILED; |
| break; |
| } |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId(), |
| taskState, false)); |
| } |
| } |
| LOG.info("DAG informed Vertex of its final completed state" |
| + ", vertex=" + vertex.logIdentifier |
| + ", state=" + desiredState); |
| return desiredState; |
| default: |
| LOG.info("Unhandled desired state provided by DAG" |
| + ", vertex=" + vertex.logIdentifier |
| + ", state=" + desiredState); |
| vertex.finished(VertexState.ERROR); |
| } |
| |
| // recover from recover log, should recover to running |
| // desiredState must be RUNNING based on above code |
| VertexState endState; |
| switch (vertex.recoveredState) { |
| case NEW: |
| // Trigger init and start as desired state is RUNNING |
| // Drop all root events |
| Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator(); |
| while (iterator.hasNext()) { |
| if (iterator.next().getEventType().equals( |
| EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) { |
| iterator.remove(); |
| } |
| } |
| vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, |
| VertexEventType.V_INIT)); |
| vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, |
| VertexEventType.V_START)); |
| endState = VertexState.NEW; |
| break; |
| case INITED: |
| try { |
| vertex.initializeCommitters(); |
| } catch (Exception e) { |
| String msg = "Failed to initialize committers" |
| + ", vertex=" + vertex.logIdentifier + "," |
| + ExceptionUtils.getStackTrace(e); |
| LOG.error(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.INIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| |
| // Recover tasks |
| if (vertex.tasks != null) { |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId())); |
| } |
| } |
| // Update tasks with their input payloads as needed |
| |
| vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, |
| VertexEventType.V_START)); |
| if (vertex.getInputVertices().isEmpty()) { |
| endState = VertexState.INITED; |
| } else { |
| endState = VertexState.RECOVERING; |
| } |
| break; |
| case RUNNING: |
| vertex.tasksNotYetScheduled = false; |
| try { |
| vertex.initializeCommitters(); |
| } catch (Exception e) { |
| String msg = "Failed to initialize committers" |
| + ", vertex=" + vertex.logIdentifier + "," |
| + ExceptionUtils.getStackTrace(e); |
| LOG.error(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.INIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| |
| // if commit in progress and desired state is not a succeeded one, |
| // move to failed |
| if (vertex.recoveryCommitInProgress) { |
| String msg = "Recovered vertex was in the middle of a commit" |
| + ", failing Vertex=" + vertex.logIdentifier; |
| LOG.warn(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.COMMIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| assert vertex.tasks.size() == vertex.numTasks; |
| if (vertex.tasks != null && vertex.numTasks != 0) { |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId())); |
| } |
| try { |
| vertex.recoveryCodeSimulatingStart(); |
| endState = VertexState.RUNNING; |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| endState = VertexState.FAILED; |
| } |
| } else { |
| // why succeeded here |
| endState = VertexState.SUCCEEDED; |
| vertex.finished(endState); |
| } |
| break; |
| case SUCCEEDED: |
| case FAILED: |
| case KILLED: |
| if (vertex.recoveredState == VertexState.SUCCEEDED |
| && vertex.hasCommitter |
| && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) { |
| String msg = "Cannot recover vertex as all recovery events not" |
| + " found, vertex=" + vertex.logIdentifier |
| + ", hasCommitters=" + vertex.hasCommitter |
| + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen |
| + ", finalCompletionSeen=" + vertex.vertexCompleteSeen; |
| LOG.warn(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.COMMIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| } else { |
| vertex.tasksNotYetScheduled = false; |
| // recover tasks |
| if (vertex.tasks != null && vertex.numTasks != 0) { |
| TaskState taskState = TaskState.KILLED; |
| switch (vertex.recoveredState) { |
| case SUCCEEDED: |
| taskState = TaskState.SUCCEEDED; |
| break; |
| case KILLED: |
| taskState = TaskState.KILLED; |
| break; |
| case FAILED: |
| taskState = TaskState.FAILED; |
| break; |
| } |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId(), |
| taskState)); |
| } |
| try { |
| vertex.recoveryCodeSimulatingStart(); |
| endState = VertexState.RUNNING; |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| endState = VertexState.FAILED; |
| } |
| } else { |
| endState = vertex.recoveredState; |
| vertex.finished(endState); |
| } |
| } |
| break; |
| default: |
| LOG.warn("Invalid recoveredState found when trying to recover" |
| + " vertex" |
| + ", vertex=" + vertex.logIdentifier |
| + ", recoveredState=" + vertex.recoveredState); |
| vertex.finished(VertexState.ERROR); |
| endState = VertexState.ERROR; |
| break; |
| } |
| if (!endState.equals(VertexState.RECOVERING)) { |
| LOG.info("Recovered Vertex State" |
| + ", vertexId=" + vertex.logIdentifier |
| + ", state=" + endState |
| + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices |
| + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices |
| + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices |
| + ", recoveredEvents=" |
| + ( vertex.recoveredEvents == null ? "null" : vertex.recoveredEvents.size()) |
| + ", tasksIsNull=" + (vertex.tasks == null) |
| + ", numTasks=" + ( vertex.tasks == null ? "null" : vertex.tasks.size())); |
| for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) { |
| vertex.eventHandler.handle(new VertexEventSourceVertexRecovered( |
| entry.getKey().getVertexId(), |
| vertex.vertexId, endState, null, |
| vertex.getDistanceFromRoot())); |
| } |
| } |
| if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED) |
| .contains(endState)) { |
| // Send events downstream |
| vertex.routeRecoveredEvents(endState, vertex.recoveredEvents); |
| vertex.recoveredEvents.clear(); |
| } else { |
| // Ensure no recovered events |
| if (!vertex.recoveredEvents.isEmpty()) { |
| throw new RuntimeException("Invalid Vertex state" |
| + ", found non-zero recovered events in invalid state" |
| + ", vertex=" + vertex.logIdentifier |
| + ", recoveredState=" + endState |
| + ", recoveredEvents=" + vertex.recoveredEvents.size()); |
| } |
| } |
| return endState; |
| } |
| |
| } |
| |
| private void recoveryCodeSimulatingStart() throws AMUserCodeException { |
| vertexManager.onVertexStarted(pendingReportedSrcCompletions); |
| // This code is duplicated from startVertex() because recovery does not follow normal |
| // transitions. To be removed after recovery code is fixed. |
| maybeSendConfiguredEvent(); |
| } |
| |
| private void routeRecoveredEvents(VertexState vertexState, |
| List<TezEvent> tezEvents) { |
| for (TezEvent tezEvent : tezEvents) { |
| EventMetaData sourceMeta = tezEvent.getSourceInfo(); |
| TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID(); |
| if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) { |
| ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { |
| ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) { |
| ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) { |
| if (vertexState == VertexState.RUNNING |
| || vertexState == VertexState.INITED) { |
| // Only routed if vertex is still running |
| eventHandler.handle(new VertexEventRouteEvent( |
| this.getVertexId(), Collections.singletonList(tezEvent), true)); |
| } |
| continue; |
| } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) { |
| // The event has the relevant target information |
| InputInitializerEvent iiEvent = (InputInitializerEvent) tezEvent.getEvent(); |
| iiEvent.setSourceVertexName(vertexName); |
| eventHandler.handle(new VertexEventRouteEvent( |
| getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(), |
| Collections.singletonList(tezEvent), true)); |
| continue; |
| } |
| |
| Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName()); |
| Edge destEdge = targetVertices.get(destVertex); |
| if (destEdge == null) { |
| throw new TezUncheckedException("Bad destination vertex: " + |
| sourceMeta.getEdgeVertexName() + " for event vertex: " + |
| getLogIdentifier()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Routing recovered event" |
| + ", vertex=" + logIdentifier |
| + ", eventType=" + tezEvent.getEventType() |
| + ", sourceInfo=" + sourceMeta |
| + ", destinationVertex=" + destVertex.getLogIdentifier()); |
| } |
| eventHandler.handle(new VertexEventRouteEvent(destVertex |
| .getVertexId(), Collections.singletonList(tezEvent), true)); |
| } |
| } |
| |
| public static class TerminateDuringRecoverTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| LOG.info("Received a terminate during recovering, setting recovered" |
| + " state to KILLED"); |
| vertex.recoveredState = VertexState.KILLED; |
| } |
| |
| } |
| |
| public static class NullEdgeInitializedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| VertexEventNullEdgeInitialized event = (VertexEventNullEdgeInitialized) vertexEvent; |
| Edge edge = event.getEdge(); |
| Vertex otherVertex = event.getVertex(); |
| Preconditions.checkState( |
| vertex.getState() == VertexState.NEW |
| || vertex.getState() == VertexState.INITIALIZING, |
| "Unexpected state " + vertex.getState() + " for vertex: " |
| + vertex.logIdentifier); |
| Preconditions.checkState( |
| (vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) || |
| vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)), |
| "Not connected to vertex " + otherVertex.getLogIdentifier() + " from vertex: " + vertex.logIdentifier); |
| LOG.info("Edge initialized for connection to vertex " + otherVertex.getLogIdentifier() + |
| " at vertex : " + vertex.logIdentifier); |
| vertex.uninitializedEdges.remove(edge); |
| if(vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) { |
| // Vertex in Initialing state and can init. Do init. |
| return VertexInitializedTransition.doTransition(vertex); |
| } |
| // Vertex is either New (waiting for sources to init) or its not ready to init (failed) |
| return vertex.getState(); |
| } |
| } |
| |
| public static class BufferDataRecoverTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| LOG.info("Received upstream event while still recovering" |
| + ", vertexId=" + vertex.logIdentifier |
| + ", vertexEventType=" + vertexEvent.getType()); |
| if (vertexEvent.getType().equals(VertexEventType.V_ROUTE_EVENT)) { |
| VertexEventRouteEvent evt = (VertexEventRouteEvent) vertexEvent; |
| vertex.pendingRouteEvents.addAll(evt.getEvents()); |
| } else if (vertexEvent.getType().equals( |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) { |
| VertexEventSourceTaskAttemptCompleted evt = |
| (VertexEventSourceTaskAttemptCompleted) vertexEvent; |
| vertex.pendingReportedSrcCompletions.add( |
| evt.getCompletionEvent().getTaskAttemptId()); |
| } else if (vertexEvent.getType().equals( |
| VertexEventType.V_SOURCE_VERTEX_STARTED)) { |
| VertexEventSourceVertexStarted startEvent = |
| (VertexEventSourceVertexStarted) vertexEvent; |
| int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1; |
| if(vertex.distanceFromRoot < distanceFromRoot) { |
| vertex.distanceFromRoot = distanceFromRoot; |
| } |
| ++vertex.numStartedSourceVertices; |
| } else if (vertexEvent.getType().equals(VertexEventType.V_INIT)) { |
| ++vertex.numInitedSourceVertices; |
| } |
| } |
| } |
| |
| |
| public static class RecoverTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| VertexEventSourceVertexRecovered sourceRecoveredEvent = |
| (VertexEventSourceVertexRecovered) vertexEvent; |
| // Use distance from root from Recovery events as upstream vertices may not |
| // send source vertex started event that is used to compute distance |
| int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1; |
| if(vertex.distanceFromRoot < distanceFromRoot) { |
| vertex.distanceFromRoot = distanceFromRoot; |
| } |
| |
| ++vertex.numRecoveredSourceVertices; |
| |
| switch (sourceRecoveredEvent.getSourceVertexState()) { |
| case NEW: |
| // Nothing to do |
| break; |
| case INITED: |
| ++vertex.numInitedSourceVertices; |
| break; |
| case RUNNING: |
| case SUCCEEDED: |
| ++vertex.numInitedSourceVertices; |
| ++vertex.numStartedSourceVertices; |
| if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) { |
| vertex.pendingReportedSrcCompletions.addAll( |
| sourceRecoveredEvent.getCompletedTaskAttempts()); |
| } |
| break; |
| case FAILED: |
| case KILLED: |
| case ERROR: |
| // Nothing to do |
| // Recover as if source vertices have not inited/started |
| break; |
| default: |
| LOG.warn("Received invalid SourceVertexRecovered event" |
| + ", vertex=" + vertex.logIdentifier |
| + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID() |
| + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState()); |
| return vertex.finished(VertexState.ERROR); |
| } |
| |
| if (vertex.numRecoveredSourceVertices != |
| vertex.getInputVerticesCount()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Waiting for source vertices to recover" |
| + ", vertex=" + vertex.logIdentifier |
| + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices |
| + ", totalSourceVertices=" + vertex.getInputVerticesCount()); |
| } |
| return VertexState.RECOVERING; |
| } |
| |
| |
| // Complete recovery |
| VertexState endState = VertexState.NEW; |
| List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList(); |
| switch (vertex.recoveredState) { |
| case NEW: |
| // Drop all root events if not inited properly |
| Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator(); |
| while (iterator.hasNext()) { |
| if (iterator.next().getEventType().equals( |
| EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) { |
| iterator.remove(); |
| } |
| } |
| // Trigger init if all sources initialized |
| if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) { |
| vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, |
| VertexEventType.V_INIT)); |
| } |
| if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) { |
| vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, |
| VertexEventType.V_START)); |
| } |
| endState = VertexState.NEW; |
| break; |
| case INITED: |
| vertex.vertexAlreadyInitialized = true; |
| try { |
| vertex.initializeCommitters(); |
| } catch (Exception e) { |
| String msg = "Failed to initialize committers, vertex=" |
| + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace(e); |
| LOG.error(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.INIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| boolean successSetParallelism ; |
| try { |
| vertex.setParallelism(0, |
| null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false); |
| successSetParallelism = true; |
| } catch (Exception e) { |
| successSetParallelism = false; |
| } |
| if (!successSetParallelism) { |
| String msg = "Failed to recover edge managers, vertex=" + vertex.logIdentifier; |
| LOG.error(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.INIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| // Recover tasks |
| if (vertex.tasks != null) { |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId())); |
| } |
| } |
| if (vertex.numInitedSourceVertices != vertex.getInputVerticesCount()) { |
| LOG.info("Vertex already initialized but source vertices have not" |
| + " initialized" |
| + ", vertexId=" + vertex.logIdentifier |
| + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices); |
| } else { |
| if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) { |
| vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, |
| VertexEventType.V_START)); |
| } |
| } |
| endState = VertexState.INITED; |
| break; |
| case RUNNING: |
| vertex.tasksNotYetScheduled = false; |
| // if commit in progress and desired state is not a succeeded one, |
| // move to failed |
| if (vertex.recoveryCommitInProgress) { |
| LOG.info("Recovered vertex was in the middle of a commit" |
| + ", failing Vertex=" + vertex.logIdentifier); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.COMMIT_FAILURE, null); |
| endState = VertexState.FAILED; |
| break; |
| } |
| try { |
| vertex.initializeCommitters(); |
| } catch (Exception e) { |
| String msg = "Failed to initialize committers, vertex=" |
| + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace(e); |
| LOG.error(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.INIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| try { |
| vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, |
| vertex.recoveredRootInputSpecUpdates, true, false); |
| successSetParallelism = true; |
| } catch (Exception e) { |
| successSetParallelism = false; |
| } |
| if (!successSetParallelism) { |
| String msg = "Failed to recover edge managers for vertex:" + vertex.logIdentifier; |
| LOG.error(msg); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.INIT_FAILURE, msg); |
| endState = VertexState.FAILED; |
| break; |
| } |
| assert vertex.tasks.size() == vertex.numTasks; |
| if (vertex.tasks != null && vertex.numTasks != 0) { |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId())); |
| } |
| try { |
| vertex.recoveryCodeSimulatingStart(); |
| endState = VertexState.RUNNING; |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| endState = VertexState.FAILED; |
| } |
| } else { |
| endState = VertexState.SUCCEEDED; |
| vertex.finished(endState); |
| } |
| break; |
| case SUCCEEDED: |
| case FAILED: |
| case KILLED: |
| vertex.tasksNotYetScheduled = false; |
| // recover tasks |
| assert vertex.tasks.size() == vertex.numTasks; |
| if (vertex.tasks != null && vertex.numTasks != 0) { |
| TaskState taskState = TaskState.KILLED; |
| switch (vertex.recoveredState) { |
| case SUCCEEDED: |
| taskState = TaskState.SUCCEEDED; |
| break; |
| case KILLED: |
| taskState = TaskState.KILLED; |
| break; |
| case FAILED: |
| taskState = TaskState.FAILED; |
| break; |
| } |
| for (Task task : vertex.tasks.values()) { |
| vertex.eventHandler.handle( |
| new TaskEventRecoverTask(task.getTaskId(), |
| taskState)); |
| } |
| // Wait for all tasks to recover and report back |
| try { |
| vertex.recoveryCodeSimulatingStart(); |
| endState = VertexState.RUNNING; |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| endState = VertexState.FAILED; |
| } |
| } else { |
| endState = vertex.recoveredState; |
| vertex.finished(endState); |
| } |
| break; |
| default: |
| LOG.warn("Invalid recoveredState found when trying to recover" |
| + " vertex, recoveredState=" + vertex.recoveredState); |
| vertex.finished(VertexState.ERROR); |
| endState = VertexState.ERROR; |
| break; |
| } |
| |
| LOG.info("Recovered Vertex State" |
| + ", vertexId=" + vertex.logIdentifier |
| + ", state=" + endState |
| + ", numInitedSourceVertices" + vertex.numInitedSourceVertices |
| + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices |
| + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices |
| + ", tasksIsNull=" + (vertex.tasks == null) |
| + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size())); |
| |
| for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) { |
| vertex.eventHandler.handle(new VertexEventSourceVertexRecovered( |
| entry.getKey().getVertexId(), |
| vertex.vertexId, endState, completedTaskAttempts, |
| vertex.getDistanceFromRoot())); |
| } |
| if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED) |
| .contains(endState)) { |
| // Send events downstream |
| vertex.routeRecoveredEvents(endState, vertex.recoveredEvents); |
| vertex.recoveredEvents.clear(); |
| if (!vertex.pendingRouteEvents.isEmpty()) { |
| try { |
| handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false); |
| vertex.pendingRouteEvents.clear(); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| endState = VertexState.FAILED; |
| } |
| } |
| } else { |
| // Ensure no recovered events |
| if (!vertex.recoveredEvents.isEmpty()) { |
| throw new RuntimeException("Invalid Vertex state" |
| + ", found non-zero recovered events in invalid state" |
| + ", recoveredState=" + endState |
| + ", recoveredEvents=" + vertex.recoveredEvents.size()); |
| } |
| } |
| return endState; |
| } |
| |
| } |
| |
| public static class IgnoreInitInInitedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| LOG.info("Received event during INITED state" |
| + ", vertex=" + vertex.logIdentifier |
| + ", eventType=" + event.getType()); |
| if (!vertex.vertexAlreadyInitialized) { |
| LOG.error("Vertex not initialized but in INITED state" |
| + ", vertexId=" + vertex.logIdentifier); |
| return vertex.finished(VertexState.ERROR); |
| } else { |
| return VertexState.INITED; |
| } |
| } |
| } |
| |
| |
| |
| public static class InitTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexState vertexState = VertexState.NEW; |
| vertex.numInitedSourceVertices++; |
| // TODO fix this as part of TEZ-1008 |
| // Should have a different way to infer source vertices INITED |
| // as compared to a recovery triggered INIT |
| // In normal flow, upstream vertices send a V_INIT downstream to |
| // trigger an init of the downstream vertex. In case of recovery, |
| // upstream vertices may not send this event if they are already in a |
| // RUNNING or completed state. Hence, recovering vertices may send |
| // themselves a V_INIT to trigger a transition. Hence, the count may |
| // go one over. |
| if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() || |
| (vertex.numInitedSourceVertices == vertex.sourceVertices.size() |
| || vertex.numInitedSourceVertices == (vertex.sourceVertices.size()+1))) { |
| vertexState = handleInitEvent(vertex, event); |
| if (vertexState != VertexState.FAILED) { |
| if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) { |
| for (Vertex target : vertex.targetVertices.keySet()) { |
| vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(), |
| VertexEventType.V_INIT)); |
| } |
| } |
| } |
| } |
| return vertexState; |
| } |
| |
| private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) { |
| VertexState state = vertex.setupVertex(); |
| if (state.equals(VertexState.FAILED)) { |
| return state; |
| } |
| // TODO move before to handle NEW state |
| if (vertex.targetVertices != null) { |
| for (Edge e : vertex.targetVertices.values()) { |
| if (e.getEdgeManager() == null) { |
| Preconditions |
| .checkState( |
| e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM, |
| "Null edge manager allowed only for custom edge. " + vertex.logIdentifier); |
| vertex.uninitializedEdges.add(e); |
| } |
| } |
| } |
| if (vertex.sourceVertices != null) { |
| for (Edge e : vertex.sourceVertices.values()) { |
| if (e.getEdgeManager() == null) { |
| Preconditions |
| .checkState( |
| e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM, |
| "Null edge manager allowed only for custom edge. " + vertex.logIdentifier); |
| vertex.uninitializedEdges.add(e); |
| } |
| } |
| } |
| |
| // Create tasks based on initial configuration, but don't start them yet. |
| if (vertex.numTasks == -1) { |
| // this block must always return VertexState.INITIALIZING |
| LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split" |
| + " to set #tasks for the vertex " + vertex.getLogIdentifier()); |
| |
| if (vertex.inputsWithInitializers != null) { |
| LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); |
| vertex.setupInputInitializerManager(); |
| return VertexState.INITIALIZING; |
| } else { |
| boolean hasOneToOneUninitedSource = false; |
| for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) { |
| if (entry.getValue().getEdgeProperty().getDataMovementType() == |
| DataMovementType.ONE_TO_ONE) { |
| if (entry.getKey().getTotalTasks() == -1) { |
| hasOneToOneUninitedSource = true; |
| break; |
| } |
| } |
| } |
| if (hasOneToOneUninitedSource) { |
| LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier); |
| return VertexState.INITIALIZING; |
| } |
| if (vertex.vertexPlan.hasVertexManagerPlugin()) { |
| LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier); |
| return VertexState.INITIALIZING; |
| } |
| throw new TezUncheckedException(vertex.getLogIdentifier() + |
| " has -1 tasks but does not have input initializers, " + |
| "1-1 uninited sources or custom vertex manager to set it at runtime"); |
| } |
| } else { |
| LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier); |
| vertex.createTasks(); |
| // this block may return VertexState.INITIALIZING |
| if (vertex.inputsWithInitializers != null) { |
| LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); |
| vertex.setupInputInitializerManager(); |
| return VertexState.INITIALIZING; |
| } |
| if (!vertex.uninitializedEdges.isEmpty()) { |
| LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier); |
| return VertexState.INITIALIZING; |
| } |
| LOG.info("Directly initializing vertex: " + vertex.logIdentifier); |
| // vertex is completely configured. Send out notification now. |
| vertex.maybeSendConfiguredEvent(); |
| boolean isInitialized = vertex.initializeVertex(); |
| if (isInitialized) { |
| return VertexState.INITED; |
| } else { |
| return VertexState.FAILED; |
| } |
| } |
| } |
| } // end of InitTransition |
| |
| @VisibleForTesting |
| protected RootInputInitializerManager createRootInputInitializerManager( |
| String dagName, String vertexName, TezVertexID vertexID, |
| EventHandler eventHandler, int numTasks, int numNodes, |
| Resource vertexTaskResource, Resource totalResource) { |
| return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier); |
| } |
| |
| private boolean initializeVertexInInitializingState() { |
| boolean isInitialized = initializeVertex(); |
| if (!isInitialized) { |
| // Don't bother starting if the vertex state is failed. |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void startIfPossible() { |
| if (startSignalPending) { |
| // Trigger a start event to ensure route events are seen before |
| // a start event. |
| LOG.info("Triggering start event for vertex: " + logIdentifier + |
| " with distanceFromRoot: " + distanceFromRoot ); |
| eventHandler.handle(new VertexEvent(vertexId, |
| VertexEventType.V_START)); |
| } |
| } |
| |
| public static class VertexInitializedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| static VertexState doTransition(VertexImpl vertex) { |
| Preconditions.checkState(vertex.canInitVertex(), "Vertex: " + vertex.logIdentifier); |
| boolean isInitialized = vertex.initializeVertexInInitializingState(); |
| if (!isInitialized) { |
| return VertexState.FAILED; |
| } |
| |
| vertex.startIfPossible(); |
| return VertexState.INITED; |
| } |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| return doTransition(vertex); |
| } |
| } |
| |
| // present in most transitions so that the initializer thread can be shutdown properly |
| public static class RootInputInitializedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event; |
| VertexState state = vertex.getState(); |
| if (state == VertexState.INITIALIZING) { |
| try { |
| List<TezEvent> inputInfoEvents = |
| vertex.vertexManager.onRootVertexInitialized( |
| liInitEvent.getInputName(), |
| vertex.getAdditionalInputs().get(liInitEvent.getInputName()) |
| .getIODescriptor(), liInitEvent.getEvents()); |
| if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) { |
| VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false); |
| } |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.AM_USERCODE_FAILURE, msg |
| + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| |
| vertex.numInitializedInputs++; |
| if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) { |
| // All inputs initialized, shutdown the initializer. |
| vertex.rootInputInitializerManager.shutdown(); |
| vertex.rootInputInitializerManager = null; |
| } |
| |
| // done. check if we need to do the initialization |
| if (vertex.getState() == VertexState.INITIALIZING && |
| vertex.initWaitsForRootInitializers) { |
| if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) { |
| // set the wait flag to false if all initializers are done |
| vertex.initWaitsForRootInitializers = false; |
| } |
| // initialize vertex if possible and needed |
| if (vertex.canInitVertex()) { |
| Preconditions.checkState(vertex.numTasks >= 0, |
| "Parallelism should have been set by now for vertex: " + vertex.logIdentifier); |
| return VertexInitializedTransition.doTransition(vertex); |
| } |
| } |
| return vertex.getState(); |
| } |
| } |
| |
| public static class OneToOneSourceSplitTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventOneToOneSourceSplit splitEvent = |
| (VertexEventOneToOneSourceSplit)event; |
| TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource(); |
| |
| if (vertex.originalOneToOneSplitSource != null) { |
| VertexState state = vertex.getState(); |
| Preconditions |
| .checkState( |
| (state == VertexState.INITIALIZING |
| || state == VertexState.INITED || state == VertexState.RUNNING), |
| " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier() |
| + " in state " + vertex.getState() + " . Split in vertex " |
| + originalSplitSource + " sent by vertex " |
| + splitEvent.getSenderVertex() + " numTasks " |
| + splitEvent.getNumTasks()); |
| if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) { |
| // ignore another split event that may have come from a different |
| // path in the DAG. We have already split because of that source |
| LOG.info("Ignoring split of vertex " + vertex.getLogIdentifier() + |
| " because of split in vertex " + originalSplitSource + |
| " sent by vertex " + splitEvent.getSenderVertex() + |
| " numTasks " + splitEvent.getNumTasks()); |
| return state; |
| } |
| // cannot split from multiple sources |
| throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() + |
| " asked to split by: " + originalSplitSource + |
| " but was already split by:" + vertex.originalOneToOneSplitSource); |
| } |
| |
| LOG.info("Splitting vertex " + vertex.getLogIdentifier() + |
| " because of split in vertex " + originalSplitSource + |
| " sent by vertex " + splitEvent.getSenderVertex() + |
| " numTasks " + splitEvent.getNumTasks()); |
| vertex.originalOneToOneSplitSource = originalSplitSource; |
| try { |
| vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false); |
| } catch (Exception e) { |
| // ingore this exception, should not happen |
| LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager," |
| + "exception should not happen here", e); |
| } |
| if (vertex.getState() == VertexState.RUNNING || |
| vertex.getState() == VertexState.INITED) { |
| return vertex.getState(); |
| } else { |
| Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING, |
| " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier() + |
| " in state " + vertex.getState() + |
| " . Split in vertex " + originalSplitSource + |
| " sent by vertex " + splitEvent.getSenderVertex() + |
| " numTasks " + splitEvent.getNumTasks()); |
| return vertex.getState(); |
| } |
| } |
| } |
| |
| // Temporary to maintain topological order while starting vertices. Not useful |
| // since there's not much difference between the INIT and RUNNING states. |
| public static class SourceVertexStartedTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventSourceVertexStarted startEvent = |
| (VertexEventSourceVertexStarted) event; |
| int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1; |
| if(vertex.distanceFromRoot < distanceFromRoot) { |
| vertex.distanceFromRoot = distanceFromRoot; |
| } |
| vertex.numStartedSourceVertices++; |
| LOG.info("Source vertex started: " + startEvent.getSourceVertexId() + |
| " for vertex: " + vertex.logIdentifier + " numStartedSources: " + |
| vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); |
| |
| if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) { |
| LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: " |
| + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); |
| return; |
| } |
| |
| // vertex meets external start dependency conditions. Save this signal in |
| // case we are not ready to start now and need to start later |
| vertex.startSignalPending = true; |
| |
| if (vertex.getState() != VertexState.INITED) { |
| // vertex itself is not ready to start. External dependencies have already |
| // notified us. |
| LOG.info("Cannot start vertex. Not in inited state. " |
| + vertex.logIdentifier + " . VertesState: " + vertex.getState() |
| + " numTasks: " + vertex.numTasks + " Num uninitialized edges: " |
| + vertex.uninitializedEdges.size()); |
| return; |
| } |
| |
| // vertex is inited and all dependencies are ready. Inited vertex means |
| // parallelism must be set already and edges defined |
| Preconditions.checkState( |
| (vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty()), |
| "Cannot start vertex that is not completely defined. Vertex: " |
| + vertex.logIdentifier + " numTasks: " + vertex.numTasks); |
| |
| vertex.startIfPossible(); |
| } |
| } |
| |
| boolean canInitVertex() { |
| if (numTasks >= 0 && uninitializedEdges.isEmpty() && !initWaitsForRootInitializers) { |
| // vertex fully defined |
| return true; |
| } |
| LOG.info("Cannot init vertex: " + logIdentifier + " numTasks: " + numTasks |
| + " numUnitializedEdges: " + uninitializedEdges.size() |
| + " numInitializedInputs: " + numInitializedInputs |
| + " initWaitsForRootInitializers: " + initWaitsForRootInitializers); |
| return false; |
| } |
| |
| public static class StartWhileInitializingTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| // vertex state machine does not start itself in the initializing state |
| // this start event can only come directly from the DAG. That means this |
| // is a top level vertex of the dag |
| Preconditions.checkState( |
| (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()), |
| "Vertex: " + vertex.logIdentifier + " got invalid start event"); |
| vertex.startTimeRequested = vertex.clock.getTime(); |
| vertex.startSignalPending = true; |
| } |
| |
| } |
| |
| public static class StartTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| Preconditions.checkState(vertex.getState() == VertexState.INITED, |
| "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier); |
| vertex.startTimeRequested = vertex.clock.getTime(); |
| return vertex.startVertex(); |
| } |
| } |
| |
| private void maybeSendConfiguredEvent() { |
| // the vertex is fully configured by the time it starts. Always notify completely configured |
| // unless the vertex manager has told us that it is going to reconfigure it further |
| Preconditions.checkState(canInitVertex()); |
| if (!this.vertexToBeReconfiguredByManager) { |
| // this vertex will not be reconfigured by its manager |
| if (completelyConfiguredSent.compareAndSet(false, true)) { |
| stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName, |
| org.apache.tez.dag.api.event.VertexState.CONFIGURED)); |
| } |
| } |
| } |
| |
| private VertexState startVertex() { |
| // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! |
| // IMPORTANT - Until Recovery is fixed to use normal state transitions, if any code is added |
| // here then please check if it needs to be duplicated in recoveryCodeSimulatingStart(). |
| // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! |
| Preconditions.checkState(getState() == VertexState.INITED, |
| "Vertex must be inited " + logIdentifier); |
| |
| startedTime = clock.getTime(); |
| try { |
| vertexManager.onVertexStarted(pendingReportedSrcCompletions); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier; |
| LOG.error(msg, e); |
| addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } |
| pendingReportedSrcCompletions.clear(); |
| logJobHistoryVertexStartedEvent(); |
| |
| // the vertex is fully configured by the time it starts. Always notify completely configured |
| // unless the vertex manager has told us that it is going to reconfigure it further. |
| // If the vertex was pre-configured then the event would have been sent out earlier. Calling again |
| // would be a no-op. If the vertex was not fully configured and waiting for that to complete then |
| // we would start immediately after that. Either parallelism updated (now) or IPO changed (future) |
| // or vertex added (future). Simplify these cases by sending the event now automatically for the |
| // user as if they had invoked the planned()/done() API's. |
| maybeSendConfiguredEvent(); |
| |
| // TODO: Metrics |
| //job.metrics.runningJob(job); |
| |
| // default behavior is to start immediately. so send information about us |
| // starting to downstream vertices. If the connections/structure of this |
| // vertex is not fully defined yet then we could send this event later |
| // when we are ready |
| if (targetVertices != null) { |
| for (Vertex targetVertex : targetVertices.keySet()) { |
| eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex |
| .getVertexId(), getVertexId(), distanceFromRoot)); |
| } |
| } |
| |
| // If we have no tasks, just transition to vertex completed |
| if (this.numTasks == 0) { |
| eventHandler.handle(new VertexEvent( |
| this.vertexId, VertexEventType.V_COMPLETED)); |
| } |
| |
| return VertexState.RUNNING; |
| } |
| |
| private void abortVertex(final VertexStatus.State finalState) { |
| if (this.aborted.getAndSet(true)) { |
| LOG.info("Ignoring multiple aborts for vertex: " + logIdentifier); |
| return; |
| } |
| LOG.info("Invoking committer abort for vertex, vertexId=" + logIdentifier); |
| if (outputCommitters != null) { |
| try { |
| dagUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() { |
| for (Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) { |
| try { |
| LOG.info("Invoking committer abort for output=" + entry.getKey() + ", vertexId=" |
| + logIdentifier); |
| entry.getValue().abortOutput(finalState); |
| } catch (Exception e) { |
| LOG.warn("Could not abort committer for output=" + entry.getKey() + ", vertexId=" |
| + logIdentifier, e); |
| } |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| throw new TezUncheckedException("Unknown error while attempting VertexCommitter(s) abort", e); |
| } |
| } |
| if (finishTime == 0) { |
| setFinishTime(); |
| } |
| } |
| |
| private void mayBeConstructFinalFullCounters() { |
| // Calculating full-counters. This should happen only once for the vertex. |
| synchronized (this.fullCountersLock) { |
| if (this.fullCounters != null) { |
| // Already constructed. Just return. |
| return; |
| } |
| this.constructFinalFullcounters(); |
| } |
| } |
| |
| @Private |
| public void constructFinalFullcounters() { |
| this.fullCounters = new TezCounters(); |
| this.vertexStats = new VertexStats(); |
| |
| for (Task t : this.tasks.values()) { |
| vertexStats.updateStats(t.getReport()); |
| TezCounters counters = t.getCounters(); |
| this.fullCounters.incrAllCounters(counters); |
| } |
| } |
| |
| private static class RootInputInitFailedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventRootInputFailed fe = (VertexEventRootInputFailed) event; |
| String msg = "Vertex Input: " + fe.getInputName() |
| + " initializer failed, vertex=" + vertex.getLogIdentifier(); |
| LOG.error(msg, fe.getError()); |
| if (vertex.getState() == VertexState.RUNNING) { |
| vertex.addDiagnostic(msg |
| + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, |
| TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } else { |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg |
| + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| } |
| |
| // Task-start has been moved out of InitTransition, so this arc simply |
| // hardcodes 0 for both map and reduce finished tasks. |
| private static class TerminateNewVertexTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTermination vet = (VertexEventTermination) event; |
| vertex.trySetTerminationCause(vet.getTerminationCause()); |
| vertex.setFinishTime(); |
| vertex.addDiagnostic("Vertex received Kill in NEW state."); |
| vertex.finished(VertexState.KILLED); |
| } |
| } |
| |
| private static class TerminateInitedVertexTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTermination vet = (VertexEventTermination) event; |
| vertex.trySetTerminationCause(vet.getTerminationCause()); |
| vertex.abortVertex(VertexStatus.State.KILLED); |
| vertex.addDiagnostic("Vertex received Kill in INITED state."); |
| vertex.finished(VertexState.KILLED); |
| } |
| } |
| |
| private static class TerminateInitingVertexTransition extends TerminateInitedVertexTransition { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| super.transition(vertex, event); |
| } |
| } |
| |
| private static class VertexKilledTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| vertex.addDiagnostic("Vertex received Kill while in RUNNING state."); |
| VertexEventTermination vet = (VertexEventTermination) event; |
| VertexTerminationCause trigger = vet.getTerminationCause(); |
| switch(trigger){ |
| case DAG_KILL : vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break; |
| case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break; |
| case ROOT_INPUT_INIT_FAILURE: |
| case COMMIT_FAILURE: |
| case INVALID_NUM_OF_TASKS: |
| case INIT_FAILURE: |
| case INTERNAL_ERROR: |
| case AM_USERCODE_FAILURE: |
| case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break; |
| default://should not occur |
| throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger); |
| } |
| |
| // TODO: Metrics |
| //job.metrics.endRunningJob(job); |
| } |
| } |
| |
| private static class VertexManagerUserCodeErrorTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventManagerUserCodeError errEvent = ((VertexEventManagerUserCodeError) event); |
| AMUserCodeException e = errEvent.getError(); |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| |
| if (vertex.getState() == VertexState.RECOVERING) { |
| LOG.info("Received a user code error during recovering, setting recovered" |
| + " state to FAILED"); |
| vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE; |
| vertex.recoveredState = VertexState.FAILED; |
| return VertexState.RECOVERING; |
| } else if (vertex.getState() == VertexState.RUNNING) { |
| vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, |
| TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } else { |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.AM_USERCODE_FAILURE, msg |
| + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| } |
| |
| /** |
| * Here, the Vertex is being told that one of it's source task-attempts |
| * completed. |
| */ |
| private static class SourceTaskAttemptCompletedEventTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskAttemptCompleted completionEvent = |
| ((VertexEventSourceTaskAttemptCompleted) event).getCompletionEvent(); |
| LOG.info("Source task attempt completed for vertex: " + vertex.getLogIdentifier() |
| + " attempt: " + completionEvent.getTaskAttemptId() |
| + " with state: " + completionEvent.getTaskAttemptState() |
| + " vertexState: " + vertex.getState()); |
| |
| |
| if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent |
| .getTaskAttemptState())) { |
| vertex.numSuccessSourceAttemptCompletions++; |
| |
| if (vertex.getState() == VertexState.RUNNING) { |
| try { |
| // Inform the vertex manager about the source task completing. |
| vertex.vertexManager.onSourceTaskCompleted(completionEvent |
| .getTaskAttemptId().getTaskID()); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, |
| TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } |
| } else { |
| vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId()); |
| } |
| } |
| return vertex.getState(); |
| } |
| } |
| |
| private static class TaskAttemptCompletedEventTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskAttemptCompleted completionEvent = |
| ((VertexEventTaskAttemptCompleted) event); |
| |
| // If different tasks were connected to different destination vertices |
| // then this would need to be sent via the edges |
| // Notify all target vertices |
| if (vertex.targetVertices != null) { |
| for (Vertex targetVertex : vertex.targetVertices.keySet()) { |
| vertex.eventHandler.handle( |
| new VertexEventSourceTaskAttemptCompleted( |
| targetVertex.getVertexId(), completionEvent) |
| ); |
| } |
| } |
| } |
| } |
| |
| private static class TaskAttempStatusUpdateEventTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskAttemptStatusUpdate updateEvent = |
| ((VertexEventTaskAttemptStatusUpdate) event); |
| if (vertex.isSpeculationEnabled()) { |
| if (updateEvent.hasJustStarted()) { |
| vertex.speculator.notifyAttemptStarted(updateEvent.getAttemptId(), |
| updateEvent.getTimestamp()); |
| } else { |
| vertex.speculator.notifyAttemptStatusUpdate(updateEvent.getAttemptId(), |
| updateEvent.getTaskAttemptState(), updateEvent.getTimestamp()); |
| } |
| } |
| } |
| } |
| private static class TaskCompletedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| boolean forceTransitionToKillWait = false; |
| vertex.completedTaskCount++; |
| LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : " |
| + vertex.completedTaskCount); |
| VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event; |
| Task task = vertex.tasks.get(taskEvent.getTaskID()); |
| if (taskEvent.getState() == TaskState.SUCCEEDED) { |
| taskSucceeded(vertex, task); |
| } else if (taskEvent.getState() == TaskState.FAILED) { |
| LOG.info("Failing vertex: " + vertex.logIdentifier + |
| " because task failed: " + taskEvent.getTaskID()); |
| vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE); |
| forceTransitionToKillWait = true; |
| taskFailed(vertex, task); |
| } else if (taskEvent.getState() == TaskState.KILLED) { |
| taskKilled(vertex, task); |
| } |
| |
| VertexState state = VertexImpl.checkVertexForCompletion(vertex); |
| if(state == VertexState.RUNNING && forceTransitionToKillWait){ |
| return VertexState.TERMINATING; |
| } |
| |
| return state; |
| } |
| |
| private void taskSucceeded(VertexImpl vertex, Task task) { |
| vertex.succeededTaskCount++; |
| // TODO Metrics |
| // job.metrics.completedTask(task); |
| } |
| |
| private void taskFailed(VertexImpl vertex, Task task) { |
| vertex.failedTaskCount++; |
| vertex.addDiagnostic("Task failed" |
| + ", taskId=" + task.getTaskId() |
| + ", diagnostics=" + task.getDiagnostics()); |
| // TODO Metrics |
| //vertex.metrics.failedTask(task); |
| } |
| |
| private void taskKilled(VertexImpl vertex, Task task) { |
| vertex.killedTaskCount++; |
| // TODO Metrics |
| //job.metrics.killedTask(task); |
| } |
| } |
| |
| private static class TaskRescheduledTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| //succeeded task is restarted back |
| vertex.completedTaskCount--; |
| vertex.succeededTaskCount--; |
| } |
| } |
| |
| private static class VertexNoTasksCompletedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| return VertexImpl.checkVertexForCompletion(vertex); |
| } |
| } |
| |
| private static class TaskCompletedAfterVertexSuccessTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskCompleted vEvent = (VertexEventTaskCompleted) event; |
| VertexState finalState; |
| VertexStatus.State finalStatus; |
| String diagnosticMsg; |
| if (vEvent.getState() == TaskState.FAILED) { |
| finalState = VertexState.FAILED; |
| finalStatus = VertexStatus.State.FAILED; |
| diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() + |
| " failed after vertex succeeded."; |
| } else { |
| finalState = VertexState.ERROR; |
| finalStatus = VertexStatus.State.ERROR; |
| diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + |
| " completed with state " + vEvent.getState() + " after vertex succeeded."; |
| } |
| LOG.info(diagnosticMsg); |
| vertex.abortVertex(finalStatus); |
| vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg); |
| return finalState; |
| } |
| } |
| |
| private static class TaskRescheduledAfterVertexSuccessTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| if (vertex.outputCommitters == null // no committer |
| || vertex.outputCommitters.isEmpty() // no committer |
| || !vertex.commitVertexOutputs) { // committer does not commit on vertex success |
| LOG.info(vertex.getLogIdentifier() + " back to running due to rescheduling " |
| + ((VertexEventTaskReschedule)event).getTaskID()); |
| (new TaskRescheduledTransition()).transition(vertex, event); |
| // inform the DAG that we are re-running |
| vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId())); |
| return VertexState.RUNNING; |
| } |
| |
| // terminate any running tasks |
| String diagnosticMsg = vertex.getLogIdentifier() + " failed due to post-commit rescheduling of " |
| + ((VertexEventTaskReschedule)event).getTaskID(); |
| LOG.info(diagnosticMsg); |
| vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, |
| TaskTerminationCause.OWN_TASK_FAILURE); |
| vertex.abortVertex(VertexStatus.State.FAILED); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg); |
| return VertexState.FAILED; |
| } |
| } |
| |
| private void addDiagnostic(String diag) { |
| diagnostics.add(diag); |
| } |
| |
| private static boolean isEventFromVertex(Vertex vertex, |
| EventMetaData sourceMeta) { |
| if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) { |
| return false; |
| } |
| return true; |
| } |
| |
| private static void checkEventSourceMetadata(Vertex vertex, |
| EventMetaData sourceMeta) { |
| assert isEventFromVertex(vertex, sourceMeta); |
| } |
| |
| // private static class RouteEventsWhileInitializingTransition implements |
| // SingleArcTransition<VertexImpl, VertexEvent> { |
| // |
| // @Override |
| // public void transition(VertexImpl vertex, VertexEvent event) { |
| // VertexEventRouteEvent re = (VertexEventRouteEvent) event; |
| // // Store the events for post-init routing, since INIT state is when |
| // // initial task parallelism will be set |
| // vertex.pendingRouteEvents.addAll(re.getEvents()); |
| // } |
| // } |
| |
| private static class RouteEventTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event; |
| boolean recovered = rEvent.isRecovered(); |
| List<TezEvent> tezEvents = rEvent.getEvents(); |
| try { |
| VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| if (vertex.getState() == VertexState.RUNNING) { |
| vertex.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } else { |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| return vertex.getState(); |
| } |
| } |
| |
| private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent> tezEvents, boolean recovered) throws AMUserCodeException { |
| if (vertex.getAppContext().isRecoveryEnabled() |
| && !recovered |
| && !tezEvents.isEmpty()) { |
| List<TezEvent> recoveryEvents = |
| Lists.newArrayList(); |
| for (TezEvent tezEvent : tezEvents) { |
| if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) { |
| continue; |
| } |
| if (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT) |
| || tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT) |
| || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) |
| || tezEvent.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) { |
| recoveryEvents.add(tezEvent); |
| } |
| } |
| if (!recoveryEvents.isEmpty()) { |
| VertexRecoverableEventsGeneratedEvent historyEvent = |
| new VertexRecoverableEventsGeneratedEvent(vertex.vertexId, |
| recoveryEvents); |
| vertex.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(vertex.getDAGId(), historyEvent)); |
| } |
| } |
| for(TezEvent tezEvent : tezEvents) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Vertex: " + vertex.getName() + " routing event: " |
| + tezEvent.getEventType() |
| + " Recovered:" + recovered); |
| } |
| EventMetaData sourceMeta = tezEvent.getSourceInfo(); |
| switch(tezEvent.getEventType()) { |
| case INPUT_FAILED_EVENT: |
| case DATA_MOVEMENT_EVENT: |
| case COMPOSITE_DATA_MOVEMENT_EVENT: |
| { |
| if (isEventFromVertex(vertex, sourceMeta)) { |
| // event from this vertex. send to destination vertex |
| TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID(); |
| if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) { |
| ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { |
| ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else { |
| ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } |
| Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName()); |
| Edge destEdge = vertex.targetVertices.get(destVertex); |
| if (destEdge == null) { |
| throw new TezUncheckedException("Bad destination vertex: " + |
| sourceMeta.getEdgeVertexName() + " for event vertex: " + |
| vertex.getLogIdentifier()); |
| } |
| vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex |
| .getVertexId(), Collections.singletonList(tezEvent))); |
| } else { |
| // event not from this vertex. must have come from source vertex. |
| // send to tasks |
| if (vertex.tasksNotYetScheduled) { |
| vertex.pendingTaskEvents.add(tezEvent); |
| } else { |
| Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex( |
| sourceMeta.getTaskVertexName())); |
| if (srcEdge == null) { |
| throw new TezUncheckedException("Bad source vertex: " + |
| sourceMeta.getTaskVertexName() + " for destination vertex: " + |
| vertex.getLogIdentifier()); |
| } |
| srcEdge.sendTezEventToDestinationTasks(tezEvent); |
| } |
| } |
| } |
| break; |
| case ROOT_INPUT_DATA_INFORMATION_EVENT: |
| if (vertex.tasksNotYetScheduled) { |
| vertex.pendingTaskEvents.add(tezEvent); |
| } else { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent |
| .getEvent(); |
| Task targetTask = vertex.getTask(riEvent.getTargetIndex()); |
| targetTask.registerTezEvent(tezEvent); |
| } |
| break; |
| case VERTEX_MANAGER_EVENT: |
| { |
| // VM events on task success only can be changed as part of TEZ-1532 |
| VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent(); |
| Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName()); |
| Preconditions.checkArgument(target != null, |
| "Event sent to unkown vertex: " + vmEvent.getTargetVertexName()); |
| if (target == vertex) { |
| vertex.vertexManager.onVertexManagerEventReceived(vmEvent); |
| } else { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| vertex.eventHandler.handle(new VertexEventRouteEvent(target |
| .getVertexId(), Collections.singletonList(tezEvent))); |
| } |
| } |
| break; |
| case ROOT_INPUT_INITIALIZER_EVENT: |
| { |
| InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent(); |
| Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName()); |
| Preconditions.checkArgument(target != null, |
| "Event sent to unknown vertex: " + riEvent.getTargetVertexName()); |
| riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName()); |
| if (target == vertex) { |
| if (vertex.rootInputDescriptors == null || |
| !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) { |
| throw new TezUncheckedException( |
| "InputInitializerEvent targeted at unknown initializer on vertex " + |
| vertex.logIdentifier + ", Event=" + riEvent); |
| } |
| if (vertex.getState() == VertexState.NEW) { |
| vertex.pendingInitializerEvents.add(tezEvent); |
| } else if (vertex.getState() == VertexState.INITIALIZING) { |
| vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent)); |
| } else { |
| // Currently, INITED and subsequent states means Initializer complete / failure |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + vertex.getState()); |
| } |
| } |
| } else { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(), |
| Collections.singletonList(tezEvent))); |
| } |
| } |
| break; |
| case INPUT_READ_ERROR_EVENT: |
| { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex( |
| sourceMeta.getEdgeVertexName())); |
| srcEdge.sendTezEventToSourceTasks(tezEvent); |
| } |
| break; |
| case TASK_STATUS_UPDATE_EVENT: |
| { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| TaskStatusUpdateEvent sEvent = |
| (TaskStatusUpdateEvent) tezEvent.getEvent(); |
| vertex.getEventHandler().handle( |
| new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(), |
| sEvent)); |
| } |
| break; |
| case TASK_ATTEMPT_COMPLETED_EVENT: |
| { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| vertex.getEventHandler().handle( |
| new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), |
| TaskAttemptEventType.TA_DONE)); |
| } |
| break; |
| case TASK_ATTEMPT_FAILED_EVENT: |
| { |
| checkEventSourceMetadata(vertex, sourceMeta); |
| TaskAttemptTerminationCause errCause = null; |
| switch (sourceMeta.getEventGenerator()) { |
| case INPUT: |
| errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR; |
| break; |
| case PROCESSOR: |
| errCause = TaskAttemptTerminationCause.APPLICATION_ERROR; |
| break; |
| case OUTPUT: |
| errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR; |
| break; |
| case SYSTEM: |
| errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR; |
| break; |
| default: |
| throw new TezUncheckedException("Unknown EventProducerConsumerType: " + |
| sourceMeta.getEventGenerator()); |
| } |
| TaskAttemptFailedEvent taskFailedEvent = |
| (TaskAttemptFailedEvent) tezEvent.getEvent(); |
| vertex.getEventHandler().handle( |
| new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), |
| TaskAttemptEventType.TA_FAILED, |
| "Error: " + taskFailedEvent.getDiagnostics(), |
| errCause) |
| ); |
| } |
| break; |
| default: |
| throw new TezUncheckedException("Unhandled tez event type: " |
| + tezEvent.getEventType()); |
| } |
| } |
| } |
| |
| private static class InternalErrorTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| LOG.error("Invalid event " + event.getType() + " on Vertex " |
| + vertex.getLogIdentifier()); |
| vertex.eventHandler.handle(new DAGEventDiagnosticsUpdate( |
| vertex.getDAGId(), "Invalid event " + event.getType() |
| + " on Vertex " + vertex.getLogIdentifier())); |
| vertex.setFinishTime(); |
| vertex.finished(VertexState.ERROR); |
| } |
| } |
| |
| private void setupInputInitializerManager() { |
| rootInputInitializerManager = createRootInputInitializerManager( |
| getDAG().getName(), getName(), getVertexId(), |
| eventHandler, getTotalTasks(), |
| appContext.getTaskScheduler().getNumClusterNodes(), |
| getTaskResource(), |
| appContext.getTaskScheduler().getTotalResources()); |
| List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size()); |
| for (String inputName : inputsWithInitializers) { |
| inputList.add(rootInputDescriptors.get(inputName)); |
| } |
| LOG.info("Vertex will initialize via inputInitializers " |
| + logIdentifier + ". Starting root input initializers: " |
| + inputsWithInitializers.size()); |
| initWaitsForRootInitializers = true; |
| rootInputInitializerManager.runInputInitializers(inputList); |
| // Send pending rootInputInitializerEvents |
| rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents); |
| pendingInitializerEvents.clear(); |
| } |
| |
| private static class VertexStateChangedCallback |
| implements OnStateChangedCallback<VertexState, VertexImpl> { |
| |
| @Override |
| public void onStateChanged(VertexImpl vertex, VertexState vertexState) { |
| vertex.stateChangeNotifier.stateChanged(vertex.getVertexId(), |
| new VertexStateUpdate(vertex.getName(), convertInternalState( |
| vertexState, vertex.getVertexId()))); |
| } |
| |
| private org.apache.tez.dag.api.event.VertexState convertInternalState(VertexState vertexState, |
| TezVertexID vertexId) { |
| switch (vertexState) { |
| case RUNNING: |
| return org.apache.tez.dag.api.event.VertexState.RUNNING; |
| case SUCCEEDED: |
| return org.apache.tez.dag.api.event.VertexState.SUCCEEDED; |
| case FAILED: |
| return org.apache.tez.dag.api.event.VertexState.FAILED; |
| case KILLED: |
| return org.apache.tez.dag.api.event.VertexState.KILLED; |
| case NEW: |
| case INITIALIZING: |
| case INITED: |
| case ERROR: |
| case TERMINATING: |
| case RECOVERING: |
| default: |
| throw new TezUncheckedException( |
| "Not expecting state updates for state: " + vertexState + ", VertexID: " + vertexId); |
| } |
| } |
| } |
| |
| @Override |
| public void setInputVertices(Map<Vertex, Edge> inVertices) { |
| this.sourceVertices = inVertices; |
| } |
| |
| @Override |
| public void setOutputVertices(Map<Vertex, Edge> outVertices) { |
| this.targetVertices = outVertices; |
| } |
| |
| @Override |
| public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) { |
| this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size()); |
| for (RootInputLeafOutputProto input : inputs) { |
| |
| InputDescriptor id = DagTypeConverters |
| .convertInputDescriptorFromDAGPlan(input.getIODescriptor()); |
| |
| this.rootInputDescriptors |
| .put( |
| input.getName(), |
| new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>( |
| input.getName(), id, |
| input.hasControllerDescriptor() ? DagTypeConverters |
| .convertInputInitializerDescriptorFromDAGPlan(input |
| .getControllerDescriptor()) : null)); |
| this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public Map<String, OutputCommitter> getOutputCommitters() { |
| return outputCommitters; |
| } |
| |
| @Nullable |
| @Private |
| @VisibleForTesting |
| public OutputCommitter getOutputCommitter(String outputName) { |
| if (this.outputCommitters != null) { |
| return outputCommitters.get(outputName); |
| } |
| return null; |
| } |
| |
| @Override |
| public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) { |
| LOG.info("setting additional outputs for vertex " + this.vertexName); |
| this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size()); |
| this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size()); |
| for (RootInputLeafOutputProto output : outputs) { |
| OutputDescriptor od = DagTypeConverters |
| .convertOutputDescriptorFromDAGPlan(output.getIODescriptor()); |
| |
| this.additionalOutputs |
| .put( |
| output.getName(), |
| new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>( |
| output.getName(), od, |
| output.hasControllerDescriptor() ? DagTypeConverters |
| .convertOutputCommitterDescriptorFromDAGPlan(output |
| .getControllerDescriptor()) : null)); |
| OutputSpec outputSpec = new OutputSpec(output.getName(), od, 0); |
| additionalOutputSpecs.add(outputSpec); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| getAdditionalInputs() { |
| return this.rootInputDescriptors; |
| } |
| |
| @Nullable |
| @Override |
| public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> |
| getAdditionalOutputs() { |
| return this.additionalOutputs; |
| } |
| |
| @Override |
| public int compareTo(Vertex other) { |
| return this.vertexId.compareTo(other.getVertexId()); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) { |
| return true; |
| } |
| if (obj == null) { |
| return false; |
| } |
| if (getClass() != obj.getClass()) { |
| return false; |
| } |
| Vertex other = (Vertex) obj; |
| return this.vertexId.equals(other.getVertexId()); |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 11239; |
| return prime + prime * this.vertexId.hashCode(); |
| } |
| |
| @Override |
| public Map<Vertex, Edge> getInputVertices() { |
| return Collections.unmodifiableMap(this.sourceVertices); |
| } |
| |
| @Override |
| public Map<Vertex, Edge> getOutputVertices() { |
| return Collections.unmodifiableMap(this.targetVertices); |
| } |
| |
| @Override |
| public int getInputVerticesCount() { |
| return this.sourceVertices.size(); |
| } |
| |
| @Override |
| public int getOutputVerticesCount() { |
| return this.targetVertices.size(); |
| } |
| |
| @Override |
| public ProcessorDescriptor getProcessorDescriptor() { |
| return processorDescriptor; |
| } |
| |
| @Override |
| public DAG getDAG() { |
| return appContext.getCurrentDAG(); |
| } |
| |
| private TezDAGID getDAGId() { |
| return getDAG().getID(); |
| } |
| |
| public Resource getTaskResource() { |
| return taskResource; |
| } |
| |
| @VisibleForTesting |
| String getProcessorName() { |
| return this.processorDescriptor.getClassName(); |
| } |
| |
| @VisibleForTesting |
| String getJavaOpts() { |
| return this.javaOpts; |
| } |
| |
| @VisibleForTesting |
| TaskLocationHint[] getTaskLocationHints() { |
| return taskLocationHints; |
| } |
| |
| // TODO Eventually remove synchronization. |
| @Override |
| public synchronized List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException { |
| List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() |
| + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); |
| if (rootInputDescriptors != null) { |
| for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| rootInputDescriptorEntry : rootInputDescriptors.entrySet()) { |
| inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(), |
| rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get( |
| rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); |
| } |
| } |
| for (Entry<Vertex, Edge> entry : this.getInputVertices().entrySet()) { |
| InputSpec inputSpec = entry.getValue().getDestinationSpec(taskIndex); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("For vertex : " + this.getLogIdentifier() |
| + ", Using InputSpec : " + inputSpec); |
| } |
| // TODO DAGAM This should be based on the edge type. |
| inputSpecList.add(inputSpec); |
| } |
| return inputSpecList; |
| } |
| |
| // TODO Eventually remove synchronization. |
| @Override |
| public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException { |
| List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() |
| + this.additionalOutputSpecs.size()); |
| outputSpecList.addAll(additionalOutputSpecs); |
| for (Entry<Vertex, Edge> entry : this.getOutputVertices().entrySet()) { |
| OutputSpec outputSpec = entry.getValue().getSourceSpec(taskIndex); |
| outputSpecList.add(outputSpec); |
| } |
| return outputSpecList; |
| } |
| |
| //TODO Eventually remove synchronization. |
| @Override |
| public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) { |
| return groupInputSpecList; |
| } |
| |
| @Override |
| public synchronized void addSharedOutputs(Set<String> outputs) { |
| this.sharedOutputs.addAll(outputs); |
| } |
| |
| @Override |
| public synchronized Set<String> getSharedOutputs() { |
| return this.sharedOutputs; |
| } |
| |
| @VisibleForTesting |
| VertexManager getVertexManager() { |
| return this.vertexManager; |
| } |
| |
| private static void logLocationHints(String vertexName, |
| VertexLocationHint locationHint) { |
| if (locationHint == null) { |
| LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName); |
| return; |
| } |
| Multiset<String> hosts = HashMultiset.create(); |
| Multiset<String> racks = HashMultiset.create(); |
| int counter = 0; |
| for (TaskLocationHint taskLocationHint : locationHint |
| .getTaskLocationHints()) { |
| StringBuilder sb = new StringBuilder(); |
| if (taskLocationHint.getHosts() == null) { |
| sb.append("No Hosts"); |
| } else { |
| sb.append("Hosts: "); |
| for (String host : taskLocationHint.getHosts()) { |
| hosts.add(host); |
| sb.append(host).append(", "); |
| } |
| } |
| if (taskLocationHint.getRacks() == null) { |
| sb.append("No Racks"); |
| } else { |
| sb.append("Racks: "); |
| for (String rack : taskLocationHint.getRacks()) { |
| racks.add(rack); |
| sb.append(rack).append(", "); |
| } |
| } |
| LOG.debug("Vertex: " + vertexName + ", Location: " |
| + counter + " : " + sb.toString()); |
| counter++; |
| } |
| |
| LOG.debug("Vertex: " + vertexName + ", Host Counts"); |
| for (Multiset.Entry<String> host : hosts.entrySet()) { |
| LOG.debug("Vertex: " + vertexName + ", host: " + host.toString()); |
| } |
| |
| LOG.debug("Vertex: " + vertexName + ", Rack Counts"); |
| for (Multiset.Entry<String> rack : racks.entrySet()) { |
| LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString()); |
| } |
| } |
| } |