blob: 9efc7ee9a34231338f8218b21b39639a806a5f17 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.LinkedHashMap;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.TaskTracker.TaskLauncher;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.junit.Test;
/**
* Tests {@link TaskLauncher}
*
*/
public class TestTaskLauncher {
private static int expectedLaunchAttemptId = 1;
private static class MyTaskTracker extends TaskTracker {
// override startNewTask just to set the runState,
// not to launch the task really
@Override
void startNewTask(TaskInProgress tip) {
assertEquals(expectedLaunchAttemptId, tip.getTask().getTaskID().getId());
tip.getStatus().setRunState(TaskStatus.State.RUNNING);
}
}
/**
* Tests the case "task waiting to be launched is killed externally".
*
* Launches a task which will wait for ever to get slots. Kill the
* task and see if launcher is able to come out of the wait and pickup a
* another task.
*
* @throws IOException
*/
@Test
public void testExternalKillForLaunchTask() throws IOException {
// setup a TaskTracker
JobConf ttConf = new JobConf();
ttConf.setInt(TTConfig.TT_MAP_SLOTS, 4);
TaskTracker tt = new MyTaskTracker();
tt.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
tt.setConf(ttConf);
tt.setIndexCache(new IndexCache(ttConf));
tt.setTaskMemoryManagerEnabledFlag();
// start map-task launcher with four slots
TaskLauncher mapLauncher = tt.new TaskLauncher(TaskType.MAP, 4);
mapLauncher.start();
// launch a task which requires five slots
String jtId = "test";
TaskAttemptID attemptID = new TaskAttemptID(jtId, 1, TaskType.MAP, 0, 0);
Task task = new MapTask(null, attemptID, 0, null, 5);
mapLauncher.addToTaskQueue(new LaunchTaskAction(task));
// verify that task is added to runningTasks
TaskInProgress killTip = tt.runningTasks.get(attemptID);
assertNotNull(killTip);
// wait for a while for launcher to pick up the task
// this loop waits atmost for 30 seconds
for (int i = 0; i < 300; i++) {
if (mapLauncher.getNumWaitingTasksToLaunch() == 0) {
break;
}
UtilsForTests.waitFor(100);
}
assertEquals("Launcher didnt pick up the task " + attemptID + "to launch",
0, mapLauncher.getNumWaitingTasksToLaunch());
// Now, that launcher has picked up the task, it waits until all five slots
// are available. i.e. it waits for-ever
// lets kill the task so that map launcher comes out
tt.processKillTaskAction(new KillTaskAction(attemptID));
assertEquals(TaskStatus.State.KILLED, killTip.getRunState());
// launch another attempt which requires only one slot
TaskAttemptID runningAttemptID = new TaskAttemptID(jtId, 1, TaskType.MAP,
0, expectedLaunchAttemptId);
mapLauncher.addToTaskQueue(new LaunchTaskAction(new MapTask(null,
runningAttemptID, 0, null, 1)));
TaskInProgress runningTip = tt.runningTasks.get(runningAttemptID);
assertNotNull(runningTip);
// wait for a while for the task to be launched
// this loop waits at most for 30 seconds
for (int i = 0; i < 300; i++) {
if (runningTip.getRunState().equals(TaskStatus.State.RUNNING)) {
break;
}
UtilsForTests.waitFor(100);
}
// verify that the task went to running
assertEquals(TaskStatus.State.RUNNING, runningTip.getRunState());
}
}