| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import org.apache.tez.dag.app.MockClock; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.anyInt; |
| import static org.mockito.Mockito.atLeast; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.RETURNS_DEEP_STUBS; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RawLocalFileSystem; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.LogManager; |
| import org.apache.tez.common.MockDNSToSwitchMapping; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.TezConstants; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; |
| import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; |
| import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; |
| import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; |
| import org.apache.tez.runtime.api.TaskFailureType; |
| import org.apache.tez.serviceplugins.api.TaskCommunicator; |
| import org.apache.tez.dag.api.TaskLocationHint; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.oldrecords.TaskAttemptState; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ClusterInfo; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.ContainerHeartbeatHandler; |
| import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; |
| import org.apache.tez.dag.app.TaskCommunicatorWrapper; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; |
| import org.apache.tez.dag.app.dag.DAG; |
| import org.apache.tez.dag.app.dag.Task; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.event.DAGEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventType; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; |
| import org.apache.tez.dag.app.dag.event.TaskEvent; |
| import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; |
| import org.apache.tez.dag.app.dag.event.TaskEventType; |
| import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate; |
| import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; |
| import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; |
| import org.apache.tez.dag.app.rm.container.AMContainerMap; |
| import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.HistoryEventHandler; |
| import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; |
| import org.apache.tez.dag.records.TaskAttemptTerminationCause; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.dag.utils.TezBuilderUtils; |
| import org.apache.tez.runtime.api.events.InputReadErrorEvent; |
| import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; |
| import org.apache.tez.runtime.api.impl.EventMetaData; |
| import org.apache.tez.runtime.api.impl.TezEvent; |
| import org.apache.tez.serviceplugins.api.ServicePluginException; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public class TestTaskAttempt { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestTaskAttempt.class); |
| |
| static public class StubbedFS extends RawLocalFileSystem { |
| @Override |
| public FileStatus getFileStatus(Path f) throws IOException { |
| return new FileStatus(1, false, 1, 1, 1, f); |
| } |
| } |
| |
| AppContext appCtx; |
| TezConfiguration vertexConf = new TezConfiguration(); |
| TaskLocationHint locationHint; |
| Vertex mockVertex; |
| Task mockTask; |
| ServicePluginInfo servicePluginInfo = new ServicePluginInfo() |
| .setContainerLauncherName(TezConstants.getTezYarnServicePluginName()); |
| |
| @BeforeClass |
| public static void setup() { |
| MockDNSToSwitchMapping.initializeMockRackResolver(); |
| } |
| |
| @Before |
| public void setupTest() { |
| appCtx = mock(AppContext.class); |
| when(appCtx.getAMConf()).thenReturn(new Configuration()); |
| when(appCtx.getContainerLauncherName(anyInt())).thenReturn( |
| TezConstants.getTezYarnServicePluginName()); |
| |
| createMockVertex(vertexConf); |
| mockTask = mock(Task.class); |
| when(mockTask.getVertex()).thenReturn(mockVertex); |
| |
| HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); |
| doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); |
| LogManager.getRootLogger().setLevel(Level.DEBUG); |
| } |
| |
| private void createMockVertex(Configuration conf) { |
| mockVertex = mock(Vertex.class); |
| when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); |
| when(mockVertex.getVertexConfig()).thenReturn( |
| new VertexImpl.VertexConfigImpl(conf)); |
| } |
| |
| @Test(timeout = 5000) |
| public void testLocalityRequest() { |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| Set<String> hosts = new TreeSet<String>(); |
| hosts.add("host1"); |
| hosts.add("host2"); |
| hosts.add("host3"); |
| locationHint = TaskLocationHint.createTaskLocationHint(hosts, null); |
| |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, |
| false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); |
| |
| sta.transition(taImpl, sEvent); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) { |
| fail("Second event not of type " |
| + AMSchedulerEventTALaunchRequest.class.getName()); |
| } |
| |
| // TODO Move the Rack request check to the client after TEZ-125 is fixed. |
| Set<String> requestedRacks = taImpl.taskRacks; |
| assertEquals(1, requestedRacks.size()); |
| assertEquals(3, taImpl.taskHosts.size()); |
| for (int i = 0; i < 3; i++) { |
| String host = ("host" + (i + 1)); |
| assertEquals(host, true, taImpl.taskHosts.contains(host)); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testRetriesAtSamePriorityConfig() { |
| |
| // Override the test defaults to setup the config change |
| TezConfiguration vertexConf = new TezConfiguration(); |
| vertexConf.setBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, false); |
| vertexConf.setBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY, true); |
| when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf)); |
| |
| // set locality |
| Set<String> hosts = new TreeSet<String>(); |
| hosts.add("host1"); |
| locationHint = TaskLocationHint.createTaskLocationHint(hosts, null); |
| |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, |
| false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, |
| true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); |
| when(sEvent.getPriorityLowLimit()).thenReturn(3); |
| when(sEvent.getPriorityHighLimit()).thenReturn(1); |
| |
| // Verify priority for a non-retried attempt |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| AMSchedulerEventTALaunchRequest launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue(); |
| Assert.assertEquals(2, launchEvent.getPriority()); |
| Assert.assertEquals(1, launchEvent.getLocationHint().getHosts().size()); |
| Assert.assertTrue(launchEvent.getLocationHint().getHosts().contains("host1")); |
| |
| // Verify priority for a retried attempt is the same |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(2)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue(); |
| Assert.assertEquals(2, launchEvent.getPriority()); |
| Assert.assertNull(launchEvent.getLocationHint()); |
| } |
| |
| @Test(timeout = 5000) |
| public void testPriority() { |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, |
| false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, |
| true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); |
| when(sEvent.getPriorityLowLimit()).thenReturn(3); |
| when(sEvent.getPriorityHighLimit()).thenReturn(1); |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| AMSchedulerEventTALaunchRequest launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(2, launchEvent.getPriority()); |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(2)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(1, launchEvent.getPriority()); |
| |
| when(sEvent.getPriorityLowLimit()).thenReturn(6); |
| when(sEvent.getPriorityHighLimit()).thenReturn(4); |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(3)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(5, launchEvent.getPriority()); |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(4)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(4, launchEvent.getPriority()); |
| |
| when(sEvent.getPriorityLowLimit()).thenReturn(5); |
| when(sEvent.getPriorityHighLimit()).thenReturn(5); |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(5)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(5, launchEvent.getPriority()); |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(6)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(5, launchEvent.getPriority()); |
| } |
| |
| @Test(timeout = 5000) |
| // Tests that an attempt is made to resolve the localized hosts to racks. |
| // TODO Move to the client post TEZ-125. |
| public void testHostResolveAttempt() throws Exception { |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| String hosts[] = new String[] { "127.0.0.1", "host2", "host3" }; |
| Set<String> resolved = new TreeSet<String>( |
| Arrays.asList(new String[]{ "host1", "host2", "host3" })); |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new TreeSet<String>(Arrays.asList(hosts)), null); |
| |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskCommunicatorManagerInterface.class), new Configuration(), |
| new SystemClock(), mock(TaskHeartbeatHandler.class), |
| appCtx, false, Resource.newInstance(1024, |
| 1), createFakeContainerContext(), false); |
| |
| TaskAttemptImpl spyTa = spy(taImpl); |
| when(spyTa.resolveHosts(hosts)).thenReturn( |
| resolved.toArray(new String[3])); |
| |
| TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class); |
| |
| sta.transition(spyTa, mockTAEvent); |
| verify(spyTa).resolveHosts(hosts); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) { |
| fail("Second Event not of type ContainerRequestEvent"); |
| } |
| Map<String, Boolean> expected = new HashMap<String, Boolean>(); |
| expected.put("host1", true); |
| expected.put("host2", true); |
| expected.put("host3", true); |
| Set<String> requestedHosts = spyTa.taskHosts; |
| for (String h : requestedHosts) { |
| expected.remove(h); |
| } |
| assertEquals(0, expected.size()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure the dag does not go into an error state if a attempt kill is |
| // received while STARTING |
| public void testLaunchFailedWhileKilling() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = new MockEventHandler(); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| AppContext mockAppContext = appCtx; |
| doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), mockAppContext, false, |
| resource, createFakeContainerContext(), false); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null, |
| TaskAttemptTerminationCause.TERMINATED_BY_CLIENT)); |
| assertEquals(TaskAttemptStateInternal.KILL_IN_PROGRESS, taImpl.getInternalState()); |
| taImpl.handle(new TaskAttemptEventTezEventUpdate(taImpl.getID(), Collections.EMPTY_LIST)); |
| assertFalse( |
| "InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in KILL_IN_PROGRESS state", |
| eventHandler.internalError); |
| // At some KILLING state. |
| taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null, |
| TaskAttemptTerminationCause.TERMINATED_BY_CLIENT)); |
| // taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, |
| // null)); |
| assertFalse(eventHandler.internalError); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure ContainerTerminating and ContainerTerminated is handled correctly by |
| // the TaskAttempt |
| public void testContainerTerminationWhileRunning() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| assertEquals("Task attempt is not in the STARTING state", taImpl.getState(), |
| TaskAttemptState.STARTING); |
| assertEquals("Task attempt internal state is not at SUBMITTED", taImpl.getInternalState(), |
| TaskAttemptStateInternal.SUBMITTED); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 5; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, |
| "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR)); |
| assertFalse( |
| "InternalError occurred trying to handle TA_CONTAINER_TERMINATING", |
| eventHandler.internalError); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals("Task attempt is not in the FAILED state", taImpl.getState(), |
| TaskAttemptState.FAILED); |
| |
| assertEquals(1, taImpl.getDiagnostics().size()); |
| assertEquals("Terminating", taImpl.getDiagnostics().get(0)); |
| assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| Event event = verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); |
| TaskEventTAFailed failedEvent = (TaskEventTAFailed) event; |
| assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, |
| "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| |
| assertEquals(2, taImpl.getDiagnostics().size()); |
| assertEquals("Terminated", taImpl.getDiagnostics().get(1)); |
| |
| // check that original error cause is retained |
| assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); |
| } |
| |
| |
| @Test(timeout = 5000) |
| // Ensure ContainerTerminated is handled correctly by the TaskAttempt |
| public void testContainerTerminatedWhileRunning() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| int expectedEventsAtRunning = 5; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| assertEquals("Task attempt is not in running state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "Terminated", |
| TaskAttemptTerminationCause.CONTAINER_EXITED)); |
| assertFalse( |
| "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", |
| eventHandler.internalError); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals("Terminated", taImpl.getDiagnostics().get(0)); |
| assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, taImpl.getTerminationCause()); |
| // TODO Ensure TA_TERMINATING after this is ingored. |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| Event event = verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); |
| TaskEventTAFailed failedEvent = (TaskEventTAFailed) event; |
| assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, |
| "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure ContainerTerminating and ContainerTerminated is handled correctly by |
| // the TaskAttempt |
| public void testContainerTerminatedAfterSuccess() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 5; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, |
| "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| |
| // Verify that the diagnostic message included in the Terminated event is not |
| // captured - TA already succeeded. Error cause is the default value. |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause()); |
| } |
| |
| @Test(timeout = 5000) |
| public void testLastDataEventRecording() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| |
| long ts1 = 1024; |
| long ts2 = 2048; |
| TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class); |
| TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class); |
| TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS); |
| when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1); |
| when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1); |
| TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS); |
| when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2); |
| when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2); |
| TaskAttemptEventStatusUpdate statusEvent = |
| new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)); |
| |
| assertEquals(0, taImpl.lastDataEvents.size()); |
| taImpl.setLastEventSent(mockTezEvent1); |
| assertEquals(1, taImpl.lastDataEvents.size()); |
| assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp()); |
| assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId()); |
| taImpl.handle(statusEvent); |
| taImpl.setLastEventSent(mockTezEvent2); |
| assertEquals(1, taImpl.lastDataEvents.size()); |
| assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp()); |
| assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value |
| statusEvent.setReadErrorReported(true); |
| taImpl.handle(statusEvent); |
| taImpl.setLastEventSent(mockTezEvent1); |
| assertEquals(2, taImpl.lastDataEvents.size()); |
| assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp()); |
| assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event |
| taImpl.setLastEventSent(mockTezEvent2); |
| assertEquals(2, taImpl.lastDataEvents.size()); |
| assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp()); |
| assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value |
| } |
| |
| @Test(timeout = 5000) |
| public void testFailure() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 6; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| verifyEventType( |
| arg.getAllValues().subList(0, |
| expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| |
| taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, |
| TaskFailureType.NON_FATAL, "0", |
| TaskAttemptTerminationCause.APPLICATION_ERROR)); |
| |
| assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), |
| TaskAttemptStateInternal.FAIL_IN_PROGRESS); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(1, taImpl.getDiagnostics().size()); |
| assertEquals("0", taImpl.getDiagnostics().get(0)); |
| assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); |
| |
| assertEquals(TaskAttemptStateInternal.FAIL_IN_PROGRESS, taImpl.getInternalState()); |
| taImpl.handle(new TaskAttemptEventTezEventUpdate(taImpl.getID(), Collections.EMPTY_LIST)); |
| assertFalse( |
| "InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in FAIL_IN_PROGRESS state", |
| eventHandler.internalError); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1", |
| TaskAttemptTerminationCause.CONTAINER_EXITED)); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| assertEquals(2, taImpl.getDiagnostics().size()); |
| assertEquals("1", taImpl.getDiagnostics().get(1)); |
| // err cause does not change |
| assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| |
| Event e = verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); |
| TaskEventTAFailed failedEvent = (TaskEventTAFailed) e; |
| assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); |
| } |
| |
| @Test(timeout = 5000) |
| public void testFailureFatalError() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 6; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| verifyEventType( |
| arg.getAllValues().subList(0, |
| expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| |
| taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, |
| TaskFailureType.FATAL, "0", |
| TaskAttemptTerminationCause.APPLICATION_ERROR)); |
| |
| assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), |
| TaskAttemptStateInternal.FAIL_IN_PROGRESS); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(1, taImpl.getDiagnostics().size()); |
| assertEquals("0", taImpl.getDiagnostics().get(0)); |
| assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); |
| |
| assertEquals(TaskAttemptStateInternal.FAIL_IN_PROGRESS, taImpl.getInternalState()); |
| taImpl.handle(new TaskAttemptEventTezEventUpdate(taImpl.getID(), Collections.EMPTY_LIST)); |
| assertFalse( |
| "InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in FAIL_IN_PROGRESS state", |
| eventHandler.internalError); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1", |
| TaskAttemptTerminationCause.CONTAINER_EXITED)); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| assertEquals(2, taImpl.getDiagnostics().size()); |
| assertEquals("1", taImpl.getDiagnostics().get(1)); |
| // err cause does not change |
| assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| Event e = verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); |
| TaskEventTAFailed failedEvent = (TaskEventTAFailed) e; |
| assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); |
| } |
| |
| @Test |
| public void testProgressTimeStampUpdate() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| Clock mockClock = mock(Clock.class); |
| when(mockClock.getTime()).thenReturn(50l); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, mockClock, |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| when(mockClock.getTime()).thenReturn(100l); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| verify(eventHandler, atLeast(1)).handle(arg.capture()); |
| if (arg.getValue() instanceof TaskAttemptEventAttemptFailed) { |
| TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue(); |
| assertEquals(taImpl.getID(), fEvent.getTaskAttemptID()); |
| assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause()); |
| taImpl.handle(fEvent); |
| fail("Should not fail since the timestamps do not differ by progress interval config"); |
| } else { |
| Assert.assertEquals("Task Attempt's internal state should be RUNNING!", |
| taImpl.getInternalState(), TaskAttemptStateInternal.RUNNING); |
| } |
| when(mockClock.getTime()).thenReturn(200l); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| verify(eventHandler, atLeast(1)).handle(arg.capture()); |
| Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof TaskAttemptEventAttemptFailed); |
| } |
| |
| @Test |
| public void testStatusUpdateWithNullCounters() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| TezCounters counters = new TezCounters(); |
| counters.findCounter("group", "counter").increment(1); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false))); |
| assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue()); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue()); |
| counters.findCounter("group", "counter").increment(1); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false))); |
| assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue()); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue()); |
| } |
| |
| @Test (timeout = 60000L) |
| public void testProgressAfterSubmit() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 50); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| MockClock mockClock = new MockClock(); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, mockClock, |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| mockClock.incrementTime(20L); |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| mockClock.incrementTime(55L); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| verify(eventHandler, atLeast(1)).handle(arg.capture()); |
| if (arg.getValue() instanceof TaskAttemptEvent) { |
| taImpl.handle((TaskAttemptEvent) arg.getValue()); |
| } |
| Assert.assertEquals("Task Attempt's internal state should be SUBMITTED!", |
| taImpl.getInternalState(), TaskAttemptStateInternal.SUBMITTED); |
| } |
| |
| @Test (timeout = 5000) |
| public void testNoProgressFail() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| Clock mockClock = mock(Clock.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, mockClock, |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| when(mockClock.getTime()).thenReturn(100l); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true))); |
| // invocations and time updated |
| assertEquals(100l, taImpl.lastNotifyProgressTimestamp); |
| when(mockClock.getTime()).thenReturn(150l); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true))); |
| // invocations and time updated |
| assertEquals(150l, taImpl.lastNotifyProgressTimestamp); |
| when(mockClock.getTime()).thenReturn(200l); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| // invocations and time not updated |
| assertEquals(150l, taImpl.lastNotifyProgressTimestamp); |
| when(mockClock.getTime()).thenReturn(250l); |
| taImpl.handle(new TaskAttemptEventStatusUpdate( |
| taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| // invocations and time not updated |
| assertEquals(150l, taImpl.lastNotifyProgressTimestamp); |
| // failed event sent to self |
| verify(eventHandler, atLeast(1)).handle(arg.capture()); |
| TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue(); |
| assertEquals(taImpl.getID(), fEvent.getTaskAttemptID()); |
| assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause()); |
| assertEquals(TaskFailureType.NON_FATAL, fEvent.getTaskFailureType()); |
| taImpl.handle(fEvent); |
| |
| assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), |
| TaskAttemptStateInternal.FAIL_IN_PROGRESS); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(1, taImpl.getDiagnostics().size()); |
| assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, taImpl.getTerminationCause()); |
| } |
| |
| @Test(timeout = 5000) |
| public void testEventSerializingHash() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID1 = TezTaskID.getInstance(vertexID, 1); |
| TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2); |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(taskID1, 0); |
| TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(taskID1, 1); |
| TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(taskID2, 1); |
| |
| TaskAttemptEvent taEventFail11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_FAILED); |
| TaskAttemptEvent taEventKill11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_KILL_REQUEST); |
| TaskAttemptEvent taEventKill12 = new TaskAttemptEvent(taID12, TaskAttemptEventType.TA_KILL_REQUEST); |
| TaskAttemptEvent taEventKill21 = new TaskAttemptEvent(taID21, TaskAttemptEventType.TA_KILL_REQUEST); |
| TaskEvent tEventKill1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_KILLED); |
| TaskEvent tEventFail1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_FAILED); |
| TaskEvent tEventFail2 = new TaskEvent(taskID2, TaskEventType.T_ATTEMPT_FAILED); |
| |
| // all of them should have the same value |
| assertEquals(taEventFail11.getSerializingHash(), taEventKill11.getSerializingHash()); |
| assertEquals(taEventKill11.getSerializingHash(), taEventKill12.getSerializingHash()); |
| assertEquals(tEventFail1.getSerializingHash(), tEventKill1.getSerializingHash()); |
| assertEquals(taEventFail11.getSerializingHash(), tEventKill1.getSerializingHash()); |
| assertEquals(taEventKill21.getSerializingHash(), tEventFail2.getSerializingHash()); |
| // events from different tasks may not have the same value |
| assertFalse(tEventFail1.getSerializingHash() == tEventFail2.getSerializingHash()); |
| } |
| |
| @Test(timeout = 5000) |
| public void testCompletedAtSubmitted() throws ServicePluginException { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.STARTING); |
| |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtStarting = 4; |
| verify(eventHandler, times(expectedEventsAtStarting)).handle(arg.capture()); |
| |
| // Ensure status_updates are handled in the submitted state. |
| taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, |
| new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtStarting + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| |
| Event e = verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtStarting, |
| expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); |
| assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtStarting, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtStarting, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| } |
| |
| @Test(timeout = 5000) |
| public void testSuccess() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 6; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| verifyEventType( |
| arg.getAllValues().subList(0, |
| expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, |
| new TaskStatusUpdateEvent(null, 0.1f, null, false))); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| Event e = verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); |
| assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure Container Preemption race with task completion is handled correctly by |
| // the TaskAttempt |
| public void testContainerPreemptedAfterSuccess() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 5; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEventsAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEventsAfterTerminating), TaskEventTASucceeded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEventsAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, |
| TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM)); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| int expectedEventAfterTerminated = expectedEventsAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| |
| // Verify that the diagnostic message included in the Terminated event is not |
| // captured - TA already succeeded. Error cause should be the default value |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure node failure on Successful Non-Leaf tasks cause them to be marked as KILLED |
| public void testNodeFailedNonLeafVertex() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, |
| taImpl.getState()); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 5; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED, |
| taImpl.getState()); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| // Send out a Node Failure. |
| taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned", |
| TaskAttemptTerminationCause.NODE_FAILED)); |
| // Verify in KILLED state |
| assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED, |
| taImpl.getState()); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| assertEquals(true, taImpl.inputFailedReported); |
| // Verify one event to the Task informing it about FAILURE. No events to scheduler. Counter event. |
| int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 2; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEvenstAfterTerminating, |
| expectedEventsNodeFailure), TaskEventTAKilled.class, 1); |
| |
| // Verify still in KILLED state |
| assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED, |
| taImpl.getState()); |
| assertEquals(TaskAttemptTerminationCause.NODE_FAILED, taImpl.getTerminationCause()); |
| |
| assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState()); |
| taImpl.handle(new TaskAttemptEventTezEventUpdate(taImpl.getID(), Collections.EMPTY_LIST)); |
| assertFalse( |
| "InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in KILLED state", |
| eventHandler.internalError); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure node failure on Successful Leaf tasks do not cause them to be marked as KILLED |
| public void testNodeFailedLeafVertex() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), true); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, |
| taImpl.getState()); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| |
| int expectedEventsAtRunning = 5; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED, |
| taImpl.getState()); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| // Send out a Node Failure. |
| taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned", |
| TaskAttemptTerminationCause.NODE_FAILED)); |
| |
| // Verify no additional events |
| int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture()); |
| |
| // Verify still in SUCCEEDED state |
| assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED, |
| taImpl.getState()); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| // error cause remains as default value |
| assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause()); |
| } |
| |
| @Test(timeout = 5000) |
| // Verifies that multiple TooManyFetchFailures are handled correctly by the |
| // TaskAttempt. |
| public void testMultipleOutputFailed() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler mockEh = new MockEventHandler(); |
| MockEventHandler eventHandler = spy(mockEh); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); |
| doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); |
| DAGImpl mockDAG = mock(DAGImpl.class); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, |
| TaskAttemptEventType.TA_DONE)); |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| |
| int expectedEventsTillSucceeded = 8; |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class); |
| verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); |
| verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish |
| DAGHistoryEvent histEvent = histArg.getValue(); |
| TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); |
| long finishTime = finishEvent.getFinishTime(); |
| verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2); |
| |
| InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1); |
| EventMetaData mockMeta = mock(EventMetaData.class); |
| TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); |
| TezTaskID destTaskID = mock(TezTaskID.class); |
| TezVertexID destVertexID = mock(TezVertexID.class); |
| when(mockDestId1.getTaskID()).thenReturn(destTaskID); |
| when(destTaskID.getVertexID()).thenReturn(destVertexID); |
| Vertex destVertex = mock(VertexImpl.class); |
| when(destVertex.getRunningTasks()).thenReturn(11); |
| when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); |
| when(appCtx.getCurrentDAG()).thenReturn(mockDAG); |
| TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta); |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); |
| |
| // failure threshold not met. state is SUCCEEDED |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| |
| // sending same error again doesnt change anything |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| // default value of error cause |
| assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause()); |
| |
| // different destination attempt reports error. now threshold crossed |
| TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2); |
| destTaskID = mock(TezTaskID.class); |
| destVertexID = mock(TezVertexID.class); |
| when(mockDestId2.getTaskID()).thenReturn(destTaskID); |
| when(destTaskID.getVertexID()).thenReturn(destVertexID); |
| destVertex = mock(VertexImpl.class); |
| when(destVertex.getRunningTasks()).thenReturn(11); |
| when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); |
| |
| assertEquals("Task attempt is not in FAILED state", taImpl.getState(), |
| TaskAttemptState.FAILED); |
| assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause()); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); |
| verify(mockHistHandler, times(3)).handle(histArg.capture()); |
| histEvent = histArg.getValue(); |
| finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); |
| assertEquals(TaskFailureType.NON_FATAL, finishEvent.getTaskFailureType()); |
| long newFinishTime = finishEvent.getFinishTime(); |
| Assert.assertEquals(finishTime, newFinishTime); |
| |
| assertEquals(true, taImpl.inputFailedReported); |
| int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2; |
| arg.getAllValues().clear(); |
| verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture()); |
| Event e = verifyEventType( |
| arg.getAllValues().subList(expectedEventsTillSucceeded, |
| expectedEventsAfterFetchFailure), TaskEventTAFailed.class, 1); |
| TaskEventTAFailed failedEvent = (TaskEventTAFailed) e; |
| assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); |
| |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1)); |
| assertEquals("Task attempt is not in FAILED state, still", |
| taImpl.getState(), TaskAttemptState.FAILED); |
| assertFalse( |
| "InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES", |
| eventHandler.internalError); |
| // No new events. |
| verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle( |
| arg.capture()); |
| |
| Configuration newVertexConf = new Configuration(vertexConf); |
| newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, |
| 1); |
| createMockVertex(newVertexConf); |
| |
| TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2); |
| MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID2 = taImpl2.getID(); |
| |
| taImpl2.handle(new TaskAttemptEventSchedule(taskAttemptID2, 0, 0)); |
| taImpl2.handle(new TaskAttemptEventSubmitted(taskAttemptID2, contId)); |
| taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2)); |
| verify(mockHeartbeatHandler).register(taskAttemptID2); |
| taImpl2.handle(new TaskAttemptEvent(taskAttemptID2, TaskAttemptEventType.TA_DONE)); |
| assertEquals("Task attempt is not in succeeded state", taImpl2.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID2); |
| |
| mockReEvent = InputReadErrorEvent.create("", 1, 1); |
| mockMeta = mock(EventMetaData.class); |
| mockDestId1 = mock(TezTaskAttemptID.class); |
| when(mockDestId1.getTaskID()).thenReturn(destTaskID); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); |
| tzEvent = new TezEvent(mockReEvent, mockMeta); |
| //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as |
| // MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit. |
| taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent, 8)); |
| assertEquals("Task attempt is not in failed state", taImpl2.getState(), |
| TaskAttemptState.FAILED); |
| assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl2.getTerminationCause()); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2); |
| |
| Clock mockClock = mock(Clock.class); |
| int readErrorTimespanSec = 1; |
| |
| newVertexConf = new Configuration(vertexConf); |
| newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, |
| 10); |
| newVertexConf.setInt( |
| TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, |
| readErrorTimespanSec); |
| createMockVertex(newVertexConf); |
| |
| TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3); |
| MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1, eventHandler, |
| taListener, taskConf, mockClock, |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID3 = taImpl3.getID(); |
| |
| taImpl3.handle(new TaskAttemptEventSchedule(taskAttemptID3, 0, 0)); |
| taImpl3.handle(new TaskAttemptEventSubmitted(taskAttemptID3, contId)); |
| taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3)); |
| verify(mockHeartbeatHandler).register(taskAttemptID3); |
| taImpl3.handle(new TaskAttemptEvent(taskAttemptID3, TaskAttemptEventType.TA_DONE)); |
| assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID3); |
| |
| mockReEvent = InputReadErrorEvent.create("", 1, 1); |
| mockMeta = mock(EventMetaData.class); |
| mockDestId1 = mock(TezTaskAttemptID.class); |
| when(mockDestId1.getTaskID()).thenReturn(destTaskID); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); |
| tzEvent = new TezEvent(mockReEvent, mockMeta); |
| when(mockClock.getTime()).thenReturn(1000L); |
| when(destVertex.getRunningTasks()).thenReturn(1000); |
| // time deadline not exceeded for a couple of read error events |
| taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); |
| assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), |
| TaskAttemptState.SUCCEEDED); |
| when(mockClock.getTime()).thenReturn(1500L); |
| taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); |
| assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), |
| TaskAttemptState.SUCCEEDED); |
| // exceed the time threshold |
| when(mockClock.getTime()).thenReturn(2001L); |
| taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); |
| assertEquals("Task attempt is not in FAILED state", taImpl3.getState(), |
| TaskAttemptState.FAILED); |
| assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl3.getTerminationCause()); |
| // verify unregister is not invoked again |
| verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3); |
| } |
| |
| @Test(timeout = 60000) |
| public void testTAFailureBasedOnRunningTasks() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler mockEh = new MockEventHandler(); |
| MockEventHandler eventHandler = spy(mockEh); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| @SuppressWarnings("deprecation") |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); |
| doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); |
| DAGImpl mockDAG = mock(DAGImpl.class); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), false); |
| TezTaskAttemptID taskAttemptID = taImpl.getID(); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); |
| verify(mockHeartbeatHandler).register(taskAttemptID); |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, |
| TaskAttemptEventType.TA_DONE)); |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| verify(mockHeartbeatHandler).unregister(taskAttemptID); |
| |
| int expectedEventsTillSucceeded = 8; |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class); |
| verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); |
| verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish |
| DAGHistoryEvent histEvent = histArg.getValue(); |
| TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); |
| long finishTime = finishEvent.getFinishTime(); |
| verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2); |
| |
| InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1); |
| EventMetaData mockMeta = mock(EventMetaData.class); |
| TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); |
| TezTaskID destTaskID = mock(TezTaskID.class); |
| TezVertexID destVertexID = mock(TezVertexID.class); |
| when(mockDestId1.getTaskID()).thenReturn(destTaskID); |
| when(destTaskID.getVertexID()).thenReturn(destVertexID); |
| Vertex destVertex = mock(VertexImpl.class); |
| when(destVertex.getRunningTasks()).thenReturn(5); |
| when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); |
| when(appCtx.getCurrentDAG()).thenReturn(mockDAG); |
| TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta); |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); |
| |
| // failure threshold is met due to running tasks. state is FAILED |
| assertEquals("Task attempt is not in FAILED state", taImpl.getState(), |
| TaskAttemptState.FAILED); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Test(timeout = 5000) |
| public void testKilledInNew() throws ServicePluginException { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container, 0, 0, 0); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); |
| MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mockHeartbeatHandler, appCtx, false, |
| resource, createFakeContainerContext(), true); |
| Assert.assertEquals(TaskAttemptStateInternal.NEW, taImpl.getInternalState()); |
| taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it", |
| TaskAttemptTerminationCause.TERMINATED_BY_CLIENT)); |
| Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState()); |
| |
| Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged); |
| Assert.assertEquals(1, taImpl.taskAttemptFinishedEventLogged); |
| } |
| |
| @Test |
| public void testMapTaskIsBlamedImmediatelyOnLocalFetchFailure() throws ServicePluginException { |
| // local fetch failure or disk read error at source -> turn source attempt to FAIL_IN_PROGRESS |
| testMapTaskFailingForFetchFailureType(true, true, TaskAttemptStateInternal.FAIL_IN_PROGRESS); |
| testMapTaskFailingForFetchFailureType(true, false, TaskAttemptStateInternal.FAIL_IN_PROGRESS); |
| testMapTaskFailingForFetchFailureType(false, true, TaskAttemptStateInternal.FAIL_IN_PROGRESS); |
| |
| // remote fetch failure -> won't change current state |
| testMapTaskFailingForFetchFailureType(false, false, TaskAttemptStateInternal.NEW); |
| } |
| |
| private void testMapTaskFailingForFetchFailureType(boolean isLocalFetch, |
| boolean isDiskErrorAtSource, TaskAttemptStateInternal expectedState) { |
| EventHandler eventHandler = mock(EventHandler.class); |
| TezTaskID taskID = |
| TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl sourceAttempt = new MockTaskAttemptImpl(taskID, 1, eventHandler, null, |
| new Configuration(), SystemClock.getInstance(), mock(TaskHeartbeatHandler.class), appCtx, |
| false, null, null, false); |
| |
| // the original read error event, sent by reducer task |
| InputReadErrorEvent inputReadErrorEvent = |
| InputReadErrorEvent.create("", 0, 1, 1, isLocalFetch, isDiskErrorAtSource); |
| TezTaskAttemptID destTaskAttemptId = mock(TezTaskAttemptID.class); |
| when(destTaskAttemptId.getTaskID()).thenReturn(mock(TezTaskID.class)); |
| when(destTaskAttemptId.getTaskID().getVertexID()).thenReturn(mock(TezVertexID.class)); |
| when(appCtx.getCurrentDAG()).thenReturn(mock(DAG.class)); |
| when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class))) |
| .thenReturn(mock(Vertex.class)); |
| when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class)).getRunningTasks()) |
| .thenReturn(100); |
| |
| EventMetaData mockMeta = mock(EventMetaData.class); |
| when(mockMeta.getTaskAttemptID()).thenReturn(destTaskAttemptId); |
| TezEvent tezEvent = new TezEvent(inputReadErrorEvent, mockMeta); |
| |
| // the event is propagated to map task's event handler |
| TaskAttemptEventOutputFailed outputFailedEvent = |
| new TaskAttemptEventOutputFailed(sourceAttempt.getID(), tezEvent, 11); |
| |
| Assert.assertEquals(TaskAttemptStateInternal.NEW, sourceAttempt.getInternalState()); |
| TaskAttemptStateInternal resultState = new TaskAttemptImpl.OutputReportedFailedTransition() |
| .transition(sourceAttempt, outputFailedEvent); |
| Assert.assertEquals(expectedState, resultState); |
| } |
| |
| private Event verifyEventType(List<Event> events, |
| Class<? extends Event> eventClass, int expectedOccurences) { |
| int count = 0; |
| Event ret = null; |
| for (Event e : events) { |
| if (eventClass.isInstance(e)) { |
| count++; |
| ret = e; |
| } |
| } |
| assertEquals( |
| "Mismatch in num occurences of event: " + eventClass.getCanonicalName(), |
| expectedOccurences, count); |
| return ret; |
| } |
| |
| public static class MockEventHandler implements EventHandler { |
| public boolean internalError; |
| |
| @Override |
| public void handle(Event event) { |
| if (event instanceof DAGEvent) { |
| DAGEvent je = ((DAGEvent) event); |
| if (DAGEventType.INTERNAL_ERROR == je.getType()) { |
| internalError = true; |
| } |
| } |
| } |
| } |
| |
| private class MockTaskAttemptImpl extends TaskAttemptImpl { |
| |
| public int taskAttemptStartedEventLogged = 0; |
| public int taskAttemptFinishedEventLogged = 0; |
| public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, |
| EventHandler eventHandler, TaskCommunicatorManagerInterface tal, |
| Configuration conf, Clock clock, |
| TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, |
| boolean isRescheduled, |
| Resource resource, ContainerContext containerContext, boolean leafVertex) { |
| super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber), |
| eventHandler, tal, conf, |
| clock, taskHeartbeatHandler, appContext, |
| isRescheduled, resource, containerContext, leafVertex, mockTask, |
| locationHint, null, null); |
| } |
| |
| boolean inputFailedReported = false; |
| |
| @Override |
| protected Vertex getVertex() { |
| return mockVertex; |
| } |
| |
| @Override |
| protected void logJobHistoryAttemptStarted() { |
| taskAttemptStartedEventLogged++; |
| super.logJobHistoryAttemptStarted(); |
| } |
| |
| @Override |
| protected void logJobHistoryAttemptFinishedEvent( |
| TaskAttemptStateInternal state) { |
| taskAttemptFinishedEventLogged++; |
| super.logJobHistoryAttemptFinishedEvent(state); |
| } |
| |
| @Override |
| protected void logJobHistoryAttemptUnsuccesfulCompletion( |
| TaskAttemptState state, TaskFailureType taskFailureType) { |
| taskAttemptFinishedEventLogged++; |
| super.logJobHistoryAttemptUnsuccesfulCompletion(state, taskFailureType); |
| } |
| |
| @Override |
| protected void sendInputFailedToConsumers() { |
| inputFailedReported = true; |
| } |
| } |
| |
| private static ContainerContext createFakeContainerContext() { |
| return new ContainerContext(new HashMap<String, LocalResource>(), |
| new Credentials(), new HashMap<String, String>(), ""); |
| } |
| |
| private TaskCommunicatorManagerInterface createMockTaskAttemptListener() throws |
| ServicePluginException { |
| TaskCommunicatorManagerInterface taListener = mock(TaskCommunicatorManagerInterface.class); |
| TaskCommunicator taskComm = mock(TaskCommunicator.class); |
| doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); |
| doReturn(new TaskCommunicatorWrapper(taskComm)).when(taListener).getTaskCommunicator(0); |
| return taListener; |
| } |
| } |