| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Mockito.RETURNS_DEEP_STUBS; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.client.VertexStatus.State; |
| import org.apache.tez.dag.api.oldrecords.TaskAttemptState; |
| import org.apache.tez.dag.api.oldrecords.TaskState; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.TaskAttemptListener; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.StateChangeNotifier; |
| import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; |
| import org.apache.tez.dag.app.dag.TaskStateInternal; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.event.DAGEventType; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; |
| import org.apache.tez.dag.app.dag.event.TaskEvent; |
| import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; |
| import org.apache.tez.dag.app.dag.event.TaskEventType; |
| import org.apache.tez.dag.app.dag.event.VertexEventType; |
| import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; |
| import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; |
| import org.apache.tez.dag.history.events.TaskFinishedEvent; |
| import org.apache.tez.dag.history.events.TaskStartedEvent; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.runtime.api.OutputCommitter; |
| import org.apache.tez.runtime.api.OutputCommitterContext; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import com.google.common.collect.Lists; |
| |
| public class TestTaskRecovery { |
| |
| private TaskImpl task; |
| private DrainDispatcher dispatcher; |
| |
| private int taskAttemptCounter = 0; |
| |
| private Configuration conf = new Configuration(); |
| private AppContext mockAppContext; |
| private ApplicationId appId = ApplicationId.newInstance( |
| System.currentTimeMillis(), 1); |
| private TezDAGID dagId = TezDAGID.getInstance(appId, 1); |
| private TezVertexID vertexId = TezVertexID.getInstance(dagId, 1); |
| private Vertex vertex; |
| private String vertexName = "v1"; |
| private long taskScheduledTime = 100L; |
| private long taskStartTime = taskScheduledTime + 100L; |
| private long taskFinishTime = taskStartTime + 100L; |
| private TaskAttemptEventHandler taEventHandler = |
| new TaskAttemptEventHandler(); |
| |
| private class TaskEventHandler implements EventHandler<TaskEvent> { |
| @Override |
| public void handle(TaskEvent event) { |
| task.handle(event); |
| } |
| } |
| |
| private class TaskAttemptEventHandler implements |
| EventHandler<TaskAttemptEvent> { |
| |
| private List<TaskAttemptEvent> events = Lists.newArrayList(); |
| |
| @Override |
| public void handle(TaskAttemptEvent event) { |
| events.add(event); |
| ((TaskAttemptImpl) task.getAttempt(event.getTaskAttemptID())) |
| .handle(event); |
| } |
| |
| public List<TaskAttemptEvent> getEvents() { |
| return events; |
| } |
| } |
| |
| private class TestOutputCommitter extends OutputCommitter { |
| |
| boolean recoverySupported = false; |
| boolean throwExceptionWhenRecovery = false; |
| |
| public TestOutputCommitter(OutputCommitterContext committerContext, |
| boolean recoverySupported, boolean throwExceptionWhenRecovery) { |
| super(committerContext); |
| this.recoverySupported = recoverySupported; |
| this.throwExceptionWhenRecovery = throwExceptionWhenRecovery; |
| } |
| |
| @Override |
| public void recoverTask(int taskIndex, int previousDAGAttempt) |
| throws Exception { |
| if (throwExceptionWhenRecovery) { |
| throw new Exception("fail recovery Task"); |
| } |
| } |
| |
| @Override |
| public boolean isTaskRecoverySupported() { |
| return recoverySupported; |
| } |
| |
| @Override |
| public void initialize() throws Exception { |
| |
| } |
| |
| @Override |
| public void setupOutput() throws Exception { |
| |
| } |
| |
| @Override |
| public void commitOutput() throws Exception { |
| |
| } |
| |
| @Override |
| public void abortOutput(State finalState) throws Exception { |
| |
| } |
| |
| } |
| |
| @Before |
| public void setUp() { |
| dispatcher = new DrainDispatcher(); |
| dispatcher.register(DAGEventType.class, mock(EventHandler.class)); |
| dispatcher.register(VertexEventType.class, mock(EventHandler.class)); |
| dispatcher.register(TaskEventType.class, new TaskEventHandler()); |
| dispatcher.register(TaskAttemptEventType.class, taEventHandler); |
| dispatcher.init(new Configuration()); |
| dispatcher.start(); |
| |
| vertex = mock(Vertex.class, RETURNS_DEEP_STUBS); |
| when(vertex.getProcessorDescriptor().getClassName()).thenReturn(""); |
| |
| mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); |
| when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))) |
| .thenReturn(vertex); |
| |
| task = |
| new TaskImpl(vertexId, 0, dispatcher.getEventHandler(), |
| new Configuration(), mock(TaskAttemptListener.class), |
| new SystemClock(), mock(TaskHeartbeatHandler.class), |
| mockAppContext, false, Resource.newInstance(1, 1), |
| mock(ContainerContext.class), mock(StateChangeNotifier.class)); |
| |
| Map<String, OutputCommitter> committers = |
| new HashMap<String, OutputCommitter>(); |
| committers.put("out1", new TestOutputCommitter( |
| mock(OutputCommitterContext.class), true, false)); |
| when(task.getVertex().getOutputCommitters()).thenReturn(committers); |
| } |
| |
| private void restoreFromTaskStartEvent() { |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskStartedEvent(task.getTaskId(), |
| vertexName, taskScheduledTime, taskStartTime)); |
| assertEquals(TaskState.SCHEDULED, recoveredState); |
| assertEquals(0, task.getFinishedAttemptsCount()); |
| assertEquals(taskScheduledTime, task.scheduledTime); |
| assertEquals(0, task.getAttempts().size()); |
| } |
| |
| private void restoreFromFirstTaskAttemptStartEvent(TezTaskAttemptID taId) { |
| long taStartTime = taskStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, |
| taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(0, task.getFinishedAttemptsCount()); |
| assertEquals(taskScheduledTime, task.scheduledTime); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(TaskAttemptStateInternal.NEW, |
| ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| } |
| |
| /** |
| * New -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_New() { |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.NEW, task.getInternalState()); |
| } |
| |
| /** |
| * -> restoreFromTaskFinishEvent ( no TaskStartEvent ) |
| */ |
| @Test |
| public void testRecovery_NoStartEvent() { |
| try { |
| task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName, |
| taskStartTime, taskFinishTime, null, TaskState.SUCCEEDED, "", |
| new TezCounters())); |
| fail("Should fail due to no TaskStartEvent before TaskFinishEvent"); |
| } catch (Throwable e) { |
| assertTrue(e.getMessage().contains( |
| "Finished Event seen but" |
| + " no Started Event was encountered earlier")); |
| } |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_Started() { |
| restoreFromTaskStartEvent(); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // new task attempt is scheduled |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(0, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * RecoverTranstion |
| */ |
| @Test |
| public void testRecovery_OneTAStarted() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| // wait for the second task attempt is scheduled |
| dispatcher.await(); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_SUCCEEDED() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", |
| new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState()); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (FAILED) -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_FAILED() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.FAILED, "", |
| new TezCounters())); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(1, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // new task attempt is scheduled |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(1, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_KILLED() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.KILLED, "", |
| new TezCounters())); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // new task attempt is scheduled |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> |
| * restoreFromTaskFinishedEvent -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_SUCCEEDED_Finished() { |
| |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", |
| new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| |
| recoveredState = |
| task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), |
| vertexName, taskStartTime, taskFinishTime, taId, |
| TaskState.SUCCEEDED, "", new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(taId, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState()); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> |
| * restoreFromTaskAttemptFinishedEvent (Failed due to output_failure) |
| * restoreFromTaskFinishedEvent -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_SUCCEEDED_FAILED() { |
| |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", |
| new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| |
| // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure. |
| recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.FAILED, "", |
| new TezCounters())); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(1, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(1, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> |
| * restoreFromTaskAttemptFinishedEvent (KILLED due to node failed ) |
| * restoreFromTaskFinishedEvent -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_SUCCEEDED_KILLED() { |
| |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", |
| new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| |
| // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure. |
| recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.KILLED, "", |
| new TezCounters())); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_Commit_Failed_Recovery_Not_Supported() { |
| Map<String, OutputCommitter> committers = |
| new HashMap<String, OutputCommitter>(); |
| committers.put("out1", new TestOutputCommitter( |
| mock(OutputCommitterContext.class), false, false)); |
| when(task.getVertex().getOutputCommitters()).thenReturn(committers); |
| |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| // restoreFromTaskAttemptFinishedEvent (SUCCEEDED) |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", |
| new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // new task attempt is scheduled |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_Commit_Failed_recover_fail() { |
| Map<String, OutputCommitter> committers = |
| new HashMap<String, OutputCommitter>(); |
| committers.put("out1", new TestOutputCommitter( |
| mock(OutputCommitterContext.class), true, true)); |
| when(task.getVertex().getOutputCommitters()).thenReturn(committers); |
| |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| |
| // restoreFromTaskAttemptFinishedEvent (SUCCEEDED) |
| long taStartTime = taskStartTime + 100L; |
| long taFinishTime = taStartTime + 100L; |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", |
| new TezCounters())); |
| assertEquals(TaskState.SUCCEEDED, recoveredState); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(taId, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // new task attempt is scheduled |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| @Test |
| public void testRecovery_WithDesired_SUCCEEDED() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.SUCCEEDED, |
| false)); |
| assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState()); |
| // no TA_Recovery event sent |
| assertEquals(0, taEventHandler.getEvents().size()); |
| } |
| |
| @Test |
| public void testRecovery_WithDesired_FAILED() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.FAILED, |
| false)); |
| assertEquals(TaskStateInternal.FAILED, task.getInternalState()); |
| // no TA_Recovery event sent |
| assertEquals(0, taEventHandler.getEvents().size()); |
| } |
| |
| @Test |
| public void testRecovery_WithDesired_KILLED() { |
| restoreFromTaskStartEvent(); |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| restoreFromFirstTaskAttemptStartEvent(taId); |
| task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.KILLED, |
| false)); |
| assertEquals(TaskStateInternal.KILLED, task.getInternalState()); |
| // no TA_Recovery event sent |
| assertEquals(0, taEventHandler.getEvents().size()); |
| |
| } |
| |
| /** |
| * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> |
| * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition |
| */ |
| @Test |
| public void testRecovery_OneTAStarted_Killed() { |
| restoreFromTaskStartEvent(); |
| |
| long taStartTime = taskStartTime + 100L; |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, |
| taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(TaskAttemptStateInternal.NEW, |
| ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(0, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| |
| long taFinishTime = taStartTime + 100L; |
| recoveredState = |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, |
| taStartTime, taFinishTime, TaskAttemptState.KILLED, "", |
| new TezCounters())); |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(TaskAttemptStateInternal.NEW, |
| ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); |
| assertEquals(1, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(0, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| // wait for Task send TA_RECOVER to TA and TA complete the RecoverTransition |
| dispatcher.await(); |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| assertEquals(TaskAttemptStateInternal.KILLED, |
| ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); |
| // new task attempt is scheduled |
| assertEquals(2, task.getAttempts().size()); |
| assertEquals(1, task.getFinishedAttemptsCount()); |
| assertEquals(0, task.failedAttempts); |
| assertEquals(1, task.getUncompletedAttemptsCount()); |
| assertEquals(null, task.successfulAttempt); |
| } |
| |
| /** |
| * n = maxFailedAttempts, in the previous AM attempt, n task attempts are |
| * killed. When recovering, it should continue to be in running state and |
| * schedule a new task attempt. |
| */ |
| @Test |
| public void testTaskRecovery_MultipleAttempts1() { |
| int maxFailedAttempts = |
| conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, |
| TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); |
| restoreFromTaskStartEvent(); |
| |
| for (int i = 0; i < maxFailedAttempts; ++i) { |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, |
| mock(ContainerId.class), mock(NodeId.class), "", "", "")); |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, |
| 0, TaskAttemptState.KILLED, "", null)); |
| } |
| assertEquals(maxFailedAttempts, task.getAttempts().size()); |
| assertEquals(0, task.failedAttempts); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| // if the previous task attempt is killed, it should not been take into |
| // account when checking whether exceed the max attempts |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| // schedule a new task attempt |
| assertEquals(maxFailedAttempts + 1, task.getAttempts().size()); |
| } |
| |
| /** |
| * n = maxFailedAttempts, in the previous AM attempt, n task attempts are |
| * failed. When recovering, it should transit to failed because # of |
| * failed_attempt is exceeded. |
| */ |
| @Test |
| public void testTaskRecovery_MultipleAttempts2() { |
| int maxFailedAttempts = |
| conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, |
| TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); |
| restoreFromTaskStartEvent(); |
| |
| for (int i = 0; i < maxFailedAttempts; ++i) { |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, |
| mock(ContainerId.class), mock(NodeId.class), "", "", "")); |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, |
| 0, TaskAttemptState.FAILED, "", null)); |
| } |
| assertEquals(maxFailedAttempts, task.getAttempts().size()); |
| assertEquals(maxFailedAttempts, task.failedAttempts); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| // it should transit to failed because of the failed task attempt in the |
| // last application attempt. |
| assertEquals(TaskStateInternal.FAILED, task.getInternalState()); |
| assertEquals(maxFailedAttempts, task.getAttempts().size()); |
| } |
| |
| /** |
| * n = maxFailedAttempts, in the previous AM attempt, n-1 task attempts are |
| * killed. And last task attempt is still in running state. When recovering, |
| * the last attempt should transit to killed and task is still in running |
| * state and new task attempt is scheduled. |
| */ |
| @Test |
| public void testTaskRecovery_MultipleAttempts3() throws InterruptedException { |
| int maxFailedAttempts = |
| conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, |
| TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); |
| restoreFromTaskStartEvent(); |
| |
| for (int i = 0; i < maxFailedAttempts - 1; ++i) { |
| TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); |
| task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, |
| mock(ContainerId.class), mock(NodeId.class), "", "", "")); |
| task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, |
| 0, TaskAttemptState.FAILED, "", null)); |
| } |
| assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); |
| assertEquals(maxFailedAttempts - 1, task.failedAttempts); |
| |
| TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId()); |
| TaskState recoveredState = |
| task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId, |
| vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "")); |
| |
| assertEquals(TaskState.RUNNING, recoveredState); |
| assertEquals(TaskAttemptStateInternal.NEW, |
| ((TaskAttemptImpl) task.getAttempt(newTaskAttemptId)) |
| .getInternalState()); |
| assertEquals(maxFailedAttempts, task.getAttempts().size()); |
| |
| task.handle(new TaskEventRecoverTask(task.getTaskId())); |
| // wait until task attempt receive the Recover event from task |
| dispatcher.await(); |
| |
| assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); |
| assertEquals(TaskAttemptStateInternal.KILLED, |
| ((TaskAttemptImpl) (task.getAttempt(newTaskAttemptId))) |
| .getInternalState()); |
| assertEquals(maxFailedAttempts - 1, task.failedAttempts); |
| |
| // new task attempt is added |
| assertEquals(maxFailedAttempts + 1, task.getAttempts().size()); |
| } |
| |
| private TezTaskAttemptID getNewTaskAttemptID(TezTaskID taskId) { |
| return TezTaskAttemptID.getInstance(taskId, taskAttemptCounter++); |
| } |
| |
| } |