blob: 53a2ba00b3a259b3e0278bb2e279e9f61bfd3cdd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import com.google.common.base.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/**
* Tests the behavior of TaskAttemptListenerImpl.
*/
@RunWith(MockitoJUnitRunner.class)
public class TestTaskAttemptListenerImpl {
private static final String ATTEMPT1_ID =
"attempt_123456789012_0001_m_000001_0";
private static final String ATTEMPT2_ID =
"attempt_123456789012_0001_m_000002_0";
private static final TaskAttemptId TASKATTEMPTID1 =
TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
private static final TaskAttemptId TASKATTEMPTID2 =
TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
@Mock
private AppContext appCtx;
@Mock
private JobTokenSecretManager secret;
@Mock
private RMHeartbeatHandler rmHeartbeatHandler;
@Mock
private TaskHeartbeatHandler hbHandler;
@Mock
private Dispatcher dispatcher;
@Mock
private Task task;
@SuppressWarnings("rawtypes")
@Mock
private EventHandler<Event> ea;
@SuppressWarnings("rawtypes")
@Captor
private ArgumentCaptor<Event> eventCaptor;
private JVMId id;
private WrappedJvmID wid;
private TaskAttemptID attemptID;
private TaskAttemptId attemptId;
private ReduceTaskStatus firstReduceStatus;
private ReduceTaskStatus secondReduceStatus;
private ReduceTaskStatus thirdReduceStatus;
private MockTaskAttemptListenerImpl listener;
/**
* Extension of the original TaskAttemptImpl
* for testing purposes
*/
public static class MockTaskAttemptListenerImpl
extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler) {
super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
}
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) {
super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
this.taskHeartbeatHandler = hbHandler;
}
@Override
protected void registerHeartbeatHandler(Configuration conf) {
//Empty
}
@Override
protected void startRpcServer() {
//Empty
}
@Override
protected void stopRpcServer() {
//Empty
}
}
@After
public void after() throws IOException {
if (listener != null) {
listener.close();
listener = null;
}
}
@Test (timeout=5000)
public void testGetTask() throws IOException {
configureMocks();
startListener(false);
// Verify ask before registration.
//The JVM ID has not been registered yet so we should kill it.
JvmContext context = new JvmContext();
context.jvmId = id;
JvmTask result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
// Verify ask after registration but before launch.
// Don't kill, should be null.
//Now put a task with the ID
listener.registerPendingTask(task, wid);
result = listener.getTask(context);
assertNull(result);
// Unregister for more testing.
listener.unregister(attemptId, wid);
// Verify ask after registration and launch
//Now put a task with the ID
listener.registerPendingTask(task, wid);
listener.registerLaunchedTask(attemptId, wid);
verify(hbHandler).register(attemptId);
result = listener.getTask(context);
assertNotNull(result);
assertFalse(result.shouldDie);
// Don't unregister yet for more testing.
//Verify that if we call it again a second time we are told to die.
result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
listener.unregister(attemptId, wid);
// Verify after unregistration.
result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
// test JVMID
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
assertNotNull(jvmid);
try {
JVMId.forName("jvm_001_002_m_004_006");
Assert.fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
"TaskId string : jvm_001_002_m_004_006 is not properly formed");
}
}
@Test (timeout=5000)
public void testJVMId() {
JVMId jvmid = new JVMId("test", 1, true, 2);
JVMId jvmid1 = JVMId.forName("jvm_test_0001_m_000002");
// test compare methot should be the same
assertEquals(0, jvmid.compareTo(jvmid1));
}
@Test (timeout=10000)
public void testGetMapCompletionEvents() throws IOException {
TaskAttemptCompletionEvent[] empty = {};
TaskAttemptCompletionEvent[] taskEvents = {
createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE),
createTce(1, false, TaskAttemptCompletionEventStatus.FAILED),
createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED),
createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) };
TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] };
Job mockJob = mock(Job.class);
when(mockJob.getTaskAttemptCompletionEvents(0, 100))
.thenReturn(taskEvents);
when(mockJob.getTaskAttemptCompletionEvents(0, 2))
.thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
when(mockJob.getTaskAttemptCompletionEvents(2, 100))
.thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(
TypeConverter.fromYarn(mapEvents));
when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(
TypeConverter.fromYarn(mapEvents));
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
TypeConverter.fromYarn(empty));
configureMocks();
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, hbHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
}
};
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
JobID jid = new JobID("12345", 1);
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
MapTaskCompletionEventsUpdate update =
listener.getMapCompletionEvents(jid, 0, 100, tid);
assertEquals(2, update.events.length);
update = listener.getMapCompletionEvents(jid, 0, 2, tid);
assertEquals(2, update.events.length);
update = listener.getMapCompletionEvents(jid, 2, 100, tid);
assertEquals(0, update.events.length);
}
private static TaskAttemptCompletionEvent createTce(int eventId,
boolean isMap, TaskAttemptCompletionEventStatus status) {
JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
tce.setAttemptId(attemptId);
tce.setStatus(status);
return tce;
}
@Test (timeout=10000)
public void testCommitWindow() throws IOException {
SystemClock clock = SystemClock.getInstance();
configureMocks();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, hbHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
}
};
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
// verify commit not allowed when RM heartbeat has not occurred recently
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
boolean canCommit = listener.canCommit(tid);
assertFalse(canCommit);
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
// verify commit allowed when RM heartbeat is recent
when(rmHeartbeatHandler.getLastHeartbeatTime())
.thenReturn(clock.getTime());
canCommit = listener.canCommit(tid);
assertTrue(canCommit);
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
}
@Test
public void testSingleStatusUpdate()
throws IOException, InterruptedException {
configureMocks();
startListener(true);
listener.statusUpdate(attemptID, firstReduceStatus);
verify(ea).handle(eventCaptor.capture());
TaskAttemptStatusUpdateEvent updateEvent =
(TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
assertEquals(1, status.fetchFailedMaps.size());
assertEquals(Phase.SHUFFLE, status.phase);
}
@Test
public void testStatusUpdateEventCoalescing()
throws IOException, InterruptedException {
configureMocks();
startListener(true);
listener.statusUpdate(attemptID, firstReduceStatus);
listener.statusUpdate(attemptID, secondReduceStatus);
verify(ea).handle(any(Event.class));
ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
listener.getAttemptIdToStatus();
TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
assertEquals(2, status.fetchFailedMaps.size());
assertEquals(Phase.SORT, status.phase);
}
@Test
public void testCoalescedStatusUpdatesCleared()
throws IOException, InterruptedException {
// First two events are coalesced, the third is not
configureMocks();
startListener(true);
listener.statusUpdate(attemptID, firstReduceStatus);
listener.statusUpdate(attemptID, secondReduceStatus);
ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
listener.getAttemptIdToStatus();
attemptIdToStatus.get(attemptId).set(null);
listener.statusUpdate(attemptID, thirdReduceStatus);
verify(ea, times(2)).handle(eventCaptor.capture());
TaskAttemptStatusUpdateEvent updateEvent =
(TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
assertNull(status.fetchFailedMaps);
assertEquals(Phase.REDUCE, status.phase);
}
@Test
public void testStatusUpdateFromUnregisteredTask() throws Exception {
configureMocks();
ControlledClock clock = new ControlledClock();
clock.setTime(0);
doReturn(clock).when(appCtx).getClock();
final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx,
secret, rmHeartbeatHandler, null) {
@Override
protected void startRpcServer() {
// Empty
}
@Override
protected void stopRpcServer() {
// Empty
}
};
Configuration conf = new Configuration();
conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
tal.init(conf);
tal.start();
assertFalse(tal.statusUpdate(attemptID, firstReduceStatus));
tal.registerPendingTask(task, wid);
tal.registerLaunchedTask(attemptId, wid);
assertTrue(tal.statusUpdate(attemptID, firstReduceStatus));
// verify attempt is still reported as found if recently unregistered
tal.unregister(attemptId, wid);
assertTrue(tal.statusUpdate(attemptID, firstReduceStatus));
// verify attempt is not found if not recently unregistered
long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
clock.setTime(unregisterTimeout + 1);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return !tal.statusUpdate(attemptID, firstReduceStatus);
} catch (Exception e) {
throw new RuntimeException("status update failed", e);
}
}
}, 10, 10000);
}
private void configureMocks() {
firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
new Counters());
firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
new Counters());
secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "",
TaskStatus.Phase.REDUCE, new Counters());
when(dispatcher.getEventHandler()).thenReturn(ea);
when(appCtx.getEventHandler()).thenReturn(ea);
listener = new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler);
id = new JVMId("foo", 1, true, 1);
wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
attemptId = TypeConverter.toYarn(attemptID);
}
private void startListener(boolean registerTask) {
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
if (registerTask) {
listener.registerPendingTask(task, wid);
listener.registerLaunchedTask(attemptId, wid);
}
}
}