blob: 434a4b525815be2e62c3cf5de918621ecb06bbf9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestTaskAttempt {
static public class StubbedFS extends RawLocalFileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return new FileStatus(1, false, 1, 1, 1, f);
}
}
// @Test
// // Verifies # tasks, attempts and diagnostics for a failing job.
// // TODO Move to TestTask - to verify # retries
// public void testMRAppHistoryForMap() throws Exception {
// MRApp app = new FailingAttemptsMRApp(1, 0);
// testMRAppHistory(app);
// }
//
// @Test
// // Verifies # tasks, attempts and diagnostics for a failing job.
// // Move to TestTask - to verify # retries
// public void testMRAppHistoryForReduce() throws Exception {
// MRApp app = new FailingAttemptsMRApp(0, 1);
// testMRAppHistory(app);
// }
@Test
public void testLocalityRequest() {
TaskAttemptImpl.ScheduleTaskattemptTransition sta =
new TaskAttemptImpl.ScheduleTaskattemptTransition();
EventHandler eventHandler = mock(EventHandler.class);
Set<String> hosts = new TreeSet<String>();
hosts.add("host1");
hosts.add("host2");
hosts.add("host3");
TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
TezTaskID taskID = new TezTaskID(
new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext());
TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
sta.transition(taImpl, sEvent);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(1)).handle(arg.capture());
if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) {
fail("Second event not of type "
+ AMSchedulerEventTALaunchRequest.class.getName());
}
// TODO Move the Rack request check to the client after TEZ-125 is fixed.
AMSchedulerEventTALaunchRequest lre = (AMSchedulerEventTALaunchRequest) arg
.getAllValues().get(0);
String[] requestedRacks = lre.getRacks();
assertEquals(1, requestedRacks.length);
assertEquals(3, lre.getHosts().length);
for (int i = 0; i < 3; i++) {
assertEquals("host" + (i + 1), lre.getHosts()[i]);
}
}
@Test
// Tests that an attempt is made to resolve the localized hosts to racks.
// TODO Move to the client post TEZ-125.
public void testHostResolveAttempt() throws Exception {
TaskAttemptImpl.ScheduleTaskattemptTransition sta =
new TaskAttemptImpl.ScheduleTaskattemptTransition();
EventHandler eventHandler = mock(EventHandler.class);
String hosts[] = new String[] { "192.168.1.1", "host2", "host3" };
Set<String> resolved = new TreeSet<String>(
Arrays.asList(new String[]{ "host1", "host2", "host3" }));
TaskLocationHint locationHint = new TaskLocationHint(
new TreeSet<String>(Arrays.asList(hosts)), null);
TezTaskID taskID = new TezTaskID(
new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
1), createFakeContainerContext());
TaskAttemptImpl spyTa = spy(taImpl);
when(spyTa.resolveHosts(hosts)).thenReturn(
resolved.toArray(new String[3]));
TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class);
sta.transition(spyTa, mockTAEvent);
verify(spyTa).resolveHosts(hosts);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(1)).handle(arg.capture());
if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) {
fail("Second Event not of type ContainerRequestEvent");
}
Map<String, Boolean> expected = new HashMap<String, Boolean>();
expected.put("host1", true);
expected.put("host2", true);
expected.put("host3", true);
AMSchedulerEventTALaunchRequest cre = (AMSchedulerEventTALaunchRequest) arg
.getAllValues().get(0);
String[] requestedHosts = cre.getHosts();
for (String h : requestedHosts) {
expected.remove(h);
}
assertEquals(0, expected.size());
}
// @Test
// // Verifies accounting of slot_milli counters. Time spent in running tasks.
// // TODO Fix this test to work without MRApp.
// public void testSlotMillisCounterUpdate() throws Exception {
// verifySlotMillis(2048, 2048, 1024);
// verifySlotMillis(2048, 1024, 1024);
// verifySlotMillis(10240, 1024, 2048);
// }
// public void verifySlotMillis(int mapMemMb, int reduceMemMb,
// int minContainerSize) throws Exception {
// Clock actualClock = new SystemClock();
// ControlledClock clock = new ControlledClock(actualClock);
// clock.setTime(10);
// MRApp app =
// new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
// Configuration conf = new Configuration();
// conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
// conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
// app.setClusterInfo(new ClusterInfo(Resource.newInstance(minContainerSize, 1),
// Resource.newInstance(10240,1)));
//
// Job job = app.submit(conf);
// app.waitForState(job, JobState.RUNNING);
// Map<TaskId, Task> tasks = job.getTasks();
// Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
// Iterator<Task> taskIter = tasks.values().iterator();
// Task mTask = taskIter.next();
// app.waitForState(mTask, TaskState.RUNNING);
// Task rTask = taskIter.next();
// app.waitForState(rTask, TaskState.RUNNING);
// Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
// Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
// Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
// Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
// TaskAttempt mta = mAttempts.values().iterator().next();
// TaskAttempt rta = rAttempts.values().iterator().next();
// app.waitForState(mta, TaskAttemptState.RUNNING);
// app.waitForState(rta, TaskAttemptState.RUNNING);
//
// clock.setTime(11);
// app.getContext()
// .getEventHandler()
// .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
// app.getContext()
// .getEventHandler()
// .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
// app.waitForState(job, JobState.SUCCEEDED);
// Assert.assertEquals(mta.getFinishTime(), 11);
// Assert.assertEquals(mta.getLaunchTime(), 10);
// Assert.assertEquals(rta.getFinishTime(), 11);
// Assert.assertEquals(rta.getLaunchTime(), 10);
// Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
// job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
// .getValue());
// Assert.assertEquals(
// (int) Math.ceil((float) reduceMemMb / minContainerSize), job
// .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
// .getValue());
// }
//
// private void testMRAppHistory(MRApp app) throws Exception {
// Configuration conf = new Configuration();
// Job job = app.submit(conf);
// app.waitForState(job, JobState.FAILED);
// Map<TaskId, Task> tasks = job.getTasks();
//
// Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
// Task task = tasks.values().iterator().next();
// Assert.assertEquals("Task state not correct", TaskState.FAILED, task
// .getReport().getTaskState());
// Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
// .getAttempts();
// Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
//
// Iterator<TaskAttempt> it = attempts.values().iterator();
// TaskAttemptReport report = it.next().getReport();
// Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
// report.getTaskAttemptState());
// Assert.assertEquals("Diagnostic Information is not Correct",
// "Test Diagnostic Event", report.getDiagnosticInfo());
// report = it.next().getReport();
// Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
// report.getTaskAttemptState());
// }
@Test
// Ensure the dag does not go into an error state if a attempt kill is
// received while STARTING
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = new TezDAGID(appId, 1);
TezVertexID vertexID = new TezVertexID(dagID, 1);
TezTaskID taskID = new TezTaskID(vertexID, 1);
TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
AppContext mockAppContext = mock(AppContext.class);
doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false,
resource, createFakeContainerContext());
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, Priority
.newInstance(3)));
// At state STARTING.
taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
// At some KILLING state.
taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
// taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
// null));
assertFalse(eventHandler.internalError);
}
@Test
// Ensure ContainerTerminating and ContainerTerminated is handled correctly by
// the TaskAttempt
public void testContainerTerminationWhileRunning() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = new TezDAGID(appId, 1);
TezVertexID vertexID = new TezVertexID(dagID, 1);
TezTaskID taskID = new TezTaskID(vertexID, 1);
TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
appCtx);
containers.addContainerIfNew(container);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
resource, createFakeContainerContext());
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
// At state STARTING.
taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
null));
assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
TaskAttemptState.RUNNING);
int expectedEventsAtRunning = 3;
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
"Terminating"));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
eventHandler.internalError);
assertEquals("Task attempt is not in the FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
assertEquals(1, taImpl.getDiagnostics().size());
assertEquals("Terminating", taImpl.getDiagnostics().get(0));
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
assertEquals(2, taImpl.getDiagnostics().size());
assertEquals("Terminated", taImpl.getDiagnostics().get(1));
}
@Test
// Ensure ContainerTerminated is handled correctly by the TaskAttempt
public void testContainerTerminatedWhileRunning() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = new TezDAGID(appId, 1);
TezVertexID vertexID = new TezVertexID(dagID, 1);
TezTaskID taskID = new TezTaskID(vertexID, 1);
TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
appCtx);
containers.addContainerIfNew(container);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
resource, createFakeContainerContext());
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
// At state STARTING.
taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
null));
assertEquals("Task attempt is not in running state", taImpl.getState(),
TaskAttemptState.RUNNING);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated"));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
eventHandler.internalError);
assertEquals("Terminated", taImpl.getDiagnostics().get(0));
// TODO Ensure TA_TERMINATING after this is ingored.
}
@Test
// Ensure ContainerTerminating and ContainerTerminated is handled correctly by
// the TaskAttempt
public void testContainerTerminatedAfterSuccess() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = new TezDAGID(appId, 1);
TezVertexID vertexID = new TezVertexID(dagID, 1);
TezTaskID taskID = new TezTaskID(vertexID, 1);
TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
appCtx);
containers.addContainerIfNew(container);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
resource, createFakeContainerContext());
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
// At state STARTING.
taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
null));
assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
TaskAttemptState.RUNNING);
int expectedEventsAtRunning = 3;
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals(0, taImpl.getDiagnostics().size());
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is not
// captured - TA already succeeded.
assertEquals(0, taImpl.getDiagnostics().size());
}
@Test
// Verifies that multiple TooManyFetchFailures are handled correctly by the
// TaskAttempt.
public void testMultipleOutputFailed() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = new TezDAGID(appId, 1);
TezVertexID vertexID = new TezVertexID(dagID, 1);
TezTaskID taskID = new TezTaskID(vertexID, 1);
TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
MockEventHandler mockEh = new MockEventHandler();
MockEventHandler eventHandler = spy(mockEh);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
appCtx);
containers.addContainerIfNew(container);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
resource, createFakeContainerContext());
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
// At state STARTING.
taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
null));
taImpl.handle(new TaskAttemptEvent(taskAttemptID,
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
int expectedEventsTillSucceeded = 6;
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
InputReadErrorEvent reEvent = new InputReadErrorEvent("", 0, 1);
EventMetaData mockMeta = mock(EventMetaData.class);
TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId);
TezEvent tzEvent = new TezEvent(reEvent, mockMeta);
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1));
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
arg.getAllValues().clear();
verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture());
verifyEventType(
arg.getAllValues().subList(expectedEventsTillSucceeded,
expectedEventsAfterFetchFailure), TaskEventTAUpdate.class, 1);
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1));
assertEquals("Task attempt is not in FAILED state, still",
taImpl.getState(), TaskAttemptState.FAILED);
assertFalse(
"InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES",
eventHandler.internalError);
// No new events.
verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
arg.capture());
}
private void verifyEventType(List<Event> events,
Class<? extends Event> eventClass, int expectedOccurences) {
int count = 0;
for (Event e : events) {
if (eventClass.isInstance(e)) {
count++;
}
}
assertEquals(
"Mismatch in num occurences of event: " + eventClass.getCanonicalName(),
expectedOccurences, count);
}
public static class MockEventHandler implements EventHandler {
public boolean internalError;
@Override
public void handle(Event event) {
if (event instanceof DAGEvent) {
DAGEvent je = ((DAGEvent) event);
if (DAGEventType.INTERNAL_ERROR == je.getType()) {
internalError = true;
}
}
}
};
private class MockTaskAttemptImpl extends TaskAttemptImpl {
public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
EventHandler eventHandler, TaskAttemptListener tal,
Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
TaskLocationHint locationHint, boolean isRescheduled,
Resource resource, ContainerContext containerContext) {
super(taskId, attemptNumber, eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
locationHint, isRescheduled, resource, containerContext);
}
@Override
protected TaskSpec createRemoteTaskSpec() {
// FIXME
return null;
}
@Override
protected void logJobHistoryAttemptStarted() {
}
@Override
protected void logJobHistoryAttemptFinishedEvent(
TaskAttemptStateInternal state) {
}
@Override
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
}
}
private static ContainerContext createFakeContainerContext() {
return new ContainerContext(new HashMap<String, LocalResource>(),
new Credentials(), new HashMap<String, String>(), "");
}
}