blob: 1af6092d1ecf5775c10bd390eb0a568f3eecb28d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestTaskImpl {
private static final Logger LOG = LoggerFactory.getLogger(TestTaskImpl.class);
private int taskCounter = 0;
private final int partition = 1;
private Configuration conf;
private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private TaskHeartbeatHandler taskHeartbeatHandler;
private Credentials credentials;
private Clock clock;
private TaskLocationHint locationHint;
private ApplicationId appId;
private TezDAGID dagId;
private TezVertexID vertexId;
private AppContext appContext;
private Resource taskResource;
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
private String javaOpts;
private boolean leafVertex;
private ContainerContext containerContext;
private ContainerId mockContainerId;
private Container mockContainer;
private AMContainer mockAMContainer;
private NodeId mockNodeId;
private HistoryEventHandler mockHistoryHandler;
private MockTaskImpl mockTask;
private TaskSpec mockTaskSpec;
private Vertex mockVertex;
@SuppressWarnings("rawtypes")
class TestEventHandler implements EventHandler<Event> {
List<Event> events = new ArrayList<Event>();
@Override
public void handle(Event event) {
events.add(event);
}
}
private TestEventHandler eventHandler;
@Before
public void setup() {
conf = new Configuration();
conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 4);
taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class);
taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
credentials = new Credentials();
clock = new SystemClock();
locationHint = TaskLocationHint.createTaskLocationHint(null, null);
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
dagId = TezDAGID.getInstance(appId, 1);
vertexId = TezVertexID.getInstance(dagId, 1);
appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
when(appContext.getDAGRecoveryData()).thenReturn(null);
appContext.setDAGRecoveryData(null);
mockContainerId = mock(ContainerId.class);
mockContainer = mock(Container.class);
mockAMContainer = mock(AMContainer.class);
when(mockAMContainer.getContainer()).thenReturn(mockContainer);
when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:1234");
mockNodeId = mock(NodeId.class);
mockHistoryHandler = mock(HistoryEventHandler.class);
when(mockContainer.getId()).thenReturn(mockContainerId);
when(mockContainer.getNodeId()).thenReturn(mockNodeId);
when(mockAMContainer.getContainer()).thenReturn(mockContainer);
when(appContext.getAllContainers().get(mockContainerId)).thenReturn(mockAMContainer);
when(appContext.getHistoryHandler()).thenReturn(mockHistoryHandler);
taskResource = Resource.newInstance(1024, 1);
localResources = new HashMap<String, LocalResource>();
environment = new HashMap<String, String>();
javaOpts = "";
leafVertex = false;
containerContext = new ContainerContext(localResources, credentials,
environment, javaOpts);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
eventHandler = new TestEventHandler();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
mockTaskSpec = mock(TaskSpec.class);
mockVertex = mock(Vertex.class);
ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
.setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(conf));
}
private TezTaskID getNewTaskID() {
TezTaskID taskID = TezTaskID.getInstance(vertexId, ++taskCounter);
return taskID;
}
private void scheduleTaskAttempt(TezTaskID taskId) {
mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
assertTaskScheduledState();
assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
assertEquals(locationHint, mockTask.getTaskLocationHint());
}
private void scheduleTaskAttempt(TezTaskID taskId, TaskState expectedState) {
mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
assertEquals(expectedState, mockTask.getState());
assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
assertEquals(locationHint, mockTask.getTaskLocationHint());
}
private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
EventMetaData eventMetaData = new EventMetaData();
DataMovementEvent dmEvent = DataMovementEvent.create(null);
TezEvent tezEvent = new TezEvent(dmEvent, eventMetaData);
for (int i = 0; i < numTezEvents; i++) {
mockTask.registerTezEvent(tezEvent);
}
}
private void killTask(TezTaskID taskId) {
mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
private void failTask(TezTaskID taskId) {
mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId) {
return createTaskTAKilledEvent(taskAttemptId, null);
}
private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId,
TezAbstractEvent causalEvent) {
return new TaskEventTAKilled(taskAttemptId, causalEvent);
}
private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId) {
return createTaskTAFailedEvent(taskAttemptId, TaskFailureType.NON_FATAL, null);
}
private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId,
TaskFailureType taskFailureType,
TezAbstractEvent causalEvent) {
return new TaskEventTAFailed(taskAttemptId, taskFailureType, causalEvent);
}
private TaskEventTALaunched createTaskTALauncherEvent(TezTaskAttemptID taskAttemptId) {
return new TaskEventTALaunched(taskAttemptId);
}
private TaskEventTASucceeded createTaskTASucceededEvent(TezTaskAttemptID taskAttemptId) {
return new TaskEventTASucceeded(taskAttemptId);
}
private TaskEvent createTaskTAAddSpecAttempt(TezTaskAttemptID taskAttemptId) {
return new TaskEvent(taskAttemptId.getTaskID(), TaskEventType.T_ADD_SPEC_ATTEMPT);
}
private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
mockTask.handle(createTaskTAKilledEvent(attemptId));
assertTaskScheduledState();
}
private void launchTaskAttempt(TezTaskAttemptID attemptId) {
mockTask.handle(createTaskTALauncherEvent(attemptId));
assertTaskRunningState();
}
private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) {
attempt.setProgress(p);
}
private void updateAttemptState(MockTaskAttemptImpl attempt,
TaskAttemptState s) {
attempt.setState(s);
}
private void killRunningTaskAttempt(TezTaskAttemptID attemptId) {
mockTask.handle(createTaskTAKilledEvent(attemptId));
assertTaskRunningState();
verify(mockTask.getVertex(), times(1)).incrementKilledTaskAttemptCount();
}
private void failRunningTaskAttempt(TezTaskAttemptID attemptId) {
failRunningTaskAttempt(attemptId, true);
}
private void failRunningTaskAttempt(TezTaskAttemptID attemptId, boolean verifyState) {
int failedAttempts = mockTask.failedAttempts;
mockTask.handle(createTaskTAFailedEvent(attemptId));
if (verifyState) {
assertTaskRunningState();
}
Assert.assertEquals(failedAttempts + 1, mockTask.failedAttempts);
verify(mockTask.getVertex(), times(failedAttempts + 1)).incrementFailedTaskAttemptCount();
}
/**
* {@link TaskState#NEW}
*/
private void assertTaskNewState() {
assertEquals(TaskState.NEW, mockTask.getState());
}
/**
* {@link TaskState#SCHEDULED}
*/
private void assertTaskScheduledState() {
assertEquals(TaskState.SCHEDULED, mockTask.getState());
}
/**
* {@link TaskState#RUNNING}
*/
private void assertTaskRunningState() {
assertEquals(TaskState.RUNNING, mockTask.getState());
}
/**
* {@link org.apache.tez.dag.app.dag.TaskStateInternal#KILL_WAIT}
*/
private void assertTaskKillWaitState() {
assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
}
/**
* {@link TaskState#SUCCEEDED}
*/
private void assertTaskSucceededState() {
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
}
@Test(timeout = 5000)
public void testInit() {
LOG.info("--- START: testInit ---");
assertTaskNewState();
assert (mockTask.getAttemptList().size() == 0);
}
@Test(timeout = 5000)
/**
* {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
*/
public void testScheduleTask() {
LOG.info("--- START: testScheduleTask ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
}
@Test(timeout = 5000)
/**
* {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
*/
public void testKillScheduledTask() {
LOG.info("--- START: testKillScheduledTask ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
killTask(taskId);
}
/**
* {@link TaskState#RUNNING}->{@link TaskState#KILLED}
*/
@Test(timeout = 5000)
public void testKillRunningTask() {
LOG.info("--- START: testKillRunningTask ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
killTask(taskId);
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
}
@Test(timeout = 5000)
public void testTooManyFailedAtetmpts() {
LOG.info("--- START: testTooManyFailedAttempts ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
launchTaskAttempt(mockTask.getLastAttempt().getID());
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
scheduleTaskAttempt(taskId, TaskState.RUNNING);
launchTaskAttempt(mockTask.getLastAttempt().getID());
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
scheduleTaskAttempt(taskId, TaskState.RUNNING);
launchTaskAttempt(mockTask.getLastAttempt().getID());
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
scheduleTaskAttempt(taskId, TaskState.RUNNING);
launchTaskAttempt(mockTask.getLastAttempt().getID());
failRunningTaskAttempt(mockTask.getLastAttempt().getID(), false);
assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
}
@Test(timeout = 5000)
public void testFailedAttemptWithFatalError() {
LOG.info("--- START: testFailedAttemptWithFatalError ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
launchTaskAttempt(mockTask.getLastAttempt().getID());
mockTask.handle(
createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.FATAL, null));
assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
assertEquals(1, mockTask.failedAttempts);
verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
}
@Test(timeout = 5000)
public void testKillRunningTaskPreviousKilledAttempts() {
LOG.info("--- START: testKillRunningTaskPreviousKilledAttempts ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
killRunningTaskAttempt(mockTask.getLastAttempt().getID());
assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
killTask(taskId);
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
}
/**
* {@link TaskState#RUNNING}->{@link TaskState#KILLED}
*/
@Test(timeout = 5000)
public void testKillRunningTaskButAttemptSucceeds() {
LOG.info("--- START: testKillRunningTaskButAttemptSucceeds ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
killTask(taskId);
mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
}
/**
* {@link TaskState#RUNNING}->{@link TaskState#KILLED}
*/
@Test(timeout = 5000)
public void testKillRunningTaskButAttemptFails() {
LOG.info("--- START: testKillRunningTaskButAttemptFails ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
killTask(taskId);
mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
}
@Test(timeout = 5000)
/**
* Kill attempt
* {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
*/
public void testKillScheduledTaskAttempt() {
LOG.info("--- START: testKillScheduledTaskAttempt ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
killScheduledTaskAttempt(mockTask.getLastAttempt().getID());
// last killed attempt should be causal TA of next attempt
Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
}
@Test(timeout = 5000)
/**
* Launch attempt
* {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
*/
public void testLaunchTaskAttempt() {
LOG.info("--- START: testLaunchTaskAttempt ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
}
@Test(timeout = 5000)
/**
* Kill running attempt
* {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
*/
public void testKillRunningTaskAttempt() {
LOG.info("--- START: testKillRunningTaskAttempt ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
launchTaskAttempt(mockTask.getLastAttempt().getID());
killRunningTaskAttempt(mockTask.getLastAttempt().getID());
// last killed attempt should be causal TA of next attempt
Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
}
@Test(timeout = 5000)
/**
* Kill running attempt
* {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
*/
public void testKillTaskAttemptServiceBusy() {
LOG.info("--- START: testKillTaskAttemptServiceBusy ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
mockTask.handle(createTaskTAKilledEvent(
mockTask.getLastAttempt().getID(), new ServiceBusyEvent()));
assertTaskRunningState();
verify(mockTask.getVertex(), times(0)).incrementKilledTaskAttemptCount();
verify(mockTask.getVertex(), times(1)).incrementRejectedTaskAttemptCount();
}
/**
* {@link TaskState#KILLED}->{@link TaskState#KILLED}
*/
@Test(timeout = 5000)
public void testKilledAttemptAtTaskKilled() {
LOG.info("--- START: testKilledAttemptAtTaskKilled ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
killTask(taskId);
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
// Send duplicate kill for same attempt
// This will not happen in practice but this is to simulate handling
// of killed attempts in killed state.
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
}
/**
* {@link TaskState#FAILED}->{@link TaskState#FAILED}
*/
@Test(timeout = 5000)
public void testKilledAttemptAtTaskFailed() {
LOG.info("--- START: testKilledAttemptAtTaskFailed ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
for (int i = 0; i < mockTask.maxFailedAttempts; ++i) {
mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID()));
}
assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
// Send kill for an attempt
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
}
@Test(timeout = 5000)
public void testFetchedEventsModifyUnderlyingList() {
// Tests to ensure that adding an event to a task, does not affect the
// result of past getTaskAttemptTezEvents calls.
List<TezEvent> fetchedList;
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
sendTezEventsToTask(taskId, 2);
TezTaskAttemptID attemptID = mockTask.getAttemptList().iterator().next()
.getID();
fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
assertEquals(2, fetchedList.size());
// Add events, make sure underlying list is the same, and no exceptions are
// thrown while accessing the previous list
sendTezEventsToTask(taskId, 4);
assertEquals(2, fetchedList.size());
fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
assertEquals(6, fetchedList.size());
}
@Test(timeout = 5000)
public void testTaskProgress() {
LOG.info("--- START: testTaskProgress ---");
// launch task
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
float progress = 0f;
assert (mockTask.getProgress() == progress);
launchTaskAttempt(mockTask.getLastAttempt().getID());
// update attempt1
progress = 50f;
updateAttemptProgress(mockTask.getLastAttempt(), progress);
assert (mockTask.getProgress() == progress);
progress = 100f;
updateAttemptProgress(mockTask.getLastAttempt(), progress);
assert (mockTask.getProgress() == progress);
progress = 0f;
// mark first attempt as killed
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.KILLED);
assert (mockTask.getProgress() == progress);
// kill first attempt
// should trigger a new attempt
// as no successful attempts
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
assert (mockTask.getAttemptList().size() == 2);
assertEquals(1, mockTask.failedAttempts);
verify(mockTask.getVertex(), times(1)).incrementFailedTaskAttemptCount();
assert (mockTask.getProgress() == 0f);
launchTaskAttempt(mockTask.getLastAttempt().getID());
progress = 50f;
updateAttemptProgress(mockTask.getLastAttempt(), progress);
assert (mockTask.getProgress() == progress);
}
@Test(timeout = 5000)
public void testFailureDuringTaskAttemptCommit() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
assertTrue("First attempt should commit",
mockTask.canCommit(mockTask.getLastAttempt().getID()));
// During the task attempt commit there is an exception which causes
// the attempt to fail
TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
assertEquals(1, mockTask.getAttemptList().size());
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
assertEquals(2, mockTask.getAttemptList().size());
assertEquals(1, mockTask.failedAttempts);
// last failed attempt should be the causal TA
Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
assertFalse("First attempt should not commit",
mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
assertTrue("Second attempt should commit",
mockTask.canCommit(mockTask.getLastAttempt().getID()));
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
assertTaskSucceededState();
}
@Test(timeout = 5000)
public void testEventBacklogDuringTaskAttemptCommit() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
assertEquals(TaskState.SCHEDULED, mockTask.getState());
// simulate
// task in scheduled state due to event backlog - real task done and calling canCommit
assertFalse("Commit should return false to make running task wait",
mockTask.canCommit(mockTask.getLastAttempt().getID()));
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
assertTrue("Task state in AM is running now. Can commit.",
mockTask.canCommit(mockTask.getLastAttempt().getID()));
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
assertTaskSucceededState();
}
@Test(timeout = 5000)
public void testChangeCommitTaskAttempt() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
// Add a speculative task attempt that succeeds
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
assertEquals(2, mockTask.getAttemptList().size());
// previous running attempt should be the casual TA of this speculative attempt
Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
assertTrue("Second attempt should commit",
mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
assertFalse("First attempt should not commit",
mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
// During the task attempt commit there is an exception which causes
// the second attempt to fail
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
assertEquals(2, mockTask.getAttemptList().size());
assertFalse("Second attempt should not commit",
mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
assertTrue("First attempt should commit",
mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED);
mockTask.handle(createTaskTASucceededEvent(mockTask.getAttemptList().get(0).getID()));
assertTaskSucceededState();
}
@SuppressWarnings("rawtypes")
@Test(timeout = 5000)
public void testTaskSucceedAndRetroActiveFailure() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
// The task should now have succeeded
assertTaskSucceededState();
verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
eq(mockTask.getLastAttempt().getID().getId()));
ArgumentCaptor<DAGHistoryEvent> argumentCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
verify(mockHistoryHandler).handle(argumentCaptor.capture());
DAGHistoryEvent dagHistoryEvent = argumentCaptor.getValue();
HistoryEvent historyEvent = dagHistoryEvent.getHistoryEvent();
assertTrue(historyEvent instanceof TaskFinishedEvent);
TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent)historyEvent;
assertEquals(taskFinishedEvent.getFinishTime(), mockTask.getFinishTime());
eventHandler.events.clear();
// Now fail the attempt after it has succeeded
TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
TezEvent mockTezEvent = mock(TezEvent.class);
EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
when(mockTezEvent.getSourceInfo()).thenReturn(meta);
TaskAttemptEventOutputFailed outputFailedEvent =
new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
mockTask.handle(
createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.NON_FATAL,
outputFailedEvent));
// The task should still be in the scheduled state
assertTaskScheduledState();
Event event = eventHandler.events.get(0);
Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
event = eventHandler.events.get(eventHandler.events.size()-1);
Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
// report of output read error should be the causal TA
List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
Assert.assertEquals(2, attempts.size());
MockTaskAttemptImpl newAttempt = attempts.get(1);
Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
}
@SuppressWarnings("rawtypes")
@Test(timeout = 5000)
public void testTaskSucceedAndRetroActiveKilled() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
// The task should now have succeeded
assertTaskSucceededState();
verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
eq(mockTask.getLastAttempt().getID().getId()));
eventHandler.events.clear();
// Now kill the attempt after it has succeeded
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
// The task should still be in the scheduled state
assertTaskScheduledState();
Event event = eventHandler.events.get(0);
Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
}
@Test(timeout = 5000)
public void testDiagnostics_KillNew(){
TezTaskID taskId = getNewTaskID();
mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
assertEquals(1, mockTask.getDiagnostics().size());
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
assertEquals(0, mockTask.taskStartedEventLogged);
assertEquals(1, mockTask.taskFinishedEventLogged);
}
@Test(timeout = 5000)
public void testDiagnostics_Kill(){
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertEquals(1, mockTask.getDiagnostics().size());
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
}
@Test(timeout = 20000)
public void testFailedThenSpeculativeFailed() {
conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
// Add a speculative task attempt
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
assertEquals(2, mockTask.getAttemptList().size());
// Fail the first attempt
updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID()));
assertEquals(TaskState.FAILED, mockTask.getState());
assertEquals(2, mockTask.getAttemptList().size());
// Now fail the speculative attempt
updateAttemptState(specAttempt, TaskAttemptState.FAILED);
mockTask.handle(createTaskTAFailedEvent(specAttempt.getID()));
assertEquals(TaskState.FAILED, mockTask.getState());
assertEquals(2, mockTask.getAttemptList().size());
}
@Test(timeout = 20000)
public void testFailedThenSpeculativeSucceeded() {
conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
// Add a speculative task attempt
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
assertEquals(2, mockTask.getAttemptList().size());
// Fail the first attempt
updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID()));
assertEquals(TaskState.FAILED, mockTask.getState());
assertEquals(2, mockTask.getAttemptList().size());
// Now succeed the speculative attempt
updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
assertEquals(TaskState.FAILED, mockTask.getState());
assertEquals(2, mockTask.getAttemptList().size());
}
@Test
public void testKilledBeforeSpeculatedSucceeded() {
conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
mockTask.handle(createTaskTAKilledEvent(firstAttempt.getID()));
assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
// We need to manually override the current node id
// to induce NPE in the state machine transition
// simulating killed before speculated scenario
NodeId nodeId = mockNodeId;
mockNodeId = null;
// Add a speculative task attempt
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
mockNodeId = nodeId;
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
assertEquals(3, mockTask.getAttemptList().size());
// Now succeed the speculative attempt
updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
assertEquals(3, mockTask.getAttemptList().size());
}
@Test(timeout = 20000)
public void testKilledAttemptUpdatesDAGScheduler() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
// Add a speculative task attempt
mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
assertEquals(2, mockTask.getAttemptList().size());
// Have the first task succeed
eventHandler.events.clear();
mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID()));
verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED);
// The task should now have succeeded and sent kill to other attempt
assertTaskSucceededState();
verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
eq(firstAttempt.getID().getId()));
@SuppressWarnings("rawtypes")
Event event = eventHandler.events.get(eventHandler.events.size()-1);
assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
assertEquals(specAttempt.getID(),
((TaskAttemptEventKillRequest) event).getTaskAttemptID());
eventHandler.events.clear();
// Emulate the spec attempt being killed
mockTask.handle(createTaskTAKilledEvent(specAttempt.getID()));
assertTaskSucceededState();
verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED);
}
@Test(timeout = 20000)
public void testSpeculatedThenRetroactiveFailure() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
// Add a speculative task attempt
mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
assertEquals(2, mockTask.getAttemptList().size());
// Have the first task succeed
eventHandler.events.clear();
mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID()));
// The task should now have succeeded and sent kill to other attempt
assertTaskSucceededState();
verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
eq(firstAttempt.getID().getId()));
@SuppressWarnings("rawtypes")
Event event = eventHandler.events.get(eventHandler.events.size()-1);
assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
assertEquals(specAttempt.getID(),
((TaskAttemptEventKillRequest) event).getTaskAttemptID());
// Emulate the spec attempt being killed
mockTask.handle(createTaskTAKilledEvent(specAttempt.getID()));
assertTaskSucceededState();
// Now fail the attempt after it has succeeded
TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
TezEvent mockTezEvent = mock(TezEvent.class);
EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
when(mockTezEvent.getSourceInfo()).thenReturn(meta);
TaskAttemptEventOutputFailed outputFailedEvent =
new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
eventHandler.events.clear();
mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent));
// The task should still be in the scheduled state
assertTaskScheduledState();
event = eventHandler.events.get(eventHandler.events.size()-1);
Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
// There should be a new attempt, and report of output read error
// should be the causal TA
List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
Assert.assertEquals(3, attempts.size());
MockTaskAttemptImpl newAttempt = attempts.get(2);
Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
}
@Test(timeout = 20000)
public void testIgnoreSpeculationOnSuccessfulOriginalAttempt() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
// Mock success of the first task attempt
updateAttemptState(firstAttempt, TaskAttemptState.SUCCEEDED);
firstAttempt.handle(new TaskAttemptEvent(firstAttempt.getID(), TaskAttemptEventType.TA_DONE));
// Verify the speculation scheduling is ignored and no speculative attempt was added to the task
mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
assertEquals(1, mockTask.getAttemptList().size());
}
@Test(timeout = 20000)
public void testIgnoreSpeculationAfterOriginalAttemptCommit() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstAttempt.getID());
updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
// Mock commit of the first task attempt
mockTask.canCommit(firstAttempt.getID());
// Verify the speculation scheduling is ignored and no speculative attempt was added to the task
mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
launchTaskAttempt(specAttempt.getID());
assertEquals(1, mockTask.getAttemptList().size());
}
@SuppressWarnings("rawtypes")
@Test
public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
launchTaskAttempt(firstMockTaskAttempt.getID());
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
launchTaskAttempt(secondMockTaskAttempt.getID());
firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
firstMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
secondMockTaskAttempt.handle(
new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
TaskAttemptEventType.TA_DONE));
firstMockTaskAttempt.handle(
new TaskAttemptEvent(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
TaskAttemptEventType.TA_DONE));
mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
mockTask.handle(new TaskEventTASucceeded(firstMockTaskAttempt.getID()));
assertTrue("Attempts should have succeeded!",
firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED
&& secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED);
assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
assertTrue("Task should have Succeeded!", mockTask.getState() == TaskState.SUCCEEDED);
//Failing the attempt that finished after the task was marked succeeded, should not schedule another attempt
failAttempt(firstMockTaskAttempt, 0, 0);
assertTaskSucceededState();
//Failing the attempt that allowed the task to succeed, should schedule another attempt
failAttempt(secondMockTaskAttempt, 1, 1);
assertTaskScheduledState();
}
@SuppressWarnings("rawtypes")
@Test
public void testFailedAttemptStatus() throws InterruptedException {
Configuration newConf = new Configuration(conf);
newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
launchTaskAttempt(firstMockTaskAttempt.getID());
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
launchTaskAttempt(secondMockTaskAttempt.getID());
firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
firstMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
secondMockTaskAttempt.handle(
new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
TaskAttemptTerminationCause.NO_PROGRESS));
firstMockTaskAttempt.handle(
new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
TaskAttemptTerminationCause.NO_PROGRESS));
firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
mock(TaskAttemptEvent.class)));
mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
mock(TaskAttemptEvent.class)));
assertTrue("Attempts should have failed!",
firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
&& secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
}
@SuppressWarnings("rawtypes")
@Test (timeout = 10000L)
public void testSucceededLeafTaskWithRetroFailures() throws InterruptedException {
Configuration newConf = new Configuration(conf);
newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, true,
taskResource, containerContext, vertex);
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
launchTaskAttempt(firstMockTaskAttempt.getID());
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
launchTaskAttempt(secondMockTaskAttempt.getID());
firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
firstMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
secondMockTaskAttempt.handle(
new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
TaskAttemptEventType.TA_DONE));
firstMockTaskAttempt.handle(
new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
TaskAttemptTerminationCause.CONTAINER_EXITED));
mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 0);
TezTaskAttemptID mockDestId = firstMockTaskAttempt.getID();
EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
TezEvent tzEvent = new TezEvent(mockReEvent, meta);
TaskAttemptEventOutputFailed outputFailedEvent =
new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
firstMockTaskAttempt.handle(outputFailedEvent);
mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
mock(TaskAttemptEvent.class)));
Assert.assertEquals(mockTask.getInternalState(), TaskStateInternal.SUCCEEDED);
}
private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts) {
InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index);
TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
TezEvent tzEvent = new TezEvent(mockReEvent, meta);
TaskAttemptEventOutputFailed outputFailedEvent =
new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
taskAttempt.handle(
outputFailedEvent);
TaskEvent tEventFail1 = new TaskEventTAFailed(taskAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent);
mockTask.handle(tEventFail1);
assertEquals("Unexpected number of incomplete attempts!",
expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount());
}
@Test (timeout = 30000)
public void testFailedTaskTransitionWithLaunchedAttempt() throws InterruptedException {
Configuration newConf = new Configuration(conf);
newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
Vertex vertex = mock(Vertex.class);
doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstMockTaskAttempt.getID());
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt();
launchTaskAttempt(secondMockTaskAttempt.getID());
firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
firstMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
secondMockTaskAttempt.handle(
new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
TaskAttemptTerminationCause.NO_PROGRESS));
firstMockTaskAttempt.handle(
new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
TaskAttemptTerminationCause.NO_PROGRESS));
firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
mock(TaskAttemptEvent.class)));
mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
mock(TaskAttemptEvent.class)));
assertTrue("Attempts should have failed!",
firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
&& secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt();
mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID()));
}
@Test (timeout = 30000)
public void testKilledTaskTransitionWithLaunchedAttempt() throws InterruptedException {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt();
launchTaskAttempt(firstMockTaskAttempt.getID());
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt();
launchTaskAttempt(secondMockTaskAttempt.getID());
firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
secondMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
firstMockTaskAttempt.handle(
new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
mockTask.handle(new TaskEventTermination(mockTask.getTaskId(),
TaskAttemptTerminationCause.FRAMEWORK_ERROR, "test"));
secondMockTaskAttempt.handle(
new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),"test",
TaskAttemptTerminationCause.FRAMEWORK_ERROR));
mockTask.handle(new TaskEventTAKilled(secondMockTaskAttempt.getID(),
new TaskAttemptEvent(secondMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED)));
firstMockTaskAttempt.handle(
new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test",
TaskAttemptTerminationCause.FRAMEWORK_ERROR));
mockTask.handle(new TaskEventTAKilled(firstMockTaskAttempt.getID(),
new TaskAttemptEvent(firstMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED)));
firstMockTaskAttempt.handle(
new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test",
TaskAttemptTerminationCause.FRAMEWORK_ERROR));
assertEquals("Task should have been killed!", mockTask.getInternalState(), TaskStateInternal.KILLED);
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt();
mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID()));
mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
MockTaskAttemptImpl fourthMockTaskAttempt = mockTask.getLastAttempt();
mockTask.handle(createTaskTASucceededEvent(fourthMockTaskAttempt.getID()));
MockTaskAttemptImpl fifthMockTaskAttempt = mockTask.getLastAttempt();
mockTask.handle(createTaskTAFailedEvent(fifthMockTaskAttempt.getID()));
}
// TODO Add test to validate the correct commit attempt.
/* Verifies that the specified event types, exist. Does not ensure they are the only ones, however */
private void verifyOutgoingEvents(List<Event> events,
Enum<?>... expectedTypes) {
List<Enum<?>> expectedTypeList = new LinkedList<Enum<?>>();
for (Enum<?> expectedType : expectedTypes) {
expectedTypeList.add(expectedType);
}
for (Event event : events) {
Iterator<Enum<?>> typeIter = expectedTypeList.iterator();
while (typeIter.hasNext()) {
Enum<?> expectedType = typeIter.next();
if (event.getType() == expectedType) {
typeIter.remove();
// Move to the next event.
break;
}
}
}
assertTrue("Did not find types : " + expectedTypeList
+ " in outgoing event list", expectedTypeList.isEmpty());
}
@SuppressWarnings("rawtypes")
private class MockTaskImpl extends TaskImpl {
public int taskStartedEventLogged = 0;
public int taskFinishedEventLogged = 0;
private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
private Vertex vertex;
public MockTaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Configuration conf,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
Resource resource,
ContainerContext containerContext, Vertex vertex) {
super(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface,
clock, thh, appContext, leafVertex, resource,
containerContext, mock(StateChangeNotifier.class), vertex);
this.vertex = vertex;
}
@Override
protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
eventHandler, taskCommunicatorManagerInterface,
conf, clock, taskHeartbeatHandler, appContext,
true, taskResource, containerContext, schedCausalTA);
taskAttempts.add(attempt);
return attempt;
}
@Override
protected void internalError(TaskEventType type) {
super.internalError(type);
fail("Internal error: " + type);
}
MockTaskAttemptImpl getLastAttempt() {
return taskAttempts.get(taskAttempts.size() - 1);
}
List<MockTaskAttemptImpl> getAttemptList() {
return taskAttempts;
}
@Override
public Vertex getVertex() {
return vertex;
}
protected void logJobHistoryTaskStartedEvent() {
taskStartedEventLogged++;
}
protected void logJobHistoryTaskFinishedEvent() {
super.logJobHistoryTaskFinishedEvent();
taskFinishedEventLogged++;
}
protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
taskFinishedEventLogged++;
}
}
@SuppressWarnings("rawtypes")
public class MockTaskAttemptImpl extends TaskAttemptImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
super(attemptId, eventHandler, tal, conf, clock, thh,
appContext, isRescheduled, resource, containerContext, false, mockTask,
locationHint, mockTaskSpec, schedCausalTA);
}
@Override
protected Vertex getVertex() {
return mockVertex;
}
@Override
public float getProgress() {
return progress;
}
public void setProgress(float progress) {
this.progress = progress;
}
public void setState(TaskAttemptState state) {
this.state = state;
}
@Override
public TaskAttemptState getState() {
return state;
}
@Override
public TaskAttemptState getStateNoLock() {
return state;
}
@Override
public ContainerId getAssignedContainerID() {
return mockContainerId;
}
@Override
public NodeId getNodeId() {
return mockNodeId;
}
}
public class ServiceBusyEvent extends TezAbstractEvent<TaskAttemptEventType>
implements TaskAttemptEventTerminationCauseEvent {
public ServiceBusyEvent() {
super(TaskAttemptEventType.TA_KILLED);
}
@Override
public TaskAttemptTerminationCause getTerminationCause() {
return TaskAttemptTerminationCause.SERVICE_BUSY;
}
}
}