/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tez.dag.app.dag.impl;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestTaskImpl {
  private static final Logger LOG = LoggerFactory.getLogger(TestTaskImpl.class);

  private int taskCounter = 0;
  private final int partition = 1;

  private Configuration conf;
  private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
  private TaskHeartbeatHandler taskHeartbeatHandler;
  private Credentials credentials;
  private Clock clock;
  private TaskLocationHint locationHint;

  private ApplicationId appId;
  private TezDAGID dagId;
  private TezVertexID vertexId;
  private AppContext appContext;
  private Resource taskResource;
  private Map<String, LocalResource> localResources;
  private Map<String, String> environment;
  private String javaOpts;
  private boolean leafVertex;
  private ContainerContext containerContext;
  private ContainerId mockContainerId;
  private Container mockContainer;
  private AMContainer mockAMContainer;
  private NodeId mockNodeId;
  private HistoryEventHandler mockHistoryHandler;

  private MockTaskImpl mockTask;
  private TaskSpec mockTaskSpec;
  private Vertex mockVertex;
  
  @SuppressWarnings("rawtypes")
  class TestEventHandler implements EventHandler<Event> {
    List<Event> events = new ArrayList<Event>();
    @Override
    public void handle(Event event) {
      events.add(event);
    }
  }
  private TestEventHandler eventHandler;

  @Before
  public void setup() {
    conf = new Configuration();
    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 4);
    taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class);
    taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
    credentials = new Credentials();
    clock = new SystemClock();
    locationHint = TaskLocationHint.createTaskLocationHint(null, null);

    appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
    dagId = TezDAGID.getInstance(appId, 1);
    vertexId = TezVertexID.getInstance(dagId, 1);
    appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
    when(appContext.getDAGRecoveryData()).thenReturn(null);
    appContext.setDAGRecoveryData(null);
    mockContainerId = mock(ContainerId.class);
    mockContainer = mock(Container.class);
    mockAMContainer = mock(AMContainer.class);
    when(mockAMContainer.getContainer()).thenReturn(mockContainer);
    when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:1234");
    mockNodeId = mock(NodeId.class);
    mockHistoryHandler = mock(HistoryEventHandler.class);
    when(mockContainer.getId()).thenReturn(mockContainerId);
    when(mockContainer.getNodeId()).thenReturn(mockNodeId);
    when(mockAMContainer.getContainer()).thenReturn(mockContainer);
    when(appContext.getAllContainers().get(mockContainerId)).thenReturn(mockAMContainer);
    when(appContext.getHistoryHandler()).thenReturn(mockHistoryHandler);
    taskResource = Resource.newInstance(1024, 1);
    localResources = new HashMap<String, LocalResource>();
    environment = new HashMap<String, String>();
    javaOpts = "";
    leafVertex = false;
    containerContext = new ContainerContext(localResources, credentials,
        environment, javaOpts);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
    eventHandler = new TestEventHandler();

    mockTask = new MockTaskImpl(vertexId, partition,
        eventHandler, conf, taskCommunicatorManagerInterface, clock,
        taskHeartbeatHandler, appContext, leafVertex,
        taskResource, containerContext, vertex);
    mockTaskSpec = mock(TaskSpec.class);
    mockVertex = mock(Vertex.class);
    ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
      .setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
    when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(conf));
  }

  private TezTaskID getNewTaskID() {
    TezTaskID taskID = TezTaskID.getInstance(vertexId, ++taskCounter);
    return taskID;
  }

  private void scheduleTaskAttempt(TezTaskID taskId) {
    mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
    assertTaskScheduledState();
    assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
    assertEquals(locationHint, mockTask.getTaskLocationHint());
  }

  private void scheduleTaskAttempt(TezTaskID taskId, TaskState expectedState) {
    mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
    assertEquals(expectedState, mockTask.getState());
    assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
    assertEquals(locationHint, mockTask.getTaskLocationHint());
  }

  private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
    EventMetaData eventMetaData = new EventMetaData();
    DataMovementEvent dmEvent = DataMovementEvent.create(null);
    TezEvent tezEvent = new TezEvent(dmEvent, eventMetaData);
    for (int i = 0; i < numTezEvents; i++) {
      mockTask.registerTezEvent(tezEvent);
    }
  }

  private void killTask(TezTaskID taskId) {
    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
    assertTaskKillWaitState();
  }

  private void failTask(TezTaskID taskId) {
    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
    assertTaskKillWaitState();
  }

  private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId) {
    return createTaskTAKilledEvent(taskAttemptId, null);
  }

  private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId,
                                                    TezAbstractEvent causalEvent) {
    return new TaskEventTAKilled(taskAttemptId, causalEvent);
  }

  private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId) {
    return createTaskTAFailedEvent(taskAttemptId, TaskFailureType.NON_FATAL, null);
  }

  private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId,
                                                    TaskFailureType taskFailureType,
                                                    TezAbstractEvent causalEvent) {
    return new TaskEventTAFailed(taskAttemptId, taskFailureType, causalEvent);
  }

  private TaskEventTALaunched createTaskTALauncherEvent(TezTaskAttemptID taskAttemptId) {
    return new TaskEventTALaunched(taskAttemptId);
  }

  private TaskEventTASucceeded createTaskTASucceededEvent(TezTaskAttemptID taskAttemptId) {
    return new TaskEventTASucceeded(taskAttemptId);
  }

  private TaskEvent createTaskTAAddSpecAttempt(TezTaskAttemptID taskAttemptId) {
    return new TaskEvent(taskAttemptId.getTaskID(), TaskEventType.T_ADD_SPEC_ATTEMPT);
  }

  private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
    mockTask.handle(createTaskTAKilledEvent(attemptId));
    assertTaskScheduledState();
  }

  private void launchTaskAttempt(TezTaskAttemptID attemptId) {
    mockTask.handle(createTaskTALauncherEvent(attemptId));
    assertTaskRunningState();
  }

  private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) {
    attempt.setProgress(p);
  }

  private void updateAttemptState(MockTaskAttemptImpl attempt,
      TaskAttemptState s) {
    attempt.setState(s);
  }

  private void killRunningTaskAttempt(TezTaskAttemptID attemptId) {
    mockTask.handle(createTaskTAKilledEvent(attemptId));
    assertTaskRunningState();
    verify(mockTask.getVertex(), times(1)).incrementKilledTaskAttemptCount();
  }

  private void failRunningTaskAttempt(TezTaskAttemptID attemptId) {
    failRunningTaskAttempt(attemptId, true);
  }

  private void failRunningTaskAttempt(TezTaskAttemptID attemptId, boolean verifyState) {
    int failedAttempts = mockTask.failedAttempts;
    mockTask.handle(createTaskTAFailedEvent(attemptId));
    if (verifyState) {
      assertTaskRunningState();
    }
    Assert.assertEquals(failedAttempts + 1, mockTask.failedAttempts);
    verify(mockTask.getVertex(), times(failedAttempts + 1)).incrementFailedTaskAttemptCount();
  }

  /**
   * {@link TaskState#NEW}
   */
  private void assertTaskNewState() {
    assertEquals(TaskState.NEW, mockTask.getState());
  }

  /**
   * {@link TaskState#SCHEDULED}
   */
  private void assertTaskScheduledState() {
    assertEquals(TaskState.SCHEDULED, mockTask.getState());
  }

  /**
   * {@link TaskState#RUNNING}
   */
  private void assertTaskRunningState() {
    assertEquals(TaskState.RUNNING, mockTask.getState());
  }

  /**
   * {@link org.apache.tez.dag.app.dag.TaskStateInternal#KILL_WAIT}
   */
  private void assertTaskKillWaitState() {
    assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
  }

  /**
   * {@link TaskState#SUCCEEDED}
   */
  private void assertTaskSucceededState() {
    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
  }

  @Test(timeout = 5000)
  public void testInit() {
    LOG.info("--- START: testInit ---");
    assertTaskNewState();
    assert (mockTask.getAttemptList().size() == 0);
  }

  @Test(timeout = 5000)
  /**
   * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
   */
  public void testScheduleTask() {
    LOG.info("--- START: testScheduleTask ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
  }

  @Test(timeout = 5000)
  /**
   * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
   */
  public void testKillScheduledTask() {
    LOG.info("--- START: testKillScheduledTask ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    killTask(taskId);
  }
  
  /**
   * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
   */
  @Test(timeout = 5000)
  public void testKillRunningTask() {
    LOG.info("--- START: testKillRunningTask ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    killTask(taskId);
    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
  }

  @Test(timeout = 5000)
  public void testTooManyFailedAtetmpts() {
    LOG.info("--- START: testTooManyFailedAttempts ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    failRunningTaskAttempt(mockTask.getLastAttempt().getID());

    scheduleTaskAttempt(taskId, TaskState.RUNNING);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    failRunningTaskAttempt(mockTask.getLastAttempt().getID());

    scheduleTaskAttempt(taskId, TaskState.RUNNING);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    failRunningTaskAttempt(mockTask.getLastAttempt().getID());

    scheduleTaskAttempt(taskId, TaskState.RUNNING);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    failRunningTaskAttempt(mockTask.getLastAttempt().getID(), false);

    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
  }

  @Test(timeout = 5000)
  public void testFailedAttemptWithFatalError() {
    LOG.info("--- START: testFailedAttemptWithFatalError ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    mockTask.handle(
        createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.FATAL, null));

    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
    assertEquals(1, mockTask.failedAttempts);
    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
  }

  @Test(timeout = 5000)
  public void testKillRunningTaskPreviousKilledAttempts() {
    LOG.info("--- START: testKillRunningTaskPreviousKilledAttempts ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    killRunningTaskAttempt(mockTask.getLastAttempt().getID());
    assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
    killTask(taskId);
    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));

    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
  }

  /**
   * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
   */
  @Test(timeout = 5000)
  public void testKillRunningTaskButAttemptSucceeds() {
    LOG.info("--- START: testKillRunningTaskButAttemptSucceeds ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    killTask(taskId);
    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
  }
  
  /**
   * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
   */
  @Test(timeout = 5000)
  public void testKillRunningTaskButAttemptFails() {
    LOG.info("--- START: testKillRunningTaskButAttemptFails ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    killTask(taskId);
    mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID()));
    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
  }

  @Test(timeout = 5000)
  /**
   * Kill attempt
   * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
   */
  public void testKillScheduledTaskAttempt() {
    LOG.info("--- START: testKillScheduledTaskAttempt ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
    killScheduledTaskAttempt(mockTask.getLastAttempt().getID());
    // last killed attempt should be causal TA of next attempt
    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
  }

  @Test(timeout = 5000)
  /**
   * Launch attempt
   * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
   */
  public void testLaunchTaskAttempt() {
    LOG.info("--- START: testLaunchTaskAttempt ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
  }

  @Test(timeout = 5000)
  /**
   * Kill running attempt
   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
   */
  public void testKillRunningTaskAttempt() {
    LOG.info("--- START: testKillRunningTaskAttempt ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    killRunningTaskAttempt(mockTask.getLastAttempt().getID());
    // last killed attempt should be causal TA of next attempt
    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
  }

  @Test(timeout = 5000)
  /**
   * Kill running attempt
   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
   */
  public void testKillTaskAttemptServiceBusy() {
    LOG.info("--- START: testKillTaskAttemptServiceBusy ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    mockTask.handle(createTaskTAKilledEvent(
        mockTask.getLastAttempt().getID(), new ServiceBusyEvent()));
    assertTaskRunningState();
    verify(mockTask.getVertex(), times(0)).incrementKilledTaskAttemptCount();
    verify(mockTask.getVertex(), times(1)).incrementRejectedTaskAttemptCount();
  }

  /**
   * {@link TaskState#KILLED}->{@link TaskState#KILLED}
   */
  @Test(timeout = 5000)
  public void testKilledAttemptAtTaskKilled() {
    LOG.info("--- START: testKilledAttemptAtTaskKilled ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    killTask(taskId);
    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());

    // Send duplicate kill for same attempt
    // This will not happen in practice but this is to simulate handling
    // of killed attempts in killed state.
    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());

  }

  /**
   * {@link TaskState#FAILED}->{@link TaskState#FAILED}
   */
  @Test(timeout = 5000)
  public void testKilledAttemptAtTaskFailed() {
    LOG.info("--- START: testKilledAttemptAtTaskFailed ---");
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    for (int i = 0; i < mockTask.maxFailedAttempts; ++i) {
      mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID()));
    }
    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());

    // Send kill for an attempt
    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());

  }



  @Test(timeout = 5000)
  public void testFetchedEventsModifyUnderlyingList() {
    // Tests to ensure that adding an event to a task, does not affect the
    // result of past getTaskAttemptTezEvents calls.
    List<TezEvent> fetchedList;
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    sendTezEventsToTask(taskId, 2);
    TezTaskAttemptID attemptID = mockTask.getAttemptList().iterator().next()
        .getID();
    fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
    assertEquals(2, fetchedList.size());

    // Add events, make sure underlying list is the same, and no exceptions are
    // thrown while accessing the previous list
    sendTezEventsToTask(taskId, 4);
    assertEquals(2, fetchedList.size());

    fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
    assertEquals(6, fetchedList.size());
  }

  @Test(timeout = 5000)
  public void testTaskProgress() {
    LOG.info("--- START: testTaskProgress ---");

    // launch task
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    float progress = 0f;
    assert (mockTask.getProgress() == progress);
    launchTaskAttempt(mockTask.getLastAttempt().getID());

    // update attempt1
    progress = 50f;
    updateAttemptProgress(mockTask.getLastAttempt(), progress);
    assert (mockTask.getProgress() == progress);
    progress = 100f;
    updateAttemptProgress(mockTask.getLastAttempt(), progress);
    assert (mockTask.getProgress() == progress);

    progress = 0f;
    // mark first attempt as killed
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.KILLED);
    assert (mockTask.getProgress() == progress);

    // kill first attempt
    // should trigger a new attempt
    // as no successful attempts
    failRunningTaskAttempt(mockTask.getLastAttempt().getID());
    assert (mockTask.getAttemptList().size() == 2);
    assertEquals(1, mockTask.failedAttempts);
    verify(mockTask.getVertex(), times(1)).incrementFailedTaskAttemptCount();

    assert (mockTask.getProgress() == 0f);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    progress = 50f;
    updateAttemptProgress(mockTask.getLastAttempt(), progress);
    assert (mockTask.getProgress() == progress);
  }

  @Test(timeout = 5000)
  public void testFailureDuringTaskAttemptCommit() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
    assertTrue("First attempt should commit",
        mockTask.canCommit(mockTask.getLastAttempt().getID()));

    // During the task attempt commit there is an exception which causes
    // the attempt to fail
    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
    assertEquals(1, mockTask.getAttemptList().size());
    failRunningTaskAttempt(mockTask.getLastAttempt().getID());

    assertEquals(2, mockTask.getAttemptList().size());
    assertEquals(1, mockTask.failedAttempts);
    // last failed attempt should be the causal TA
    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());

    assertFalse("First attempt should not commit",
        mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
    assertTrue("Second attempt should commit",
        mockTask.canCommit(mockTask.getLastAttempt().getID()));

    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));

    assertTaskSucceededState();
  }
  

  @Test(timeout = 5000)
  public void testEventBacklogDuringTaskAttemptCommit() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    assertEquals(TaskState.SCHEDULED, mockTask.getState());
    // simulate
    // task in scheduled state due to event backlog - real task done and calling canCommit
    assertFalse("Commit should return false to make running task wait",
        mockTask.canCommit(mockTask.getLastAttempt().getID()));
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
    assertTrue("Task state in AM is running now. Can commit.",
        mockTask.canCommit(mockTask.getLastAttempt().getID()));

    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));

    assertTaskSucceededState();
  }


  @Test(timeout = 5000)
  public void testChangeCommitTaskAttempt() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
    
    // Add a speculative task attempt that succeeds
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
    
    assertEquals(2, mockTask.getAttemptList().size());
    
    // previous running attempt should be the casual TA of this speculative attempt
    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
    
    assertTrue("Second attempt should commit",
        mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
    assertFalse("First attempt should not commit",
        mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));

    // During the task attempt commit there is an exception which causes
    // the second attempt to fail
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
    failRunningTaskAttempt(mockTask.getLastAttempt().getID());

    assertEquals(2, mockTask.getAttemptList().size());
    
    assertFalse("Second attempt should not commit",
        mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
    assertTrue("First attempt should commit",
        mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));

    updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED);
    mockTask.handle(createTaskTASucceededEvent(mockTask.getAttemptList().get(0).getID()));

    assertTaskSucceededState();
  }
  
  @SuppressWarnings("rawtypes")
  @Test(timeout = 5000)
  public void testTaskSucceedAndRetroActiveFailure() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);

    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));

    // The task should now have succeeded
    assertTaskSucceededState();
    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
        eq(mockTask.getLastAttempt().getID().getId()));

    ArgumentCaptor<DAGHistoryEvent> argumentCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
    verify(mockHistoryHandler).handle(argumentCaptor.capture());
    DAGHistoryEvent dagHistoryEvent = argumentCaptor.getValue();
    HistoryEvent historyEvent = dagHistoryEvent.getHistoryEvent();
    assertTrue(historyEvent instanceof TaskFinishedEvent);
    TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent)historyEvent;
    assertEquals(taskFinishedEvent.getFinishTime(), mockTask.getFinishTime());

    eventHandler.events.clear();
    // Now fail the attempt after it has succeeded
    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
    TezEvent mockTezEvent = mock(TezEvent.class);
    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
    when(mockTezEvent.getSourceInfo()).thenReturn(meta);
    TaskAttemptEventOutputFailed outputFailedEvent = 
        new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
    mockTask.handle(
        createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.NON_FATAL,
            outputFailedEvent));

    // The task should still be in the scheduled state
    assertTaskScheduledState();
    Event event = eventHandler.events.get(0);
    Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
    event = eventHandler.events.get(eventHandler.events.size()-1);
    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
    
    // report of output read error should be the causal TA
    List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
    Assert.assertEquals(2, attempts.size());
    MockTaskAttemptImpl newAttempt = attempts.get(1);
    Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
  }

  @SuppressWarnings("rawtypes")
  @Test(timeout = 5000)
  public void testTaskSucceedAndRetroActiveKilled() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(mockTask.getLastAttempt().getID());
    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);

    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));

    // The task should now have succeeded
    assertTaskSucceededState();
    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
        eq(mockTask.getLastAttempt().getID().getId()));

    eventHandler.events.clear();
    // Now kill the attempt after it has succeeded
    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));

    // The task should still be in the scheduled state
    assertTaskScheduledState();
    Event event = eventHandler.events.get(0);
    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
  }

  @Test(timeout = 5000)
  public void testDiagnostics_KillNew(){
    TezTaskID taskId = getNewTaskID();
    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
    assertEquals(1, mockTask.getDiagnostics().size());
    assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
    assertEquals(0, mockTask.taskStartedEventLogged);
    assertEquals(1, mockTask.taskFinishedEventLogged);
  }
  
  @Test(timeout = 5000)
  public void testDiagnostics_Kill(){
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
    assertEquals(1, mockTask.getDiagnostics().size());
    assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
  }

  @Test(timeout = 20000)
  public void testFailedThenSpeculativeFailed() {
    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
    mockTask = new MockTaskImpl(vertexId, partition,
        eventHandler, conf, taskCommunicatorManagerInterface, clock,
        taskHeartbeatHandler, appContext, leafVertex,
        taskResource, containerContext, vertex);
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);

    // Add a speculative task attempt
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
    assertEquals(2, mockTask.getAttemptList().size());

    // Fail the first attempt
    updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
    mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID()));
    assertEquals(TaskState.FAILED, mockTask.getState());
    assertEquals(2, mockTask.getAttemptList().size());

    // Now fail the speculative attempt
    updateAttemptState(specAttempt, TaskAttemptState.FAILED);
    mockTask.handle(createTaskTAFailedEvent(specAttempt.getID()));
    assertEquals(TaskState.FAILED, mockTask.getState());
    assertEquals(2, mockTask.getAttemptList().size());
  }

  @Test(timeout = 20000)
  public void testFailedThenSpeculativeSucceeded() {
    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
    mockTask = new MockTaskImpl(vertexId, partition,
        eventHandler, conf, taskCommunicatorManagerInterface, clock,
        taskHeartbeatHandler, appContext, leafVertex,
        taskResource, containerContext, vertex);
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);

    // Add a speculative task attempt
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
    assertEquals(2, mockTask.getAttemptList().size());

    // Fail the first attempt
    updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
    mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID()));
    assertEquals(TaskState.FAILED, mockTask.getState());
    assertEquals(2, mockTask.getAttemptList().size());

    // Now succeed the speculative attempt
    updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
    mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
    assertEquals(TaskState.FAILED, mockTask.getState());
    assertEquals(2, mockTask.getAttemptList().size());
  }

  @Test
  public void testKilledBeforeSpeculatedSucceeded() {
    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
    mockTask = new MockTaskImpl(vertexId, partition,
        eventHandler, conf, taskCommunicatorManagerInterface, clock,
        taskHeartbeatHandler, appContext, leafVertex,
        taskResource, containerContext, vertex);
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);

    mockTask.handle(createTaskTAKilledEvent(firstAttempt.getID()));
    assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());

    // We need to manually override the current node id
    // to induce NPE in the state machine transition
    // simulating killed before speculated scenario
    NodeId nodeId = mockNodeId;
    mockNodeId = null;

    // Add a speculative task attempt
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    mockNodeId = nodeId;
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
    assertEquals(3, mockTask.getAttemptList().size());

    // Now succeed the speculative attempt
    updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
    mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
    assertEquals(3, mockTask.getAttemptList().size());
  }

  @Test(timeout = 20000)
  public void testKilledAttemptUpdatesDAGScheduler() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);

    // Add a speculative task attempt
    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
    assertEquals(2, mockTask.getAttemptList().size());

    // Have the first task succeed
    eventHandler.events.clear();
    mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID()));
    verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
        VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED);

    // The task should now have succeeded and sent kill to other attempt
    assertTaskSucceededState();
    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
        eq(firstAttempt.getID().getId()));
    @SuppressWarnings("rawtypes")
    Event event = eventHandler.events.get(eventHandler.events.size()-1);
    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
    assertEquals(specAttempt.getID(),
        ((TaskAttemptEventKillRequest) event).getTaskAttemptID());

    eventHandler.events.clear();
    // Emulate the spec attempt being killed
    mockTask.handle(createTaskTAKilledEvent(specAttempt.getID()));
    assertTaskSucceededState();
    verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
        VertexEventType.V_TASK_ATTEMPT_COMPLETED);
  }

  @Test(timeout = 20000)
  public void testSpeculatedThenRetroactiveFailure() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);

    // Add a speculative task attempt
    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
    assertEquals(2, mockTask.getAttemptList().size());

    // Have the first task succeed
    eventHandler.events.clear();
    mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID()));

    // The task should now have succeeded and sent kill to other attempt
    assertTaskSucceededState();
    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
        eq(firstAttempt.getID().getId()));
    @SuppressWarnings("rawtypes")
    Event event = eventHandler.events.get(eventHandler.events.size()-1);
    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
    assertEquals(specAttempt.getID(),
        ((TaskAttemptEventKillRequest) event).getTaskAttemptID());

    // Emulate the spec attempt being killed
    mockTask.handle(createTaskTAKilledEvent(specAttempt.getID()));
    assertTaskSucceededState();

    // Now fail the attempt after it has succeeded
    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
    TezEvent mockTezEvent = mock(TezEvent.class);
    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
    when(mockTezEvent.getSourceInfo()).thenReturn(meta);
    TaskAttemptEventOutputFailed outputFailedEvent =
        new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
    eventHandler.events.clear();
    mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent));

    // The task should still be in the scheduled state
    assertTaskScheduledState();
    event = eventHandler.events.get(eventHandler.events.size()-1);
    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());

    // There should be a new attempt, and report of output read error
    // should be the causal TA
    List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
    Assert.assertEquals(3, attempts.size());
    MockTaskAttemptImpl newAttempt = attempts.get(2);
    Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
  }

  @Test(timeout = 20000)
  public void testIgnoreSpeculationOnSuccessfulOriginalAttempt() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    // Mock success of the first task attempt
    updateAttemptState(firstAttempt, TaskAttemptState.SUCCEEDED);
    firstAttempt.handle(new TaskAttemptEvent(firstAttempt.getID(), TaskAttemptEventType.TA_DONE));

    // Verify the speculation scheduling is ignored and no speculative attempt was added to the task
    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    assertEquals(1, mockTask.getAttemptList().size());
  }

  @Test(timeout = 20000)
  public void testIgnoreSpeculationAfterOriginalAttemptCommit() {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstAttempt.getID());
    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
    // Mock commit of the first task attempt
    mockTask.canCommit(firstAttempt.getID());

    // Verify the speculation scheduling is ignored and no speculative attempt was added to the task
    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(specAttempt.getID());
    assertEquals(1, mockTask.getAttemptList().size());
  }

  @SuppressWarnings("rawtypes")
  @Test
  public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
    launchTaskAttempt(firstMockTaskAttempt.getID());
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
    launchTaskAttempt(secondMockTaskAttempt.getID());

    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));

    secondMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
    secondMockTaskAttempt.handle(
        new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
        TaskAttemptEventType.TA_DONE));
    firstMockTaskAttempt.handle(
        new TaskAttemptEvent(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
        TaskAttemptEventType.TA_DONE));

    mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
    mockTask.handle(new TaskEventTASucceeded(firstMockTaskAttempt.getID()));
    assertTrue("Attempts should have succeeded!",
        firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED
        && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED);
    assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
    assertTrue("Task should have Succeeded!", mockTask.getState() == TaskState.SUCCEEDED);
    //Failing the attempt that finished after the task was marked succeeded, should not schedule another attempt
    failAttempt(firstMockTaskAttempt, 0, 0);
    assertTaskSucceededState();
    //Failing the attempt that allowed the task to succeed, should schedule another attempt
    failAttempt(secondMockTaskAttempt, 1, 1);
    assertTaskScheduledState();
  }

  @SuppressWarnings("rawtypes")
  @Test
  public void testFailedAttemptStatus() throws InterruptedException {
    Configuration newConf = new Configuration(conf);
    newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
    mockTask = new MockTaskImpl(vertexId, partition,
      eventHandler, conf, taskCommunicatorManagerInterface, clock,
      taskHeartbeatHandler, appContext, leafVertex,
      taskResource, containerContext, vertex);
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
    launchTaskAttempt(firstMockTaskAttempt.getID());
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
    launchTaskAttempt(secondMockTaskAttempt.getID());

    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));

    secondMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
    secondMockTaskAttempt.handle(
        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
        TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
        TaskAttemptTerminationCause.NO_PROGRESS));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
        TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
        TaskAttemptTerminationCause.NO_PROGRESS));

    firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
        firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
    secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
        secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
    mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
        mock(TaskAttemptEvent.class)));
    mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
        mock(TaskAttemptEvent.class)));
    assertTrue("Attempts should have failed!",
        firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
        && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
    assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
    assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
  }

  @SuppressWarnings("rawtypes")
  @Test (timeout = 10000L)
  public void testSucceededLeafTaskWithRetroFailures() throws InterruptedException {
    Configuration newConf = new Configuration(conf);
    newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
    mockTask = new MockTaskImpl(vertexId, partition,
        eventHandler, conf, taskCommunicatorManagerInterface, clock,
        taskHeartbeatHandler, appContext, true,
        taskResource, containerContext, vertex);
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
    launchTaskAttempt(firstMockTaskAttempt.getID());
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
    launchTaskAttempt(secondMockTaskAttempt.getID());

    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));

    secondMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
    secondMockTaskAttempt.handle(
        new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
            TaskAttemptEventType.TA_DONE));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
            TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
            TaskAttemptTerminationCause.CONTAINER_EXITED));

    mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
    firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
        firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));

    InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 0);
    TezTaskAttemptID mockDestId = firstMockTaskAttempt.getID();
    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
    TezEvent tzEvent = new TezEvent(mockReEvent, meta);
    TaskAttemptEventOutputFailed outputFailedEvent =
        new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
    firstMockTaskAttempt.handle(outputFailedEvent);
    mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
        mock(TaskAttemptEvent.class)));
    Assert.assertEquals(mockTask.getInternalState(), TaskStateInternal.SUCCEEDED);
  }

  private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts) {
    InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index);
    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
    TezEvent tzEvent = new TezEvent(mockReEvent, meta);
    TaskAttemptEventOutputFailed outputFailedEvent =
        new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
    taskAttempt.handle(
        outputFailedEvent);
    TaskEvent tEventFail1 = new TaskEventTAFailed(taskAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent);
    mockTask.handle(tEventFail1);
    assertEquals("Unexpected number of incomplete attempts!",
        expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount());
  }

  @Test (timeout = 30000)
  public void testFailedTaskTransitionWithLaunchedAttempt() throws InterruptedException {
    Configuration newConf = new Configuration(conf);
    newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
    Vertex vertex = mock(Vertex.class);
    doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
    mockTask = new MockTaskImpl(vertexId, partition,
        eventHandler, conf, taskCommunicatorManagerInterface, clock,
        taskHeartbeatHandler, appContext, leafVertex,
        taskResource, containerContext, vertex);
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstMockTaskAttempt.getID());
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(secondMockTaskAttempt.getID());

    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));

    secondMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
    secondMockTaskAttempt.handle(
        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
            TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
            TaskAttemptTerminationCause.NO_PROGRESS));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
            TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
            TaskAttemptTerminationCause.NO_PROGRESS));

    firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
        firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
    secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
        secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
    mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
        mock(TaskAttemptEvent.class)));
    mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
        mock(TaskAttemptEvent.class)));
    assertTrue("Attempts should have failed!",
        firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
            && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
    assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
    assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt();
    mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID()));
  }

  @Test (timeout = 30000)
  public void testKilledTaskTransitionWithLaunchedAttempt() throws InterruptedException {
    TezTaskID taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(firstMockTaskAttempt.getID());
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt();
    launchTaskAttempt(secondMockTaskAttempt.getID());

    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));

    secondMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
    mockTask.handle(new TaskEventTermination(mockTask.getTaskId(),
        TaskAttemptTerminationCause.FRAMEWORK_ERROR, "test"));
    secondMockTaskAttempt.handle(
        new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),"test",
            TaskAttemptTerminationCause.FRAMEWORK_ERROR));
    mockTask.handle(new TaskEventTAKilled(secondMockTaskAttempt.getID(),
        new TaskAttemptEvent(secondMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED)));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test",
            TaskAttemptTerminationCause.FRAMEWORK_ERROR));
    mockTask.handle(new TaskEventTAKilled(firstMockTaskAttempt.getID(),
        new TaskAttemptEvent(firstMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED)));
    firstMockTaskAttempt.handle(
        new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test",
            TaskAttemptTerminationCause.FRAMEWORK_ERROR));
    assertEquals("Task should have been killed!", mockTask.getInternalState(), TaskStateInternal.KILLED);
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt();
    mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID()));
    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
    MockTaskAttemptImpl fourthMockTaskAttempt = mockTask.getLastAttempt();
    mockTask.handle(createTaskTASucceededEvent(fourthMockTaskAttempt.getID()));
    MockTaskAttemptImpl fifthMockTaskAttempt = mockTask.getLastAttempt();
    mockTask.handle(createTaskTAFailedEvent(fifthMockTaskAttempt.getID()));
  }

  // TODO Add test to validate the correct commit attempt.


  /* Verifies that the specified event types, exist. Does not ensure they are the only ones, however */
  private void verifyOutgoingEvents(List<Event> events,
                                    Enum<?>... expectedTypes) {

    List<Enum<?>> expectedTypeList = new LinkedList<Enum<?>>();
    for (Enum<?> expectedType : expectedTypes) {
      expectedTypeList.add(expectedType);
    }
    for (Event event : events) {
      Iterator<Enum<?>> typeIter = expectedTypeList.iterator();
      while (typeIter.hasNext()) {
        Enum<?> expectedType = typeIter.next();
        if (event.getType() == expectedType) {
          typeIter.remove();
          // Move to the next event.
          break;
        }
      }
    }
    assertTrue("Did not find types : " + expectedTypeList
        + " in outgoing event list", expectedTypeList.isEmpty());
  }

  @SuppressWarnings("rawtypes")
  private class MockTaskImpl extends TaskImpl {

    public int taskStartedEventLogged = 0;
    public int taskFinishedEventLogged = 0;

    private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
    private Vertex vertex;

    public MockTaskImpl(TezVertexID vertexId, int partition,
        EventHandler eventHandler, Configuration conf,
        TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
        TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
        Resource resource,
        ContainerContext containerContext, Vertex vertex) {
      super(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface,
          clock, thh, appContext, leafVertex, resource,
          containerContext, mock(StateChangeNotifier.class), vertex);
      this.vertex = vertex;
    }

    @Override
    protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
          TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
          eventHandler, taskCommunicatorManagerInterface,
          conf, clock, taskHeartbeatHandler, appContext,
          true, taskResource, containerContext, schedCausalTA);
      taskAttempts.add(attempt);
      return attempt;
    }

    @Override
    protected void internalError(TaskEventType type) {
      super.internalError(type);
      fail("Internal error: " + type);
    }

    MockTaskAttemptImpl getLastAttempt() {
      return taskAttempts.get(taskAttempts.size() - 1);
    }

    List<MockTaskAttemptImpl> getAttemptList() {
      return taskAttempts;
    }
    
    @Override
    public Vertex getVertex() {
      return vertex;
    }

    protected void logJobHistoryTaskStartedEvent() {
      taskStartedEventLogged++;
    }

    protected void logJobHistoryTaskFinishedEvent() {
      super.logJobHistoryTaskFinishedEvent();
      taskFinishedEventLogged++;
    }

    protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
      taskFinishedEventLogged++;
    }
  }

  @SuppressWarnings("rawtypes")
  public class MockTaskAttemptImpl extends TaskAttemptImpl {

    private float progress = 0;
    private TaskAttemptState state = TaskAttemptState.NEW;

    public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
        EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
        Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
        boolean isRescheduled,
        Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
      super(attemptId, eventHandler, tal, conf, clock, thh,
          appContext, isRescheduled, resource, containerContext, false, mockTask,
          locationHint, mockTaskSpec, schedCausalTA);
    }

    @Override
    protected Vertex getVertex() {
      return mockVertex;
    }

    @Override
    public float getProgress() {
      return progress;
    }

    public void setProgress(float progress) {
      this.progress = progress;
    }

    public void setState(TaskAttemptState state) {
      this.state = state;
    }

    @Override
    public TaskAttemptState getState() {
      return state;
    }
    
    @Override
    public TaskAttemptState getStateNoLock() {
      return state;
    }
    
    @Override
    public ContainerId getAssignedContainerID() {
      return mockContainerId;
    }

    @Override
    public NodeId getNodeId() {
      return mockNodeId;
    }
  }

  public class ServiceBusyEvent extends TezAbstractEvent<TaskAttemptEventType>
     implements TaskAttemptEventTerminationCauseEvent {
    public ServiceBusyEvent() {
      super(TaskAttemptEventType.TA_KILLED);
    }

    @Override
    public TaskAttemptTerminationCause getTerminationCause() {
      return TaskAttemptTerminationCause.SERVICE_BUSY;
    }
  }
}

