| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import javax.annotation.Nullable; |
| |
| import com.google.protobuf.CodedInputStream; |
| import com.google.protobuf.CodedOutputStream; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.service.ServiceOperations; |
| import org.apache.hadoop.service.ServiceStateException; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitonException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.tez.client.TezClientUtils; |
| import org.apache.tez.common.ATSConstants; |
| import org.apache.tez.common.GuavaShim; |
| import org.apache.tez.common.ProgressHelper; |
| import org.apache.tez.common.ReflectionUtils; |
| import org.apache.tez.common.TezUtilsInternal; |
| import org.apache.tez.common.counters.AggregateTezCounters; |
| import org.apache.tez.common.counters.LimitExceededException; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.common.io.NonSyncByteArrayOutputStream; |
| import org.apache.tez.dag.api.DagTypeConverters; |
| import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; |
| import org.apache.tez.dag.api.EdgeProperty; |
| import org.apache.tez.dag.api.EdgeProperty.DataMovementType; |
| import org.apache.tez.dag.api.InputDescriptor; |
| import org.apache.tez.dag.api.InputInitializerDescriptor; |
| import org.apache.tez.dag.api.OutputCommitterDescriptor; |
| import org.apache.tez.dag.api.OutputDescriptor; |
| import org.apache.tez.dag.api.ProcessorDescriptor; |
| import org.apache.tez.dag.api.RootInputLeafOutput; |
| import org.apache.tez.dag.api.Scope; |
| import org.apache.tez.dag.api.TaskLocationHint; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezConstants; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.api.UserPayload; |
| import org.apache.tez.dag.api.VertexLocationHint; |
| import org.apache.tez.dag.api.VertexManagerPlugin; |
| import org.apache.tez.dag.api.VertexManagerPluginContext; |
| import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; |
| import org.apache.tez.dag.api.VertexManagerPluginDescriptor; |
| import org.apache.tez.dag.api.client.ProgressBuilder; |
| import org.apache.tez.dag.api.client.StatusGetOpts; |
| import org.apache.tez.dag.api.client.VertexStatus; |
| import org.apache.tez.dag.api.client.VertexStatusBuilder; |
| import org.apache.tez.dag.api.event.VertexStateUpdate; |
| import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated; |
| import org.apache.tez.dag.api.oldrecords.TaskState; |
| import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; |
| import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; |
| import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData; |
| import org.apache.tez.dag.app.TaskAttemptEventInfo; |
| import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.DAG; |
| import org.apache.tez.dag.app.dag.RootInputInitializerManager; |
| import org.apache.tez.dag.app.dag.StateChangeNotifier; |
| import org.apache.tez.dag.app.dag.Task; |
| import org.apache.tez.dag.app.dag.TaskAttempt; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; |
| import org.apache.tez.dag.app.dag.TaskTerminationCause; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.VertexState; |
| import org.apache.tez.dag.app.dag.VertexTerminationCause; |
| import org.apache.tez.dag.app.dag.event.CallableEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventType; |
| import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; |
| import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; |
| import org.apache.tez.dag.app.dag.event.SpeculatorEvent; |
| import org.apache.tez.dag.app.dag.event.TaskEvent; |
| import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; |
| import org.apache.tez.dag.app.dag.event.TaskEventTermination; |
| import org.apache.tez.dag.app.dag.event.TaskEventType; |
| import org.apache.tez.dag.app.dag.event.VertexShuffleDataDeletion; |
| import org.apache.tez.dag.app.dag.event.VertexEvent; |
| import org.apache.tez.dag.app.dag.event.VertexEventCommitCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; |
| import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; |
| import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized; |
| import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; |
| import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed; |
| import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized; |
| import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; |
| import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; |
| import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; |
| import org.apache.tez.dag.app.dag.event.VertexEventTermination; |
| import org.apache.tez.dag.app.dag.event.VertexEventType; |
| import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; |
| import org.apache.tez.dag.app.dag.impl.Edge.PendingEventRouteMetadata; |
| import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.events.VertexCommitStartedEvent; |
| import org.apache.tez.dag.history.events.VertexFinishedEvent; |
| import org.apache.tez.dag.history.events.VertexInitializedEvent; |
| import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; |
| import org.apache.tez.dag.history.events.VertexStartedEvent; |
| import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; |
| import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; |
| import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; |
| import org.apache.tez.dag.records.TaskAttemptTerminationCause; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.InputSpecUpdate; |
| import org.apache.tez.runtime.api.InputStatistics; |
| import org.apache.tez.runtime.api.OutputCommitter; |
| import org.apache.tez.runtime.api.OutputCommitterContext; |
| import org.apache.tez.runtime.api.OutputStatistics; |
| import org.apache.tez.runtime.api.TaskAttemptIdentifier; |
| import org.apache.tez.runtime.api.VertexStatistics; |
| import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; |
| import org.apache.tez.runtime.api.events.CustomProcessorEvent; |
| import org.apache.tez.runtime.api.events.DataMovementEvent; |
| import org.apache.tez.runtime.api.events.InputDataInformationEvent; |
| import org.apache.tez.runtime.api.events.InputFailedEvent; |
| import org.apache.tez.runtime.api.events.InputInitializerEvent; |
| import org.apache.tez.runtime.api.events.VertexManagerEvent; |
| import org.apache.tez.runtime.api.impl.EventMetaData; |
| import org.apache.tez.runtime.api.impl.EventType; |
| import org.apache.tez.runtime.api.impl.GroupInputSpec; |
| import org.apache.tez.runtime.api.impl.InputSpec; |
| import org.apache.tez.runtime.api.impl.OutputSpec; |
| import org.apache.tez.runtime.api.impl.TaskSpec; |
| import org.apache.tez.runtime.api.impl.TaskStatistics; |
| import org.apache.tez.runtime.api.impl.TezEvent; |
| import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; |
| import org.apache.tez.state.OnStateChangedCallback; |
| import org.apache.tez.state.StateMachineTez; |
| import org.apache.tez.util.StringInterner; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.tez.common.Preconditions; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.HashMultiset; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multiset; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| |
| /** Implementation of Vertex interface. Maintains the state machines of Vertex. |
| * The read and write calls use ReadWriteLock for concurrency. |
| */ |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandler<VertexEvent> { |
| |
| private static final String LINE_SEPARATOR = System |
| .getProperty("line.separator"); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(VertexImpl.class); |
| |
| //final fields |
| private final Clock clock; |
| |
| private final Lock readLock; |
| private final Lock writeLock; |
| private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; |
| private final TaskHeartbeatHandler taskHeartbeatHandler; |
| private final Object tasksSyncHandle = new Object(); |
| |
| private final EventHandler eventHandler; |
| // TODO Metrics |
| //private final MRAppMetrics metrics; |
| private final AppContext appContext; |
| private final DAG dag; |
| private final VertexRecoveryData recoveryData; |
| private boolean isVertexInitSkipped = false; |
| private List<TezEvent> initGeneratedEvents = new ArrayList<TezEvent>(); |
| // set it to be true when setParallelism is called(used for recovery) |
| private boolean setParallelismCalledFlag = false; |
| |
| private boolean lazyTasksCopyNeeded = false; |
| // must be a linked map for ordering |
| volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>(); |
| private Object fullCountersLock = new Object(); |
| private TezCounters counters = new TezCounters(); |
| private TezCounters fullCounters = null; |
| private TezCounters cachedCounters = null; |
| private long cachedCountersTimestamp = 0; |
| private Resource taskResource; |
| |
| // Merged/combined vertex level config |
| private Configuration vertexConf; |
| // Vertex specific configs only ( include the dag specific configs too ) |
| // Useful when trying to serialize only the diff from global configs |
| @VisibleForTesting |
| Configuration vertexOnlyConf; |
| |
| private final boolean isSpeculationEnabled; |
| |
| @VisibleForTesting |
| final int taskSchedulerIdentifier; |
| @VisibleForTesting |
| final int containerLauncherIdentifier; |
| @VisibleForTesting |
| final int taskCommunicatorIdentifier; |
| |
| final ServicePluginInfo servicePluginInfo; |
| |
| /* |
| * For every upstream host (as map keys) contains every unique downstream hostnames that reported INPUT_READ_ERROR. |
| * This map helps to decide if there is a problem with the host that produced the map outputs. There is an assumption |
| * that if multiple downstream hosts report input errors for the same upstream host, then it's likely that the output |
| * has to be blamed and needs to rerun. |
| */ |
| private final Map<String, Set<String>> downstreamBlamingHosts = Maps.newHashMap(); |
| |
| private final float maxFailuresPercent; |
| private boolean logSuccessDiagnostics = false; |
| |
| private final VertexConfigImpl vertexContextConfig; |
| //fields initialized in init |
| |
| @VisibleForTesting |
| int numStartedSourceVertices = 0; |
| @VisibleForTesting |
| int numInitedSourceVertices = 0; |
| @VisibleForTesting |
| int numRecoveredSourceVertices = 0; |
| |
| private int distanceFromRoot = 0; |
| |
| private final List<String> diagnostics = new ArrayList<String>(); |
| |
| protected final StateChangeNotifier stateChangeNotifier; |
| |
| //task/attempt related datastructures |
| @VisibleForTesting |
| int numSuccessSourceAttemptCompletions = 0; |
| |
| List<GroupInputSpec> groupInputSpecList; |
| Set<String> sharedOutputs = Sets.newHashSet(); |
| |
| private static final InternalErrorTransition |
| INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); |
| private static final RouteEventTransition |
| ROUTE_EVENT_TRANSITION = new RouteEventTransition(); |
| private static final TaskAttemptCompletedEventTransition |
| TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = |
| new TaskAttemptCompletedEventTransition(); |
| private static final SourceTaskAttemptCompletedEventTransition |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = |
| new SourceTaskAttemptCompletedEventTransition(); |
| private static final CommitCompletedTransition |
| COMMIT_COMPLETED_TRANSITION = |
| new CommitCompletedTransition(); |
| private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK = |
| new VertexStateChangedCallback(); |
| |
| @VisibleForTesting |
| final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>(); |
| |
| @VisibleForTesting |
| final List<VertexManagerEvent> pendingVmEvents = new LinkedList<>(); |
| |
| private final AtomicBoolean servicesInited; |
| private LegacySpeculator speculator; |
| private List<AbstractService> services; |
| |
| @VisibleForTesting |
| Map<String, ListenableFuture<Void>> commitFutures = new ConcurrentHashMap<String, ListenableFuture<Void>>(); |
| |
| protected static final |
| StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent> |
| stateMachineFactory |
| = new StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent> |
| (VertexState.NEW) |
| |
| // Transitions from NEW state |
| .addTransition |
| (VertexState.NEW, |
| EnumSet.of(VertexState.NEW, VertexState.INITED, |
| VertexState.INITIALIZING, VertexState.FAILED, VertexState.KILLED), |
| VertexEventType.V_INIT, |
| new InitTransition()) |
| .addTransition(VertexState.NEW, |
| EnumSet.of(VertexState.NEW), |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| new NullEdgeInitializedTransition()) |
| .addTransition(VertexState.NEW, |
| EnumSet.of(VertexState.NEW), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition(VertexState.NEW, |
| EnumSet.of(VertexState.NEW), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition |
| (VertexState.NEW, |
| EnumSet.of(VertexState.NEW, |
| VertexState.SUCCEEDED, VertexState.FAILED, |
| VertexState.KILLED, VertexState.ERROR), |
| VertexEventType.V_RECOVER, |
| new RecoverTransition()) |
| .addTransition(VertexState.NEW, VertexState.NEW, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| new SourceVertexStartedTransition()) |
| .addTransition(VertexState.NEW, VertexState.KILLED, |
| VertexEventType.V_TERMINATE, |
| new TerminateNewVertexTransition()) |
| .addTransition(VertexState.NEW, VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Transitions from INITIALIZING state |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, |
| VertexState.FAILED), |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| new RootInputInitializedTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, |
| VertexState.FAILED), |
| VertexEventType.V_INPUT_DATA_INFORMATION, |
| new InputDataInformationTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITED, VertexState.FAILED), |
| VertexEventType.V_READY_TO_INIT, |
| new VertexInitializedTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| new RootInputInitFailedTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING, |
| VertexEventType.V_START, |
| new StartWhileInitializingTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| new SourceVertexStartedTransition()) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.FAILED), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.KILLED, |
| VertexEventType.V_TERMINATE, |
| new TerminateInitingVertexTransition()) |
| .addTransition(VertexState.INITIALIZING, VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition(VertexState.INITIALIZING, |
| EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, |
| VertexState.FAILED), |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| new NullEdgeInitializedTransition()) |
| |
| // Transitions from INITED state |
| // SOURCE_VERTEX_STARTED - for sources which determine parallelism, |
| // they must complete before this vertex can start. |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| new RootInputInitFailedTransition()) |
| .addTransition(VertexState.INITED, VertexState.INITED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| new SourceVertexStartedTransition()) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.INITED), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.RUNNING, VertexState.INITED, VertexState.TERMINATING), |
| VertexEventType.V_START, |
| new StartTransition()) |
| .addTransition(VertexState.INITED, |
| EnumSet.of(VertexState.INITED, VertexState.FAILED), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition(VertexState.INITED, VertexState.KILLED, |
| VertexEventType.V_TERMINATE, |
| new TerminateInitedVertexTransition()) |
| .addTransition(VertexState.INITED, EnumSet.of(VertexState.FAILED), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| .addTransition(VertexState.INITED, VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| |
| // Transitions from RUNNING state |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.TERMINATING), |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| new RootInputInitFailedTransition()) |
| .addTransition(VertexState.RUNNING, VertexState.RUNNING, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) |
| .addTransition |
| (VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, |
| VertexState.COMMITTING, |
| VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED, |
| VertexState.ERROR), |
| VertexEventType.V_TASK_COMPLETED, |
| new TaskCompletedTransition()) |
| .addTransition(VertexState.RUNNING, VertexState.TERMINATING, |
| VertexEventType.V_TERMINATE, |
| new VertexKilledTransition()) |
| .addTransition(VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| .addTransition(VertexState.RUNNING, VertexState.RUNNING, |
| VertexEventType.V_TASK_RESCHEDULED, |
| new TaskRescheduledTransition()) |
| .addTransition(VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, |
| VertexState.FAILED), |
| VertexEventType.V_COMPLETED, |
| new VertexNoTasksCompletedTransition()) |
| .addTransition( |
| VertexState.RUNNING, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition( |
| VertexState.RUNNING, |
| EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| |
| // Transitions from COMMITTING state. |
| .addTransition( |
| VertexState.COMMITTING, |
| EnumSet.of(VertexState.COMMITTING, VertexState.TERMINATING, |
| VertexState.SUCCEEDED, VertexState.FAILED), |
| VertexEventType.V_COMMIT_COMPLETED, |
| COMMIT_COMPLETED_TRANSITION) |
| .addTransition( |
| VertexState.COMMITTING, |
| VertexState.TERMINATING, |
| VertexEventType.V_TERMINATE, |
| new VertexKilledWhileCommittingTransition()) |
| .addTransition( |
| VertexState.COMMITTING, |
| VertexState.ERROR, |
| VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition( |
| VertexState.COMMITTING, |
| EnumSet.of(VertexState.COMMITTING, VertexState.TERMINATING), |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition( |
| VertexState.COMMITTING, |
| VertexState.TERMINATING, |
| VertexEventType.V_TASK_RESCHEDULED, |
| new TaskRescheduledWhileCommittingTransition()) |
| .addTransition(VertexState.COMMITTING, |
| EnumSet.of(VertexState.TERMINATING), |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| new VertexManagerUserCodeErrorTransition()) |
| |
| // Transitions from TERMINATING state. |
| .addTransition |
| (VertexState.TERMINATING, |
| EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED, VertexState.ERROR), |
| VertexEventType.V_TASK_COMPLETED, |
| new TaskCompletedTransition()) |
| .addTransition |
| (VertexState.TERMINATING, |
| EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED, VertexState.ERROR), |
| VertexEventType.V_COMPLETED, |
| new VertexNoTasksCompletedTransition()) |
| .addTransition( |
| VertexState.TERMINATING, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition( |
| VertexState.TERMINATING, |
| EnumSet.of(VertexState.TERMINATING, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR), |
| VertexEventType.V_COMMIT_COMPLETED, |
| COMMIT_COMPLETED_TRANSITION) |
| // Ignore-able events |
| .addTransition(VertexState.TERMINATING, VertexState.TERMINATING, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_DELETE_SHUFFLE_DATA)) |
| |
| // Transitions from SUCCEEDED state |
| .addTransition( |
| VertexState.SUCCEEDED, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| .addTransition(VertexState.SUCCEEDED, |
| EnumSet.of(VertexState.RUNNING, VertexState.FAILED), |
| VertexEventType.V_TASK_RESCHEDULED, |
| new TaskRescheduledAfterVertexSuccessTransition()) |
| |
| .addTransition( |
| VertexState.SUCCEEDED, |
| EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED), |
| // accumulate these in case we get restarted |
| VertexEventType.V_ROUTE_EVENT, |
| ROUTE_EVENT_TRANSITION) |
| .addTransition( |
| VertexState.SUCCEEDED, |
| EnumSet.of(VertexState.FAILED, VertexState.ERROR), |
| VertexEventType.V_TASK_COMPLETED, |
| new TaskCompletedAfterVertexSuccessTransition()) |
| // Ignore-able events |
| .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| // after we are done reruns of source tasks should not affect |
| // us. These reruns may be triggered by other consumer vertices. |
| // We should have been in RUNNING state if we had triggered the |
| // reruns. |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) |
| .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| new TaskAttemptCompletedEventTransition()) |
| .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, |
| VertexEventType.V_DELETE_SHUFFLE_DATA, |
| new VertexShuffleDeleteTransition()) |
| |
| |
| // Transitions from FAILED state |
| .addTransition( |
| VertexState.FAILED, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(VertexState.FAILED, VertexState.FAILED, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_START, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_COMPLETED, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_INPUT_DATA_INFORMATION, |
| VertexEventType.V_DELETE_SHUFFLE_DATA)) |
| |
| // Transitions from KILLED state |
| .addTransition( |
| VertexState.KILLED, |
| VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, |
| INTERNAL_ERROR_TRANSITION) |
| // Ignore-able events |
| .addTransition(VertexState.KILLED, VertexState.KILLED, |
| EnumSet.of(VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_INIT, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_START, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_COMPLETED, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_INPUT_DATA_INFORMATION, |
| VertexEventType.V_DELETE_SHUFFLE_DATA)) |
| |
| // No transitions from INTERNAL_ERROR state. Ignore all. |
| .addTransition( |
| VertexState.ERROR, |
| VertexState.ERROR, |
| EnumSet.of(VertexEventType.V_INIT, |
| VertexEventType.V_ROOT_INPUT_FAILED, |
| VertexEventType.V_SOURCE_VERTEX_STARTED, |
| VertexEventType.V_START, |
| VertexEventType.V_ROUTE_EVENT, |
| VertexEventType.V_TERMINATE, |
| VertexEventType.V_MANAGER_USER_CODE_ERROR, |
| VertexEventType.V_TASK_COMPLETED, |
| VertexEventType.V_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, |
| VertexEventType.V_TASK_RESCHEDULED, |
| VertexEventType.V_INTERNAL_ERROR, |
| VertexEventType.V_ROOT_INPUT_INITIALIZED, |
| VertexEventType.V_NULL_EDGE_INITIALIZED, |
| VertexEventType.V_INPUT_DATA_INFORMATION, |
| VertexEventType.V_DELETE_SHUFFLE_DATA)) |
| // create the topology tables |
| .installTopology(); |
| |
| private void augmentStateMachine() { |
| stateMachine |
| .registerStateEnteredCallback(VertexState.SUCCEEDED, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.FAILED, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.KILLED, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.RUNNING, |
| STATE_CHANGED_CALLBACK) |
| .registerStateEnteredCallback(VertexState.INITIALIZING, |
| STATE_CHANGED_CALLBACK);; |
| } |
| |
| private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine; |
| |
| //changing fields while the vertex is running |
| @VisibleForTesting |
| int numTasks; |
| @VisibleForTesting |
| int completedTaskCount = 0; |
| @VisibleForTesting |
| int succeededTaskCount = 0; |
| @VisibleForTesting |
| int failedTaskCount = 0; |
| @VisibleForTesting |
| int killedTaskCount = 0; |
| |
| // Both failed and killed task attempt counts are incremented via direct calls |
| // and not via state machine changes as they always increase. In no situation, does |
| // the counter need to be reset or changed back as failed attempts never go back to succeeded. |
| // Likewise for killed attempts. |
| // The same cannot apply to succeeded task attempts if they are tracked as they might be |
| // subsequently declared as failed. |
| @VisibleForTesting |
| AtomicInteger failedTaskAttemptCount = new AtomicInteger(0); |
| @VisibleForTesting |
| AtomicInteger killedTaskAttemptCount = new AtomicInteger(0); |
| AtomicInteger rejectedTaskAttemptCount = new AtomicInteger(0); |
| |
| @VisibleForTesting |
| long initTimeRequested; // Time at which INIT request was received. |
| @VisibleForTesting |
| long initedTime; // Time when entering state INITED |
| @VisibleForTesting |
| long startTimeRequested; // Time at which START request was received. |
| @VisibleForTesting |
| long startedTime; // Time when entering state STARTED |
| @VisibleForTesting |
| long finishTime; |
| long firstTaskStartTime = -1L; |
| Object firstTaskStartTimeLock = new Object(); |
| private float progress; |
| |
| private final TezVertexID vertexId; //runtime assigned id. |
| private final VertexPlan vertexPlan; |
| private boolean initWaitsForRootInitializers = false; |
| |
| private final String vertexName; |
| private final ProcessorDescriptor processorDescriptor; |
| |
| private boolean vertexToBeReconfiguredByManager = false; |
| final AtomicBoolean vmIsInitialized = new AtomicBoolean(false); |
| final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); |
| |
| private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false); |
| |
| @VisibleForTesting |
| Map<Vertex, Edge> sourceVertices; |
| private Map<Vertex, Edge> targetVertices; |
| private boolean cleanupShuffleDataAtVertexLevel; |
| @VisibleForTesting |
| VertexShuffleDataDeletionContext vShuffleDeletionContext; |
| Set<Edge> uninitializedEdges = Sets.newHashSet(); |
| // using a linked hash map to conveniently map edge names to a contiguous index |
| LinkedHashMap<String, Integer> ioIndices = Maps.newLinkedHashMap(); |
| |
| private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| rootInputDescriptors; |
| private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> |
| additionalOutputs; |
| private Map<String, OutputCommitter> outputCommitters; |
| private Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>(); |
| private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate |
| .getDefaultSinglePhysicalInputSpecUpdate(); |
| private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>(); |
| private Set<String> inputsWithInitializers; |
| private int numInitializedInputs; |
| @VisibleForTesting |
| int numInitializerCompletionsHandled = 0; |
| private boolean startSignalPending = false; |
| // We may always store task events in the vertex for scalability |
| List<TezEvent> pendingTaskEvents = Lists.newLinkedList(); |
| private boolean tasksNotYetScheduled = true; |
| // must be a random access structure |
| |
| private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000); |
| // Do not send any events if attempt is failed due to INPUT_FAILED_EVENTS. |
| private final Set<TezTaskAttemptID> failedTaskAttemptIDs = Sets.newHashSet(); |
| private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock(); |
| private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock(); |
| private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock(); |
| |
| List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>(); |
| List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList(); |
| |
| |
| private RootInputInitializerManager rootInputInitializerManager; |
| |
| VertexManager vertexManager; |
| |
| private final UserGroupInformation dagUgi; |
| |
| private AtomicBoolean committed = new AtomicBoolean(false); |
| private AtomicBoolean aborted = new AtomicBoolean(false); |
| private AtomicBoolean commitCanceled = new AtomicBoolean(false); |
| private boolean commitVertexOutputs = false; |
| |
| private Map<String, VertexGroupInfo> dagVertexGroups; |
| |
| private TaskLocationHint taskLocationHints[]; |
| private Map<String, LocalResource> localResources; |
| private final Map<String, String> environment; |
| private final Map<String, String> environmentTaskSpecific; |
| private final String javaOptsTaskSpecific; |
| private final String javaOpts; |
| private final ContainerContext containerContext; |
| private VertexTerminationCause terminationCause; |
| |
| private String logIdentifier; |
| |
| private VertexStats vertexStats = null; |
| |
| private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts; |
| |
| @VisibleForTesting |
| VertexStatisticsImpl completedTasksStatsCache; |
| |
| static class EventInfo { |
| final TezEvent tezEvent; |
| final Edge eventEdge; |
| final int eventTaskIndex; |
| boolean isObsolete = false; |
| EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) { |
| this.tezEvent = tezEvent; |
| this.eventEdge = eventEdge; |
| this.eventTaskIndex = eventTaskIndex; |
| } |
| } |
| |
| private VertexStatisticsImpl finalStatistics; |
| |
| |
| static class IOStatisticsImpl extends org.apache.tez.runtime.api.impl.IOStatistics |
| implements InputStatistics, OutputStatistics { |
| |
| @Override |
| public long getDataSize() { |
| return super.getDataSize(); |
| } |
| |
| @Override |
| public long getItemsProcessed() { |
| return super.getItemsProcessed(); |
| } |
| } |
| |
| class VertexStatisticsImpl implements VertexStatistics { |
| final Map<String, IOStatisticsImpl> ioStats; |
| final BitSet taskSet; |
| |
| public VertexStatisticsImpl() { |
| ioStats = Maps.newHashMapWithExpectedSize(ioIndices.size()); |
| taskSet = new BitSet(); |
| for (String name : getIOIndices().keySet()) { |
| ioStats.put(name, new IOStatisticsImpl()); |
| } |
| } |
| |
| public IOStatisticsImpl getIOStatistics(String ioName) { |
| return ioStats.get(ioName); |
| } |
| |
| void mergeFrom(TaskStatistics taskStats) { |
| if (taskStats == null) { |
| return; |
| } |
| |
| for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats |
| .getIOStatistics().entrySet()) { |
| String ioName = entry.getKey(); |
| IOStatisticsImpl myIOStat = ioStats.get(ioName); |
| Preconditions.checkState(myIOStat != null, "Unexpected IO name: " + ioName |
| + " for vertex:" + getLogIdentifier()); |
| myIOStat.mergeFrom(entry.getValue()); |
| } |
| } |
| |
| @Override |
| public InputStatistics getInputStatistics(String inputName) { |
| return getIOStatistics(inputName); |
| } |
| |
| @Override |
| public OutputStatistics getOutputStatistics(String outputName) { |
| return getIOStatistics(outputName); |
| } |
| |
| void addTask(TezTaskID taskID) { |
| taskSet.set(taskID.getId()); |
| } |
| |
| boolean containsTask(TezTaskID taskID) { |
| return taskSet.get(taskID.getId()); |
| } |
| } |
| |
| void resetCompletedTaskStatsCache(boolean recompute) { |
| completedTasksStatsCache = new VertexStatisticsImpl(); |
| if (recompute) { |
| for (Task t : getTasks().values()) { |
| if (t.getState() == TaskState.SUCCEEDED) { |
| completedTasksStatsCache.mergeFrom(((TaskImpl) t).getStatistics()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void initServices() { |
| if (servicesInited.get()) { |
| LOG.debug("Skipping Initing services for vertex because already" |
| + " Initialized, name={}", this.vertexName); |
| return; |
| } |
| writeLock.lock(); |
| try { |
| List<AbstractService> servicesToAdd = new ArrayList<>(); |
| if (isSpeculationEnabled()) { |
| // Initialize the speculator |
| LOG.debug("Initing service vertex speculator, name={}", this.vertexName); |
| speculator = new LegacySpeculator(vertexConf, getAppContext(), this); |
| speculator.init(vertexConf); |
| servicesToAdd.add(speculator); |
| } |
| services = Collections.synchronizedList(servicesToAdd); |
| servicesInited.set(true); |
| } finally { |
| writeLock.unlock(); |
| } |
| LOG.debug("Initing service vertex, name={}", this.vertexName); |
| } |
| |
| @Override |
| public void startServices() { |
| writeLock.lock(); |
| try { |
| if (!servicesInited.get()) { |
| initServices(); |
| } |
| for (AbstractService srvc : services) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("starting service : " + srvc.getName() |
| + ", for vertex: " + getName()); |
| } |
| srvc.start(); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void stopServices() { |
| Exception firstException = null; |
| List<AbstractService> stoppedServices = new ArrayList<>(); |
| writeLock.lock(); |
| try { |
| if (servicesInited.get()) { |
| for (AbstractService srvc : services) { |
| LOG.debug("Stopping service : {}", srvc); |
| Exception ex = ServiceOperations.stopQuietly(srvc); |
| if (ex != null && firstException == null) { |
| LOG.warn(String.format( |
| "Failed to stop service=(%s) for vertex name=(%s)", |
| srvc.getName(), getName()), ex); |
| firstException = ex; |
| } else { |
| stoppedServices.add(srvc); |
| } |
| } |
| services.clear(); |
| } |
| servicesInited.set(false); |
| } finally { |
| writeLock.unlock(); |
| } |
| // wait for services to stop |
| for (AbstractService srvc : stoppedServices) { |
| srvc.waitForServiceToStop(60000L); |
| } |
| // After stopping all services, rethrow the first exception raised |
| if (firstException != null) { |
| throw ServiceStateException.convert(firstException); |
| } |
| } |
| |
| public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, |
| String vertexName, Configuration dagConf, EventHandler eventHandler, |
| TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, |
| TaskHeartbeatHandler thh, boolean commitVertexOutputs, |
| AppContext appContext, VertexLocationHint vertexLocationHint, |
| Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, |
| StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) { |
| this.vertexId = vertexId; |
| this.vertexPlan = vertexPlan; |
| this.vertexName = StringInterner.intern(vertexName); |
| this.vertexConf = new Configuration(dagConf); |
| this.vertexOnlyConf = new Configuration(dagOnlyConf); |
| if (vertexPlan.hasVertexConf()) { |
| ConfigurationProto confProto = vertexPlan.getVertexConf(); |
| for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) { |
| TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX); |
| vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue()); |
| vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue()); |
| } |
| } |
| this.vertexContextConfig = new VertexConfigImpl(vertexConf); |
| |
| this.clock = clock; |
| this.appContext = appContext; |
| this.commitVertexOutputs = commitVertexOutputs; |
| this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]"; |
| |
| this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface; |
| this.taskHeartbeatHandler = thh; |
| this.eventHandler = eventHandler; |
| ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); |
| this.readLock = readWriteLock.readLock(); |
| this.writeLock = readWriteLock.writeLock(); |
| |
| if (LOG.isDebugEnabled()) { |
| logLocationHints(this.vertexName, vertexLocationHint); |
| } |
| setTaskLocationHints(vertexLocationHint); |
| |
| this.dagUgi = appContext.getCurrentDAG().getDagUGI(); |
| this.dag = appContext.getCurrentDAG(); |
| |
| this.taskResource = DagTypeConverters |
| .createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig()); |
| this.processorDescriptor = DagTypeConverters |
| .convertProcessorDescriptorFromDAGPlan(vertexPlan |
| .getProcessorDescriptor()); |
| this.localResources = DagTypeConverters |
| .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig() |
| .getLocalResourceList()); |
| this.localResources.putAll(dag.getLocalResources()); |
| this.environment = DagTypeConverters |
| .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig() |
| .getEnvironmentSettingList()); |
| this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption; |
| this.recoveryData = appContext.getDAGRecoveryData() == null ? |
| null : appContext.getDAGRecoveryData().getVertexRecoveryData(vertexId); |
| // Set up log properties, including task specific log properties. |
| String javaOptsWithoutLoggerMods = |
| vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null; |
| String logString = vertexConf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT); |
| String [] taskLogParams = TezClientUtils.parseLogParams(logString); |
| this.javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts(taskLogParams[0], javaOptsWithoutLoggerMods); |
| |
| if (taskSpecificLaunchCmdOpts.hasModifiedLogProperties()) { |
| String [] taskLogParamsTaskSpecific = taskSpecificLaunchCmdOption.getTaskSpecificLogParams(); |
| this.javaOptsTaskSpecific = TezClientUtils |
| .maybeAddDefaultLoggingJavaOpts(taskLogParamsTaskSpecific[0], javaOptsWithoutLoggerMods); |
| |
| environmentTaskSpecific = new HashMap<String, String>(this.environment.size()); |
| environmentTaskSpecific.putAll(environment); |
| if (taskLogParamsTaskSpecific.length == 2 && !Strings.isNullOrEmpty(taskLogParamsTaskSpecific[1])) { |
| TezClientUtils.addLogParamsToEnv(environmentTaskSpecific, taskLogParamsTaskSpecific); |
| } |
| } else { |
| this.javaOptsTaskSpecific = null; |
| this.environmentTaskSpecific = null; |
| } |
| |
| // env for tasks which don't have task-specific configuration. Has to be set up later to |
| // optionally allow copying this for specific tasks |
| TezClientUtils.addLogParamsToEnv(this.environment, taskLogParams); |
| |
| |
| this.containerContext = new ContainerContext(this.localResources, |
| appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this); |
| LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource); |
| |
| if (vertexPlan.getInputsCount() > 0) { |
| setAdditionalInputs(vertexPlan.getInputsList()); |
| } |
| if (vertexPlan.getOutputsCount() > 0) { |
| setAdditionalOutputs(vertexPlan.getOutputsList()); |
| } |
| this.stateChangeNotifier = entityStatusTracker; |
| |
| // Setup the initial parallelism early. This may be changed after |
| // initialization or on a setParallelism call. |
| this.numTasks = vertexPlan.getTaskConfig().getNumTasks(); |
| // Not sending the notifier a parallelism update since this is the initial parallelism |
| |
| this.dagVertexGroups = dagVertexGroups; |
| |
| isSpeculationEnabled = |
| vertexConf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, |
| TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT); |
| servicesInited = new AtomicBoolean(false); |
| initServices(); |
| |
| maxFailuresPercent = vertexConf.getFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, |
| TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT); |
| |
| // This "this leak" is okay because the retained pointer is in an |
| // instance variable. |
| |
| boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, |
| TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); |
| |
| String tezDefaultComponentName = |
| isLocal ? TezConstants.getTezUberServicePluginName() : |
| TezConstants.getTezYarnServicePluginName(); |
| |
| org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = dag.getDefaultExecutionContext(); |
| if (vertexPlan.hasExecutionContext()) { |
| execContext = DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext()); |
| LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName); |
| } else if (execContext != null) { |
| LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName); |
| } |
| if (execContext != null) { |
| if (execContext.shouldExecuteInAm()) { |
| tezDefaultComponentName = TezConstants.getTezUberServicePluginName(); |
| } |
| } |
| |
| String taskSchedulerName = tezDefaultComponentName; |
| String containerLauncherName = tezDefaultComponentName; |
| String taskCommName = tezDefaultComponentName; |
| |
| if (execContext != null) { |
| if (execContext.getTaskSchedulerName() != null) { |
| taskSchedulerName = execContext.getTaskSchedulerName(); |
| } |
| if (execContext.getContainerLauncherName() != null) { |
| containerLauncherName = execContext.getContainerLauncherName(); |
| } |
| if (execContext.getTaskCommName() != null) { |
| taskCommName = execContext.getTaskCommName(); |
| } |
| } |
| |
| try { |
| taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName); |
| } catch (Exception e) { |
| LOG.error("Failed to get index for taskScheduler: " + taskSchedulerName); |
| throw e; |
| } |
| try { |
| taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName); |
| } catch (Exception e) { |
| LOG.error("Failed to get index for taskCommunicator: " + taskCommName); |
| throw e; |
| } |
| try { |
| containerLauncherIdentifier = |
| appContext.getContainerLauncherIdentifier(containerLauncherName); |
| } catch (Exception e) { |
| LOG.error("Failed to get index for containerLauncher: " + containerLauncherName); |
| throw e; |
| } |
| this.servicePluginInfo = new ServicePluginInfo() |
| .setContainerLauncherName( |
| appContext.getContainerLauncherName(this.containerLauncherIdentifier)) |
| .setTaskSchedulerName(appContext.getTaskSchedulerName(this.taskSchedulerIdentifier)) |
| .setTaskCommunicatorName(appContext.getTaskCommunicatorName(this.taskCommunicatorIdentifier)) |
| .setContainerLauncherClassName( |
| appContext.getContainerLauncherClassName(this.containerLauncherIdentifier)) |
| .setTaskSchedulerClassName( |
| appContext.getTaskSchedulerClassName(this.taskSchedulerIdentifier)) |
| .setTaskCommunicatorClassName( |
| appContext.getTaskCommunicatorClassName(this.taskCommunicatorIdentifier)); |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Running vertex: ").append(logIdentifier).append(" : ") |
| .append("TaskScheduler=").append(taskSchedulerIdentifier).append(":").append(taskSchedulerName) |
| .append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName) |
| .append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName); |
| LOG.info(sb.toString()); |
| cleanupShuffleDataAtVertexLevel = vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, |
| TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 && |
| ShuffleUtils.isTezShuffleHandler(vertexConf); |
| stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>( |
| stateMachineFactory.make(this), this); |
| augmentStateMachine(); |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return vertexConf; |
| } |
| |
| @Override |
| public int getTaskSchedulerIdentifier() { |
| return this.taskSchedulerIdentifier; |
| } |
| |
| @Override |
| public int getContainerLauncherIdentifier() { |
| return this.containerLauncherIdentifier; |
| } |
| |
| @Override |
| public int getTaskCommunicatorIdentifier() { |
| return this.taskCommunicatorIdentifier; |
| } |
| |
| @Override |
| public ServicePluginInfo getServicePluginInfo() { |
| return servicePluginInfo; |
| } |
| |
| @Override |
| public boolean isSpeculationEnabled() { |
| return isSpeculationEnabled; |
| } |
| |
| protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() { |
| return stateMachine; |
| } |
| |
| @Override |
| public TezVertexID getVertexId() { |
| return vertexId; |
| } |
| |
| @Override |
| public VertexPlan getVertexPlan() { |
| return vertexPlan; |
| } |
| |
| @Override |
| public int getDistanceFromRoot() { |
| return distanceFromRoot; |
| } |
| |
| @Override |
| public LinkedHashMap<String, Integer> getIOIndices() { |
| return ioIndices; |
| } |
| |
| @Override |
| public String getName() { |
| return vertexName; |
| } |
| |
| EventHandler getEventHandler() { |
| return this.eventHandler; |
| } |
| |
| @Override |
| public Task getTask(TezTaskID taskID) { |
| readLock.lock(); |
| try { |
| return tasks.get(taskID); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Task getTask(int taskIndex) { |
| return getTask(TezTaskID.getInstance(this.vertexId, taskIndex)); |
| } |
| |
| @Override |
| public int getTotalTasks() { |
| readLock.lock(); |
| try { |
| return numTasks; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getCompletedTasks() { |
| readLock.lock(); |
| try { |
| return succeededTaskCount + failedTaskCount + killedTaskCount; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getSucceededTasks() { |
| readLock.lock(); |
| try { |
| return succeededTaskCount; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getRunningTasks() { |
| readLock.lock(); |
| try { |
| int num=0; |
| for (Task task : tasks.values()) { |
| if(task.getState() == TaskState.RUNNING) |
| num++; |
| } |
| return num; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TezCounters getAllCounters() { |
| |
| readLock.lock(); |
| |
| try { |
| if (inTerminalState()) { |
| this.mayBeConstructFinalFullCounters(); |
| return fullCounters; |
| } |
| |
| TezCounters counters = new TezCounters(); |
| counters.aggrAllCounters(this.counters); |
| return aggrTaskCounters(counters, tasks.values()); |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TezCounters getCachedCounters() { |
| readLock.lock(); |
| |
| try { |
| // FIXME a better lightweight approach for counters is needed |
| if (fullCounters == null && cachedCounters != null |
| && ((cachedCountersTimestamp+10000) > System.currentTimeMillis())) { |
| LOG.info("Asked for counters" |
| + ", cachedCountersTimestamp=" + cachedCountersTimestamp |
| + ", currentTime=" + System.currentTimeMillis()); |
| return cachedCounters; |
| } |
| |
| cachedCountersTimestamp = System.currentTimeMillis(); |
| if (inTerminalState()) { |
| this.mayBeConstructFinalFullCounters(); |
| return fullCounters; |
| } |
| |
| TezCounters counters = new TezCounters(); |
| counters.aggrAllCounters(this.counters); |
| cachedCounters = aggrTaskCounters(counters, tasks.values()); |
| return cachedCounters; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addCounters(final TezCounters tezCounters) { |
| counters.aggrAllCounters(tezCounters); |
| } |
| |
| @Override |
| public int getMaxTaskConcurrency() { |
| return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, |
| TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT); |
| } |
| |
| public VertexStats getVertexStats() { |
| |
| readLock.lock(); |
| try { |
| if (inTerminalState()) { |
| this.mayBeConstructFinalFullCounters(); |
| return this.vertexStats; |
| } |
| |
| VertexStats stats = new VertexStats(); |
| return updateVertexStats(stats, tasks.values()); |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getInitTime() { |
| readLock.lock(); |
| try { |
| return initedTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getStartTime() { |
| readLock.lock(); |
| try { |
| return startedTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getFinishTime() { |
| readLock.lock(); |
| try { |
| return finishTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void reportTaskStartTime(long taskStartTime) { |
| synchronized (firstTaskStartTimeLock) { |
| if (firstTaskStartTime < 0 || taskStartTime < firstTaskStartTime) { |
| firstTaskStartTime = taskStartTime; |
| } |
| } |
| } |
| |
| @Override |
| public long getFirstTaskStartTime() { |
| return firstTaskStartTime; |
| } |
| |
| @Override |
| public long getLastTaskFinishTime() { |
| readLock.lock(); |
| try { |
| if (inTerminalState()) { |
| mayBeConstructFinalFullCounters(); |
| return vertexStats.getLastTaskFinishTime(); |
| } else { |
| return -1; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public VertexConfig getVertexConfig() { |
| return vertexContextConfig; |
| } |
| |
| boolean inTerminalState() { |
| VertexState state = getInternalState(); |
| if (state == VertexState.ERROR || state == VertexState.FAILED |
| || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { |
| return true; |
| } |
| return false; |
| } |
| |
| public static TezCounters aggrTaskCounters( |
| TezCounters counters, Collection<Task> tasks) { |
| for (Task task : tasks) { |
| counters.aggrAllCounters(task.getCounters()); |
| } |
| return counters; |
| } |
| |
| public static VertexStats updateVertexStats( |
| VertexStats stats, Collection<Task> tasks) { |
| for (Task task : tasks) { |
| stats.updateStats(task.getReport()); |
| } |
| return stats; |
| } |
| |
| |
| @Override |
| public List<String> getDiagnostics() { |
| readLock.lock(); |
| try { |
| return diagnostics; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public float getProgress() { |
| this.readLock.lock(); |
| try { |
| final VertexState state = this.getState(); |
| switch (state) { |
| case NEW: |
| case INITED: |
| case INITIALIZING: |
| progress = 0.0f; |
| break; |
| case RUNNING: |
| computeProgress(); |
| break; |
| case KILLED: |
| case ERROR: |
| case FAILED: |
| case TERMINATING: |
| progress = 0.0f; |
| break; |
| case COMMITTING: |
| case SUCCEEDED: |
| progress = 1.0f; |
| break; |
| default: |
| // unknown, do not change progress |
| break; |
| } |
| return progress; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public float getCompletedTaskProgress() { |
| this.readLock.lock(); |
| try { |
| int totalTasks = getTotalTasks(); |
| if (totalTasks < 0) { |
| return 0.0f; |
| } |
| if (totalTasks == 0) { |
| VertexState state = getStateMachine().getCurrentState(); |
| if (state == VertexState.ERROR || state == VertexState.FAILED |
| || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { |
| return 1.0f; |
| } else { |
| return 0.0f; |
| } |
| } |
| return ((float)this.succeededTaskCount/totalTasks); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ProgressBuilder getVertexProgress() { |
| this.readLock.lock(); |
| try { |
| ProgressBuilder progress = new ProgressBuilder(); |
| progress.setTotalTaskCount(numTasks); |
| progress.setSucceededTaskCount(succeededTaskCount); |
| if (inTerminalState()) { |
| progress.setRunningTaskCount(0); |
| } else { |
| progress.setRunningTaskCount(getRunningTasks()); |
| } |
| progress.setFailedTaskCount(failedTaskCount); |
| progress.setKilledTaskCount(killedTaskCount); |
| progress.setFailedTaskAttemptCount(failedTaskAttemptCount.get()); |
| progress.setKilledTaskAttemptCount(killedTaskAttemptCount.get()); |
| progress.setRejectedTaskAttemptCount(rejectedTaskAttemptCount.get()); |
| return progress; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public VertexStatusBuilder getVertexStatus( |
| Set<StatusGetOpts> statusOptions) { |
| this.readLock.lock(); |
| try { |
| VertexStatusBuilder status = new VertexStatusBuilder(); |
| status.setId(getVertexId()); |
| status.setState(getInternalState()); |
| status.setDiagnostics(diagnostics); |
| status.setProgress(getVertexProgress()); |
| if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) { |
| status.setVertexCounters(getAllCounters()); |
| } |
| return status; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TaskLocationHint getTaskLocationHint(TezTaskID taskId) { |
| this.readLock.lock(); |
| try { |
| if (taskLocationHints == null || |
| taskLocationHints.length <= taskId.getId()) { |
| return null; |
| } |
| return taskLocationHints[taskId.getId()]; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @VisibleForTesting |
| List<EventInfo> getOnDemandRouteEvents() { |
| return onDemandRouteEvents; |
| } |
| |
| /** |
| * Updates the progress value in the vertex. |
| * This should be called only when the vertex is running state. |
| * No need to acquire the lock since this is nested inside |
| * {@link #getProgress() getProgress} method. |
| */ |
| private void computeProgress() { |
| |
| float accProg = 0.0f; |
| int tasksCount = this.tasks.size(); |
| for (Task task : this.tasks.values()) { |
| float taskProg = task.getProgress(); |
| if (LOG.isDebugEnabled()) { |
| if (!ProgressHelper.isProgressWithinRange(taskProg)) { |
| LOG.debug("progress update: vertex={}, task={} incorrect; range={}", |
| getName(), task.getTaskID(), taskProg); |
| } |
| } |
| accProg += ProgressHelper.processProgress(taskProg); |
| } |
| // tasksCount is 0, do not reset the current progress. |
| if (tasksCount > 0) { |
| // force the progress to be below within the range |
| progress = ProgressHelper.processProgress(accProg / tasksCount); |
| } |
| } |
| |
| @Override |
| public Map<TezTaskID, Task> getTasks() { |
| synchronized (tasksSyncHandle) { |
| lazyTasksCopyNeeded = true; |
| return Collections.unmodifiableMap(tasks); |
| } |
| } |
| |
| @Override |
| public VertexState getState() { |
| readLock.lock(); |
| try { |
| return getStateMachine().getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Set the terminationCause if it had not yet been set. |
| * |
| * @param trigger The trigger |
| * @return true if setting the value succeeded. |
| */ |
| boolean trySetTerminationCause(VertexTerminationCause trigger) { |
| if(terminationCause == null){ |
| terminationCause = trigger; |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public VertexTerminationCause getTerminationCause(){ |
| readLock.lock(); |
| try { |
| return terminationCause; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public AppContext getAppContext() { |
| return this.appContext; |
| } |
| |
| @Override |
| public String getLogIdentifier() { |
| return this.logIdentifier; |
| } |
| |
| @Override |
| public void incrementFailedTaskAttemptCount() { |
| this.failedTaskAttemptCount.incrementAndGet(); |
| } |
| |
| @Override |
| public void incrementKilledTaskAttemptCount() { |
| this.killedTaskAttemptCount.incrementAndGet(); |
| } |
| |
| @Override |
| public void incrementRejectedTaskAttemptCount() { |
| this.rejectedTaskAttemptCount.incrementAndGet(); |
| } |
| |
| @Override |
| public int getFailedTaskAttemptCount() { |
| return this.failedTaskAttemptCount.get(); |
| } |
| |
| @Override |
| public int getKilledTaskAttemptCount() { |
| return this.killedTaskAttemptCount.get(); |
| } |
| |
| @Override |
| public int getRejectedTaskAttemptCount() { |
| return this.rejectedTaskAttemptCount.get(); |
| } |
| |
| private void setTaskLocationHints(VertexLocationHint vertexLocationHint) { |
| if (vertexLocationHint != null && |
| vertexLocationHint.getTaskLocationHints() != null && |
| !vertexLocationHint.getTaskLocationHints().isEmpty()) { |
| List<TaskLocationHint> locHints = vertexLocationHint.getTaskLocationHints(); |
| // TODO TEZ-2246 hints size must match num tasks |
| taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]); |
| } |
| } |
| |
| @Override |
| public void scheduleSpeculativeTask(TezTaskID taskId) { |
| readLock.lock(); |
| try { |
| Preconditions.checkState(taskId.getId() < numTasks); |
| eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| void setupEdgeRouting() throws AMUserCodeException { |
| for (Edge e : sourceVertices.values()) { |
| e.routingToBegin(); |
| } |
| } |
| |
| private void unsetTasksNotYetScheduled() throws AMUserCodeException { |
| if (tasksNotYetScheduled) { |
| setupEdgeRouting(); |
| // change state under lock |
| writeLock.lock(); |
| try { |
| tasksNotYetScheduled = false; |
| // only now can we be sure of the edge manager type. so until now |
| // we will accumulate pending tasks in case legacy routing gets used. |
| // this is only needed to support mixed mode routing. Else for |
| // on demand routing events can be directly added to taskEvents when |
| // they arrive in handleRoutedEvents instead of first caching them in |
| // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents |
| // can be removed. |
| if (!pendingTaskEvents.isEmpty()) { |
| LOG.info("Routing pending task events for vertex: " + logIdentifier); |
| try { |
| handleRoutedTezEvents(pendingTaskEvents, true); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier; |
| LOG.error(msg, e); |
| addDiagnostic(msg + ", " + e.getMessage() + ", " |
| + ExceptionUtils.getStackTrace(e.getCause())); |
| eventHandler.handle(new VertexEventTermination(vertexId, |
| VertexTerminationCause.AM_USERCODE_FAILURE)); |
| return; |
| } |
| pendingTaskEvents.clear(); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| } |
| |
| TaskSpec createRemoteTaskSpec(int taskIndex) throws AMUserCodeException { |
| return TaskSpec.createBaseTaskSpec(getDAG().getName(), |
| getName(), getTotalTasks(), getProcessorDescriptor(), |
| getInputSpecList(taskIndex), getOutputSpecList(taskIndex), |
| getGroupInputSpecList(), vertexOnlyConf); |
| } |
| |
| @Override |
| public void scheduleTasks(List<ScheduleTaskRequest> tasksToSchedule) { |
| try { |
| unsetTasksNotYetScheduled(); |
| // update state under write lock |
| writeLock.lock(); |
| try { |
| for (ScheduleTaskRequest task : tasksToSchedule) { |
| if (numTasks <= task.getTaskIndex()) { |
| throw new TezUncheckedException( |
| "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier); |
| } |
| TaskLocationHint locationHint = task.getTaskLocationHint(); |
| if (locationHint != null) { |
| if (taskLocationHints == null) { |
| taskLocationHints = new TaskLocationHint[numTasks]; |
| } |
| taskLocationHints[task.getTaskIndex()] = locationHint; |
| } |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| /** |
| * read lock is not needed here. For e.g after starting task |
| * scheduling on the vertex, it would not change numTasks. Rest of |
| * the methods creating remote task specs have their |
| * own locking mechanisms. Ref: TEZ-3297 |
| */ |
| for (ScheduleTaskRequest task : tasksToSchedule) { |
| TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex()); |
| TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); |
| boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null; |
| eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, |
| getTaskLocationHint(taskId), fromRecovery)); |
| } |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier(); |
| LOG.error(msg, e); |
| // send event to fail the vertex |
| eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e)); |
| // throw an unchecked exception to stop the vertex manager that invoked this. |
| throw new TezUncheckedException(e); |
| } |
| } |
| |
| @Override |
| public void reconfigureVertex(int parallelism, |
| @Nullable VertexLocationHint locationHint, |
| @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException { |
| setParallelismWrapper(parallelism, locationHint, sourceEdgeProperties, null, true); |
| } |
| |
| @Override |
| public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, |
| int parallelism, |
| @Nullable VertexLocationHint locationHint) throws AMUserCodeException { |
| setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, true); |
| } |
| |
| @Override |
| public void reconfigureVertex(int parallelism, |
| @Nullable VertexLocationHint locationHint, |
| @Nullable Map<String, EdgeProperty> sourceEdgeProperties, |
| @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException { |
| setParallelismWrapper(parallelism, locationHint, sourceEdgeProperties, rootInputSpecUpdate, true); |
| } |
| |
| @Override |
| public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, |
| Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, |
| Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) |
| throws AMUserCodeException { |
| // temporarily support conversion of edge manager to edge property |
| Map<String, EdgeProperty> sourceEdgeProperties = Maps.newHashMap(); |
| readLock.lock(); |
| try { |
| if (sourceEdgeManagers != null && !sourceEdgeManagers.isEmpty()) { |
| for (Edge e : sourceVertices.values()) { |
| EdgeManagerPluginDescriptor newEdge = sourceEdgeManagers.get(e.getSourceVertexName()); |
| EdgeProperty oldEdge = e.getEdgeProperty(); |
| if (newEdge != null) { |
| sourceEdgeProperties.put( |
| e.getSourceVertexName(), |
| EdgeProperty.create(newEdge, oldEdge.getDataSourceType(), |
| oldEdge.getSchedulingType(), oldEdge.getEdgeSource(), |
| oldEdge.getEdgeDestination())); |
| } |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| setParallelismWrapper(parallelism, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, |
| fromVertexManager); |
| } |
| |
| private void setParallelismWrapper(int parallelism, VertexLocationHint vertexLocationHint, |
| Map<String, EdgeProperty> sourceEdgeProperties, |
| Map<String, InputSpecUpdate> rootInputSpecUpdates, |
| boolean fromVertexManager) throws AMUserCodeException { |
| Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " + parallelism |
| + " for vertex: " + logIdentifier); |
| writeLock.lock(); |
| this.setParallelismCalledFlag = true; |
| try { |
| // disallow changing things after a vertex has started |
| if (!tasksNotYetScheduled) { |
| String msg = "setParallelism cannot be called after scheduling tasks. Vertex: " |
| + getLogIdentifier(); |
| LOG.info(msg); |
| throw new TezUncheckedException(msg); |
| } |
| |
| if (fromVertexManager && canInitVertex()) { |
| // vertex is fully defined. setParallelism has been called. VertexManager should have |
| // informed us about this. Otherwise we would have notified listeners that we are fully |
| // defined before we are actually fully defined |
| Preconditions |
| .checkState( |
| vertexToBeReconfiguredByManager, |
| "Vertex is fully configured but still" |
| + " the reconfiguration API has been called. VertexManager must notify the framework using " |
| + " context.vertexReconfigurationPlanned() before re-configuring the vertex." |
| + " vertexId=" + logIdentifier); |
| } |
| |
| // Input initializer/Vertex Manager/1-1 split expected to set parallelism. |
| if (numTasks == -1) { |
| if (getState() != VertexState.INITIALIZING) { |
| throw new TezUncheckedException( |
| "Vertex state is not Initializing. Value: " + getState() |
| + " for vertex: " + logIdentifier); |
| } |
| |
| if(sourceEdgeProperties != null) { |
| for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) { |
| LOG.info("Replacing edge manager for source:" |
| + entry.getKey() + " destination: " + getLogIdentifier()); |
| Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); |
| Edge edge = sourceVertices.get(sourceVertex); |
| try { |
| edge.setEdgeProperty(entry.getValue()); |
| } catch (Exception e) { |
| throw new TezUncheckedException("Fail to update EdgeProperty for Edge," |
| + "sourceVertex:" + edge.getSourceVertexName() |
| + "destinationVertex:" + edge.getDestinationVertexName(), e); |
| } |
| } |
| } |
| if (rootInputSpecUpdates != null) { |
| LOG.info("Got updated RootInputsSpecs: " + rootInputSpecUpdates.toString()); |
| // Sanity check for correct number of updates. |
| for (Entry<String, InputSpecUpdate> rootInputSpecUpdateEntry : rootInputSpecUpdates |
| .entrySet()) { |
| Preconditions |
| .checkState( |
| rootInputSpecUpdateEntry.getValue().isForAllWorkUnits() |
| || (rootInputSpecUpdateEntry.getValue().getAllNumPhysicalInputs() != null && rootInputSpecUpdateEntry |
| .getValue().getAllNumPhysicalInputs().size() == parallelism), |
| "Not enough input spec updates for root input named " |
| + rootInputSpecUpdateEntry.getKey()); |
| } |
| this.rootInputSpecs.putAll(rootInputSpecUpdates); |
| } |
| int oldNumTasks = numTasks; |
| this.numTasks = parallelism; |
| stateChangeNotifier.stateChanged(vertexId, |
| new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); |
| this.createTasks(); |
| setVertexLocationHint(vertexLocationHint); |
| LOG.info("Vertex " + getLogIdentifier() + |
| " parallelism set to " + parallelism); |
| if (canInitVertex()) { |
| getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT)); |
| } |
| } else { |
| // This is an artificial restriction since there's no way of knowing whether a VertexManager |
| // will attempt to update root input specs. When parallelism has not been initialized, the |
| // Vertex will not be in started state so it's safe to update the specifications. |
| // TODO TEZ-937 - add e mechanism to query vertex managers, or for VMs to indicate readines |
| // for a vertex to start. |
| Preconditions.checkState(rootInputSpecUpdates == null, |
| "Root Input specs can only be updated when the vertex is configured with -1 tasks"); |
| |
| int oldNumTasks = numTasks; |
| |
| // start buffering incoming events so that we can re-route existing events |
| for (Edge edge : sourceVertices.values()) { |
| edge.startEventBuffering(); |
| } |
| |
| if (parallelism == numTasks) { |
| LOG.info("setParallelism same as current value: " + parallelism + |
| " for vertex: " + logIdentifier); |
| Preconditions.checkArgument(sourceEdgeProperties != null, |
| "Source edge managers or RootInputSpecs must be set when not changing parallelism"); |
| } else { |
| LOG.info("Resetting vertex location hints due to change in parallelism for vertex: " |
| + logIdentifier); |
| vertexLocationHint = null; |
| |
| if (parallelism > numTasks) { |
| addTasks((parallelism)); |
| } else if (parallelism < numTasks) { |
| removeTasks(parallelism); |
| } |
| } |
| Preconditions.checkState(this.numTasks == parallelism, getLogIdentifier()); |
| |
| // set new vertex location hints |
| setVertexLocationHint(vertexLocationHint); |
| LOG.info("Vertex " + getLogIdentifier() + " parallelism set to " + parallelism + " from " |
| + oldNumTasks); |
| |
| // notify listeners |
| stateChangeNotifier.stateChanged(vertexId, |
| new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); |
| assert tasks.size() == numTasks; |
| |
| // set new edge managers |
| if(sourceEdgeProperties != null) { |
| for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) { |
| LOG.info("Replacing edge manager for source:" |
| + entry.getKey() + " destination: " + getLogIdentifier()); |
| Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); |
| Edge edge = sourceVertices.get(sourceVertex); |
| try { |
| edge.setEdgeProperty(entry.getValue()); |
| } catch (Exception e) { |
| throw new TezUncheckedException(e); |
| } |
| } |
| } |
| |
| // stop buffering events |
| for (Edge edge : sourceVertices.values()) { |
| edge.stopEventBuffering(); |
| } |
| } |
| |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void setVertexLocationHint(VertexLocationHint vertexLocationHint) { |
| writeLock.lock(); |
| try { |
| if (LOG.isDebugEnabled()) { |
| logLocationHints(this.vertexName, vertexLocationHint); |
| } |
| setTaskLocationHints(vertexLocationHint); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void vertexReconfigurationPlanned() { |
| writeLock.lock(); |
| try { |
| Preconditions.checkState(!vmIsInitialized.get(), |
| "context.vertexReconfigurationPlanned() cannot be called after initialize()"); |
| Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() " |
| + " cannot be invoked after the vertex has been configured."); |
| this.vertexToBeReconfiguredByManager = true; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void doneReconfiguringVertex() { |
| writeLock.lock(); |
| try { |
| Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be " |
| + "invoked only after vertexReconfigurationPlanned() is invoked"); |
| this.vertexToBeReconfiguredByManager = false; |
| if (canInitVertex()) { |
| maybeSendConfiguredEvent(); |
| } else { |
| Preconditions.checkState(getInternalState() == VertexState.INITIALIZING, "Vertex: " |
| + getLogIdentifier()); |
| } |
| |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| /** |
| * The only entry point to change the Vertex. |
| */ |
| public void handle(VertexEvent event) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing VertexEvent " + event.getVertexID() |
| + " of type " + event.getType() + " while in state " |
| + getInternalState() + ". Event: " + event); |
| } |
| try { |
| writeLock.lock(); |
| VertexState oldState = getInternalState(); |
| try { |
| getStateMachine().doTransition(event.getType(), event); |
| } catch (InvalidStateTransitonException e) { |
| String message = "Invalid event " + event.getType() + |
| " on vertex " + this.vertexName + |
| " with vertexId " + this.vertexId + |
| " at current state " + oldState; |
| LOG.error("Can't handle " + message, e); |
| addDiagnostic(message); |
| eventHandler.handle(new VertexEvent(this.vertexId, |
| VertexEventType.V_INTERNAL_ERROR)); |
| } catch (RuntimeException e) { |
| String message = "Uncaught Exception when handling event " + event.getType() + |
| " on vertex " + this.vertexName + |
| " with vertexId " + this.vertexId + |
| " at current state " + oldState; |
| LOG.error(message, e); |
| addDiagnostic(message); |
| if (!internalErrorTriggered.getAndSet(true)) { |
| eventHandler.handle(new VertexEvent(this.vertexId, |
| VertexEventType.V_INTERNAL_ERROR)); |
| } |
| } |
| |
| if (oldState != getInternalState()) { |
| LOG.info(logIdentifier + " transitioned from " + oldState + " to " |
| + getInternalState() + " due to event " |
| + event.getType()); |
| } |
| } |
| |
| finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private VertexState getInternalState() { |
| readLock.lock(); |
| try { |
| return getStateMachine().getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| //helpful in testing |
| protected void addTask(Task task) { |
| synchronized (tasksSyncHandle) { |
| if (lazyTasksCopyNeeded) { |
| LinkedHashMap<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>(); |
| newTasks.putAll(tasks); |
| tasks = newTasks; |
| lazyTasksCopyNeeded = false; |
| } |
| } |
| tasks.put(task.getTaskID(), task); |
| // TODO Metrics |
| //metrics.waitingTask(task); |
| } |
| |
| void setFinishTime() { |
| finishTime = clock.getTime(); |
| } |
| |
| void logJobHistoryVertexInitializedEvent() { |
| if (recoveryData == null || !recoveryData.shouldSkipInit()) { |
| VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, |
| initTimeRequested, initedTime, numTasks, |
| getProcessorName(), getAdditionalInputs(), initGeneratedEvents, |
| servicePluginInfo); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(getDAGId(), initEvt)); |
| } |
| } |
| |
| void logJobHistoryVertexStartedEvent() { |
| if (recoveryData == null |
| || !recoveryData.isVertexStarted()) { |
| VertexStartedEvent startEvt = new VertexStartedEvent(vertexId, |
| startTimeRequested, startedTime); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(getDAGId(), startEvt)); |
| } |
| } |
| |
| void logVertexConfigurationDoneEvent() { |
| if (recoveryData == null || !recoveryData.shouldSkipInit()) { |
| Map<String, EdgeProperty> sourceEdgeProperties = new HashMap<String, EdgeProperty>(); |
| for (Map.Entry<Vertex, Edge> entry : this.sourceVertices.entrySet()) { |
| sourceEdgeProperties.put(entry.getKey().getName(), entry.getValue().getEdgeProperty()); |
| } |
| VertexConfigurationDoneEvent reconfigureDoneEvent = |
| new VertexConfigurationDoneEvent(vertexId, clock.getTime(), |
| numTasks, taskLocationHints == null ? null : VertexLocationHint.create(Lists.newArrayList(taskLocationHints)), |
| sourceEdgeProperties, rootInputSpecs, setParallelismCalledFlag); |
| this.appContext.getHistoryHandler().handle( |
| new DAGHistoryEvent(getDAGId(), reconfigureDoneEvent)); |
| } |
| } |
| |
| void logJobHistoryVertexFinishedEvent() throws IOException { |
| if (recoveryData == null |
| || !recoveryData.isVertexSucceeded()) { |
| logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, |
| logSuccessDiagnostics ? StringUtils.join(getDiagnostics(), LINE_SEPARATOR) : "", |
| constructFinalFullcounters()); |
| } |
| } |
| |
| void logJobHistoryVertexFailedEvent(VertexState state) throws IOException { |
| if (recoveryData == null |
| || !recoveryData.isVertexFinished()) { |
| TezCounters counters = null; |
| try { |
| counters = constructFinalFullcounters(); |
| } catch (LimitExceededException e) { |
| // Ignore as failed vertex |
| addDiagnostic("Counters limit exceeded: " + e.getMessage()); |
| } |
| |
| logJobHistoryVertexCompletedHelper(state, clock.getTime(), |
| StringUtils.join(getDiagnostics(), LINE_SEPARATOR), counters); |
| } |
| } |
| |
| private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime, |
| String diagnostics, TezCounters counters) throws IOException { |
| Map<String, Integer> taskStats = new HashMap<String, Integer>(); |
| taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount); |
| taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount); |
| taskStats.put(ATSConstants.NUM_FAILED_TASKS, failedTaskCount); |
| taskStats.put(ATSConstants.NUM_KILLED_TASKS, killedTaskCount); |
| taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get()); |
| taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get()); |
| |
| VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, |
| initTimeRequested, initedTime, startTimeRequested, startedTime, finishTime, finalState, |
| diagnostics, counters, getVertexStats(), taskStats, servicePluginInfo); |
| this.appContext.getHistoryHandler().handleCriticalEvent( |
| new DAGHistoryEvent(getDAGId(), finishEvt)); |
| } |
| |
| private static VertexState commitOrFinish(final VertexImpl vertex) { |
| // commit only once. Dont commit shared outputs |
| if (vertex.outputCommitters != null |
| && !vertex.outputCommitters.isEmpty()) { |
| if (vertex.recoveryData != null |
| && vertex.recoveryData.isVertexCommitted()) { |
| LOG.info("Vertex was already committed as per recovery" |
| + " data, vertex=" + vertex.logIdentifier); |
| return vertex.finished(VertexState.SUCCEEDED); |
| } |
| boolean firstCommit = true; |
| for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) { |
| final OutputCommitter committer = entry.getValue(); |
| final String outputName = entry.getKey(); |
| if (vertex.sharedOutputs.contains(outputName)) { |
| // dont commit shared committers. Will be committed by the DAG |
| continue; |
| } |
| if (firstCommit) { |
| LOG.info("Invoking committer commit for vertex, vertexId=" |
| + vertex.logIdentifier); |
| // Log commit start event on first actual commit |
| try { |
| vertex.appContext.getHistoryHandler().handleCriticalEvent( |
| new DAGHistoryEvent(vertex.getDAGId(), |
| new VertexCommitStartedEvent(vertex.vertexId, |
| vertex.clock.getTime()))); |
| } catch (IOException e) { |
| LOG.error("Failed to persist commit start event to recovery, vertex=" |
| + vertex.logIdentifier, e); |
| vertex.trySetTerminationCause(VertexTerminationCause.RECOVERY_ERROR); |
| return vertex.finished(VertexState.FAILED); |
| } |
| firstCommit = false; |
| } |
| VertexCommitCallback commitCallback = new VertexCommitCallback(vertex, outputName); |
| CallableEvent commitCallableEvent = new CallableEvent(commitCallback) { |
| @Override |
| public Void call() throws Exception { |
| try { |
| TezUtilsInternal.setHadoopCallerContext(vertex.appContext.getHadoopShim(), |
| vertex.vertexId); |
| vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| LOG.info("Invoking committer commit for output=" + outputName |
| + ", vertexId=" + vertex.logIdentifier); |
| committer.commitOutput(); |
| return null; |
| } |
| }); |
| } finally { |
| vertex.appContext.getHadoopShim().clearHadoopCallerContext(); |
| } |
| return null; |
| } |
| }; |
| ListenableFuture<Void> commitFuture = |
| vertex.getAppContext().getExecService().submit(commitCallableEvent); |
| Futures.addCallback(commitFuture, commitCallableEvent.getCallback(), GuavaShim.directExecutor()); |
| vertex.commitFutures.put(outputName, commitFuture); |
| } |
| } |
| if (vertex.commitFutures.isEmpty()) { |
| return vertex.finished(VertexState.SUCCEEDED); |
| } else { |
| return VertexState.COMMITTING; |
| } |
| } |
| |
| private static String constructCheckTasksForCompletionLog(VertexImpl vertex) { |
| String logLine = vertex.logIdentifier |
| + ", tasks=" + vertex.numTasks |
| + ", failed=" + vertex.failedTaskCount |
| + ", killed=" + vertex.killedTaskCount |
| + ", success=" + vertex.succeededTaskCount |
| + ", completed=" + vertex.completedTaskCount |
| + ", commits=" + vertex.commitFutures.size() |
| + ", err=" + vertex.terminationCause; |
| return logLine; |
| } |
| |
| // triggered by task_complete |
| static VertexState checkTasksForCompletion(final VertexImpl vertex) { |
| // this log helps quickly count the completion count for a vertex. |
| // grepping and counting for attempts and handling re-tries is time consuming |
| LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex)); |
| //check for vertex failure first |
| if (vertex.completedTaskCount > vertex.tasks.size()) { |
| LOG.error("task completion accounting issue: completedTaskCount > nTasks:" |
| + constructCheckTasksForCompletionLog(vertex)); |
| } |
| |
| if (vertex.completedTaskCount == vertex.tasks.size()) { |
| // finished - gather stats |
| vertex.finalStatistics = vertex.constructStatistics(); |
| |
| //Only succeed if tasks complete successfully and no terminationCause is registered or if failures are below configured threshold. |
| boolean vertexSucceeded = vertex.succeededTaskCount == vertex.numTasks; |
| boolean vertexFailuresBelowThreshold = (vertex.succeededTaskCount + vertex.failedTaskCount == vertex.numTasks) |
| && (vertex.failedTaskCount * 100 <= vertex.maxFailuresPercent * vertex.numTasks); |
| |
| if((vertexSucceeded || vertexFailuresBelowThreshold) && vertex.terminationCause == null) { |
| if(vertexSucceeded) { |
| LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier); |
| if (vertex.cleanupShuffleDataAtVertexLevel) { |
| |
| for (Vertex v : vertex.vShuffleDeletionContext.getAncestors()) { |
| vertex.eventHandler.handle(new VertexShuffleDataDeletion(vertex, v)); |
| } |
| } |
| } else { |
| LOG.info("All tasks in the vertex " + vertex.logIdentifier + " have completed and the percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent); |
| vertex.addDiagnostic("Vertex succeeded as percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent); |
| vertex.logSuccessDiagnostics = true; |
| for (Task task : vertex.tasks.values()) { |
| if (!task.getState().equals(TaskState.FAILED)) { |
| continue; |
| } |
| // Find the last attempt and mark that as successful |
| Iterator<TezTaskAttemptID> attempts = task.getAttempts().keySet().iterator(); |
| TezTaskAttemptID lastAttempt = null; |
| while (attempts.hasNext()) { |
| TezTaskAttemptID attempt = attempts.next(); |
| if (lastAttempt == null || attempt.getId() > lastAttempt.getId()) { |
| lastAttempt = attempt; |
| } |
| } |
| LOG.info("Succeeding failed task attempt:" + lastAttempt); |
| for (Map.Entry<Vertex, Edge> vertexEdge : vertex.targetVertices.entrySet()) { |
| Vertex destVertex = vertexEdge.getKey(); |
| Edge edge = vertexEdge.getValue(); |
| try { |
| List<TezEvent> tezEvents = edge.generateEmptyEventsForAttempt(lastAttempt); |
| |
| // Downstream vertices need to receive a SUCCEEDED completion event for each failed task to ensure num bipartite count is correct |
| VertexEventTaskAttemptCompleted completionEvent = new VertexEventTaskAttemptCompleted(lastAttempt, TaskAttemptStateInternal.SUCCEEDED); |
| |
| // Notify all target vertices |
| vertex.eventHandler.handle(new VertexEventSourceTaskAttemptCompleted(destVertex.getVertexId(), completionEvent)); |
| vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex.getVertexId(), tezEvents)); |
| } catch (Exception e) { |
| throw new TezUncheckedException(e); |
| } |
| } |
| } |
| } |
| |
| if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) { |
| // start commit if there're commits or just finish if no commits |
| return commitOrFinish(vertex); |
| } else { |
| // just finish because no vertex committing needed |
| return vertex.finished(VertexState.SUCCEEDED); |
| } |
| } |
| return finishWithTerminationCause(vertex); |
| } |
| |
| //return the current state, Vertex not finished yet |
| return vertex.getInternalState(); |
| } |
| |
| //triggered by commit_complete |
| static VertexState checkCommitsForCompletion(final VertexImpl vertex) { |
| LOG.info("Commits completion: " |
| + constructCheckTasksForCompletionLog(vertex)); |
| // terminationCause is null mean commit is succeeded, otherwise terminationCause will be set. |
| if (vertex.terminationCause == null) { |
| Preconditions.checkState(vertex.getState() == VertexState.COMMITTING, |
| "Vertex should be in COMMITTING state, but in " + vertex.getState() |
| + ", vertex:" + vertex.getLogIdentifier()); |
| if (vertex.commitFutures.isEmpty()) { |
| // move from COMMITTING to SUCCEEDED |
| return vertex.finished(VertexState.SUCCEEDED); |
| } else { |
| return VertexState.COMMITTING; |
| } |
| } else { |
| if (!vertex.commitFutures.isEmpty()) { |
| // pending commits are running |
| return VertexState.TERMINATING; |
| } else { |
| // all the commits are completed successfully |
| return finishWithTerminationCause(vertex); |
| } |
| } |
| } |
| |
| private static VertexState finishWithTerminationCause(VertexImpl vertex) { |
| Preconditions.checkArgument(vertex.getTerminationCause() != null, "TerminationCause is not set"); |
| String diagnosticMsg = "Vertex did not succeed due to " + vertex.getTerminationCause() |
| + ", failedTasks:" + vertex.failedTaskCount |
| + " killedTasks:" + vertex.killedTaskCount; |
| LOG.info(diagnosticMsg); |
| vertex.addDiagnostic(diagnosticMsg); |
| return vertex.finished(vertex.getTerminationCause().getFinishedState()); |
| } |
| |
| /** |
| * Set the terminationCause and send a kill-message to all tasks. |
| * The task-kill messages are only sent once. |
| */ |
| void tryEnactKill(VertexTerminationCause trigger, |
| TaskTerminationCause taskterminationCause) { |
| // In most cases the dag is shutting down due to some error |
| TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN; |
| if (taskterminationCause == TaskTerminationCause.DAG_KILL) { |
| errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT; |
| } |
| if(trySetTerminationCause(trigger)){ |
| String msg = "Killing tasks in vertex: " + logIdentifier + " due to trigger: " + trigger; |
| LOG.info(msg); |
| for (Task task : tasks.values()) { |
| eventHandler.handle( // attempt was terminated because the vertex is shutting down |
| new TaskEventTermination(task.getTaskID(), errCause, msg)); |
| } |
| } |
| } |
| |
| VertexState finished(VertexState finalState, |
| VertexTerminationCause termCause, String diag) { |
| if (finishTime == 0) setFinishTime(); |
| if (termCause != null) { |
| trySetTerminationCause(termCause); |
| } |
| if (rootInputInitializerManager != null) { |
| rootInputInitializerManager.shutdown(); |
| rootInputInitializerManager = null; |
| } |
| |
| switch (finalState) { |
| case ERROR: |
| addDiagnostic("Vertex: " + logIdentifier + " error due to:" + terminationCause); |
| if (!StringUtils.isEmpty(diag)) { |
| addDiagnostic(diag); |
| } |
| abortVertex(VertexStatus.State.valueOf(finalState.name())); |
| eventHandler.handle(new DAGEvent(getDAGId(), |
| DAGEventType.INTERNAL_ERROR)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("stopping services attached to the succeeded Vertex," |
| + "name=" + getName()); |
| } |
| stopServices(); |
| try { |
| logJobHistoryVertexFailedEvent(finalState); |
| } catch (IOException e) { |
| LOG.error("Failed to send vertex finished event to recovery", e); |
| } |
| break; |
| case KILLED: |
| case FAILED: |
| addDiagnostic("Vertex " + logIdentifier + " killed/failed due to:" + terminationCause); |
| if (!StringUtils.isEmpty(diag)) { |
| addDiagnostic(diag); |
| } |
| abortVertex(VertexStatus.State.valueOf(finalState.name())); |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState, terminationCause)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("stopping services attached to the succeeded Vertex," |
| + "name=" + getName()); |
| } |
| stopServices(); |
| try { |
| logJobHistoryVertexFailedEvent(finalState); |
| } catch (IOException e) { |
| LOG.error("Failed to send vertex finished event to recovery", e); |
| } |
| break; |
| case SUCCEEDED: |
| try { |
| try { |
| logJobHistoryVertexFinishedEvent(); |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState)); |
| // Stop related services |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("stopping services attached to the succeeded Vertex," |
| + "name=" + getName()); |
| } |
| stopServices(); |
| } catch (LimitExceededException e) { |
| LOG.error("Counter limits exceeded for vertex: " + getLogIdentifier(), e); |
| finalState = VertexState.FAILED; |
| addDiagnostic("Counters limit exceeded: " + e.getMessage()); |
| trySetTerminationCause(VertexTerminationCause.COUNTER_LIMITS_EXCEEDED); |
| logJobHistoryVertexFailedEvent(finalState); |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState)); |
| } |
| } catch (IOException e) { |
| LOG.error("Failed to send vertex finished event to recovery", e); |
| finalState = VertexState.FAILED; |
| trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR); |
| eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), |
| finalState)); |
| } |
| break; |
| default: |
| // Stop related services |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("stopping services attached with Unexpected State," |
| + "name=" + getName()); |
| } |
| stopServices(); |
| throw new TezUncheckedException("Unexpected VertexState: " + finalState); |
| } |
| return finalState; |
| } |
| |
| VertexState finished(VertexState finalState) { |
| return finished(finalState, null, null); |
| } |
| |
| |
| private void initializeCommitters() throws Exception { |
| if (!this.additionalOutputSpecs.isEmpty()) { |
| LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" + |
| additionalOutputs.size()); |
| for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry: |
| additionalOutputs.entrySet()) { |
| final String outputName = entry.getKey(); |
| final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue(); |
| if (od.getControllerDescriptor() == null |
| || od.getControllerDescriptor().getClassName() == null) { |
| LOG.debug("Ignoring committer as none specified for output={}, vertexId={}", |
| outputName, logIdentifier); |
| continue; |
| } |
| LOG.info("Instantiating committer for output=" + outputName |
| + ", vertex=" + logIdentifier |
| + ", committerClass=" + od.getControllerDescriptor().getClassName()); |
| |
| dagUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| OutputCommitterContext outputCommitterContext = |
| new OutputCommitterContextImpl(appContext.getApplicationID(), |
| appContext.getApplicationAttemptId().getAttemptId(), |
| appContext.getCurrentDAG().getName(), |
| vertexName, |
| od, |
| vertexId.getId()); |
| OutputCommitter outputCommitter = ReflectionUtils |
| .createClazzInstance(od.getControllerDescriptor().getClassName(), |
| new Class[]{OutputCommitterContext.class}, |
| new Object[]{outputCommitterContext}); |
| LOG.debug("Invoking committer init for output={}, vertex={}", outputName, logIdentifier); |
| |
| try { |
| TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertexId); |
| outputCommitter.initialize(); |
| outputCommitters.put(outputName, outputCommitter); |
| LOG.debug("Invoking committer setup for output={}, vertex={}", outputName, logIdentifier); |
| outputCommitter.setupOutput(); |
| } finally { |
| appContext.getHadoopShim().clearHadoopCallerContext(); |
| } |
| |
| return null; |
| } |
| }); |
| } |
| } |
| } |
| |
| private boolean initializeVertex() { |
| // Don't need to initialize committer if vertex is fully completed |
| if (recoveryData != null && recoveryData.shouldSkipInit()) { |
| // Do other necessary recovery here |
| initedTime = recoveryData.getVertexInitedEvent().getInitedTime(); |
| List<TezEvent> initGeneratedEvents = recoveryData.getVertexInitedEvent().getInitGeneratedEvents(); |
| if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) { |
| eventHandler.handle(new VertexEventRouteEvent(getVertexId(), initGeneratedEvents)); |
| } |
| // reset rootInputDescriptor because it may be changed during input initialization. |
| this.rootInputDescriptors = recoveryData.getVertexInitedEvent().getAdditionalInputs(); |
| } else { |
| initedTime = clock.getTime(); |
| } |
| // set the vertex services to be initialized. |
| initServices(); |
| // Only initialize committer when it is in non-recovery mode or vertex is not recovered to completed |
| // state in recovery mode |
| if (recoveryData == null || recoveryData.getVertexFinishedEvent() == null) { |
| try { |
| initializeCommitters(); |
| } catch (Exception e) { |
| LOG.warn("Vertex Committer init failed, vertex=" + logIdentifier, e); |
| addDiagnostic("Vertex init failed : " |
| + ExceptionUtils.getStackTrace(e)); |
| trySetTerminationCause(VertexTerminationCause.INIT_FAILURE); |
| finished(VertexState.FAILED); |
| return false; |
| } |
| } |
| |
| logJobHistoryVertexInitializedEvent(); |
| return true; |
| } |
| |
| /** |
| * If the number of tasks are greater than the configured value |
| * throw an exception that will fail job initialization |
| */ |
| private void checkTaskLimits() { |
| // no code, for now |
| } |
| |
| @VisibleForTesting |
| ContainerContext getContainerContext(int taskIdx) { |
| if (taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(vertexName, taskIdx)) { |
| |
| String jvmOpts = javaOptsTaskSpecific != null ? javaOptsTaskSpecific : javaOpts; |
| |
| if (taskSpecificLaunchCmdOpts.hasModifiedTaskLaunchOpts()) { |
| jvmOpts = taskSpecificLaunchCmdOpts.getTaskSpecificOption(jvmOpts, vertexName, taskIdx); |
| } |
| |
| ContainerContext context = new ContainerContext(this.localResources, |
| appContext.getCurrentDAG().getCredentials(), |
| this.environmentTaskSpecific != null ? this.environmentTaskSpecific : this.environment, |
| jvmOpts); |
| return context; |
| } else { |
| return this.containerContext; |
| } |
| } |
| |
| private TaskImpl createTask(int taskIndex) { |
| ContainerContext conContext = getContainerContext(taskIndex); |
| return new TaskImpl(this.getVertexId(), taskIndex, |
| this.eventHandler, |
| vertexConf, |
| this.taskCommunicatorManagerInterface, |
| this.clock, |
| this.taskHeartbeatHandler, |
| this.appContext, |
| (this.targetVertices != null ? |
| this.targetVertices.isEmpty() : true), |
| this.taskResource, |
| conContext, |
| this.stateChangeNotifier, |
| this); |
| } |
| |
| private void createTasks() { |
| for (int i=0; i < this.numTasks; ++i) { |
| TaskImpl task = createTask(i); |
| this.addTask(task); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Created task for vertex " + logIdentifier + ": " + |
| task.getTaskID()); |
| } |
| } |
| } |
| |
| private void addTasks(int newNumTasks) { |
| Preconditions.checkArgument(newNumTasks > this.numTasks, getLogIdentifier()); |
| int initialNumTasks = this.numTasks; |
| for (int i = initialNumTasks; i < newNumTasks; ++i) { |
| TaskImpl task = createTask(i); |
| this.addTask(task); |
| this.numTasks++; |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Created task for vertex " + logIdentifier + ": " + |
| task.getTaskID()); |
| } |
| } |
| } |
| |
| private void removeTasks(int newNumTasks) { |
| Preconditions.checkArgument(newNumTasks < this.numTasks, getLogIdentifier()); |
| // assign to local variable of LinkedHashMap to make sure that changing |
| // type of task causes compile error. We depend on LinkedHashMap for order |
| LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks; |
| Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet() |
| .iterator(); |
| // remove tasks from the end to maintain index numbers |
| int i = 0; |
| while (iter.hasNext()) { |
| i++; |
| Map.Entry<TezTaskID, Task> entry = iter.next(); |
| Task task = entry.getValue(); |
| if (task.getState() != TaskState.NEW) { |
| String msg = "All tasks must be in initial state when changing parallelism" |
| + " for vertex: " + getLogIdentifier(); |
| LOG.warn(msg); |
| throw new TezUncheckedException(msg); |
| } |
| if (i <= newNumTasks) { |
| continue; |
| } |
| LOG.debug("Removing task: {}", entry.getKey()); |
| iter.remove(); |
| this.numTasks--; |
| } |
| } |
| |
| |
| private VertexState setupVertex() { |
| |
| this.initTimeRequested = clock.getTime(); |
| |
| // VertexManager needs to be setup before attempting to Initialize any |
| // Inputs - since events generated by them will be routed to the |
| // VertexManager for handling. |
| if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) { |
| List<GroupInputSpec> groupSpecList = Lists.newLinkedList(); |
| for (VertexGroupInfo groupInfo : dagVertexGroups.values()) { |
| if (groupInfo.edgeMergedInputs.containsKey(getName())) { |
| InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName()); |
| groupSpecList.add(new GroupInputSpec(groupInfo.groupName, |
| Lists.newLinkedList(groupInfo.groupMembers), mergedInput)); |
| } |
| } |
| if (!groupSpecList.isEmpty()) { |
| groupInputSpecList = groupSpecList; |
| } |
| } |
| |
| // Check if any inputs need initializers |
| if (rootInputDescriptors != null) { |
| LOG.info("Root Inputs exist for Vertex: " + getName() + " : " |
| + rootInputDescriptors); |
| for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input |
| : rootInputDescriptors.values()) { |
| if (input.getControllerDescriptor() != null && |
| input.getControllerDescriptor().getClassName() != null) { |
| if (inputsWithInitializers == null) { |
| inputsWithInitializers = Sets.newHashSet(); |
| } |
| inputsWithInitializers.add(input.getName()); |
| LOG.info("Starting root input initializer for input: " |
| + input.getName() + ", with class: [" |
| + input.getControllerDescriptor().getClassName() + "]"); |
| } |
| } |
| } |
| |
| boolean hasBipartite = false; |
| if (sourceVertices != null) { |
| for (Edge edge : sourceVertices.values()) { |
| if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) { |
| hasBipartite = true; |
| break; |
| } |
| } |
| } |
| |
| if (hasBipartite && inputsWithInitializers != null) { |
| LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment"); |
| return finished(VertexState.FAILED); |
| } |
| |
| numTasks = getVertexPlan().getTaskConfig().getNumTasks(); |
| if (!(numTasks == -1 || numTasks >= 0)) { |
| addDiagnostic("Invalid task count for vertex" |
| + ", numTasks=" + numTasks); |
| trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS); |
| return VertexState.FAILED; |
| } |
| |
| checkTaskLimits(); |
| // set VertexManager as the last step. Because in recovery case, we may need to restore |
| // some info from last the AM attempt and skip the initialization step. Otherwise numTasks may be |
| // reset to -1 after the restore. |
| try { |
| assignVertexManager(); |
| } catch (TezException e1) { |
| String msg = "Fail to create VertexManager, " + ExceptionUtils.getStackTrace(e1); |
| LOG.error(msg); |
| return finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); |
| } |
| |
| try { |
| vertexManager.initialize(); |
| vmIsInitialized.set(true); |
| if (!pendingVmEvents.isEmpty()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " + |
| logIdentifier); |
| } |
| for (VertexManagerEvent vmEvent : pendingVmEvents) { |
| vertexManager.onVertexManagerEventReceived(vmEvent); |
| } |
| pendingVmEvents.clear(); |
| } |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier; |
| LOG.error(msg, e); |
| finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| return VertexState.INITED; |
| } |
| |
| private boolean isVertexInitSkippedInParentVertices() { |
| for (Map.Entry<Vertex, Edge> entry : sourceVertices.entrySet()) { |
| if(!(((VertexImpl) entry.getKey()).isVertexInitSkipped())) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private void assignVertexManager() throws TezException { |
| // condition for skip initializing stage |
| // - VertexInputInitializerEvent is seen |
| // - VertexReconfigureDoneEvent is seen |
| // - Reason to check whether VertexManager has complete its responsibility |
| // VertexManager actually is involved in the InputInitializer (InputInitializer generate events |
| // and send them to VertexManager which do some processing and send back to Vertex), so that means |
| // Input initializer will affect on the VertexManager and we couldn't skip the initializing step if |
| // VertexManager has not completed its responsibility. |
| // - Why using VertexReconfigureDoneEvent |
| // - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex |
| // VertexReconfigureDoneEvent will be logged |
| // - TaskStartEvent is seen in that vertex or setVertexParallelism is called |
| // - All the parent vertices have skipped initializing stage while recovering |
| if (recoveryData != null && recoveryData.shouldSkipInit() |
| && (recoveryData.isVertexTasksStarted() || |
| recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) |
| && isVertexInitSkippedInParentVertices()) { |
| // Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt |
| VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent(); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("VertexManager reconfiguration is done in the last AM Attempt" |
| + ", use NoOpVertexManager to replace it, vertexId=" + logIdentifier); |
| LOG.info("VertexReconfigureDoneEvent=" + reconfigureDoneEvent); |
| } |
| NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream(); |
| try { |
| CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); |
| reconfigureDoneEvent.toProtoStream(codedOutputStream); |
| codedOutputStream.flush(); |
| } catch (IOException e) { |
| throw new TezUncheckedException("Unable to deserialize VertexReconfigureDoneEvent"); |
| } |
| this.vertexManager = new VertexManager( |
| VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName()) |
| .setUserPayload(UserPayload.create(ByteBuffer.wrap(out.toByteArray()))), |
| dagUgi, this, appContext, stateChangeNotifier); |
| isVertexInitSkipped = true; |
| return; |
| } |
| |
| boolean hasBipartite = false; |
| boolean hasOneToOne = false; |
| boolean hasCustom = false; |
| if (sourceVertices != null) { |
| for (Edge edge : sourceVertices.values()) { |
| switch(edge.getEdgeProperty().getDataMovementType()) { |
| case SCATTER_GATHER: |
| hasBipartite = true; |
| break; |
| case ONE_TO_ONE: |
| hasOneToOne = true; |
| break; |
| case BROADCAST: |
| break; |
| case CUSTOM: |
| hasCustom = true; |
| break; |
| default: |
| throw new TezUncheckedException("Unknown data movement type: " + |
| edge.getEdgeProperty().getDataMovementType()); |
| } |
| } |
| } |
| |
| boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin(); |
| |
| if (hasUserVertexManager) { |
| VertexManagerPluginDescriptor pluginDesc = DagTypeConverters |
| .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan |
| .getVertexManagerPlugin()); |
| LOG.info("Setting user vertex manager plugin: " |
| + pluginDesc.getClassName() + " on vertex: " + getLogIdentifier()); |
| vertexManager = new VertexManager(pluginDesc, dagUgi, this, appContext, stateChangeNotifier); |
| } else { |
| // Intended order of picking a vertex manager |
| // If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703 |
| // If there is a custom edge we fall back to default ImmediateStartVertexManager |
| // If there is a one to one edge then we use the InputReadyVertexManager |
| // If there is a scatter-gather edge then we use the ShuffleVertexManager |
| // Else we use the default ImmediateStartVertexManager |
| if (inputsWithInitializers != null) { |
| LOG.info("Setting vertexManager to RootInputVertexManager for " |
| + logIdentifier); |
| vertexManager = new VertexManager(RootInputVertexManager |
| .createConfigBuilder(vertexConf).build(), |
| dagUgi, this, appContext, stateChangeNotifier); |
| } else if (hasOneToOne && !hasCustom) { |
| LOG.info("Setting vertexManager to InputReadyVertexManager for " |
| + logIdentifier); |
| vertexManager = new VertexManager( |
| VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()), |
| dagUgi, this, appContext, stateChangeNotifier); |
| } else if (hasBipartite && !hasCustom) { |
| LOG.info("Setting vertexManager to ShuffleVertexManager for " |
| + logIdentifier); |
| // shuffle vertex manager needs a conf payload |
| vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(vertexConf).build(), |
| dagUgi, this, appContext, stateChangeNotifier); |
| } else { |
| // schedule all tasks upon vertex start. Default behavior. |
| LOG.info("Setting vertexManager to ImmediateStartVertexManager for " |
| + logIdentifier); |
| vertexManager = new VertexManager( |
| VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()), |
| dagUgi, this, appContext, stateChangeNotifier); |
| } |
| } |
| } |
| |
| private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag, |
| List<TezTaskAttemptID> taIds) { |
| List<TaskAttemptIdentifier> attempts = new ArrayList<TaskAttemptIdentifier>(taIds.size()); |
| String dagName = dag.getName(); |
| for (TezTaskAttemptID taId : taIds) { |
| String vertexName = dag.getVertex(taId.getVertexID()).getName(); |
| attempts.add(getTaskAttemptIdentifier(dagName, vertexName, taId)); |
| } |
| return attempts; |
| } |
| |
| private static TaskAttemptIdentifier getTaskAttemptIdentifier(String dagName, String vertexName, |
| TezTaskAttemptID taId) { |
| return new TaskAttemptIdentifierImpl(dagName, vertexName, taId); |
| } |
| |
| public static class NullEdgeInitializedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| VertexEventNullEdgeInitialized event = (VertexEventNullEdgeInitialized) vertexEvent; |
| Edge edge = event.getEdge(); |
| Vertex otherVertex = event.getVertex(); |
| Preconditions.checkState( |
| vertex.getState() == VertexState.NEW |
| || vertex.getState() == VertexState.INITIALIZING, |
| "Unexpected state " + vertex.getState() + " for vertex: " |
| + vertex.logIdentifier); |
| Preconditions.checkState( |
| (vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) || |
| vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)), |
| "Not connected to vertex " + otherVertex.getLogIdentifier() + " from vertex: " + vertex.logIdentifier); |
| LOG.info("Edge initialized for connection to vertex " + otherVertex.getLogIdentifier() + |
| " at vertex : " + vertex.logIdentifier); |
| vertex.uninitializedEdges.remove(edge); |
| if(vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) { |
| // Vertex in Initialing state and can init. Do init. |
| return VertexInitializedTransition.doTransition(vertex); |
| } |
| // Vertex is either New (waiting for sources to init) or its not ready to init (failed) |
| return vertex.getState(); |
| } |
| } |
| |
| public static class RecoverTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { |
| VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent; |
| // with desired state, for the cases that DAG is completed |
| VertexState desiredState = recoverEvent.getDesiredState(); |
| switch (desiredState) { |
| case SUCCEEDED: |
| vertex.succeededTaskCount = vertex.numTasks; |
| vertex.completedTaskCount = vertex.numTasks; |
| break; |
| case KILLED: |
| vertex.killedTaskCount = vertex.numTasks; |
| break; |
| case FAILED: |
| case ERROR: |
| vertex.failedTaskCount = vertex.numTasks; |
| break; |
| default: |
| LOG.info("Unhandled desired state provided by DAG" |
| + ", vertex=" + vertex.logIdentifier |
| + ", state=" + desiredState); |
| return vertex.finished(VertexState.ERROR); |
| } |
| |
| LOG.info("DAG informed vertices of its final completed state" |
| + ", vertex=" + vertex.logIdentifier |
| + ", desiredState=" + desiredState); |
| return vertex.finished(recoverEvent.getDesiredState()); |
| } |
| } |
| |
| public static class InitTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| // recover from recovery data (NEW->FAILED/KILLED) |
| if (vertex.recoveryData != null |
| && !vertex.recoveryData.isVertexInited() |
| && vertex.recoveryData.isVertexFinished()) { |
| VertexFinishedEvent finishedEvent = vertex.recoveryData.getVertexFinishedEvent(); |
| vertex.diagnostics.add(finishedEvent.getDiagnostics()); |
| return vertex.finished(finishedEvent.getState()); |
| } |
| |
| VertexState vertexState = VertexState.NEW; |
| vertex.numInitedSourceVertices++; |
| if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() || |
| (vertex.numInitedSourceVertices == vertex.sourceVertices.size())) { |
| vertexState = handleInitEvent(vertex); |
| if (vertexState != VertexState.FAILED) { |
| if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) { |
| for (Vertex target : vertex.targetVertices.keySet()) { |
| vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(), |
| VertexEventType.V_INIT)); |
| } |
| } |
| } |
| } |
| return vertexState; |
| } |
| |
| private VertexState handleInitEvent(VertexImpl vertex) { |
| VertexState state = vertex.setupVertex(); |
| if (state.equals(VertexState.FAILED)) { |
| return state; |
| } |
| |
| // TODO move before to handle NEW state |
| if (vertex.targetVertices != null) { |
| for (Edge e : vertex.targetVertices.values()) { |
| if (e.getEdgeManager() == null) { |
| Preconditions |
| .checkState( |
| e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM, |
| "Null edge manager allowed only for custom edge. " + vertex.logIdentifier); |
| vertex.uninitializedEdges.add(e); |
| } |
| } |
| } |
| if (vertex.sourceVertices != null) { |
| for (Edge e : vertex.sourceVertices.values()) { |
| if (e.getEdgeManager() == null) { |
| Preconditions |
| .checkState( |
| e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM, |
| "Null edge manager allowed only for custom edge. " + vertex.logIdentifier); |
| vertex.uninitializedEdges.add(e); |
| } |
| } |
| } |
| |
| // Create tasks based on initial configuration, but don't start them yet. |
| if (vertex.numTasks == -1) { |
| // this block must always return VertexState.INITIALIZING |
| LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split" |
| + " to set #tasks for the vertex " + vertex.getLogIdentifier()); |
| |
| if (vertex.inputsWithInitializers != null) { |
| if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) { |
| LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); |
| try { |
| vertex.setupInputInitializerManager(); |
| } catch (TezException e) { |
| String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e); |
| LOG.info(msg); |
| return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); |
| } |
| } |
| return VertexState.INITIALIZING; |
| } else { |
| boolean hasOneToOneUninitedSource = false; |
| for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) { |
| if (entry.getValue().getEdgeProperty().getDataMovementType() == |
| DataMovementType.ONE_TO_ONE) { |
| if (entry.getKey().getTotalTasks() == -1) { |
| hasOneToOneUninitedSource = true; |
| break; |
| } |
| } |
| } |
| if (hasOneToOneUninitedSource) { |
| LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier); |
| return VertexState.INITIALIZING; |
| } |
| if (vertex.vertexPlan.hasVertexManagerPlugin()) { |
| LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier); |
| return VertexState.INITIALIZING; |
| } |
| throw new TezUncheckedException(vertex.getLogIdentifier() + |
| " has -1 tasks but does not have input initializers, " + |
| "1-1 uninited sources or custom vertex manager to set it at runtime"); |
| } |
| } else { |
| LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier); |
| vertex.createTasks(); |
| // this block may return VertexState.INITIALIZING |
| if (vertex.inputsWithInitializers != null && |
| (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) { |
| LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); |
| try { |
| vertex.setupInputInitializerManager(); |
| } catch (TezException e) { |
| String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e); |
| LOG.error(msg); |
| return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); |
| } |
| return VertexState.INITIALIZING; |
| } |
| if (!vertex.uninitializedEdges.isEmpty()) { |
| LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier); |
| return VertexState.INITIALIZING; |
| } |
| LOG.info("Directly initializing vertex: " + vertex.logIdentifier); |
| // vertex is completely configured. Send out notification now. |
| vertex.maybeSendConfiguredEvent(); |
| boolean isInitialized = vertex.initializeVertex(); |
| if (isInitialized) { |
| return VertexState.INITED; |
| } else { |
| return VertexState.FAILED; |
| } |
| } |
| } |
| } // end of InitTransition |
| |
| @VisibleForTesting |
| protected RootInputInitializerManager createRootInputInitializerManager( |
| String dagName, String vertexName, TezVertexID vertexID, |
| EventHandler eventHandler, int numTasks, int numNodes, |
| Resource vertexTaskResource, Resource totalResource) { |
| return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier); |
| } |
| |
| private boolean initializeVertexInInitializingState() { |
| boolean isInitialized = initializeVertex(); |
| if (!isInitialized) { |
| // Don't bother starting if the vertex state is failed. |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void startIfPossible() { |
| if (startSignalPending) { |
| // Trigger a start event to ensure route events are seen before |
| // a start event. |
| LOG.info("Triggering start event for vertex: " + logIdentifier + |
| " with distanceFromRoot: " + distanceFromRoot ); |
| eventHandler.handle(new VertexEvent(vertexId, |
| VertexEventType.V_START)); |
| } |
| } |
| |
| public static class VertexInitializedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| static VertexState doTransition(VertexImpl vertex) { |
| Preconditions.checkState(vertex.canInitVertex(), "Vertex: " + vertex.logIdentifier); |
| boolean isInitialized = vertex.initializeVertexInInitializingState(); |
| if (!isInitialized) { |
| return VertexState.FAILED; |
| } |
| |
| vertex.startIfPossible(); |
| return VertexState.INITED; |
| } |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| return doTransition(vertex); |
| } |
| } |
| |
| // present in most transitions so that the initializer thread can be shutdown properly |
| public static class RootInputInitializedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event; |
| VertexState state = vertex.getState(); |
| if (state == VertexState.INITIALIZING) { |
| try { |
| vertex.vertexManager.onRootVertexInitialized(liInitEvent.getInputName(), vertex |
| .getAdditionalInputs().get(liInitEvent.getInputName()).getIODescriptor(), |
| liInitEvent.getEvents()); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.AM_USERCODE_FAILURE, msg |
| + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| |
| vertex.numInitializedInputs++; |
| if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) { |
| // All inputs initialized, shutdown the initializer. |
| vertex.rootInputInitializerManager.shutdown(); |
| vertex.rootInputInitializerManager = null; |
| } |
| |
| // the return of these events from the VM will complete initialization and move into |
| // INITED state if possible via InputDataInformationTransition |
| |
| return vertex.getState(); |
| } |
| } |
| |
| public static class InputDataInformationTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| vertex.numInitializerCompletionsHandled++; |
| VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event; |
| List<TezEvent> inputInfoEvents = iEvent.getEvents(); |
| try { |
| if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) { |
| vertex.initGeneratedEvents.addAll(inputInfoEvents); |
| vertex.handleRoutedTezEvents(inputInfoEvents, false); |
| } |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," |
| + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| |
| // done. check if we need to do the initialization |
| if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) { |
| if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size() |
| && vertex.numInitializerCompletionsHandled == vertex.inputsWithInitializers.size()) { |
| // set the wait flag to false if all initializers are done and InputDataInformation are received from VM |
| vertex.initWaitsForRootInitializers = false; |
| } |
| // initialize vertex if possible and needed |
| if (vertex.canInitVertex()) { |
| Preconditions.checkState(vertex.numTasks >= 0, |
| "Parallelism should have been set by now for vertex: " + vertex.logIdentifier); |
| return VertexInitializedTransition.doTransition(vertex); |
| } |
| } |
| return vertex.getState(); |
| } |
| } |
| |
| // Temporary to maintain topological order while starting vertices. Not useful |
| // since there's not much difference between the INIT and RUNNING states. |
| public static class SourceVertexStartedTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventSourceVertexStarted startEvent = |
| (VertexEventSourceVertexStarted) event; |
| int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1; |
| if(vertex.distanceFromRoot < distanceFromRoot) { |
| vertex.distanceFromRoot = distanceFromRoot; |
| } |
| vertex.numStartedSourceVertices++; |
| vertex.startTimeRequested = vertex.clock.getTime(); |
| LOG.info("Source vertex started: " + startEvent.getSourceVertexId() + |
| " for vertex: " + vertex.logIdentifier + " numStartedSources: " + |
| vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); |
| |
| if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) { |
| LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: " |
| + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); |
| return; |
| } |
| |
| // vertex meets external start dependency conditions. Save this signal in |
| // case we are not ready to start now and need to start later |
| vertex.startSignalPending = true; |
| |
| if (vertex.getState() != VertexState.INITED) { |
| // vertex itself is not ready to start. External dependencies have already |
| // notified us. |
| LOG.info("Cannot start vertex. Not in inited state. " |
| + vertex.logIdentifier + " . VertesState: " + vertex.getState() |
| + " numTasks: " + vertex.numTasks + " Num uninitialized edges: " |
| + vertex.uninitializedEdges.size()); |
| return; |
| } |
| |
| // vertex is inited and all dependencies are ready. Inited vertex means |
| // parallelism must be set already and edges defined |
| Preconditions.checkState( |
| (vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty()), |
| "Cannot start vertex that is not completely defined. Vertex: " |
| + vertex.logIdentifier + " numTasks: " + vertex.numTasks); |
| |
| vertex.startIfPossible(); |
| } |
| } |
| |
| boolean canInitVertex() { |
| if (numTasks >= 0 && uninitializedEdges.isEmpty() && !initWaitsForRootInitializers) { |
| // vertex fully defined |
| return true; |
| } |
| LOG.info("Cannot init vertex: " + logIdentifier + " numTasks: " + numTasks |
| + " numUnitializedEdges: " + uninitializedEdges.size() |
| + " numInitializedInputs: " + numInitializedInputs |
| + " initWaitsForRootInitializers: " + initWaitsForRootInitializers); |
| return false; |
| } |
| |
| public static class StartWhileInitializingTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| // vertex state machine does not start itself in the initializing state |
| // this start event can only come directly from the DAG. That means this |
| // is a top level vertex of the dag |
| Preconditions.checkState( |
| (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()), |
| "Vertex: " + vertex.logIdentifier + " got invalid start event"); |
| vertex.startSignalPending = true; |
| vertex.startTimeRequested = vertex.clock.getTime(); |
| } |
| |
| } |
| |
| public static class StartTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| Preconditions.checkState(vertex.getState() == VertexState.INITED, |
| "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier); |
| // if the start signal is pending this event is a fake start event to trigger this transition |
| if (!vertex.startSignalPending) { |
| vertex.startTimeRequested = vertex.clock.getTime(); |
| } |
| return vertex.startVertex(); |
| } |
| } |
| |
| private void maybeSendConfiguredEvent() { |
| // the vertex is fully configured by the time it starts. Always notify completely configured |
| // unless the vertex manager has told us that it is going to reconfigure it further |
| Preconditions.checkState(canInitVertex(), "Vertex: " + getLogIdentifier()); |
| if (!this.vertexToBeReconfiguredByManager) { |
| // this vertex will not be reconfigured by its manager |
| if (completelyConfiguredSent.compareAndSet(false, true)) { |
| stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName, |
| org.apache.tez.dag.api.event.VertexState.CONFIGURED)); |
| logVertexConfigurationDoneEvent(); |
| } |
| } |
| } |
| |
| private VertexState startVertex() { |
| Preconditions.checkState(getState() == VertexState.INITED, |
| "Vertex must be inited " + logIdentifier); |
| |
| if (recoveryData != null && recoveryData.isVertexStarted()) { |
| VertexStartedEvent vertexStartedEvent = recoveryData.getVertexStartedEvent(); |
| this.startedTime = vertexStartedEvent.getStartTime(); |
| } else { |
| this.startedTime = clock.getTime(); |
| } |
| |
| try { |
| vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions)); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier; |
| LOG.error(msg, e); |
| addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } |
| pendingReportedSrcCompletions.clear(); |
| logJobHistoryVertexStartedEvent(); |
| |
| // the vertex is fully configured by the time it starts. Always notify completely configured |
| // unless the vertex manager has told us that it is going to reconfigure it further. |
| // If the vertex was pre-configured then the event would have been sent out earlier. Calling again |
| // would be a no-op. If the vertex was not fully configured and waiting for that to complete then |
| // we would start immediately after that. Either parallelism updated (now) or IPO changed (future) |
| // or vertex added (future). Simplify these cases by sending the event now automatically for the |
| // user as if they had invoked the planned()/done() API's. |
| maybeSendConfiguredEvent(); |
| |
| // TODO: Metrics |
| //job.metrics.runningJob(job); |
| |
| // default behavior is to start immediately. so send information about us |
| // starting to downstream vertices. If the connections/structure of this |
| // vertex is not fully defined yet then we could send this event later |
| // when we are ready |
| if (targetVertices != null) { |
| for (Vertex targetVertex : targetVertices.keySet()) { |
| eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex |
| .getVertexId(), getVertexId(), distanceFromRoot)); |
| } |
| } |
| |
| // If we have no tasks, just transition to vertex completed |
| if (this.numTasks == 0) { |
| eventHandler.handle(new VertexEvent( |
| this.vertexId, VertexEventType.V_COMPLETED)); |
| } |
| |
| return VertexState.RUNNING; |
| } |
| |
| void abortVertex(final VertexStatus.State finalState) { |
| if (this.aborted.getAndSet(true)) { |
| LOG.info("Ignoring multiple aborts for vertex: " + logIdentifier); |
| return; |
| } |
| |
| if (outputCommitters != null) { |
| LOG.info("Invoking committer abort for vertex, vertexId=" + logIdentifier); |
| try { |
| TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertexId); |
| dagUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() { |
| for (Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) { |
| try { |
| LOG.info("Invoking committer abort for output=" + entry.getKey() + ", vertexId=" |
| + logIdentifier); |
| entry.getValue().abortOutput(finalState); |
| } catch (Exception e) { |
| LOG.warn("Could not abort committer for output=" + entry.getKey() + ", vertexId=" |
| + logIdentifier, e); |
| } |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| throw new TezUncheckedException("Unknown error while attempting VertexCommitter(s) abort", e); |
| } finally { |
| appContext.getHadoopShim().clearHadoopCallerContext(); |
| } |
| } |
| if (finishTime == 0) { |
| setFinishTime(); |
| } |
| // Stop related services |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("stopping services attached to the aborted Vertex, name=" |
| + getName()); |
| } |
| stopServices(); |
| } |
| |
| private void mayBeConstructFinalFullCounters() { |
| // Calculating full-counters. This should happen only once for the vertex. |
| synchronized (this.fullCountersLock) { |
| // TODO this is broken after rerun |
| if (this.fullCounters != null) { |
| // Already constructed. Just return. |
| return; |
| } |
| this.fullCounters = this.constructFinalFullcounters(); |
| } |
| } |
| |
| private VertexStatisticsImpl constructStatistics() { |
| return completedTasksStatsCache; |
| } |
| |
| @Private |
| public TezCounters constructFinalFullcounters() { |
| AggregateTezCounters aggregateTezCounters = new AggregateTezCounters(); |
| aggregateTezCounters.aggrAllCounters(counters); |
| this.vertexStats = new VertexStats(); |
| |
| for (Task t : this.tasks.values()) { |
| vertexStats.updateStats(t.getReport()); |
| TezCounters counters = t.getCounters(); |
| aggregateTezCounters.aggrAllCounters(counters); |
| } |
| return aggregateTezCounters; |
| } |
| |
| private static class RootInputInitFailedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventRootInputFailed fe = (VertexEventRootInputFailed) event; |
| String msg = "Vertex Input: " + fe.getInputName() |
| + " initializer failed, vertex=" + vertex.getLogIdentifier(); |
| LOG.error(msg, fe.getError()); |
| if (vertex.getState() == VertexState.RUNNING) { |
| vertex.addDiagnostic(msg |
| + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, |
| TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } else { |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg |
| + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| } |
| |
| // Task-start has been moved out of InitTransition, so this arc simply |
| // hardcodes 0 for both map and reduce finished tasks. |
| private static class TerminateNewVertexTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTermination vet = (VertexEventTermination) event; |
| vertex.trySetTerminationCause(vet.getTerminationCause()); |
| vertex.setFinishTime(); |
| vertex.addDiagnostic("Vertex received Kill in NEW state."); |
| vertex.finished(VertexState.KILLED); |
| } |
| } |
| |
| private static class TerminateInitedVertexTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTermination vet = (VertexEventTermination) event; |
| vertex.trySetTerminationCause(vet.getTerminationCause()); |
| vertex.addDiagnostic("Vertex received Kill in INITED state."); |
| vertex.finished(VertexState.KILLED); |
| } |
| } |
| |
| private static class TerminateInitingVertexTransition extends TerminateInitedVertexTransition { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| super.transition(vertex, event); |
| } |
| } |
| |
| private static class VertexKilledTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| vertex.addDiagnostic("Vertex received Kill while in RUNNING state."); |
| VertexEventTermination vet = (VertexEventTermination) event; |
| VertexTerminationCause trigger = vet.getTerminationCause(); |
| switch(trigger){ |
| case DAG_TERMINATED: vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break; |
| case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break; |
| case ROOT_INPUT_INIT_FAILURE: |
| case COMMIT_FAILURE: |
| case INVALID_NUM_OF_TASKS: |
| case INIT_FAILURE: |
| case INTERNAL_ERROR: |
| case AM_USERCODE_FAILURE: |
| case VERTEX_RERUN_IN_COMMITTING: |
| case VERTEX_RERUN_AFTER_COMMIT: |
| case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break; |
| default://should not occur |
| throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger); |
| } |
| |
| // TODO: Metrics |
| //job.metrics.endRunningJob(job); |
| } |
| } |
| |
| private static class VertexKilledWhileCommittingTransition |
| implements SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTermination vet = (VertexEventTermination) event; |
| VertexTerminationCause trigger = vet.getTerminationCause(); |
| String msg = "Vertex received Kill while in COMMITTING state, terminationCause=" |
| + trigger +", vertex=" + vertex.logIdentifier; |
| LOG.info(msg); |
| vertex.addDiagnostic(msg); |
| vertex.trySetTerminationCause(trigger); |
| vertex.cancelCommits(); |
| } |
| } |
| |
| private static class VertexManagerUserCodeErrorTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventManagerUserCodeError errEvent = ((VertexEventManagerUserCodeError) event); |
| AMUserCodeException e = errEvent.getError(); |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| |
| if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) { |
| vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, |
| TaskTerminationCause.AM_USERCODE_FAILURE); |
| vertex.cancelCommits(); |
| return VertexState.TERMINATING; |
| } else { |
| vertex.finished(VertexState.FAILED, |
| VertexTerminationCause.AM_USERCODE_FAILURE, msg |
| + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| } |
| |
| /** |
| * Here, the Vertex is being told that one of it's source task-attempts |
| * completed. |
| */ |
| private static class SourceTaskAttemptCompletedEventTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskAttemptCompleted completionEvent = |
| ((VertexEventSourceTaskAttemptCompleted) event).getCompletionEvent(); |
| LOG.info("Source task attempt completed for vertex: " + vertex.getLogIdentifier() |
| + " attempt: " + completionEvent.getTaskAttemptId() |
| + " with state: " + completionEvent.getTaskAttemptState() |
| + " vertexState: " + vertex.getState()); |
| |
| |
| if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent |
| .getTaskAttemptState())) { |
| vertex.numSuccessSourceAttemptCompletions++; |
| |
| if (vertex.getState() == VertexState.RUNNING) { |
| try { |
| // Inform the vertex manager about the source task completing. |
| TezTaskAttemptID taId = completionEvent.getTaskAttemptId(); |
| vertex.vertexManager.onSourceTaskCompleted( |
| getTaskAttemptIdentifier(vertex.dag.getName(), |
| vertex.dag.getVertex(taId.getVertexID()).getName(), |
| taId)); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, |
| TaskTerminationCause.AM_USERCODE_FAILURE); |
| return VertexState.TERMINATING; |
| } |
| } else { |
| vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId()); |
| } |
| } |
| return vertex.getState(); |
| } |
| } |
| |
| private static class TaskAttemptCompletedEventTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskAttemptCompleted completionEvent = |
| ((VertexEventTaskAttemptCompleted) event); |
| |
| // If different tasks were connected to different destination vertices |
| // then this would need to be sent via the edges |
| // Notify all target vertices |
| if (vertex.targetVertices != null) { |
| for (Vertex targetVertex : vertex.targetVertices.keySet()) { |
| vertex.eventHandler.handle( |
| new VertexEventSourceTaskAttemptCompleted( |
| targetVertex.getVertexId(), completionEvent) |
| ); |
| } |
| } |
| } |
| } |
| |
| private static class TaskCompletedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| if (vertex.completedTasksStatsCache == null) { |
| vertex.resetCompletedTaskStatsCache(false); |
| } |
| boolean forceTransitionToKillWait = false; |
| vertex.completedTaskCount++; |
| VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event; |
| Task task = vertex.tasks.get(taskEvent.getTaskID()); |
| if (taskEvent.getState() == TaskState.SUCCEEDED) { |
| taskSucceeded(vertex, task); |
| if (!vertex.completedTasksStatsCache.containsTask(task.getTaskID())) { |
| vertex.completedTasksStatsCache.addTask(task.getTaskID()); |
| vertex.completedTasksStatsCache.mergeFrom(((TaskImpl) task).getStatistics()); |
| } |
| } else if (taskEvent.getState() == TaskState.FAILED) { |
| taskFailed(vertex, task); |
| if (vertex.failedTaskCount * 100 > vertex.maxFailuresPercent * vertex.numTasks) { |
| LOG.info("Failing vertex: " + vertex.logIdentifier + |
| " because task failed: " + taskEvent.getTaskID()); |
| vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE); |
| forceTransitionToKillWait = true; |
| } |
| } else if (taskEvent.getState() == TaskState.KILLED) { |
| taskKilled(vertex, task); |
| } |
| |
| VertexState state = VertexImpl.checkTasksForCompletion(vertex); |
| if(state == VertexState.RUNNING && forceTransitionToKillWait){ |
| return VertexState.TERMINATING; |
| } |
| |
| return state; |
| } |
| |
| private void taskSucceeded(VertexImpl vertex, Task task) { |
| vertex.succeededTaskCount++; |
| // TODO Metrics |
| // job.metrics.completedTask(task); |
| } |
| |
| private void taskFailed(VertexImpl vertex, Task task) { |
| vertex.failedTaskCount++; |
| vertex.addDiagnostic("Task failed" |
| + ", taskId=" + task.getTaskID() |
| + ", diagnostics=" + task.getDiagnostics()); |
| // TODO Metrics |
| //vertex.metrics.failedTask(task); |
| } |
| |
| private void taskKilled(VertexImpl vertex, Task task) { |
| vertex.killedTaskCount++; |
| // TODO Metrics |
| //job.metrics.killedTask(task); |
| } |
| } |
| |
| private static class TaskRescheduledTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| //succeeded task is restarted back |
| vertex.completedTaskCount--; |
| vertex.succeededTaskCount--; |
| vertex.resetCompletedTaskStatsCache(true); |
| } |
| } |
| |
| private static class VertexNoTasksCompletedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| return VertexImpl.checkTasksForCompletion(vertex); |
| } |
| } |
| |
| private static class VertexShuffleDeleteTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| int incompleteChildrenVertices = vertex.vShuffleDeletionContext.getIncompleteChildrenVertices(); |
| incompleteChildrenVertices = incompleteChildrenVertices - 1; |
| vertex.vShuffleDeletionContext.setIncompleteChildrenVertices(incompleteChildrenVertices); |
| // check if all the child vertices are completed |
| if (incompleteChildrenVertices == 0) { |
| LOG.info("Vertex shuffle data deletion for vertex name: " + |
| vertex.getName() + " with vertex id: " + vertex.getVertexId()); |
| // Get nodes of all the task attempts in vertex |
| Set<NodeId> nodes = Sets.newHashSet(); |
| Map<TezTaskID, Task> tasksMap = vertex.getTasks(); |
| tasksMap.keySet().forEach(taskId -> { |
| Map<TezTaskAttemptID, TaskAttempt> taskAttemptMap = tasksMap.get(taskId).getAttempts(); |
| taskAttemptMap.keySet().forEach(attemptId -> { |
| nodes.add(taskAttemptMap.get(attemptId).getNodeId()); |
| }); |
| }); |
| vertex.appContext.getAppMaster().vertexComplete( |
| vertex.vertexId, nodes); |
| } else { |
| LOG.debug("The number of incomplete child vertex are {} for the vertex {}", |
| incompleteChildrenVertices, vertex.vertexId); |
| } |
| } |
| } |
| |
| private static class TaskCompletedAfterVertexSuccessTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventTaskCompleted vEvent = (VertexEventTaskCompleted) event; |
| VertexState finalState; |
| String diagnosticMsg; |
| if (vEvent.getState() == TaskState.FAILED) { |
| finalState = VertexState.FAILED; |
| diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() + |
| " failed after vertex succeeded."; |
| } else { |
| finalState = VertexState.ERROR; |
| diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + |
| " completed with state " + vEvent.getState() + " after vertex succeeded."; |
| } |
| LOG.info(diagnosticMsg); |
| vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg); |
| return finalState; |
| } |
| } |
| |
| private static class TaskRescheduledWhileCommittingTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| // terminate any running tasks |
| String diagnosticMsg = vertex.getLogIdentifier() + " failed due to in-committing rescheduling of " |
| + ((VertexEventTaskReschedule)event).getTaskID(); |
| LOG.info(diagnosticMsg); |
| vertex.addDiagnostic(diagnosticMsg); |
| vertex.tryEnactKill(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, |
| TaskTerminationCause.TASK_RESCHEDULE_IN_COMMITTING); |
| vertex.cancelCommits(); |
| } |
| } |
| |
| private static class TaskRescheduledAfterVertexSuccessTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| if (vertex.outputCommitters == null // no committer |
| || vertex.outputCommitters.isEmpty() // no committer |
| || !vertex.commitVertexOutputs) { // committer does not commit on vertex success |
| LOG.info(vertex.getLogIdentifier() + " back to running due to rescheduling " |
| + ((VertexEventTaskReschedule)event).getTaskID()); |
| (new TaskRescheduledTransition()).transition(vertex, event); |
| // inform the DAG that we are re-running |
| vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId())); |
| // back to running. so reset final cached stats |
| vertex.finalStatistics = null; |
| return VertexState.RUNNING; |
| } |
| |
| // terminate any running tasks |
| String diagnosticMsg = vertex.getLogIdentifier() + " failed due to post-commit rescheduling of " |
| + ((VertexEventTaskReschedule)event).getTaskID(); |
| LOG.info(diagnosticMsg); |
| vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, |
| TaskTerminationCause.OWN_TASK_FAILURE); |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg); |
| return VertexState.FAILED; |
| } |
| } |
| |
| private void commitCompleted(VertexEventCommitCompleted commitCompletedEvent) { |
| Preconditions.checkState(commitFutures.remove(commitCompletedEvent.getOutputName()) != null, |
| "Unknown commit:" + commitCompletedEvent.getOutputName() + ", vertex=" + logIdentifier); |
| if (commitCompletedEvent.isSucceeded()) { |
| LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputName() |
| + ", vertexId=" + logIdentifier); |
| } else { |
| String diag = "Commit failed for output:" + commitCompletedEvent.getOutputName() |
| + ", vertexId=" + logIdentifier + ", " |
| + ExceptionUtils.getStackTrace(commitCompletedEvent.getException());; |
| LOG.info(diag); |
| addDiagnostic(diag); |
| trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE); |
| cancelCommits(); |
| } |
| } |
| |
| private static class CommitCompletedTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| vertex.commitCompleted((VertexEventCommitCompleted)event); |
| return checkCommitsForCompletion(vertex); |
| } |
| } |
| |
| private void cancelCommits() { |
| if (!this.commitCanceled.getAndSet(true)) { |
| for (Map.Entry<String, ListenableFuture<Void>> entry : commitFutures.entrySet()) { |
| LOG.info("Canceling commit of output:" + entry.getKey() + ", vertexId=" + logIdentifier); |
| entry.getValue().cancel(true); |
| } |
| } |
| } |
| |
| private void addDiagnostic(String diag) { |
| diagnostics.add(diag); |
| } |
| |
| private static boolean isEventFromVertex(Vertex vertex, |
| EventMetaData sourceMeta) { |
| if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) { |
| return false; |
| } |
| return true; |
| } |
| |
| private static void checkEventSourceMetadata(Vertex vertex, |
| EventMetaData sourceMeta) { |
| assert isEventFromVertex(vertex, sourceMeta); |
| } |
| |
| // private static class RouteEventsWhileInitializingTransition implements |
| // SingleArcTransition<VertexImpl, VertexEvent> { |
| // |
| // @Override |
| // public void transition(VertexImpl vertex, VertexEvent event) { |
| // VertexEventRouteEvent re = (VertexEventRouteEvent) event; |
| // // Store the events for post-init routing, since INIT state is when |
| // // initial task parallelism will be set |
| // vertex.pendingRouteEvents.addAll(re.getEvents()); |
| // } |
| // } |
| |
| private static class RouteEventTransition implements |
| MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { |
| @Override |
| public VertexState transition(VertexImpl vertex, VertexEvent event) { |
| VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event; |
| List<TezEvent> tezEvents = rEvent.getEvents(); |
| try { |
| vertex.handleRoutedTezEvents(tezEvents, false); |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); |
| LOG.error(msg, e); |
| if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) { |
| vertex.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); |
| vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE); |
| vertex.cancelCommits(); |
| return VertexState.TERMINATING; |
| } else { |
| vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, |
| msg + "," + ExceptionUtils.getStackTrace(e.getCause())); |
| return VertexState.FAILED; |
| } |
| } |
| return vertex.getState(); |
| } |
| } |
| |
| @Override |
| public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID, |
| int fromEventId, int preRoutedFromEventId, int maxEvents) { |
| Task task = getTask(attemptID.getTaskID()); |
| ArrayList<TezEvent> events = task.getTaskAttemptTezEvents( |
| attemptID, preRoutedFromEventId, maxEvents); |
| int nextPreRoutedFromEventId = preRoutedFromEventId + events.size(); |
| int nextFromEventId = fromEventId; |
| onDemandRouteEventsReadLock.lock(); |
| try { |
| int currEventCount = onDemandRouteEvents.size(); |
| try { |
| if (currEventCount > fromEventId) { |
| if (events != TaskImpl.EMPTY_TASK_ATTEMPT_TEZ_EVENTS) { |
| events.ensureCapacity(maxEvents); |
| } else { |
| events = Lists.newArrayListWithCapacity(maxEvents); |
| } |
| int numPreRoutedEvents = events.size(); |
| int taskIndex = attemptID.getTaskID().getId(); |
| Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID |
| + " vertex: " + getLogIdentifier()); |
| boolean isFirstEvent = true; |
| boolean firstEventObsoleted = false; |
| for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) { |
| boolean earlyExit = false; |
| if (events.size() == maxEvents) { |
| break; |
| } |
| EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId); |
| if (eventInfo.isObsolete) { |
| // ignore obsolete events |
| firstEventObsoleted = true; |
| continue; |
| } |
| TezEvent tezEvent = eventInfo.tezEvent; |
| switch(tezEvent.getEventType()) { |
| case INPUT_FAILED_EVENT: |
| case DATA_MOVEMENT_EVENT: |
| case COMPOSITE_DATA_MOVEMENT_EVENT: |
| { |
| int srcTaskIndex = eventInfo.eventTaskIndex; |
| Edge srcEdge = eventInfo.eventEdge; |
| PendingEventRouteMetadata pendingRoute = null; |
| if (isFirstEvent) { |
| // the first event is the one that can have pending routes because its expanded |
| // events had not been completely sent in the last round. |
| isFirstEvent = false; |
| pendingRoute = srcEdge.removePendingEvents(attemptID); |
| if (pendingRoute != null) { |
| // the first event must match the pending route event |
| // the only reason it may not match is if in between rounds that event got |
| // obsoleted |
| if(tezEvent != pendingRoute.getTezEvent()) { |
| Preconditions.checkState(firstEventObsoleted); |
| // pending routes can be ignored for obsoleted events |
| pendingRoute = null; |
| } |
| } |
| } |
| if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex, |
| events, maxEvents, pendingRoute)) { |
| // not enough space left for this iteration events. |
| // Exit and start from here next time |
| earlyExit = true; |
| } |
| } |
| break; |
| case ROOT_INPUT_DATA_INFORMATION_EVENT: |
| { |
| InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent(); |
| if (riEvent.getTargetIndex() == taskIndex) { |
| events.add(tezEvent); |
| } |
| } |
| break; |
| default: |
| throw new TezUncheckedException("Unexpected event type for task: " |
| + tezEvent.getEventType()); |
| } |
| if (earlyExit) { |
| break; |
| } |
| } |
| int numEventsSent = events.size() - numPreRoutedEvents; |
| if (numEventsSent > 0) { |
| StringBuilder builder = new StringBuilder(); |
| builder.append("Sending ").append(attemptID).append(" ") |
| .append(numEventsSent) |
| .append(" events [").append(fromEventId).append(",").append(nextFromEventId) |
| .append(") total ").append(currEventCount).append(" ") |
| .append(getLogIdentifier()); |
| LOG.info(builder.toString()); |
| } |
| } |
| } catch (AMUserCodeException e) { |
| String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier(); |
| LOG.error(msg, e); |
| eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e)); |
| nextFromEventId = fromEventId; |
| events.clear(); |
| } |
| } finally { |
| onDemandRouteEventsReadLock.unlock(); |
| } |
| if (!events.isEmpty()) { |
| for (int i=(events.size() - 1); i>=0; --i) { |
| TezEvent lastEvent = events.get(i); |
| // record the last event sent by the AM to the task |
| EventType lastEventType = lastEvent.getEventType(); |
| // if the following changes then critical path logic/recording may need revision |
| if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT || |
| lastEventType == EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT || |
| lastEventType == EventType.DATA_MOVEMENT_EVENT || |
| lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) { |
| task.getAttempt(attemptID).setLastEventSent(lastEvent); |
| break; |
| } |
| } |
| } |
| return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId); |
| } |
| |
| private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean isPendingEvents) throws AMUserCodeException { |
| for(TezEvent tezEvent : tezEvents) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Vertex: " + getLogIdentifier() + " routing event: " |
| + tezEvent.getEventType()); |
| } |
| EventMetaData sourceMeta = tezEvent.getSourceInfo(); |
| switch(tezEvent.getEventType()) { |
| case CUSTOM_PROCESSOR_EVENT: |
| { |
| // set version as app attempt id |
| ((CustomProcessorEvent) tezEvent.getEvent()).setVersion( |
| appContext.getApplicationAttemptId().getAttemptId()); |
| // route event to task |
| EventMetaData destinationMeta = tezEvent.getDestinationInfo(); |
| Task targetTask = getTask(destinationMeta.getTaskAttemptID().getTaskID()); |
| targetTask.registerTezEvent(tezEvent); |
| } |
| break; |
| case INPUT_FAILED_EVENT: |
| case DATA_MOVEMENT_EVENT: |
| case COMPOSITE_DATA_MOVEMENT_EVENT: |
| { |
| if (isEventFromVertex(this, sourceMeta)) { |
| // event from this vertex. send to destination vertex |
| TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID(); |
| if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) { |
| ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { |
| ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } else { |
| ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); |
| } |
| Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName()); |
| Edge destEdge = targetVertices.get(destVertex); |
| if (destEdge == null) { |
| throw new TezUncheckedException("Bad destination vertex: " + |
| sourceMeta.getEdgeVertexName() + " for event vertex: " + |
| getLogIdentifier()); |
| } |
| eventHandler.handle(new VertexEventRouteEvent(destVertex |
| .getVertexId(), Collections.singletonList(tezEvent))); |
| } else { |
| if (tasksNotYetScheduled) { |
| // this is only needed to support mixed mode routing. Else for |
| // on demand routing events can be directly added to taskEvents |
| // when legacy routing is removed then pending task events can be |
| // removed. |
| pendingTaskEvents.add(tezEvent); |
| } else { |
| // event not from this vertex. must have come from source vertex. |
| int srcTaskIndex = sourceMeta.getTaskID().getId(); |
| Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName()); |
| Edge srcEdge = sourceVertices.get(edgeVertex); |
| if (srcEdge == null) { |
| throw new TezUncheckedException("Bad source vertex: " + |
| sourceMeta.getTaskVertexName() + " for destination vertex: " + |
| getLogIdentifier()); |
| } |
| if (srcEdge.hasOnDemandRouting()) { |
| processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex); |
| } else { |
| // send to tasks |
| srcEdge.sendTezEventToDestinationTasks(tezEvent); |
| } |
| } |
| } |
| } |
| break; |
| case ROOT_INPUT_DATA_INFORMATION_EVENT: |
| { |
| checkEventSourceMetadata(this, sourceMeta); |
| if (tasksNotYetScheduled) { |
| // this is only needed to support mixed mode routing. Else for |
| // on demand routing events can be directly added to taskEvents |
| // when legacy routing is removed then pending task events can be |
| // removed. |
| pendingTaskEvents.add(tezEvent); |
| } else { |
| InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent(); |
| Task targetTask = getTask(riEvent.getTargetIndex()); |
| targetTask.registerTezEvent(tezEvent); |
| } |
| } |
| break; |
| case VERTEX_MANAGER_EVENT: |
| { |
| // VM events on task success only can be changed as part of TEZ-1532 |
| VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent(); |
| Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName()); |
| Preconditions.checkArgument(target != null, |
| "Event sent to unkown vertex: " + vmEvent.getTargetVertexName()); |
| TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID(); |
| if (srcTaId.getVertexID().equals(vertexId)) { |
| // this is the producer tasks' vertex |
| vmEvent.setProducerAttemptIdentifier( |
| getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId)); |
| } |
| if (target == this) { |
| if (!vmIsInitialized.get()) { |
| // The VM hasn't been setup yet, defer event consumption |
| pendingVmEvents.add(vmEvent); |
| } else { |
| vertexManager.onVertexManagerEventReceived(vmEvent); |
| } |
| } else { |
| checkEventSourceMetadata(this, sourceMeta); |
| eventHandler.handle(new VertexEventRouteEvent(target |
| .getVertexId(), Collections.singletonList(tezEvent))); |
| } |
| } |
| break; |
| case ROOT_INPUT_INITIALIZER_EVENT: |
| { |
| InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent(); |
| Vertex target = getDAG().getVertex(riEvent.getTargetVertexName()); |
| Preconditions.checkArgument(target != null, |
| "Event sent to unknown vertex: " + riEvent.getTargetVertexName()); |
| riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName()); |
| if (target == this) { |
| if (rootInputDescriptors == null || |
| !rootInputDescriptors.containsKey(riEvent.getTargetInputName())) { |
| throw new TezUncheckedException( |
| "InputInitializerEvent targeted at unknown initializer on vertex " + |
| logIdentifier + ", Event=" + riEvent); |
| } |
| if (getState() == VertexState.NEW) { |
| pendingInitializerEvents.add(tezEvent); |
| } else if (getState() == VertexState.INITIALIZING) { |
| rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent)); |
| } else { |
| // Currently, INITED and subsequent states means Initializer complete / failure |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " |
| + getLogIdentifier() + ", state=" + getState()); |
| } |
| } |
| } else { |
| checkEventSourceMetadata(this, sourceMeta); |
| eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(), |
| Collections.singletonList(tezEvent))); |
| } |
| } |
| break; |
| case INPUT_READ_ERROR_EVENT: |
| { |
| checkEventSourceMetadata(this, sourceMeta); |
| Edge srcEdge = sourceVertices.get(this.getDAG().getVertex( |
| sourceMeta.getEdgeVertexName())); |
| srcEdge.sendTezEventToSourceTasks(tezEvent); |
| } |
| break; |
| default: |
| throw new TezUncheckedException("Unhandled tez event type: " |
| + tezEvent.getEventType()); |
| } |
| } |
| } |
| |
| private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) { |
| onDemandRouteEventsWriteLock.lock(); |
| try { |
| if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT || |
| tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { |
| // Prevent any failed task (due to INPUT_FAILED_EVENT) sending events downstream. E.g LLAP |
| if (failedTaskAttemptIDs.contains(tezEvent.getSourceInfo().getTaskAttemptID())) { |
| return; |
| } |
| } |
| onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex)); |
| if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) { |
| for (EventInfo eventInfo : onDemandRouteEvents) { |
| if (eventInfo.eventEdge == srcEdge |
| && eventInfo.tezEvent.getSourceInfo().getTaskAttemptID().equals( |
| tezEvent.getSourceInfo().getTaskAttemptID()) |
| && (eventInfo.tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT |
| || eventInfo.tezEvent |
| .getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) { |
| // any earlier data movement events from the same source |
| // edge+task |
| // can be obsoleted by an input failed event from the |
| // same source edge+task |
| eventInfo.isObsolete = true; |
| failedTaskAttemptIDs.add(tezEvent.getSourceInfo().getTaskAttemptID()); |
| } |
| } |
| } |
| } finally { |
| onDemandRouteEventsWriteLock.unlock(); |
| } |
| } |
| |
| private static class InternalErrorTransition implements |
| SingleArcTransition<VertexImpl, VertexEvent> { |
| @Override |
| public void transition(VertexImpl vertex, VertexEvent event) { |
| String msg = "Invalid event on Vertex " + vertex.getLogIdentifier(); |
| LOG.error(msg); |
| vertex.eventHandler.handle(new DAGEventDiagnosticsUpdate(vertex.getDAGId(), msg)); |
| vertex.setFinishTime(); |
| vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR); |
| vertex.cancelCommits(); |
| vertex.finished(VertexState.ERROR); |
| } |
| } |
| |
| private void setupInputInitializerManager() throws TezException { |
| rootInputInitializerManager = createRootInputInitializerManager( |
| getDAG().getName(), getName(), getVertexId(), |
| eventHandler, getTotalTasks(), |
| appContext.getTaskScheduler().getNumClusterNodes(), |
| getTaskResource(), |
| appContext.getTaskScheduler().getTotalResources(taskSchedulerIdentifier)); |
| List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size()); |
| for (String inputName : inputsWithInitializers) { |
| inputList.add(rootInputDescriptors.get(inputName)); |
| } |
| LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " + |
| logIdentifier); |
| initWaitsForRootInitializers = true; |
| rootInputInitializerManager.runInputInitializers(inputList); |
| // Send pending rootInputInitializerEvents |
| rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents); |
| pendingInitializerEvents.clear(); |
| } |
| |
| private static class VertexStateChangedCallback |
| implements OnStateChangedCallback<VertexState, VertexImpl> { |
| |
| @Override |
| public void onStateChanged(VertexImpl vertex, VertexState vertexState) { |
| vertex.stateChangeNotifier.stateChanged(vertex.getVertexId(), |
| new VertexStateUpdate(vertex.getName(), convertInternalState( |
| vertexState, vertex.getVertexId()))); |
| } |
| |
| private org.apache.tez.dag.api.event.VertexState convertInternalState(VertexState vertexState, |
| TezVertexID vertexId) { |
| switch (vertexState) { |
| case RUNNING: |
| return org.apache.tez.dag.api.event.VertexState.RUNNING; |
| case SUCCEEDED: |
| return org.apache.tez.dag.api.event.VertexState.SUCCEEDED; |
| case FAILED: |
| return org.apache.tez.dag.api.event.VertexState.FAILED; |
| case KILLED: |
| return org.apache.tez.dag.api.event.VertexState.KILLED; |
| case INITIALIZING: |
| return org.apache.tez.dag.api.event.VertexState.INITIALIZING; |
| case NEW: |
| case INITED: |
| case ERROR: |
| case TERMINATING: |
| default: |
| throw new TezUncheckedException( |
| "Not expecting state updates for state: " + vertexState + ", VertexID: " + vertexId); |
| } |
| } |
| } |
| |
| private static class VertexCommitCallback implements FutureCallback<Void>{ |
| |
| private String outputName; |
| private VertexImpl vertex; |
| |
| public VertexCommitCallback(VertexImpl vertex, String outputName) { |
| this.vertex = vertex; |
| this.outputName = outputName; |
| } |
| |
| @Override |
| public void onSuccess(Void result) { |
| vertex.getEventHandler().handle( |
| new VertexEventCommitCompleted(vertex.vertexId, outputName, true, null)); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| vertex.getEventHandler().handle( |
| new VertexEventCommitCompleted(vertex.vertexId, outputName, false, t)); |
| } |
| |
| } |
| |
| @Override |
| public void setInputVertices(Map<Vertex, Edge> inVertices) { |
| writeLock.lock(); |
| try { |
| this.sourceVertices = inVertices; |
| for (Vertex vertex : sourceVertices.keySet()) { |
| addIO(vertex.getName()); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void setOutputVertices(Map<Vertex, Edge> outVertices) { |
| writeLock.lock(); |
| try { |
| this.targetVertices = outVertices; |
| for (Vertex vertex : targetVertices.keySet()) { |
| addIO(vertex.getName()); |
| } |
| } finally { |
| writeLock.unlock();; |
| } |
| } |
| |
| @Override |
| public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) { |
| LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier); |
| this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size()); |
| for (RootInputLeafOutputProto input : inputs) { |
| addIO(input.getName()); |
| InputDescriptor id = DagTypeConverters |
| .convertInputDescriptorFromDAGPlan(input.getIODescriptor()); |
| |
| this.rootInputDescriptors |
| .put( |
| input.getName(), |
| new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>( |
| input.getName(), id, |
| input.hasControllerDescriptor() ? DagTypeConverters |
| .convertInputInitializerDescriptorFromDAGPlan(input |
| .getControllerDescriptor()) : null)); |
| this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS); |
| } |
| } |
| |
| // not taking a lock by design. Speculator callbacks to the vertex will take locks if needed |
| @Override |
| public void handleSpeculatorEvent(SpeculatorEvent event) { |
| if (isSpeculationEnabled()) { |
| speculator.handle(event); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public Map<String, OutputCommitter> getOutputCommitters() { |
| return outputCommitters; |
| } |
| |
| @Nullable |
| @Private |
| @VisibleForTesting |
| public OutputCommitter getOutputCommitter(String outputName) { |
| if (this.outputCommitters != null) { |
| return outputCommitters.get(outputName); |
| } |
| return null; |
| } |
| |
| @Override |
| public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) { |
| LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier); |
| this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size()); |
| this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size()); |
| for (RootInputLeafOutputProto output : outputs) { |
| addIO(output.getName()); |
| OutputDescriptor od = DagTypeConverters |
| .convertOutputDescriptorFromDAGPlan(output.getIODescriptor()); |
| |
| this.additionalOutputs |
| .put( |
| output.getName(), |
| new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>( |
| output.getName(), od, |
| output.hasControllerDescriptor() ? DagTypeConverters |
| .convertOutputCommitterDescriptorFromDAGPlan(output |
| .getControllerDescriptor()) : null)); |
| OutputSpec outputSpec = new OutputSpec(output.getName(), od, 0); |
| additionalOutputSpecs.add(outputSpec); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| getAdditionalInputs() { |
| readLock.lock(); |
| try { |
| return this.rootInputDescriptors; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> |
| getAdditionalOutputs() { |
| return this.additionalOutputs; |
| } |
| |
| @Override |
| public int compareTo(Vertex other) { |
| return this.vertexId.compareTo(other.getVertexId()); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) { |
| return true; |
| } |
| if (obj == null) { |
| return false; |
| } |
| if (getClass() != obj.getClass()) { |
| return false; |
| } |
| Vertex other = (Vertex) obj; |
| return this.vertexId.equals(other.getVertexId()); |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 11239; |
| return prime + prime * this.vertexId.hashCode(); |
| } |
| |
| @Override |
| public Map<Vertex, Edge> getInputVertices() { |
| return Collections.unmodifiableMap(this.sourceVertices); |
| } |
| |
| @Override |
| public Map<Vertex, Edge> getOutputVertices() { |
| return Collections.unmodifiableMap(this.targetVertices); |
| } |
| |
| @Override |
| public VertexStatistics getStatistics() { |
| readLock.lock(); |
| try { |
| if (inTerminalState()) { |
| Preconditions.checkState(this.finalStatistics != null); |
| return this.finalStatistics; |
| } |
| return constructStatistics(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getInputVerticesCount() { |
| return this.sourceVertices.size(); |
| } |
| |
| @Override |
| public int getOutputVerticesCount() { |
| return this.targetVertices.size(); |
| } |
| |
| @Override |
| public ProcessorDescriptor getProcessorDescriptor() { |
| return processorDescriptor; |
| } |
| |
| @Override |
| public DAG getDAG() { |
| return dag; |
| } |
| |
| private TezDAGID getDAGId() { |
| return getDAG().getID(); |
| } |
| |
| public Resource getTaskResource() { |
| readLock.lock(); |
| try { |
| return taskResource; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| void addIO(String name) { |
| ioIndices.put(StringInterner.intern(name), ioIndices.size()); |
| } |
| |
| @VisibleForTesting |
| String getProcessorName() { |
| return this.processorDescriptor.getClassName(); |
| } |
| |
| @VisibleForTesting |
| String getJavaOpts() { |
| return this.javaOpts; |
| } |
| |
| @VisibleForTesting |
| TaskLocationHint[] getTaskLocationHints() { |
| return taskLocationHints; |
| } |
| |
| @Override |
| public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException { |
| // For locking strategy, please refer to getOutputSpecList() |
| readLock.lock(); |
| List<InputSpec> inputSpecList = null; |
| try { |
| inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() |
| + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); |
| if (rootInputDescriptors != null) { |
| for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> |
| rootInputDescriptorEntry : rootInputDescriptors.entrySet()) { |
| inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(), |
| rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get( |
| rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| |
| for(Vertex vertex : getInputVertices().keySet()) { |
| /** |
| * It is possible that setParallelism is in the middle of processing in target vertex with |
| * its write lock. So we need to get inputspec by acquiring read lock in target vertex to |
| * get consistent view. |
| * Refer TEZ-2251 |
| */ |
| InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); |
| // TODO DAGAM This should be based on the edge type. |
| inputSpecList.add(inputSpec); |
| } |
| return inputSpecList; |
| } |
| |
| @Override |
| public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException { |
| /** |
| * Ref: TEZ-3297 |
| * Locking entire method could introduce a nested lock and |
| * could lead to deadlock in corner cases. Example of deadlock with nested lock here: |
| * 1. In thread#1, Downstream vertex is in the middle of processing setParallelism and gets |
| * writeLock. |
| * 2. In thread#2, currentVertex acquires read lock |
| * 3. In thread#3, central dispatcher tries to process an event for current vertex, |
| * so tries to acquire write lock. |
| * |
| * In further processing, |
| * 4. In thread#1, it tries to acquire readLock on current vertex for setting edges. But |
| * this would be blocked as #3 already requested for write lock |
| * 5. In thread#2, getting readLock on downstream vertex would be blocked as writeLock |
| * is held by thread#1. |
| * 6. thread#3 is anyways blocked due to thread#2's read lock on current vertex. |
| */ |
| |
| List<OutputSpec> outputSpecList = null; |
| readLock.lock(); |
| try { |
| outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() |
| + this.additionalOutputSpecs.size()); |
| outputSpecList.addAll(additionalOutputSpecs); |
| } finally { |
| readLock.unlock(); |
| } |
| |
| for(Vertex vertex : targetVertices.keySet()) { |
| /** |
| * It is possible that setParallelism (which could change numTasks) is in the middle of |
| * processing in target vertex with its write lock. So we need to get outputspec by |
| * acquiring read lock in target vertex to get consistent view. |
| * Refer TEZ-2251 |
| */ |
| OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); |
| outputSpecList.add(outputSpec); |
| } |
| return outputSpecList; |
| } |
| |
| private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws |
| AMUserCodeException { |
| readLock.lock(); |
| try { |
| Edge edge = sourceVertices.get(vertex); |
| Preconditions.checkState(edge != null, getLogIdentifier()); |
| return edge.getSourceSpec(taskIndex); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private InputSpec getDestinationSpecFor(VertexImpl vertex, int taskIndex) throws |
| AMUserCodeException { |
| readLock.lock(); |
| try { |
| Edge edge = targetVertices.get(vertex); |
| Preconditions.checkState(edge != null, getLogIdentifier()); |
| return edge.getDestinationSpec(taskIndex); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| |
| @Override |
| public List<GroupInputSpec> getGroupInputSpecList() { |
| readLock.lock(); |
| try { |
| return groupInputSpecList; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public synchronized void addSharedOutputs(Set<String> outputs) { |
| this.sharedOutputs.addAll(outputs); |
| } |
| |
| @Override |
| public synchronized Set<String> getSharedOutputs() { |
| return this.sharedOutputs; |
| } |
| |
| @VisibleForTesting |
| VertexManager getVertexManager() { |
| return this.vertexManager; |
| } |
| |
| public boolean isVertexInitSkipped() { |
| return isVertexInitSkipped; |
| } |
| |
| |
| private static void logLocationHints(String vertexName, |
| VertexLocationHint locationHint) { |
| if (locationHint == null) { |
| LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName); |
| return; |
| } |
| Multiset<String> hosts = HashMultiset.create(); |
| Multiset<String> racks = HashMultiset.create(); |
| int counter = 0; |
| for (TaskLocationHint taskLocationHint : locationHint |
| .getTaskLocationHints()) { |
| StringBuilder sb = new StringBuilder(); |
| if (taskLocationHint.getHosts() == null) { |
| sb.append("No Hosts"); |
| } else { |
| sb.append("Hosts: "); |
| for (String host : taskLocationHint.getHosts()) { |
| hosts.add(host); |
| sb.append(host).append(", "); |
| } |
| } |
| if (taskLocationHint.getRacks() == null) { |
| sb.append("No Racks"); |
| } else { |
| sb.append("Racks: "); |
| for (String rack : taskLocationHint.getRacks()) { |
| racks.add(rack); |
| sb.append(rack).append(", "); |
| } |
| } |
| LOG.debug("Vertex: " + vertexName + ", Location: " |
| + counter + " : " + sb.toString()); |
| counter++; |
| } |
| |
| LOG.debug("Vertex: " + vertexName + ", Host Counts"); |
| for (Multiset.Entry<String> host : hosts.entrySet()) { |
| LOG.debug("Vertex: " + vertexName + ", host: " + host.toString()); |
| } |
| |
| LOG.debug("Vertex: " + vertexName + ", Rack Counts"); |
| for (Multiset.Entry<String> rack : racks.entrySet()) { |
| LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString()); |
| } |
| } |
| |
| /** |
| * This is for recovery when VertexReconfigureDoneEvent is seen. |
| */ |
| public static class NoOpVertexManager extends VertexManagerPlugin { |
| |
| private VertexConfigurationDoneEvent configurationDoneEvent; |
| private boolean setParallelismInInitializing = false; |
| |
| public NoOpVertexManager(VertexManagerPluginContext context) { |
| super(context); |
| } |
| |
| @Override |
| public void initialize() throws Exception { |
| LOG.debug("initialize NoOpVertexManager"); |
| configurationDoneEvent = new VertexConfigurationDoneEvent(); |
| configurationDoneEvent.fromProtoStream(CodedInputStream.newInstance(getContext().getUserPayload().deepCopyAsArray())); |
| String vertexName = getContext().getVertexName(); |
| if (getContext().getVertexNumTasks(vertexName) == -1) { |
| Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called " |
| + "when numTasks is -1"); |
| setParallelismInInitializing = true; |
| getContext().registerForVertexStateUpdates(vertexName, |
| Sets.newHashSet(org.apache.tez.dag.api.event.VertexState.INITIALIZING)); |
| } |
| getContext().vertexReconfigurationPlanned(); |
| } |
| |
| @Override |
| public void onVertexStarted(List<TaskAttemptIdentifier> completions) |
| throws Exception { |
| // apply the ReconfigureDoneEvent and then schedule all the tasks. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); |
| } |
| if (!setParallelismInInitializing && configurationDoneEvent.isSetParallelismCalled()) { |
| reconfigureVertex(); |
| } |
| getContext().doneReconfiguringVertex(); |
| int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); |
| LOG.debug("Schedule all the tasks, numTask={}", numTasks); |
| List<ScheduleTaskRequest> tasks = new ArrayList<ScheduleTaskRequest>(); |
| for (int i=0;i<numTasks;++i) { |
| tasks.add(ScheduleTaskRequest.create(i, null)); |
| } |
| getContext().scheduleTasks(tasks); |
| } |
| |
| @Override |
| public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) |
| throws Exception { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("onSourceTaskCompleted is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); |
| } |
| } |
| |
| @Override |
| public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) |
| throws Exception { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("onVertexManagerEventReceived is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); |
| } |
| } |
| |
| @Override |
| public void onRootVertexInitialized(String inputName, |
| InputDescriptor inputDescriptor, List<Event> events) throws Exception { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("onRootVertexInitialized is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); |
| } |
| } |
| |
| @Override |
| public void onVertexStateUpdated(VertexStateUpdate stateUpdate) |
| throws Exception { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("onVertexStateUpdated is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); |
| } |
| Preconditions.checkArgument(stateUpdate.getVertexState() == |
| org.apache.tez.dag.api.event.VertexState.INITIALIZING, "NoOpVertexManager get unexpected notification of " |
| + " VertexStateUpdate:" + stateUpdate.getVertexState()); |
| reconfigureVertex(); |
| } |
| |
| private void reconfigureVertex() { |
| getContext().reconfigureVertex(configurationDoneEvent.getNumTasks(), |
| configurationDoneEvent.getVertexLocationHint(), |
| configurationDoneEvent.getSourceEdgeProperties(), |
| configurationDoneEvent.getRootInputSpecUpdates()); |
| } |
| } |
| |
| @Private |
| @VisibleForTesting |
| void setCounters(TezCounters counters) { |
| try { |
| writeLock.lock(); |
| this.fullCounters = counters; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @VisibleForTesting |
| static class VertexConfigImpl implements VertexConfig { |
| |
| private final int maxFailedTaskAttempts; |
| private final int maxTaskAttempts; |
| private final boolean taskRescheduleHigherPriority; |
| private final boolean taskRescheduleRelaxedLocality; |
| |
| /** |
| * See tez.task.max.allowed.output.failures.fraction. |
| */ |
| private final double maxAllowedOutputFailuresFraction; |
| /** |
| * See tez.task.max.allowed.output.failures. |
| */ |
| private final int maxAllowedOutputFailures; |
| /** |
| * See tez.am.max.allowed.time-sec.for-read-error. |
| */ |
| private final int maxAllowedTimeForTaskReadErrorSec; |
| /** |
| * See tez.am.max.allowed.downstream.host.failures.fraction. |
| */ |
| private final double maxAllowedDownstreamHostFailuresFraction; |
| |
| public VertexConfigImpl(Configuration conf) { |
| this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, |
| TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); |
| this.maxTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS, |
| TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS_DEFAULT); |
| this.taskRescheduleHigherPriority = |
| conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, |
| TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT); |
| this.taskRescheduleRelaxedLocality = |
| conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY, |
| TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT); |
| |
| this.maxAllowedOutputFailures = conf.getInt(TezConfiguration |
| .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration |
| .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); |
| |
| this.maxAllowedOutputFailuresFraction = conf.getDouble(TezConfiguration |
| .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration |
| .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); |
| |
| this.maxAllowedTimeForTaskReadErrorSec = conf.getInt( |
| TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, |
| TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT); |
| |
| this.maxAllowedDownstreamHostFailuresFraction = conf.getDouble( |
| TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION, |
| TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION_DEFAULT); |
| } |
| |
| @Override |
| public int getMaxFailedTaskAttempts() { |
| return maxFailedTaskAttempts; |
| } |
| |
| @Override |
| public int getMaxTaskAttempts() { |
| return maxTaskAttempts; |
| } |
| |
| @Override |
| public boolean getTaskRescheduleHigherPriority() { |
| return taskRescheduleHigherPriority; |
| } |
| |
| @Override |
| public boolean getTaskRescheduleRelaxedLocality() { |
| return taskRescheduleRelaxedLocality; |
| } |
| |
| /** |
| * @return maxAllowedOutputFailures. |
| */ |
| @Override public int getMaxAllowedOutputFailures() { |
| return maxAllowedOutputFailures; |
| } |
| |
| /** |
| * @return maxAllowedOutputFailuresFraction. |
| */ |
| @Override public double getMaxAllowedOutputFailuresFraction() { |
| return maxAllowedOutputFailuresFraction; |
| } |
| |
| /** |
| * @return maxAllowedTimeForTaskReadErrorSec. |
| */ |
| @Override public int getMaxAllowedTimeForTaskReadErrorSec() { |
| return maxAllowedTimeForTaskReadErrorSec; |
| } |
| |
| /** |
| * @return maxAllowedDownstreamHostsReportingFetchFailure. |
| */ |
| @Override public double getMaxAllowedDownstreamHostFailuresFraction() { |
| return maxAllowedDownstreamHostFailuresFraction; |
| } |
| } |
| |
| @Override |
| public AbstractService getSpeculator() { return speculator; } |
| |
| @Override |
| public Map<String, Set<String>> getDownstreamBlamingHosts(){ |
| return downstreamBlamingHosts; |
| } |
| |
| /** |
| * Initialize context from vertex shuffle deletion. |
| * @param deletionHeight |
| */ |
| public void initShuffleDeletionContext(int deletionHeight) { |
| VertexShuffleDataDeletionContext vShuffleDeletionContext = new VertexShuffleDataDeletionContext(deletionHeight); |
| vShuffleDeletionContext.setSpannedVertices(this); |
| this.vShuffleDeletionContext = vShuffleDeletionContext; |
| } |
| } |