blob: 007774f539edc64e29e15946fadec5a4d931cdd7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
public class TaskAttemptImpl implements TaskAttempt,
EventHandler<TaskAttemptEvent> {
// TODO Ensure MAPREDUCE-4457 is factored in. Also MAPREDUCE-4068.
// TODO Consider TAL registration in the TaskAttempt instead of the container.
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
static final TezCounters EMPTY_COUNTERS = new TezCounters();
protected final Configuration conf;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
private final TezTaskAttemptID attemptId;
private final Clock clock;
private TaskAttemptTerminationCause terminationCause = TaskAttemptTerminationCause.UNKNOWN_ERROR;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
protected final AppContext appContext;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private long launchTime = 0;
private long finishTime = 0;
private String trackerName;
private int httpPort;
// TODO Can these be replaced by the container object TEZ-1037
private Container container;
private ContainerId containerId;
private NodeId containerNodeId;
private String nodeHttpAddress;
private String nodeRackName;
@VisibleForTesting
TaskAttemptStatus reportedStatus;
private DAGCounter localityCounter;
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
Set<String> taskRacks = new HashSet<String>();
private Set<TezTaskAttemptID> uniquefailedOutputReports =
new HashSet<TezTaskAttemptID>();
private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.25;
protected final boolean isRescheduled;
private final Resource taskResource;
private final ContainerContext containerContext;
private final boolean leafVertex;
protected static final FailedTransitionHelper FAILED_HELPER =
new FailedTransitionHelper();
protected static final KilledTransitionHelper KILLED_HELPER =
new KilledTransitionHelper();
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION =
new DiagnosticInformationUpdater();
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
TERMINATED_AFTER_SUCCESS_HELPER = new TerminatedAfterSuccessHelper(KILLED_HELPER);
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
STATUS_UPDATER = new StatusUpdaterTransition();
private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
private static StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory
= new StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptStateInternal.NEW)
.addTransition(TaskAttemptStateInternal.NEW,
EnumSet.of(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED),
TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
.addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminateTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.NEW,
EnumSet.of(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.START_WAIT,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminatedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_NODE_FAILED,
new NodeFailedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
new ContainerTerminatingBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
new ContainerCompletedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Optional, may not come in for all tasks.
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
new SucceededTransition())
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_FAILED,
new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TIMED_OUT,
new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_NODE_FAILED,
new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
new ContainerCompletedWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
.addTransition(
TaskAttemptStateInternal.RUNNING,
EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptStateInternal.RUNNING),
TaskAttemptEventType.TA_OUTPUT_FAILED,
new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(
TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.KILLED,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_SCHEDULE,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(
TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.FAILED,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_SCHEDULE,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_OUTPUT_FAILED))
// How will duplicate history events be handled ?
// TODO Maybe consider not failing REDUCE tasks in this case. Also,
// MAP_TASKS in case there's only one phase in the job.
.addTransition(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminatedAfterSuccessTransition())
.addTransition(
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_NODE_FAILED,
new TerminatedAfterSuccessTransition())
.addTransition(
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_OUTPUT_FAILED,
new OutputReportedFailedTransition())
.addTransition(
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM))
.installTopology();
private TaskAttemptState recoveredState = TaskAttemptState.NEW;
private boolean recoveryStartEventSeen = false;
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
this.eventHandler = eventHandler;
//Reported status
this.conf = conf;
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
this.stateMachine = stateMachineFactory.make(this);
this.isRescheduled = isRescheduled;
this.taskResource = resource;
this.containerContext = containerContext;
this.leafVertex = leafVertex;
}
@Override
public TezTaskAttemptID getID() {
return attemptId;
}
@Override
public TezTaskID getTaskID() {
return attemptId.getTaskID();
}
@Override
public TezVertexID getVertexID() {
return attemptId.getTaskID().getVertexID();
}
@Override
public TezDAGID getDAGID() {
return getVertexID().getDAGId();
}
TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
Vertex vertex = getVertex();
ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
int taskId = getTaskID().getId();
return new TaskSpec(getID(),
vertex.getDAG().getName(),
vertex.getName(), vertex.getTotalTasks(), procDesc,
vertex.getInputSpecList(taskId), vertex.getOutputSpecList(taskId),
vertex.getGroupInputSpecList(taskId));
}
@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
readLock.lock();
try {
result.setTaskAttemptId(attemptId);
//take the LOCAL state of attempt
//DO NOT take from reportedStatus
result.setTaskAttemptState(getState());
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
//result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
//result.setPhase(reportedStatus.phase);
//result.setStateString(reportedStatus.statef);
result.setCounters(getCounters());
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.containerNodeId != null) {
result.setNodeManagerPort(this.containerNodeId.getPort());
}
return result;
} finally {
readLock.unlock();
}
}
@Override
public List<String> getDiagnostics() {
List<String> result = new ArrayList<String>();
readLock.lock();
try {
result.addAll(diagnostics);
return result;
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptTerminationCause getTerminationCause() {
return terminationCause;
}
@Override
public TezCounters getCounters() {
readLock.lock();
try {
reportedStatus.setLocalityCounter(this.localityCounter);
TezCounters counters = reportedStatus.counters;
if (counters == null) {
counters = EMPTY_COUNTERS;
}
return counters;
} finally {
readLock.unlock();
}
}
@Override
public float getProgress() {
readLock.lock();
try {
return reportedStatus.progress;
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptState getState() {
readLock.lock();
try {
return getStateNoLock();
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptState getStateNoLock() {
return getExternalState(stateMachine.getCurrentState());
}
@Override
public boolean isFinished() {
readLock.lock();
try {
return (EnumSet.of(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.KILL_IN_PROGRESS)
.contains(getInternalState()));
} finally {
readLock.unlock();
}
}
@Override
public ContainerId getAssignedContainerID() {
readLock.lock();
try {
return containerId;
} finally {
readLock.unlock();
}
}
@Override
public Container getAssignedContainer() {
readLock.lock();
try {
return container;
} finally {
readLock.unlock();
}
}
@Override
public String getAssignedContainerMgrAddress() {
readLock.lock();
try {
return containerNodeId.toString();
} finally {
readLock.unlock();
}
}
@Override
public NodeId getNodeId() {
readLock.lock();
try {
return containerNodeId;
} finally {
readLock.unlock();
}
}
/**If container Assigned then return the node's address, otherwise null.
*/
@Override
public String getNodeHttpAddress() {
readLock.lock();
try {
return nodeHttpAddress;
} finally {
readLock.unlock();
}
}
/**
* If container Assigned then return the node's rackname, otherwise null.
*/
@Override
public String getNodeRackName() {
this.readLock.lock();
try {
return this.nodeRackName;
} finally {
this.readLock.unlock();
}
}
@Override
public long getLaunchTime() {
readLock.lock();
try {
return launchTime;
} finally {
readLock.unlock();
}
}
@Override
public long getFinishTime() {
readLock.lock();
try {
return finishTime;
} finally {
readLock.unlock();
}
}
@Override
public Task getTask() {
return appContext.getCurrentDAG()
.getVertex(attemptId.getTaskID().getVertexID())
.getTask(attemptId.getTaskID());
}
Vertex getVertex() {
return appContext.getCurrentDAG()
.getVertex(attemptId.getTaskID().getVertexID());
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing TaskAttemptEvent " + event.getTaskAttemptID()
+ " of type " + event.getType() + " while in state "
+ getInternalState() + ". Event: " + event);
}
writeLock.lock();
try {
final TaskAttemptStateInternal oldState = getInternalState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state for "
+ this.attemptId, e);
eventHandler.handle(new DAGEventDiagnosticsUpdate(
this.attemptId.getTaskID().getVertexID().getDAGId(),
"Invalid event " + event.getType() +
" on TaskAttempt " + this.attemptId));
eventHandler.handle(
new DAGEvent(
this.attemptId.getTaskID().getVertexID().getDAGId(),
DAGEventType.INTERNAL_ERROR)
);
}
if (oldState != getInternalState()) {
LOG.info(attemptId + " TaskAttempt Transitioned from "
+ oldState + " to "
+ getInternalState() + " due to event "
+ event.getType());
}
} finally {
writeLock.unlock();
}
}
@VisibleForTesting
public TaskAttemptStateInternal getInternalState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
case NEW:
case START_WAIT:
return TaskAttemptState.STARTING;
case RUNNING:
return TaskAttemptState.RUNNING;
case FAILED:
case FAIL_IN_PROGRESS:
return TaskAttemptState.FAILED;
case KILLED:
case KILL_IN_PROGRESS:
return TaskAttemptState.KILLED;
case SUCCEEDED:
return TaskAttemptState.SUCCEEDED;
default:
throw new TezUncheckedException("Attempt to convert invalid "
+ "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ smState);
}
}
@Override
public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
switch (historyEvent.getEventType()) {
case TASK_ATTEMPT_STARTED:
{
TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
this.launchTime = tEvent.getStartTime();
recoveryStartEventSeen = true;
recoveredState = TaskAttemptState.RUNNING;
this.containerId = tEvent.getContainerId();
sendEvent(createDAGCounterUpdateEventTALaunched(this));
return recoveredState;
}
case TASK_ATTEMPT_FINISHED:
{
if (!recoveryStartEventSeen) {
throw new RuntimeException("Finished Event seen but"
+ " no Started Event was encountered earlier");
}
TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
this.finishTime = tEvent.getFinishTime();
this.reportedStatus.counters = tEvent.getCounters();
this.reportedStatus.progress = 1f;
this.reportedStatus.state = tEvent.getState();
this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError()
: TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
return recoveredState;
}
default:
throw new RuntimeException("Unexpected event received for restoring"
+ " state, eventType=" + historyEvent.getEventType());
}
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
}
// always called in write lock
private void setFinishTime() {
// set the finish time only if launch time is set
if (launchTime != 0 && finishTime == 0) {
finishTime = clock.getTime();
}
}
// TOOD Merge some of these JobCounter events.
private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
TaskAttemptImpl ta) {
DAGEventCounterUpdate dagCounterEvent =
new DAGEventCounterUpdate(
ta.getDAGID()
);
dagCounterEvent.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
return dagCounterEvent;
}
private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
DAGEventCounterUpdate jce =
new DAGEventCounterUpdate(taskAttempt.getDAGID());
if (taState == TaskAttemptState.FAILED) {
jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
} else if (taState == TaskAttemptState.KILLED) {
jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
} else if (taState == TaskAttemptState.SUCCEEDED ) {
jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
}
return jce;
}
// private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
// int slotMemoryReq =
// taskAttempt.taskResource.getMemory();
//
// int minSlotMemSize =
// taskAttempt.appContext.getClusterInfo().getMinContainerCapability()
// .getMemory();
//
// int simSlotsRequired =
// minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
// / minSlotMemSize);
//
// long slotMillisIncrement =
// simSlotsRequired
// * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
// return slotMillisIncrement;
// }
// TODO: JobHistory
// TODO Change to return a JobHistoryEvent.
/*
private static
TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
TaskAttemptStateInternal attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
taskAttempt.containerNodeId == null ? "UNKNOWN"
: taskAttempt.containerNodeId.getHost(),
taskAttempt.containerNodeId == null ? -1
: taskAttempt.containerNodeId.getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join(
taskAttempt.getDiagnostics(), LINE_SEPARATOR), taskAttempt
.getProgressSplitBlock().burst());
return tauce;
}
// TODO Incorporate MAPREDUCE-4838
private JobHistoryEvent createTaskAttemptStartedEvent() {
TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(
TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(taskId
.getTaskType()), launchTime, trackerName, httpPort, shufflePort,
containerId, "", "");
return new JobHistoryEvent(jobId, tase);
}
*/
// private WrappedProgressSplitsBlock getProgressSplitBlock() {
// return null;
// // TODO
// /*
// readLock.lock();
// try {
// if (progressSplitBlock == null) {
// progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
// MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
// MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
// }
// return progressSplitBlock;
// } finally {
// readLock.unlock();
// }
// */
// }
private void updateProgressSplits() {
// double newProgress = reportedStatus.progress;
// newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
// TezCounters counters = reportedStatus.counters;
// if (counters == null)
// return;
//
// WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
// if (splitsBlock != null) {
// long now = clock.getTime();
// long start = getLaunchTime();
//
// if (start == 0)
// return;
//
// if (start != 0 && now - start <= Integer.MAX_VALUE) {
// splitsBlock.getProgressWallclockTime().extend(newProgress,
// (int) (now - start));
// }
//
// TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
// if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
// splitsBlock.getProgressCPUTime().extend(newProgress,
// (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
// }
//
// TezCounter virtualBytes = counters
// .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
// if (virtualBytes != null) {
// splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
// (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
// }
//
// TezCounter physicalBytes = counters
// .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
// if (physicalBytes != null) {
// splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
// (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
// }
// }
}
private void sendTaskAttemptCleanupEvent() {
// TaskAttemptContext taContext =
// new TaskAttemptContextImpl(this.conf,
// TezMRTypeConverter.fromTez(this.attemptId));
// sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
}
@VisibleForTesting
protected TaskLocationHint getTaskLocationHint() {
return getVertex().getTaskLocationHint(getTaskID());
}
protected String[] resolveHosts(String[] src) {
return TaskAttemptImplHelpers.resolveHosts(src);
}
protected void logJobHistoryAttemptStarted() {
final String containerIdStr = containerId.toString();
String inProgressLogsUrl = nodeHttpAddress
+ "/" + "node/containerlogs"
+ "/" + containerIdStr
+ "/" + this.appContext.getUser();
String completedLogsUrl = "";
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
String contextStr = "v_" + getTask().getVertex().getName()
+ "_" + this.attemptId.toString();
completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+ "/" + containerNodeId.toString()
+ "/" + containerIdStr
+ "/" + contextStr
+ "/" + this.appContext.getUser();
}
TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
attemptId, getTask().getVertex().getName(),
launchTime, containerId, containerNodeId,
inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), startEvt));
}
protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
if (getLaunchTime() == 0) return;
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
getFinishTime(), TaskAttemptState.SUCCEEDED, null,
"", getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
}
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
clock.getTime(), state,
terminationCause,
StringUtils.join(
getDiagnostics(), LINE_SEPARATOR), getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
}
//////////////////////////////////////////////////////////////////////////////
// Start of Transition Classes //
//////////////////////////////////////////////////////////////////////////////
protected static class ScheduleTaskattemptTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
// TODO Creating the remote task here may not be required in case of
// recovery.
// Create the remote task.
TaskSpec remoteTaskSpec;
try {
remoteTaskSpec = ta.createRemoteTaskSpec();
LOG.info("remoteTaskSpec:" + remoteTaskSpec);
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
LOG.error(msg, e);
String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
new TerminateTransition(FAILED_HELPER).transition(ta,
new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
}
// Create startTaskRequest
String[] requestHosts = new String[0];
// Compute node/rack location request even if re-scheduled.
Set<String> racks = new HashSet<String>();
TaskLocationHint locationHint = ta.getTaskLocationHint();
if (locationHint != null) {
if (locationHint.getRacks() != null) {
racks.addAll(locationHint.getRacks());
}
if (locationHint.getHosts() != null) {
for (String host : locationHint.getHosts()) {
racks.add(RackResolver.resolve(host).getNetworkLocation());
}
requestHosts = ta.resolveHosts(locationHint.getHosts()
.toArray(new String[locationHint.getHosts().size()]));
}
}
ta.taskHosts.addAll(Arrays.asList(requestHosts));
ta.taskRacks = racks;
// Ask for hosts / racks only if not a re-scheduled task.
if (ta.isRescheduled) {
locationHint = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Asking for container launch with taskAttemptContext: "
+ remoteTaskSpec);
}
// Send out a launch request to the scheduler.
int priority;
if (ta.isRescheduled) {
// higher priority for rescheduled attempts
priority = scheduleEvent.getPriorityHighLimit();
} else {
priority = (scheduleEvent.getPriorityHighLimit() + scheduleEvent.getPriorityLowLimit()) / 2;
}
AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
priority, ta.containerContext);
ta.sendEvent(launchRequestEvent);
return TaskAttemptStateInternal.START_WAIT;
}
}
protected static class DiagnosticInformationUpdater implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventDiagnosticsUpdate diagEvent = (TaskAttemptEventDiagnosticsUpdate) event;
if (LOG.isDebugEnabled()) {
LOG.debug("Diagnostics update for " + ta.attemptId + ": "
+ diagEvent.getDiagnosticInfo());
}
ta.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
}
}
protected static class TerminateTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
TerminatedTransitionHelper helper;
public TerminateTransition(TerminatedTransitionHelper helper) {
this.helper = helper;
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.setFinishTime();
if (event instanceof DiagnosableEvent) {
ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
}
// this should catch at test time if any new events are missing the error cause
assert event instanceof TaskAttemptEventTerminationCauseEvent;
if (event instanceof TaskAttemptEventTerminationCauseEvent) {
ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) event).getTerminationCause());
}
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
helper.getTaskAttemptState()));
if (ta.getLaunchTime() != 0) {
// TODO For cases like this, recovery goes for a toss, since the the
// attempt will not exist in the history file.
ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
.getTaskAttemptState());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not generating HistoryFinish event since start event not "
+ "generated for taskAttempt: " + ta.getID());
}
}
// Send out events to the Task - indicating TaskAttemptTermination(F/K)
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
.getTaskEventType()));
}
}
protected static class StartedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
Container container = ta.appContext.getAllContainers()
.get(event.getContainerId()).getContainer();
ta.container = container;
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
ta.nodeRackName = StringInterner.weakIntern(RackResolver.resolve(ta.containerNodeId.getHost())
.getNetworkLocation());
ta.launchTime = ta.clock.getTime();
// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = NetUtils
.createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
ta.trackerName = StringInterner.weakIntern(nodeHttpInetAddr.getHostName());
ta.httpPort = nodeHttpInetAddr.getPort();
ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));
LOG.info("TaskAttempt: [" + ta.attemptId + "] started."
+ " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
+ ta.containerNodeId + "]");
// JobHistoryEvent
ta.logJobHistoryAttemptStarted();
// TODO Remove after HDFS-5098
// Compute LOCALITY counter for this task.
if (ta.taskHosts.contains(ta.containerNodeId.getHost())) {
ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
} else if (ta.taskRacks.contains(ta.nodeRackName)) {
ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
} else {
// Not computing this if the task does not have locality information.
if (ta.getTaskLocationHint() != null) {
ta.localityCounter = DAGCounter.OTHER_LOCAL_TASKS;
}
}
// Inform the Task
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED));
if (ta.isSpeculationEnabled()) {
ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
ta.launchTime, true));
}
ta.taskHeartbeatHandler.register(ta.attemptId);
}
}
private boolean isSpeculationEnabled() {
return conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
}
protected static class TerminatedBeforeRunningTransition extends
TerminateTransition {
public TerminatedBeforeRunningTransition(
TerminatedTransitionHelper helper) {
super(helper);
}
protected boolean sendSchedulerEvent() {
return true;
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
// Inform the scheduler
if (sendSchedulerEvent()) {
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
.getTaskAttemptState()));
}
}
}
protected static class NodeFailedBeforeRunningTransition extends
TerminatedBeforeRunningTransition {
public NodeFailedBeforeRunningTransition() {
super(KILLED_HELPER);
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
}
}
protected static class ContainerTerminatingBeforeRunningTransition extends
TerminatedBeforeRunningTransition {
public ContainerTerminatingBeforeRunningTransition() {
super(FAILED_HELPER);
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
}
}
protected static class ContainerCompletedBeforeRunningTransition extends
TerminatedBeforeRunningTransition {
public ContainerCompletedBeforeRunningTransition() {
super(FAILED_HELPER);
}
public ContainerCompletedBeforeRunningTransition(TerminatedTransitionHelper helper) {
super(helper);
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
}
}
protected static class StatusUpdaterTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
.getStatusEvent();
ta.reportedStatus.state = ta.getState();
ta.reportedStatus.progress = statusEvent.getProgress();
ta.reportedStatus.counters = statusEvent.getCounters();
ta.updateProgressSplits();
if (ta.isSpeculationEnabled()) {
ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
ta.clock.getTime()));
}
}
}
protected static class SucceededTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.setFinishTime();
// Send out history event.
ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
TaskAttemptState.SUCCEEDED));
// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
TaskAttemptState.SUCCEEDED));
// Inform the task.
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
// Unregister from the TaskHeartbeatHandler.
ta.taskHeartbeatHandler.unregister(ta.attemptId);
ta.reportedStatus.state = TaskAttemptState.SUCCEEDED;
ta.reportedStatus.progress = 1.0f;
if (ta.isSpeculationEnabled()) {
ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
ta.clock.getTime()));
}
// TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the
// TA finishes independently. // Will likely be the Job's responsibility.
}
}
protected static class TerminatedWhileRunningTransition extends
TerminatedBeforeRunningTransition {
public TerminatedWhileRunningTransition(
TerminatedTransitionHelper helper) {
super(helper);
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.taskHeartbeatHandler.unregister(ta.attemptId);
ta.reportedStatus.state = helper.getTaskAttemptState(); // FAILED or KILLED
if (ta.isSpeculationEnabled()) {
ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
ta.clock.getTime()));
}
}
}
protected static class ContainerCompletedWhileRunningTransition extends
TerminatedBeforeRunningTransition {
public ContainerCompletedWhileRunningTransition() {
super(FAILED_HELPER);
}
public ContainerCompletedWhileRunningTransition(TerminatedTransitionHelper helper) {
super(helper);
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
}
}
protected static class ContainerCompletedWhileTerminating implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.sendTaskAttemptCleanupEvent();
TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
protected static class TerminatedAfterSuccessHelper extends
TerminatedBeforeRunningTransition {
@Override
protected boolean sendSchedulerEvent() {
// since the success transition would have sent the event
// there is no need to send it again
return false;
}
public TerminatedAfterSuccessHelper(TerminatedTransitionHelper helper) {
super(helper);
}
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
}
}
protected static class RecoverTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent taskAttemptEvent) {
TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED;
switch(taskAttempt.recoveredState) {
case NEW:
case RUNNING:
// FIXME once running containers can be recovered, this
// should be handled differently
// TODO abort taskattempt
taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
getExternalState(TaskAttemptStateInternal.KILLED)));
endState = TaskAttemptStateInternal.KILLED;
break;
case SUCCEEDED:
// Do not inform Task as it already knows about completed attempts
endState = TaskAttemptStateInternal.SUCCEEDED;
break;
case FAILED:
// Do not inform Task as it already knows about completed attempts
endState = TaskAttemptStateInternal.FAILED;
break;
case KILLED:
// Do not inform Task as it already knows about completed attempts
endState = TaskAttemptStateInternal.KILLED;
break;
default:
throw new RuntimeException("Failed to recover from non-handled state"
+ ", taskAttemptId=" + taskAttempt.getID()
+ ", state=" + taskAttempt.recoveredState);
}
return endState;
}
}
protected static class TerminatedAfterSuccessTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) {
if (attempt.leafVertex) {
return TaskAttemptStateInternal.SUCCEEDED;
}
// TODO - TEZ-834. This assumes that the outputs were on that node
attempt.sendInputFailedToConsumers();
TaskAttemptImpl.TERMINATED_AFTER_SUCCESS_HELPER.transition(attempt, event);
return TaskAttemptStateInternal.KILLED;
}
}
protected static class OutputReportedFailedTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
TaskAttemptEvent event) {
TaskAttemptEventOutputFailed outputFailedEvent =
(TaskAttemptEventOutputFailed) event;
TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
TezTaskAttemptID failedDestTaId = tezEvent.getSourceInfo().getTaskAttemptID();
InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)tezEvent.getEvent();
int failedInputIndexOnDestTa = readErrorEvent.getIndex();
if (readErrorEvent.getVersion() != attempt.getID().getId()) {
throw new TezUncheckedException(attempt.getID()
+ " incorrectly blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa + " version"
+ readErrorEvent.getVersion());
}
LOG.info(attempt.getID()
+ " blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa);
attempt.uniquefailedOutputReports.add(failedDestTaId);
float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
/ outputFailedEvent.getConsumerTaskNumber();
// If needed we can also use the absolute number of reported output errors
// If needed we can launch a background task without failing this task
// to generate a copy of the output just in case.
// If needed we can consider only running consumer tasks
if (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION) {
return attempt.getInternalState();
}
String message = attempt.getID() + " being failed for too many output errors";
LOG.info(message);
attempt.addDiagnosticInfo(message);
// send input failed event
attempt.sendInputFailedToConsumers();
// Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
(new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
attempt, event);
return TaskAttemptStateInternal.FAILED;
} else {
(new TerminatedWhileRunningTransition(FAILED_HELPER)).transition(
attempt, event);
return TaskAttemptStateInternal.FAIL_IN_PROGRESS;
}
// TODO at some point. Nodes may be interested in FetchFailure info.
// Can be used to blacklist nodes.
}
}
@VisibleForTesting
protected void sendInputFailedToConsumers() {
Vertex vertex = getVertex();
Map<Vertex, Edge> edges = vertex.getOutputVertices();
if (edges != null && !edges.isEmpty()) {
List<TezEvent> tezIfEvents = Lists.newArrayListWithCapacity(edges.size());
for (Vertex edgeVertex : edges.keySet()) {
tezIfEvents.add(new TezEvent(new InputFailedEvent(),
new EventMetaData(EventProducerConsumerType.SYSTEM,
vertex.getName(),
edgeVertex.getName(),
getID())));
}
sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
}
}
private void trySetTerminationCause(TaskAttemptTerminationCause err) {
// keep only the first error cause
if (terminationCause == TaskAttemptTerminationCause.UNKNOWN_ERROR) {
terminationCause = err;
}
}
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
// result.phase = Phase.STARTING;
//result.stateString = "NEW";
result.state = TaskAttemptState.NEW;
//TezCounters counters = EMPTY_COUNTERS;
//result.counters = counters;
}
private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) {
diagnostics.add(diag);
}
}
protected interface TerminatedTransitionHelper {
public TaskAttemptStateInternal getTaskAttemptStateInternal();
public TaskAttemptState getTaskAttemptState();
public TaskEventType getTaskEventType();
}
protected static class FailedTransitionHelper implements
TerminatedTransitionHelper {
public TaskAttemptStateInternal getTaskAttemptStateInternal() {
return TaskAttemptStateInternal.FAILED;
}
@Override
public TaskAttemptState getTaskAttemptState() {
return TaskAttemptState.FAILED;
}
@Override
public TaskEventType getTaskEventType() {
return TaskEventType.T_ATTEMPT_FAILED;
}
}
protected static class KilledTransitionHelper implements
TerminatedTransitionHelper {
@Override
public TaskAttemptStateInternal getTaskAttemptStateInternal() {
return TaskAttemptStateInternal.KILLED;
}
@Override
public TaskAttemptState getTaskAttemptState() {
return TaskAttemptState.KILLED;
}
@Override
public TaskEventType getTaskEventType() {
return TaskEventType.T_ATTEMPT_KILLED;
}
}
@Override
public String toString() {
return getID().toString();
}
}