| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapreduce.v2.app.job.impl; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.Task; |
| import org.apache.hadoop.mapred.TaskUmbilicalProtocol; |
| import org.apache.hadoop.mapreduce.Counter; |
| import org.apache.hadoop.mapreduce.Counters; |
| import org.apache.hadoop.mapreduce.TaskCounter; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; |
| import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; |
| import org.apache.hadoop.mapreduce.v2.api.records.Avataar; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskType; |
| import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; |
| import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; |
| import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; |
| import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.event.InlineDispatcher; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @SuppressWarnings("rawtypes") |
| public class TestTaskImpl { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestTaskImpl.class); |
| |
| private JobConf conf; |
| private TaskAttemptListener taskAttemptListener; |
| private Token<JobTokenIdentifier> jobToken; |
| private JobId jobId; |
| private Path remoteJobConfFile; |
| private Credentials credentials; |
| private Clock clock; |
| private MRAppMetrics metrics; |
| private TaskImpl mockTask; |
| private ApplicationId appId; |
| private TaskSplitMetaInfo taskSplitMetaInfo; |
| private String[] dataLocations = new String[0]; |
| private AppContext appContext; |
| |
| private int startCount = 0; |
| private int taskCounter = 0; |
| private final int partition = 1; |
| |
| private InlineDispatcher dispatcher; |
| private MockTaskAttemptEventHandler taskAttemptEventHandler; |
| private List<MockTaskAttemptImpl> taskAttempts; |
| |
| private class MockTaskImpl extends TaskImpl { |
| |
| private int taskAttemptCounter = 0; |
| TaskType taskType; |
| |
| public MockTaskImpl(JobId jobId, int partition, |
| EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, |
| TaskAttemptListener taskAttemptListener, |
| Token<JobTokenIdentifier> jobToken, |
| Credentials credentials, Clock clock, int startCount, |
| MRAppMetrics metrics, AppContext appContext, TaskType taskType) { |
| super(jobId, taskType , partition, eventHandler, |
| remoteJobConfFile, conf, taskAttemptListener, |
| jobToken, credentials, clock, |
| startCount, metrics, appContext); |
| this.taskType = taskType; |
| } |
| |
| @Override |
| public TaskType getType() { |
| return taskType; |
| } |
| |
| @Override |
| protected TaskAttemptImpl createAttempt() { |
| MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, |
| eventHandler, taskAttemptListener, remoteJobConfFile, partition, |
| conf, jobToken, credentials, clock, appContext, taskType); |
| taskAttempts.add(attempt); |
| return attempt; |
| } |
| |
| @Override |
| protected int getMaxAttempts() { |
| return 100; |
| } |
| |
| @Override |
| protected void internalError(TaskEventType type) { |
| super.internalError(type); |
| fail("Internal error: " + type); |
| } |
| } |
| |
| private class MockTaskAttemptImpl extends TaskAttemptImpl { |
| |
| private float progress = 0; |
| private TaskAttemptState state = TaskAttemptState.NEW; |
| boolean rescheduled = false; |
| boolean containerAssigned = false; |
| private TaskType taskType; |
| private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS; |
| |
| public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, |
| TaskAttemptListener taskAttemptListener, Path jobFile, int partition, |
| JobConf conf, Token<JobTokenIdentifier> jobToken, |
| Credentials credentials, Clock clock, |
| AppContext appContext, TaskType taskType) { |
| super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, |
| dataLocations, jobToken, credentials, clock, appContext); |
| this.taskType = taskType; |
| } |
| |
| public void assignContainer() { |
| containerAssigned = true; |
| } |
| |
| @Override |
| boolean isContainerAssigned() { |
| return containerAssigned; |
| } |
| |
| public TaskAttemptId getAttemptId() { |
| return getID(); |
| } |
| |
| @Override |
| protected Task createRemoteTask() { |
| return new MockTask(taskType); |
| } |
| |
| public float getProgress() { |
| return progress ; |
| } |
| |
| public void setProgress(float progress) { |
| this.progress = progress; |
| } |
| |
| public void setState(TaskAttemptState state) { |
| this.state = state; |
| } |
| |
| @Override |
| public TaskAttemptState getState() { |
| return state; |
| } |
| |
| public boolean getRescheduled() { |
| return this.rescheduled; |
| } |
| |
| public void setRescheduled(boolean rescheduled) { |
| this.rescheduled = rescheduled; |
| } |
| |
| @Override |
| public Counters getCounters() { |
| return attemptCounters; |
| } |
| |
| public void setCounters(Counters counters) { |
| attemptCounters = counters; |
| } |
| } |
| |
| private class MockTask extends Task { |
| |
| private TaskType taskType; |
| MockTask(TaskType taskType) { |
| this.taskType = taskType; |
| } |
| |
| @Override |
| public void run(JobConf job, TaskUmbilicalProtocol umbilical) |
| throws IOException, ClassNotFoundException, InterruptedException { |
| return; |
| } |
| |
| @Override |
| public boolean isMapTask() { |
| return (taskType == TaskType.MAP); |
| } |
| |
| } |
| |
| @Before |
| @SuppressWarnings("unchecked") |
| public void setup() { |
| dispatcher = new InlineDispatcher(); |
| |
| ++startCount; |
| |
| conf = new JobConf(); |
| taskAttemptListener = mock(TaskAttemptListener.class); |
| jobToken = (Token<JobTokenIdentifier>) mock(Token.class); |
| remoteJobConfFile = mock(Path.class); |
| credentials = null; |
| clock = SystemClock.getInstance(); |
| metrics = mock(MRAppMetrics.class); |
| dataLocations = new String[1]; |
| |
| appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| |
| jobId = Records.newRecord(JobId.class); |
| jobId.setId(1); |
| jobId.setAppId(appId); |
| appContext = mock(AppContext.class); |
| |
| taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); |
| when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); |
| |
| taskAttempts = new ArrayList<MockTaskAttemptImpl>(); |
| |
| taskAttemptEventHandler = new MockTaskAttemptEventHandler(); |
| dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); |
| } |
| |
| private MockTaskImpl createMockTask(TaskType taskType) { |
| return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), |
| remoteJobConfFile, conf, taskAttemptListener, jobToken, |
| credentials, clock, |
| startCount, metrics, appContext, taskType); |
| } |
| |
| @After |
| public void teardown() { |
| taskAttempts.clear(); |
| } |
| |
| private TaskId getNewTaskID() { |
| TaskId taskId = Records.newRecord(TaskId.class); |
| taskId.setId(++taskCounter); |
| taskId.setJobId(jobId); |
| taskId.setTaskType(mockTask.getType()); |
| return taskId; |
| } |
| |
| private void scheduleTaskAttempt(TaskId taskId) { |
| mockTask.handle(new TaskEvent(taskId, |
| TaskEventType.T_SCHEDULE)); |
| assertTaskScheduledState(); |
| assertTaskAttemptAvataar(Avataar.VIRGIN); |
| } |
| |
| private void killTask(TaskId taskId) { |
| mockTask.handle(new TaskEvent(taskId, |
| TaskEventType.T_KILL)); |
| assertTaskKillWaitState(); |
| } |
| |
| private void killScheduledTaskAttempt(TaskAttemptId attemptId) { |
| killScheduledTaskAttempt(attemptId, false); |
| } |
| |
| private void killScheduledTaskAttempt(TaskAttemptId attemptId, |
| boolean reschedule) { |
| mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule)); |
| assertTaskScheduledState(); |
| } |
| |
| private void launchTaskAttempt(TaskAttemptId attemptId) { |
| mockTask.handle(new TaskTAttemptEvent(attemptId, |
| TaskEventType.T_ATTEMPT_LAUNCHED)); |
| ((MockTaskAttemptImpl)(mockTask.getAttempt(attemptId))) |
| .assignContainer(); |
| assertTaskRunningState(); |
| } |
| |
| private void commitTaskAttempt(TaskAttemptId attemptId) { |
| mockTask.handle(new TaskTAttemptEvent(attemptId, |
| TaskEventType.T_ATTEMPT_COMMIT_PENDING)); |
| assertTaskRunningState(); |
| } |
| |
| private MockTaskAttemptImpl getLastAttempt() { |
| return taskAttempts.get(taskAttempts.size()-1); |
| } |
| |
| private void updateLastAttemptProgress(float p) { |
| getLastAttempt().setProgress(p); |
| } |
| |
| private void updateLastAttemptState(TaskAttemptState s) { |
| getLastAttempt().setState(s); |
| } |
| |
| private void killRunningTaskAttempt(TaskAttemptId attemptId) { |
| killRunningTaskAttempt(attemptId, false); |
| } |
| |
| private void killRunningTaskAttempt(TaskAttemptId attemptId, |
| boolean reschedule) { |
| mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule)); |
| assertTaskRunningState(); |
| } |
| |
| private void failRunningTaskAttempt(TaskAttemptId attemptId) { |
| mockTask.handle(new TaskTAttemptFailedEvent(attemptId)); |
| assertTaskRunningState(); |
| } |
| |
| /** |
| * {@link TaskState#NEW} |
| */ |
| private void assertTaskNewState() { |
| assertEquals(TaskState.NEW, mockTask.getState()); |
| } |
| |
| /** |
| * {@link TaskState#SCHEDULED} |
| */ |
| private void assertTaskScheduledState() { |
| assertEquals(TaskState.SCHEDULED, mockTask.getState()); |
| } |
| |
| /** |
| * {@link TaskState#RUNNING} |
| */ |
| private void assertTaskRunningState() { |
| assertEquals(TaskState.RUNNING, mockTask.getState()); |
| } |
| |
| /** |
| * {@link TaskState#KILL_WAIT} |
| */ |
| private void assertTaskKillWaitState() { |
| assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState()); |
| } |
| |
| /** |
| * {@link TaskState#SUCCEEDED} |
| */ |
| private void assertTaskSucceededState() { |
| assertEquals(TaskState.SUCCEEDED, mockTask.getState()); |
| } |
| |
| /** |
| * {@link Avataar} |
| */ |
| private void assertTaskAttemptAvataar(Avataar avataar) { |
| for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) { |
| if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) { |
| return; |
| } |
| } |
| fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative") |
| + "task attempt"); |
| } |
| |
| @Test |
| public void testInit() { |
| LOG.info("--- START: testInit ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| assertTaskNewState(); |
| assert(taskAttempts.size() == 0); |
| } |
| |
| @Test |
| /** |
| * {@link TaskState#NEW}->{@link TaskState#SCHEDULED} |
| */ |
| public void testScheduleTask() { |
| LOG.info("--- START: testScheduleTask ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| } |
| |
| @Test |
| /** |
| * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT} |
| */ |
| public void testKillScheduledTask() { |
| LOG.info("--- START: testKillScheduledTask ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| killTask(taskId); |
| } |
| |
| @Test |
| /** |
| * Kill attempt |
| * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED} |
| */ |
| public void testKillScheduledTaskAttempt() { |
| LOG.info("--- START: testKillScheduledTaskAttempt ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| killScheduledTaskAttempt(getLastAttempt().getAttemptId(), true); |
| assertEquals(TaskAttemptEventType.TA_RESCHEDULE, |
| taskAttemptEventHandler.lastTaskAttemptEvent.getType()); |
| } |
| |
| @Test |
| /** |
| * Launch attempt |
| * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING} |
| */ |
| public void testLaunchTaskAttempt() { |
| LOG.info("--- START: testLaunchTaskAttempt ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| } |
| |
| @Test |
| /** |
| * Kill running attempt |
| * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} |
| */ |
| public void testKillRunningTaskAttempt() { |
| LOG.info("--- START: testKillRunningTaskAttempt ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| killRunningTaskAttempt(getLastAttempt().getAttemptId(), true); |
| assertEquals(TaskAttemptEventType.TA_RESCHEDULE, |
| taskAttemptEventHandler.lastTaskAttemptEvent.getType()); |
| } |
| |
| @Test |
| public void testKillSuccessfulTask() { |
| LOG.info("--- START: testKillSuccesfulTask ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| commitTaskAttempt(getLastAttempt().getAttemptId()); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| assertTaskSucceededState(); |
| mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); |
| assertTaskSucceededState(); |
| } |
| |
| @Test |
| /** |
| * Kill map attempt for succeeded map task |
| * {@link TaskState#SUCCEEDED}->{@link TaskState#SCHEDULED} |
| */ |
| public void testKillAttemptForSuccessfulTask() { |
| LOG.info("--- START: testKillAttemptForSuccessfulTask ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| commitTaskAttempt(getLastAttempt().getAttemptId()); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| assertTaskSucceededState(); |
| mockTask.handle( |
| new TaskTAttemptKilledEvent(getLastAttempt().getAttemptId(), true)); |
| assertEquals(TaskAttemptEventType.TA_RESCHEDULE, |
| taskAttemptEventHandler.lastTaskAttemptEvent.getType()); |
| assertTaskScheduledState(); |
| } |
| |
| @Test |
| public void testTaskProgress() { |
| LOG.info("--- START: testTaskProgress ---"); |
| mockTask = createMockTask(TaskType.MAP); |
| |
| // launch task |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| float progress = 0f; |
| assert(mockTask.getProgress() == progress); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| // update attempt1 |
| progress = 50f; |
| updateLastAttemptProgress(progress); |
| assert(mockTask.getProgress() == progress); |
| progress = 100f; |
| updateLastAttemptProgress(progress); |
| assert(mockTask.getProgress() == progress); |
| |
| progress = 0f; |
| // mark first attempt as killed |
| updateLastAttemptState(TaskAttemptState.KILLED); |
| assert(mockTask.getProgress() == progress); |
| |
| // kill first attempt |
| // should trigger a new attempt |
| // as no successful attempts |
| killRunningTaskAttempt(getLastAttempt().getAttemptId()); |
| assert(taskAttempts.size() == 2); |
| |
| assert(mockTask.getProgress() == 0f); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| progress = 50f; |
| updateLastAttemptProgress(progress); |
| assert(mockTask.getProgress() == progress); |
| |
| } |
| |
| |
| @Test |
| public void testKillDuringTaskAttemptCommit() { |
| mockTask = createMockTask(TaskType.REDUCE); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| updateLastAttemptState(TaskAttemptState.COMMIT_PENDING); |
| commitTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| TaskAttemptId commitAttempt = getLastAttempt().getAttemptId(); |
| updateLastAttemptState(TaskAttemptState.KILLED); |
| killRunningTaskAttempt(commitAttempt); |
| |
| assertFalse(mockTask.canCommit(commitAttempt)); |
| } |
| |
| @Test |
| public void testFailureDuringTaskAttemptCommit() { |
| mockTask = createMockTask(TaskType.MAP); |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| updateLastAttemptState(TaskAttemptState.COMMIT_PENDING); |
| commitTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| // During the task attempt commit there is an exception which causes |
| // the attempt to fail |
| updateLastAttemptState(TaskAttemptState.FAILED); |
| failRunningTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| assertEquals(2, taskAttempts.size()); |
| updateLastAttemptState(TaskAttemptState.SUCCEEDED); |
| commitTaskAttempt(getLastAttempt().getAttemptId()); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| |
| assertFalse("First attempt should not commit", |
| mockTask.canCommit(taskAttempts.get(0).getAttemptId())); |
| assertTrue("Second attempt should commit", |
| mockTask.canCommit(getLastAttempt().getAttemptId())); |
| |
| assertTaskSucceededState(); |
| } |
| |
| private void runSpeculativeTaskAttemptSucceeds( |
| TaskEventType firstAttemptFinishEvent) { |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| updateLastAttemptState(TaskAttemptState.RUNNING); |
| |
| // Add a speculative task attempt that succeeds |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| commitTaskAttempt(getLastAttempt().getAttemptId()); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| |
| // The task should now have succeeded |
| assertTaskSucceededState(); |
| |
| // Now complete the first task attempt, after the second has succeeded |
| if (firstAttemptFinishEvent.equals(TaskEventType.T_ATTEMPT_FAILED)) { |
| mockTask.handle(new TaskTAttemptFailedEvent(taskAttempts |
| .get(0).getAttemptId())); |
| } else { |
| mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), |
| firstAttemptFinishEvent)); |
| } |
| |
| // The task should still be in the succeeded state |
| assertTaskSucceededState(); |
| |
| // The task should contain speculative a task attempt |
| assertTaskAttemptAvataar(Avataar.SPECULATIVE); |
| } |
| |
| @Test |
| public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { |
| mockTask = createMockTask(TaskType.MAP); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); |
| } |
| |
| @Test |
| public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { |
| mockTask = createMockTask(TaskType.REDUCE); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); |
| } |
| |
| @Test |
| public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() { |
| mockTask = createMockTask(TaskType.MAP); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); |
| } |
| |
| @Test |
| public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() { |
| mockTask = createMockTask(TaskType.REDUCE); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); |
| } |
| |
| @Test |
| public void testMultipleTaskAttemptsSucceed() { |
| mockTask = createMockTask(TaskType.MAP); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); |
| } |
| |
| @Test |
| public void testCommitAfterSucceeds() { |
| mockTask = createMockTask(TaskType.REDUCE); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING); |
| } |
| |
| @Test |
| public void testSpeculativeMapFetchFailure() { |
| // Setup a scenario where speculative task wins, first attempt killed |
| mockTask = createMockTask(TaskType.MAP); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); |
| assertEquals(2, taskAttempts.size()); |
| |
| // speculative attempt retroactively fails from fetch failures |
| mockTask.handle(new TaskTAttemptFailedEvent( |
| taskAttempts.get(1).getAttemptId())); |
| |
| assertTaskScheduledState(); |
| assertEquals(3, taskAttempts.size()); |
| } |
| |
| @Test |
| public void testSpeculativeMapMultipleSucceedFetchFailure() { |
| // Setup a scenario where speculative task wins, first attempt succeeds |
| mockTask = createMockTask(TaskType.MAP); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); |
| assertEquals(2, taskAttempts.size()); |
| |
| // speculative attempt retroactively fails from fetch failures |
| mockTask.handle(new TaskTAttemptFailedEvent( |
| taskAttempts.get(1).getAttemptId())); |
| |
| assertTaskScheduledState(); |
| assertEquals(3, taskAttempts.size()); |
| } |
| |
| @Test |
| public void testSpeculativeMapFailedFetchFailure() { |
| // Setup a scenario where speculative task wins, first attempt succeeds |
| mockTask = createMockTask(TaskType.MAP); |
| runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); |
| assertEquals(2, taskAttempts.size()); |
| |
| // speculative attempt retroactively fails from fetch failures |
| mockTask.handle(new TaskTAttemptFailedEvent( |
| taskAttempts.get(1).getAttemptId())); |
| |
| assertTaskScheduledState(); |
| assertEquals(3, taskAttempts.size()); |
| } |
| |
| @Test |
| public void testFailedTransitions() { |
| mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), |
| remoteJobConfFile, conf, taskAttemptListener, jobToken, |
| credentials, clock, startCount, metrics, appContext, TaskType.MAP) { |
| @Override |
| protected int getMaxAttempts() { |
| return 1; |
| } |
| }; |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| // add three more speculative attempts |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| assertEquals(4, taskAttempts.size()); |
| |
| // have the first attempt fail, verify task failed due to no retries |
| MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); |
| taskAttempt.setState(TaskAttemptState.FAILED); |
| mockTask.handle(new TaskTAttemptFailedEvent( |
| taskAttempt.getAttemptId())); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| |
| // verify task can no longer be killed |
| mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| |
| // verify speculative doesn't launch new tasks |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ATTEMPT_LAUNCHED)); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| assertEquals(4, taskAttempts.size()); |
| |
| // verify attempt events from active tasks don't knock task out of FAILED |
| taskAttempt = taskAttempts.get(1); |
| taskAttempt.setState(TaskAttemptState.COMMIT_PENDING); |
| mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), |
| TaskEventType.T_ATTEMPT_COMMIT_PENDING)); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| taskAttempt.setState(TaskAttemptState.FAILED); |
| mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId())); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| taskAttempt = taskAttempts.get(2); |
| taskAttempt.setState(TaskAttemptState.SUCCEEDED); |
| mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| taskAttempt = taskAttempts.get(3); |
| taskAttempt.setState(TaskAttemptState.KILLED); |
| mockTask.handle(new TaskTAttemptKilledEvent(taskAttempt.getAttemptId(), |
| false)); |
| assertEquals(TaskState.FAILED, mockTask.getState()); |
| } |
| |
| private class PartialAttemptEventHandler implements EventHandler { |
| |
| @Override |
| public void handle(Event event) { |
| if (event instanceof TaskAttemptEvent) |
| if (event.getType() == TaskAttemptEventType.TA_RESCHEDULE) { |
| TaskAttempt attempt = mockTask.getAttempt(((TaskAttemptEvent) event).getTaskAttemptID()); |
| ((MockTaskAttemptImpl)attempt).setRescheduled(true); |
| } |
| } |
| } |
| |
| @Test |
| public void testFailedTransitionWithHangingSpeculativeMap() { |
| mockTask = new MockTaskImpl(jobId, partition, new PartialAttemptEventHandler(), |
| remoteJobConfFile, conf, taskAttemptListener, jobToken, |
| credentials, clock, startCount, metrics, appContext, TaskType.MAP) { |
| @Override |
| protected int getMaxAttempts() { |
| return 4; |
| } |
| }; |
| |
| // start a new task, schedule and launch a new attempt |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| // add a speculative attempt(#2), but not launch it |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| |
| // have the first attempt(#1) fail, verify task still running since the |
| // max attempts is 4 |
| MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); |
| taskAttempt.setState(TaskAttemptState.FAILED); |
| mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId())); |
| assertEquals(TaskState.RUNNING, mockTask.getState()); |
| |
| // verify a new attempt(#3) added because the speculative attempt(#2) |
| // is hanging |
| assertEquals(3, taskAttempts.size()); |
| |
| // verify the speculative attempt(#2) is not a rescheduled attempt |
| assertEquals(false, taskAttempts.get(1).getRescheduled()); |
| |
| // verify the third attempt is a rescheduled attempt |
| assertEquals(true, taskAttempts.get(2).getRescheduled()); |
| |
| // now launch the latest attempt(#3) and set the internal state to running |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| |
| // have the speculative attempt(#2) fail, verify task still since it |
| // hasn't reach the max attempts which is 4 |
| MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1); |
| taskAttempt1.setState(TaskAttemptState.FAILED); |
| mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt1.getAttemptId())); |
| assertEquals(TaskState.RUNNING, mockTask.getState()); |
| |
| // verify there's no new attempt added because of the running attempt(#3) |
| assertEquals(3, taskAttempts.size()); |
| } |
| |
| @Test |
| public void testCountersWithSpeculation() { |
| mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), |
| remoteJobConfFile, conf, taskAttemptListener, jobToken, |
| credentials, clock, startCount, metrics, appContext, TaskType.MAP) { |
| @Override |
| protected int getMaxAttempts() { |
| return 1; |
| } |
| }; |
| TaskId taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| updateLastAttemptState(TaskAttemptState.RUNNING); |
| MockTaskAttemptImpl baseAttempt = getLastAttempt(); |
| |
| // add a speculative attempt |
| mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(getLastAttempt().getAttemptId()); |
| updateLastAttemptState(TaskAttemptState.RUNNING); |
| MockTaskAttemptImpl specAttempt = getLastAttempt(); |
| assertEquals(2, taskAttempts.size()); |
| |
| Counters specAttemptCounters = new Counters(); |
| Counter cpuCounter = specAttemptCounters.findCounter( |
| TaskCounter.CPU_MILLISECONDS); |
| cpuCounter.setValue(1000); |
| specAttempt.setCounters(specAttemptCounters); |
| |
| // have the spec attempt succeed but second attempt at 1.0 progress as well |
| commitTaskAttempt(specAttempt.getAttemptId()); |
| specAttempt.setProgress(1.0f); |
| specAttempt.setState(TaskAttemptState.SUCCEEDED); |
| mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| assertEquals(TaskState.SUCCEEDED, mockTask.getState()); |
| baseAttempt.setProgress(1.0f); |
| |
| Counters taskCounters = mockTask.getCounters(); |
| assertEquals("wrong counters for task", specAttemptCounters, taskCounters); |
| } |
| |
| public static class MockTaskAttemptEventHandler implements EventHandler { |
| public TaskAttemptEvent lastTaskAttemptEvent; |
| @Override |
| public void handle(Event event) { |
| if (event instanceof TaskAttemptEvent) { |
| lastTaskAttemptEvent = (TaskAttemptEvent)event; |
| } |
| } |
| }; |
| } |