blob: cda62a4154a3da8846e620c50a99845ea273b402 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.querymaster;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.ResourceProtos.TaskCompletionReport;
import org.apache.tajo.ResourceProtos.ShuffleFileOutput;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.util.TUtil;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
private final static int EXPIRE_TIME = 15000;
private final TaskAttemptId id;
private final Task task;
final EventHandler eventHandler;
private WorkerConnectionInfo workerConnectionInfo;
private int expire;
private final Lock readLock;
private final Lock writeLock;
private final List<String> diagnostics = new ArrayList<String>();
private final TaskAttemptScheduleContext scheduleContext;
private float progress;
private CatalogProtos.TableStatsProto inputStats;
private CatalogProtos.TableStatsProto resultStats;
private Set<PartitionDescProto> partitions;
protected static final StateMachineFactory
<TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory = new StateMachineFactory
<TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptState.TA_NEW)
// Transitions from TA_NEW state
.addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
.addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
.addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
TaskAttemptEventType.TA_KILL,
new TaskKilledCompleteTransition())
// Transitions from TA_UNASSIGNED state
.addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
TaskAttemptEventType.TA_ASSIGNED,
new LaunchTransition())
.addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
TaskAttemptEventType.TA_KILL,
new KillUnassignedTaskTransition())
// Transitions from TA_ASSIGNED state
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
TaskAttemptEventType.TA_KILL,
new KillTaskTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
TaskAttemptEventType.TA_KILL,
new KillTaskTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED,
EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_UNASSIGNED,
TaskAttemptEventType.TA_ASSIGN_CANCEL, new CancelTransition())
// Transitions from TA_RUNNING state
.addTransition(TaskAttemptState.TA_RUNNING,
EnumSet.of(TaskAttemptState.TA_RUNNING),
TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
.addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
TaskAttemptEventType.TA_KILL,
new KillTaskTransition())
.addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
.addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
.addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
TaskAttemptEventType.TA_LOCAL_KILLED,
new TaskKilledCompleteTransition())
.addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
TaskAttemptEventType.TA_ASSIGNED,
new KillTaskTransition())
.addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
TaskAttemptEventType.TA_SCHEDULE_CANCELED,
new TaskKilledCompleteTransition())
.addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
TaskAttemptEventType.TA_DONE,
new TaskKilledCompleteTransition())
.addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR)
.addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
EnumSet.of(
TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
TaskAttemptEventType.TA_UPDATE))
// Transitions from TA_SUCCEEDED state
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_UPDATE)
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
// Ignore-able transitions
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_KILL)
// Transitions from TA_KILLED state
.addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
// Ignore-able transitions
.addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
EnumSet.of(
TaskAttemptEventType.TA_UPDATE))
.addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
EnumSet.of(
TaskAttemptEventType.TA_LOCAL_KILLED,
TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_DONE),
new TaskKilledCompleteTransition())
// Transitions from TA_FAILED state
.addTransition(TaskAttemptState.TA_FAILED, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_KILL)
.installTopology();
private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
stateMachine;
public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
final TaskAttemptId id, final Task task,
final EventHandler eventHandler) {
this.scheduleContext = scheduleContext;
this.id = id;
this.expire = TaskAttempt.EXPIRE_TIME;
this.task = task;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
stateMachine = stateMachineFactory.make(this);
this.partitions = TUtil.newHashSet();
}
public TaskAttemptState getState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
public TaskAttemptId getId() {
return this.id;
}
public boolean isLeafTask() {
return this.task.isLeafTask();
}
public Task getTask() {
return this.task;
}
public WorkerConnectionInfo getWorkerConnectionInfo() {
return this.workerConnectionInfo;
}
public synchronized void setExpireTime(int expire) {
this.expire = expire;
}
public synchronized void updateExpireTime(int period) {
this.setExpireTime(this.expire - period);
}
public synchronized void resetExpireTime() {
this.setExpireTime(TaskAttempt.EXPIRE_TIME);
}
public int getLeftTime() {
return this.expire;
}
public float getProgress() {
return progress;
}
public TableStats getInputStats() {
if (inputStats == null) {
return null;
}
return new TableStats(inputStats);
}
public TableStats getResultStats() {
if (resultStats == null) {
return null;
}
return new TableStats(resultStats);
}
public Set<PartitionDescProto> getPartitions() {
return partitions;
}
public void addPartitions(List<PartitionDescProto> partitions) {
this.partitions.addAll(partitions);
}
private void fillTaskStatistics(TaskCompletionReport report) {
this.progress = 1.0f;
List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
if (report.getShuffleFileOutputsCount() > 0) {
this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList());
PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(),
getId().getId(), p.getPartId(), host, p.getVolume());
partitions.add(entry);
}
}
this.getTask().setIntermediateData(partitions);
if (report.hasInputStats()) {
this.inputStats = report.getInputStats();
}
if (report.hasResultStats()) {
this.resultStats = report.getResultStats();
this.getTask().setStats(new TableStats(resultStats));
}
}
private static class TaskAttemptScheduleTransition implements
SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
taskAttempt.scheduleContext, taskAttempt));
}
}
private static class KillUnassignedTaskTransition implements
SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(),
taskAttempt.scheduleContext, taskAttempt));
}
}
private static class LaunchTransition
implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
if (!(event instanceof TaskAttemptAssignedEvent)) {
throw new IllegalArgumentException("event should be a TaskAttemptAssignedEvent type.");
}
TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
taskAttempt.eventHandler.handle(
new TaskTAttemptEvent(taskAttempt.getId(),
TaskEventType.T_ATTEMPT_LAUNCHED));
}
}
private static class CancelTransition
implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
taskAttempt.workerConnectionInfo = null;
}
}
private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(),
TaskEventType.T_ATTEMPT_KILLED));
LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
}
}
private static class StatusUpdateTransition
implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> {
@Override
public TaskAttemptState transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
if (!(event instanceof TaskAttemptStatusUpdateEvent)) {
throw new IllegalArgumentException("event should be a TaskAttemptStatusUpdateEvent type.");
}
TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
taskAttempt.progress = updateEvent.getStatus().getProgress();
taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
return TaskAttemptState.TA_RUNNING;
}
}
private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) {
diagnostics.add(diag);
}
}
private static class AlreadyAssignedTransition
implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent taskAttemptEvent) {
}
}
private static class AlreadyDoneTransition
implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent taskAttemptEvent) {
}
}
private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
if (!(event instanceof TaskCompletionEvent)) {
throw new IllegalArgumentException("event should be a TaskCompletionEvent type.");
}
TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
try {
if (report.getPartitionsCount() > 0) {
taskAttempt.addPartitions(report.getPartitionsList());
}
taskAttempt.fillTaskStatistics(report);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
} catch (Throwable t) {
taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
}
}
}
private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(),
taskAttempt.getWorkerConnectionInfo().getId(),
LocalTaskEventType.KILL));
}
}
private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
@Override
public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
if (!(event instanceof TaskFatalErrorEvent)) {
throw new IllegalArgumentException("event should be a TaskFatalErrorEvent type.");
}
TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+ " >> " + errorEvent.errorMessage());
}
}
@Override
public void handle(TaskAttemptEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
}
try {
writeLock.lock();
TaskAttemptState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
+ ", eventType:" + event.getType().name()
+ ", oldState:" + oldState.name()
+ ", nextState:" + getState().name()
, e);
eventHandler.handle(
new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
"Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
eventHandler.handle(
new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
StageEventType.SQ_INTERNAL_ERROR));
}
//notify the eventhandler of state change
if (LOG.isDebugEnabled()) {
if (oldState != getState()) {
LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+ getState());
}
}
}
finally {
writeLock.unlock();
}
}
}