| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.mock; |
| |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.event.InlineDispatcher; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; |
| import org.apache.tez.dag.api.oldrecords.TaskAttemptState; |
| import org.apache.tez.dag.api.oldrecords.TaskState; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.TaskAttemptListener; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.TaskStateInternal; |
| import org.apache.tez.dag.app.dag.TaskTerminationCause; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.event.TaskEvent; |
| import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; |
| import org.apache.tez.dag.app.dag.event.TaskEventTermination; |
| import org.apache.tez.dag.app.dag.event.TaskEventType; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestTaskImpl { |
| |
| private static final Log LOG = LogFactory.getLog(TestTaskImpl.class); |
| |
| private int taskCounter = 0; |
| private final int partition = 1; |
| |
| private InlineDispatcher dispatcher; |
| |
| private Configuration conf; |
| private TaskAttemptListener taskAttemptListener; |
| private TaskHeartbeatHandler taskHeartbeatHandler; |
| private Credentials credentials; |
| private Clock clock; |
| private TaskLocationHint locationHint; |
| |
| private ApplicationId appId; |
| private TezDAGID dagId; |
| private TezVertexID vertexId; |
| private AppContext appContext; |
| private Resource taskResource; |
| private Map<String, LocalResource> localResources; |
| private Map<String, String> environment; |
| private String javaOpts; |
| private boolean leafVertex; |
| private ContainerContext containerContext; |
| |
| private MockTaskImpl mockTask; |
| |
| @Before |
| public void setup() { |
| dispatcher = new InlineDispatcher(); |
| conf = new Configuration(); |
| taskAttemptListener = mock(TaskAttemptListener.class); |
| taskHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| credentials = null; |
| clock = new SystemClock(); |
| locationHint = new TaskLocationHint(null, null); |
| |
| appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| dagId = new TezDAGID(appId, 1); |
| vertexId = new TezVertexID(dagId, 1); |
| appContext = mock(AppContext.class); |
| taskResource = Resource.newInstance(1024, 1); |
| localResources = new HashMap<String, LocalResource>(); |
| environment = new HashMap<String, String>(); |
| javaOpts = ""; |
| leafVertex = false; |
| containerContext = new ContainerContext(localResources, credentials, |
| environment, javaOpts); |
| Vertex vertex = mock(Vertex.class); |
| |
| mockTask = new MockTaskImpl(vertexId, partition, |
| dispatcher.getEventHandler(), conf, taskAttemptListener, clock, |
| taskHeartbeatHandler, appContext, leafVertex, locationHint, |
| taskResource, containerContext, vertex); |
| } |
| |
| private TezTaskID getNewTaskID() { |
| TezTaskID taskID = new TezTaskID(vertexId, ++taskCounter); |
| return taskID; |
| } |
| |
| private void scheduleTaskAttempt(TezTaskID taskId) { |
| mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE)); |
| assertTaskScheduledState(); |
| } |
| |
| private void killTask(TezTaskID taskId) { |
| mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL)); |
| assertTaskKillWaitState(); |
| } |
| |
| private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) { |
| mockTask.handle(new TaskEventTAUpdate(attemptId, |
| TaskEventType.T_ATTEMPT_KILLED)); |
| assertTaskScheduledState(); |
| } |
| |
| private void launchTaskAttempt(TezTaskAttemptID attemptId) { |
| mockTask.handle(new TaskEventTAUpdate(attemptId, |
| TaskEventType.T_ATTEMPT_LAUNCHED)); |
| assertTaskRunningState(); |
| } |
| |
| private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) { |
| attempt.setProgress(p); |
| } |
| |
| private void updateAttemptState(MockTaskAttemptImpl attempt, |
| TaskAttemptState s) { |
| attempt.setState(s); |
| } |
| |
| private void killRunningTaskAttempt(TezTaskAttemptID attemptId) { |
| mockTask.handle(new TaskEventTAUpdate(attemptId, |
| TaskEventType.T_ATTEMPT_KILLED)); |
| assertTaskRunningState(); |
| } |
| |
| private void failRunningTaskAttempt(TezTaskAttemptID attemptId) { |
| mockTask.handle(new TaskEventTAUpdate(attemptId, |
| TaskEventType.T_ATTEMPT_FAILED)); |
| assertTaskRunningState(); |
| } |
| |
| /** |
| * {@link TaskState#NEW} |
| */ |
| private void assertTaskNewState() { |
| assertEquals(TaskState.NEW, mockTask.getState()); |
| } |
| |
| /** |
| * {@link TaskState#SCHEDULED} |
| */ |
| private void assertTaskScheduledState() { |
| assertEquals(TaskState.SCHEDULED, mockTask.getState()); |
| } |
| |
| /** |
| * {@link TaskState#RUNNING} |
| */ |
| private void assertTaskRunningState() { |
| assertEquals(TaskState.RUNNING, mockTask.getState()); |
| } |
| |
| /** |
| * {@link TaskState#TERMINATING} |
| */ |
| private void assertTaskKillWaitState() { |
| assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState()); |
| } |
| |
| /** |
| * {@link TaskState#SUCCEEDED} |
| */ |
| private void assertTaskSucceededState() { |
| assertEquals(TaskState.SUCCEEDED, mockTask.getState()); |
| } |
| |
| @Test |
| public void testInit() { |
| LOG.info("--- START: testInit ---"); |
| assertTaskNewState(); |
| assert (mockTask.getAttemptList().size() == 0); |
| } |
| |
| @Test |
| /** |
| * {@link TaskState#NEW}->{@link TaskState#SCHEDULED} |
| */ |
| public void testScheduleTask() { |
| LOG.info("--- START: testScheduleTask ---"); |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| } |
| |
| @Test |
| /** |
| * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT} |
| */ |
| public void testKillScheduledTask() { |
| LOG.info("--- START: testKillScheduledTask ---"); |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| killTask(taskId); |
| } |
| |
| @Test |
| /** |
| * Kill attempt |
| * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED} |
| */ |
| public void testKillScheduledTaskAttempt() { |
| LOG.info("--- START: testKillScheduledTaskAttempt ---"); |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| killScheduledTaskAttempt(mockTask.getLastAttempt().getID()); |
| } |
| |
| @Test |
| /** |
| * Launch attempt |
| * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING} |
| */ |
| public void testLaunchTaskAttempt() { |
| LOG.info("--- START: testLaunchTaskAttempt ---"); |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| } |
| |
| @Test |
| /** |
| * Kill running attempt |
| * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} |
| */ |
| public void testKillRunningTaskAttempt() { |
| LOG.info("--- START: testKillRunningTaskAttempt ---"); |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| killRunningTaskAttempt(mockTask.getLastAttempt().getID()); |
| } |
| |
| @Test |
| public void testTaskProgress() { |
| LOG.info("--- START: testTaskProgress ---"); |
| |
| // launch task |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| float progress = 0f; |
| assert (mockTask.getProgress() == progress); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| |
| // update attempt1 |
| progress = 50f; |
| updateAttemptProgress(mockTask.getLastAttempt(), progress); |
| assert (mockTask.getProgress() == progress); |
| progress = 100f; |
| updateAttemptProgress(mockTask.getLastAttempt(), progress); |
| assert (mockTask.getProgress() == progress); |
| |
| progress = 0f; |
| // mark first attempt as killed |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.KILLED); |
| assert (mockTask.getProgress() == progress); |
| |
| // kill first attempt |
| // should trigger a new attempt |
| // as no successful attempts |
| killRunningTaskAttempt(mockTask.getLastAttempt().getID()); |
| assert (mockTask.getAttemptList().size() == 2); |
| |
| assert (mockTask.getProgress() == 0f); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| progress = 50f; |
| updateAttemptProgress(mockTask.getLastAttempt(), progress); |
| assert (mockTask.getProgress() == progress); |
| } |
| |
| @Test |
| public void testFailureDuringTaskAttemptCommit() { |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); |
| assertTrue("First attempt should commit", |
| mockTask.canCommit(mockTask.getLastAttempt().getID())); |
| |
| // During the task attempt commit there is an exception which causes |
| // the attempt to fail |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED); |
| failRunningTaskAttempt(mockTask.getLastAttempt().getID()); |
| |
| assertEquals(2, mockTask.getAttemptList().size()); |
| |
| assertFalse("First attempt should not commit", |
| mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); |
| assertTrue("Second attempt should commit", |
| mockTask.canCommit(mockTask.getLastAttempt().getID())); |
| |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); |
| mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| |
| assertTaskSucceededState(); |
| } |
| |
| |
| @Test |
| public void testChangeCommitTaskAttempt() { |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); |
| |
| // Add a speculative task attempt that succeeds |
| mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); |
| |
| assertTrue("Second attempt should commit", |
| mockTask.canCommit(mockTask.getAttemptList().get(1).getID())); |
| assertFalse("First attempt should not commit", |
| mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); |
| |
| // During the task attempt commit there is an exception which causes |
| // the second attempt to fail |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED); |
| failRunningTaskAttempt(mockTask.getLastAttempt().getID()); |
| |
| assertEquals(2, mockTask.getAttemptList().size()); |
| |
| assertFalse("Second attempt should not commit", |
| mockTask.canCommit(mockTask.getAttemptList().get(1).getID())); |
| assertTrue("First attempt should commit", |
| mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); |
| |
| updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED); |
| mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0).getID(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| |
| assertTaskSucceededState(); |
| } |
| |
| @Test |
| public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { |
| TezTaskID taskId = getNewTaskID(); |
| scheduleTaskAttempt(taskId); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); |
| |
| // Add a speculative task attempt that succeeds |
| mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), |
| TaskEventType.T_ADD_SPEC_ATTEMPT)); |
| launchTaskAttempt(mockTask.getLastAttempt().getID()); |
| mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| |
| // The task should now have succeeded |
| assertTaskSucceededState(); |
| |
| // Now fail the first task attempt, after the second has succeeded |
| mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0) |
| .getID(), TaskEventType.T_ATTEMPT_FAILED)); |
| |
| // The task should still be in the succeeded state |
| assertTaskSucceededState(); |
| |
| } |
| |
| // TODO Add test to validate the correct commit attempt. |
| |
| @SuppressWarnings("rawtypes") |
| private class MockTaskImpl extends TaskImpl { |
| |
| private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>(); |
| private Vertex vertex; |
| |
| public MockTaskImpl(TezVertexID vertexId, int partition, |
| EventHandler eventHandler, Configuration conf, |
| TaskAttemptListener taskAttemptListener, Clock clock, |
| TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, |
| TaskLocationHint locationHint, Resource resource, |
| ContainerContext containerContext, Vertex vertex) { |
| super(vertexId, partition, eventHandler, conf, taskAttemptListener, |
| clock, thh, appContext, leafVertex, locationHint, resource, |
| containerContext); |
| this.vertex = vertex; |
| } |
| |
| @Override |
| protected TaskAttemptImpl createAttempt(int attemptNumber) { |
| MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), |
| attemptNumber, eventHandler, taskAttemptListener, |
| conf, clock, taskHeartbeatHandler, appContext, |
| locationHint, true, taskResource, containerContext); |
| taskAttempts.add(attempt); |
| return attempt; |
| } |
| |
| @Override |
| protected void internalError(TaskEventType type) { |
| super.internalError(type); |
| fail("Internal error: " + type); |
| } |
| |
| MockTaskAttemptImpl getLastAttempt() { |
| return taskAttempts.get(taskAttempts.size() - 1); |
| } |
| |
| List<MockTaskAttemptImpl> getAttemptList() { |
| return taskAttempts; |
| } |
| |
| @Override |
| public Vertex getVertex() { |
| return vertex; |
| } |
| |
| protected void logJobHistoryTaskStartedEvent() { |
| } |
| |
| protected void logJobHistoryTaskFinishedEvent() { |
| } |
| |
| protected void logJobHistoryTaskFailedEvent(TaskState finalState) { |
| } |
| } |
| |
| @SuppressWarnings("rawtypes") |
| public class MockTaskAttemptImpl extends TaskAttemptImpl { |
| |
| private float progress = 0; |
| private TaskAttemptState state = TaskAttemptState.NEW; |
| |
| public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, |
| EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, |
| Clock clock, TaskHeartbeatHandler thh, AppContext appContext, |
| TaskLocationHint locationHing, boolean isRescheduled, |
| Resource resource, ContainerContext containerContext) { |
| super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, |
| appContext, locationHing, isRescheduled, resource, containerContext); |
| } |
| |
| @Override |
| public float getProgress() { |
| return progress; |
| } |
| |
| public void setProgress(float progress) { |
| this.progress = progress; |
| } |
| |
| public void setState(TaskAttemptState state) { |
| this.state = state; |
| } |
| |
| @Override |
| public TaskAttemptState getState() { |
| return state; |
| } |
| |
| @Override |
| public TaskAttemptState getStateNoLock() { |
| return state; |
| } |
| } |
| |
| } |