| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.dag.impl; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RawLocalFileSystem; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.tez.dag.api.TaskLocationHint; |
| import org.apache.tez.dag.api.oldrecords.TaskAttemptState; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ClusterInfo; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.ContainerHeartbeatHandler; |
| import org.apache.tez.dag.app.TaskAttemptListener; |
| import org.apache.tez.dag.app.TaskHeartbeatHandler; |
| import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; |
| import org.apache.tez.dag.app.dag.Vertex; |
| import org.apache.tez.dag.app.dag.event.DAGEvent; |
| import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; |
| import org.apache.tez.dag.app.dag.event.DAGEventType; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; |
| import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; |
| import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; |
| import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; |
| import org.apache.tez.dag.app.rm.container.AMContainerMap; |
| import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.runtime.api.events.InputReadErrorEvent; |
| import org.apache.tez.runtime.api.impl.EventMetaData; |
| import org.apache.tez.runtime.api.impl.TaskSpec; |
| import org.apache.tez.runtime.api.impl.TezEvent; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public class TestTaskAttempt { |
| |
| static public class StubbedFS extends RawLocalFileSystem { |
| @Override |
| public FileStatus getFileStatus(Path f) throws IOException { |
| return new FileStatus(1, false, 1, 1, 1, f); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testLocalityRequest() { |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| Set<String> hosts = new TreeSet<String>(); |
| hosts.add("host1"); |
| hosts.add("host2"); |
| hosts.add("host3"); |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(hosts, null); |
| |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), mock(AppContext.class), |
| locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); |
| |
| sta.transition(taImpl, sEvent); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) { |
| fail("Second event not of type " |
| + AMSchedulerEventTALaunchRequest.class.getName()); |
| } |
| // TODO Move the Rack request check to the client after TEZ-125 is fixed. |
| Set<String> requestedRacks = taImpl.taskRacks; |
| assertEquals(1, requestedRacks.size()); |
| assertEquals(3, taImpl.taskHosts.size()); |
| for (int i = 0; i < 3; i++) { |
| String host = ("host" + (i + 1)); |
| assertEquals(host, true, taImpl.taskHosts.contains(host)); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testPriority() { |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), mock(AppContext.class), |
| null, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), |
| mock(TaskHeartbeatHandler.class), mock(AppContext.class), |
| null, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); |
| when(sEvent.getPriorityLowLimit()).thenReturn(3); |
| when(sEvent.getPriorityHighLimit()).thenReturn(1); |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| AMSchedulerEventTALaunchRequest launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(2, launchEvent.getPriority()); |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(2)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(1, launchEvent.getPriority()); |
| |
| when(sEvent.getPriorityLowLimit()).thenReturn(6); |
| when(sEvent.getPriorityHighLimit()).thenReturn(4); |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(3)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(5, launchEvent.getPriority()); |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(4)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(4, launchEvent.getPriority()); |
| |
| when(sEvent.getPriorityLowLimit()).thenReturn(5); |
| when(sEvent.getPriorityHighLimit()).thenReturn(5); |
| sta.transition(taImpl, sEvent); |
| verify(eventHandler, times(5)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(5, launchEvent.getPriority()); |
| sta.transition(taImplReScheduled, sEvent); |
| verify(eventHandler, times(6)).handle(arg.capture()); |
| launchEvent = (AMSchedulerEventTALaunchRequest)arg.getValue(); |
| Assert.assertEquals(5, launchEvent.getPriority()); |
| } |
| |
| @Test(timeout = 5000) |
| // Tests that an attempt is made to resolve the localized hosts to racks. |
| // TODO Move to the client post TEZ-125. |
| public void testHostResolveAttempt() throws Exception { |
| TaskAttemptImpl.ScheduleTaskattemptTransition sta = |
| new TaskAttemptImpl.ScheduleTaskattemptTransition(); |
| |
| EventHandler eventHandler = mock(EventHandler.class); |
| String hosts[] = new String[] { "127.0.0.1", "host2", "host3" }; |
| Set<String> resolved = new TreeSet<String>( |
| Arrays.asList(new String[]{ "host1", "host2", "host3" })); |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new TreeSet<String>(Arrays.asList(hosts)), null); |
| |
| TezTaskID taskID = TezTaskID.getInstance( |
| TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| mock(TaskAttemptListener.class), new Configuration(), |
| new SystemClock(), mock(TaskHeartbeatHandler.class), |
| mock(AppContext.class), locationHint, false, Resource.newInstance(1024, |
| 1), createFakeContainerContext(), false); |
| |
| TaskAttemptImpl spyTa = spy(taImpl); |
| when(spyTa.resolveHosts(hosts)).thenReturn( |
| resolved.toArray(new String[3])); |
| |
| TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class); |
| |
| sta.transition(spyTa, mockTAEvent); |
| verify(spyTa).resolveHosts(hosts); |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(1)).handle(arg.capture()); |
| if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) { |
| fail("Second Event not of type ContainerRequestEvent"); |
| } |
| Map<String, Boolean> expected = new HashMap<String, Boolean>(); |
| expected.put("host1", true); |
| expected.put("host2", true); |
| expected.put("host3", true); |
| Set<String> requestedHosts = spyTa.taskHosts; |
| for (String h : requestedHosts) { |
| expected.remove(h); |
| } |
| assertEquals(0, expected.size()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure the dag does not go into an error state if a attempt kill is |
| // received while STARTING |
| public void testLaunchFailedWhileKilling() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = new MockEventHandler(); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| AppContext mockAppContext = mock(AppContext.class); |
| doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null)); |
| // At some KILLING state. |
| taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null)); |
| // taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, |
| // null)); |
| assertFalse(eventHandler.internalError); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure ContainerTerminating and ContainerTerminated is handled correctly by |
| // the TaskAttempt |
| public void testContainerTerminationWhileRunning() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| |
| int expectedEventsAtRunning = 3; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, |
| "Terminating")); |
| assertFalse( |
| "InternalError occurred trying to handle TA_CONTAINER_TERMINATING", |
| eventHandler.internalError); |
| |
| assertEquals("Task attempt is not in the FAILED state", taImpl.getState(), |
| TaskAttemptState.FAILED); |
| |
| assertEquals(1, taImpl.getDiagnostics().size()); |
| assertEquals("Terminating", taImpl.getDiagnostics().get(0)); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, |
| "Terminated")); |
| int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| |
| assertEquals(2, taImpl.getDiagnostics().size()); |
| assertEquals("Terminated", taImpl.getDiagnostics().get(1)); |
| } |
| |
| |
| @Test(timeout = 5000) |
| // Ensure ContainerTerminated is handled correctly by the TaskAttempt |
| public void testContainerTerminatedWhileRunning() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = new MockEventHandler(); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| assertEquals("Task attempt is not in running state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated")); |
| assertFalse( |
| "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", |
| eventHandler.internalError); |
| |
| assertEquals("Terminated", taImpl.getDiagnostics().get(0)); |
| |
| // TODO Ensure TA_TERMINATING after this is ingored. |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure ContainerTerminating and ContainerTerminated is handled correctly by |
| // the TaskAttempt |
| public void testContainerTerminatedAfterSuccess() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| |
| int expectedEventsAtRunning = 3; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, |
| "Terminated")); |
| int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| |
| // Verify that the diagnostic message included in the Terminated event is not |
| // captured - TA already succeeded. |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure Container Preemption race with task completion is handled correctly by |
| // the TaskAttempt |
| public void testContainerPreemptedAfterSuccess() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), |
| TaskAttemptState.RUNNING); |
| |
| int expectedEventsAtRunning = 3; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEventsAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEventsAfterTerminating), TaskEventTAUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEventsAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, |
| TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM)); |
| int expectedEventAfterTerminated = expectedEventsAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); |
| |
| // Verify that the diagnostic message included in the Terminated event is not |
| // captured - TA already succeeded. |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure node failure on Successful Non-Leaf tasks cause them to be marked as KILLED |
| public void testNodeFailedNonLeafVertex() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, |
| taImpl.getState()); |
| |
| int expectedEventsAtRunning = 3; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED, |
| taImpl.getState()); |
| |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| // Send out a Node Failure. |
| taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned")); |
| // Verify in KILLED state |
| assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED, |
| taImpl.getState()); |
| assertEquals(true, taImpl.inputFailedReported); |
| // Verify one event to the Task informing it about FAILURE. No events to scheduler. Counter event. |
| int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 2; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEvenstAfterTerminating, |
| expectedEventsNodeFailure), TaskEventTAUpdate.class, 1); |
| |
| // Verify still in KILLED state |
| assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED, |
| taImpl.getState()); |
| } |
| |
| @Test(timeout = 5000) |
| // Ensure node failure on Successful Leaf tasks do not cause them to be marked as KILLED |
| public void testNodeFailedLeafVertex() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler eventHandler = spy(new MockEventHandler()); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), true); |
| |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, |
| taImpl.getState()); |
| |
| int expectedEventsAtRunning = 3; |
| verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); |
| |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); |
| |
| assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED, |
| taImpl.getState()); |
| |
| assertEquals(0, taImpl.getDiagnostics().size()); |
| |
| int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); |
| |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsAtRunning, |
| expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); |
| |
| // Send out a Node Failure. |
| taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned")); |
| |
| // Verify no additional events |
| int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0; |
| arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture()); |
| |
| // Verify still in SUCCEEDED state |
| assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED, |
| taImpl.getState()); |
| } |
| |
| @Test(timeout = 5000) |
| // Verifies that multiple TooManyFetchFailures are handled correctly by the |
| // TaskAttempt. |
| public void testMultipleOutputFailed() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 2); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 0); |
| TezDAGID dagID = TezDAGID.getInstance(appId, 1); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); |
| TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); |
| |
| MockEventHandler mockEh = new MockEventHandler(); |
| MockEventHandler eventHandler = spy(mockEh); |
| TaskAttemptListener taListener = mock(TaskAttemptListener.class); |
| when(taListener.getAddress()).thenReturn( |
| new InetSocketAddress("localhost", 0)); |
| |
| Configuration taskConf = new Configuration(); |
| taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); |
| taskConf.setBoolean("fs.file.impl.disable.cache", true); |
| |
| TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( |
| new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); |
| Resource resource = Resource.newInstance(1024, 1); |
| |
| NodeId nid = NodeId.newInstance("127.0.0.1", 0); |
| ContainerId contId = ContainerId.newInstance(appAttemptId, 3); |
| Container container = mock(Container.class); |
| when(container.getId()).thenReturn(contId); |
| when(container.getNodeId()).thenReturn(nid); |
| when(container.getNodeHttpAddress()).thenReturn("localhost:0"); |
| |
| AppContext appCtx = mock(AppContext.class); |
| AMContainerMap containers = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), |
| new ContainerContextMatcher(), appCtx); |
| containers.addContainerIfNew(container); |
| |
| doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); |
| doReturn(containers).when(appCtx).getAllContainers(); |
| |
| MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, |
| taListener, taskConf, new SystemClock(), |
| mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, |
| resource, createFakeContainerContext(), false); |
| |
| taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); |
| // At state STARTING. |
| taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, |
| null)); |
| taImpl.handle(new TaskAttemptEvent(taskAttemptID, |
| TaskAttemptEventType.TA_DONE)); |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| |
| int expectedEventsTillSucceeded = 6; |
| ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); |
| verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2); |
| |
| InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1); |
| EventMetaData mockMeta = mock(EventMetaData.class); |
| TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); |
| TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta); |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4)); |
| |
| // failure threshold not met. state is SUCCEEDED |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| |
| // sending same error again doesnt change anything |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4)); |
| assertEquals("Task attempt is not in succeeded state", taImpl.getState(), |
| TaskAttemptState.SUCCEEDED); |
| |
| // different destination attempt reports error. now threshold crossed |
| TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class); |
| when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2); |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4)); |
| |
| assertEquals("Task attempt is not in FAILED state", taImpl.getState(), |
| TaskAttemptState.FAILED); |
| |
| assertEquals(true, taImpl.inputFailedReported); |
| int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2; |
| arg.getAllValues().clear(); |
| verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture()); |
| verifyEventType( |
| arg.getAllValues().subList(expectedEventsTillSucceeded, |
| expectedEventsAfterFetchFailure), TaskEventTAUpdate.class, 1); |
| |
| taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1)); |
| assertEquals("Task attempt is not in FAILED state, still", |
| taImpl.getState(), TaskAttemptState.FAILED); |
| assertFalse( |
| "InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES", |
| eventHandler.internalError); |
| // No new events. |
| verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle( |
| arg.capture()); |
| } |
| |
| private void verifyEventType(List<Event> events, |
| Class<? extends Event> eventClass, int expectedOccurences) { |
| int count = 0; |
| for (Event e : events) { |
| if (eventClass.isInstance(e)) { |
| count++; |
| } |
| } |
| assertEquals( |
| "Mismatch in num occurences of event: " + eventClass.getCanonicalName(), |
| expectedOccurences, count); |
| } |
| |
| public static class MockEventHandler implements EventHandler { |
| public boolean internalError; |
| |
| @Override |
| public void handle(Event event) { |
| if (event instanceof DAGEvent) { |
| DAGEvent je = ((DAGEvent) event); |
| if (DAGEventType.INTERNAL_ERROR == je.getType()) { |
| internalError = true; |
| } |
| } |
| } |
| }; |
| |
| private class MockTaskAttemptImpl extends TaskAttemptImpl { |
| TaskLocationHint locationHint; |
| |
| public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, |
| EventHandler eventHandler, TaskAttemptListener tal, |
| Configuration conf, Clock clock, |
| TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, |
| TaskLocationHint locationHint, boolean isRescheduled, |
| Resource resource, ContainerContext containerContext, boolean leafVertex) { |
| super(taskId, attemptNumber, eventHandler, tal, conf, |
| clock, taskHeartbeatHandler, appContext, |
| isRescheduled, resource, containerContext, leafVertex); |
| this.locationHint = locationHint; |
| } |
| |
| Vertex mockVertex = mock(Vertex.class); |
| boolean inputFailedReported = false; |
| |
| @Override |
| public TaskLocationHint getTaskLocationHint() { |
| return locationHint; |
| } |
| |
| @Override |
| protected Vertex getVertex() { |
| return mockVertex; |
| } |
| |
| @Override |
| protected TaskSpec createRemoteTaskSpec() { |
| // FIXME |
| return null; |
| } |
| |
| @Override |
| protected void logJobHistoryAttemptStarted() { |
| } |
| |
| @Override |
| protected void logJobHistoryAttemptFinishedEvent( |
| TaskAttemptStateInternal state) { |
| |
| } |
| |
| @Override |
| protected void logJobHistoryAttemptUnsuccesfulCompletion( |
| TaskAttemptState state) { |
| } |
| |
| @Override |
| protected void sendInputFailedToConsumers() { |
| inputFailedReported = true; |
| } |
| } |
| |
| private static ContainerContext createFakeContainerContext() { |
| return new ContainerContext(new HashMap<String, LocalResource>(), |
| new Credentials(), new HashMap<String, String>(), ""); |
| } |
| } |