blob: dd297b3f5c021811496ac4df126ca52be25347f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
public class TestJobQueueTaskScheduler extends TestCase {
private static int jobCounter;
private static int taskCounter;
static void resetCounters() {
jobCounter = 0;
taskCounter = 0;
}
static class FakeJobInProgress extends JobInProgress {
private FakeTaskTrackerManager taskTrackerManager;
public FakeJobInProgress(JobConf jobConf,
FakeTaskTrackerManager taskTrackerManager, JobTracker jt)
throws IOException {
super(new JobID("test", ++jobCounter), jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP,
jobConf.getUser(),
jobConf.getJobName(), "", "");
this.status.setJobPriority(JobPriority.NORMAL);
this.status.setStartTime(startTime);
}
@Override
public synchronized void initTasks() throws IOException {
// do nothing
}
@Override
public Task obtainNewLocalMapTask(TaskTrackerStatus tts, int clusterSize,
int ignored)
throws IOException {
return obtainNewMapTask(tts, clusterSize, ignored);
}
@Override
public Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, int clusterSize,
int ignored)
throws IOException {
return obtainNewMapTask(tts, clusterSize, ignored);
}
@Override
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(TaskType.MAP);
Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 1) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
}
};
taskTrackerManager.update(tts.getTrackerName(), task);
runningMapTasks++;
return task;
}
@Override
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(TaskType.REDUCE);
Task task = new ReduceTask("", attemptId, 0, 10, 1) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
}
};
taskTrackerManager.update(tts.getTrackerName(), task);
runningReduceTasks++;
return task;
}
private TaskAttemptID getTaskAttemptID(TaskType type) {
JobID jobId = getJobID();
return new TaskAttemptID(jobId.getJtIdentifier(),
jobId.getId(), type, ++taskCounter, 0);
}
}
static class FakeTaskTrackerManager implements TaskTrackerManager {
int maps = 0;
int reduces = 0;
int maxMapTasksPerTracker = 2;
int maxReduceTasksPerTracker = 2;
List<JobInProgressListener> listeners =
new ArrayList<JobInProgressListener>();
QueueManager queueManager;
private Map<String, TaskTracker> trackers =
new HashMap<String, TaskTracker>();
public FakeTaskTrackerManager() {
JobConf conf = new JobConf();
queueManager = new QueueManager(conf);
TaskTracker tt1 = new TaskTracker("tt1");
tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
new ArrayList<TaskStatus>(), 0,
maxMapTasksPerTracker,
maxReduceTasksPerTracker));
trackers.put("tt1", tt1);
TaskTracker tt2 = new TaskTracker("tt2");
tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
new ArrayList<TaskStatus>(), 0,
maxMapTasksPerTracker,
maxReduceTasksPerTracker));
trackers.put("tt2", tt2);
}
@Override
public ClusterStatus getClusterStatus() {
int numTrackers = trackers.size();
return new ClusterStatus(numTrackers, 0,
10 * 60 * 1000,
maps, reduces,
numTrackers * maxMapTasksPerTracker,
numTrackers * maxReduceTasksPerTracker,
JobTrackerStatus.RUNNING);
}
@Override
public int getNumberOfUniqueHosts() {
return 0;
}
@Override
public Collection<TaskTrackerStatus> taskTrackers() {
List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
for (TaskTracker tt : trackers.values()) {
statuses.add(tt.getStatus());
}
return statuses;
}
@Override
public void addJobInProgressListener(JobInProgressListener listener) {
listeners.add(listener);
}
@Override
public void removeJobInProgressListener(JobInProgressListener listener) {
listeners.remove(listener);
}
@Override
public QueueManager getQueueManager() {
return queueManager;
}
@Override
public int getNextHeartbeatInterval() {
return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
}
@Override
public void killJob(JobID jobid) {
return;
}
@Override
public JobInProgress getJob(JobID jobid) {
return null;
}
@Override
public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
return true;
}
public void initJob(JobInProgress job) {
// do nothing
}
public void failJob(JobInProgress job) {
// do nothing
}
// Test methods
public void submitJob(JobInProgress job) throws IOException {
for (JobInProgressListener listener : listeners) {
listener.jobAdded(job);
}
}
public TaskTracker getTaskTracker(String trackerID) {
return trackers.get(trackerID);
}
public void update(String taskTrackerName, final Task t) {
if (t.isMapTask()) {
maps++;
} else {
reduces++;
}
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return t.isMapTask();
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
status.setRunState(TaskStatus.State.RUNNING);
trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
}
}
protected JobConf jobConf;
protected TaskScheduler scheduler;
private FakeTaskTrackerManager taskTrackerManager;
@Override
protected void setUp() throws Exception {
resetCounters();
jobConf = new JobConf();
jobConf.setNumMapTasks(10);
jobConf.setNumReduceTasks(10);
taskTrackerManager = new FakeTaskTrackerManager();
scheduler = createTaskScheduler();
scheduler.setConf(jobConf);
scheduler.setTaskTrackerManager(taskTrackerManager);
scheduler.start();
}
@Override
protected void tearDown() throws Exception {
if (scheduler != null) {
scheduler.terminate();
}
}
protected TaskScheduler createTaskScheduler() {
return new JobQueueTaskScheduler();
}
static void submitJobs(FakeTaskTrackerManager taskTrackerManager, JobConf jobConf,
int numJobs, int state)
throws IOException {
for (int i = 0; i < numJobs; i++) {
JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
UtilsForTests.getJobTracker());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
}
}
public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
assertEquals(0, scheduler.assignTasks(tracker(taskTrackerManager, "tt1")).size());
}
public void testNonRunningJobsAreIgnored() throws IOException {
submitJobs(taskTrackerManager, jobConf, 1, JobStatus.PREP);
submitJobs(taskTrackerManager, jobConf, 1, JobStatus.SUCCEEDED);
submitJobs(taskTrackerManager, jobConf, 1, JobStatus.FAILED);
submitJobs(taskTrackerManager, jobConf, 1, JobStatus.KILLED);
assertEquals(0, scheduler.assignTasks(tracker(taskTrackerManager, "tt1")).size());
}
public void testDefaultTaskAssignment() throws IOException {
submitJobs(taskTrackerManager, jobConf, 2, JobStatus.RUNNING);
// All slots are filled with job 1
checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"),
new String[] {"attempt_test_0001_m_000001_0 on tt1",
"attempt_test_0001_m_000002_0 on tt1",
"attempt_test_0001_r_000003_0 on tt1"});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"),
new String[] {"attempt_test_0001_r_000004_0 on tt1"});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), new String[] {});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"),
new String[] {"attempt_test_0001_m_000005_0 on tt2",
"attempt_test_0001_m_000006_0 on tt2",
"attempt_test_0001_r_000007_0 on tt2"});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"),
new String[] {"attempt_test_0001_r_000008_0 on tt2"});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), new String[] {});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), new String[] {});
checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), new String[] {});
}
static TaskTracker tracker(FakeTaskTrackerManager taskTrackerManager,
String taskTrackerName) {
return taskTrackerManager.getTaskTracker(taskTrackerName);
}
static void checkAssignment(TaskScheduler scheduler, TaskTracker taskTracker,
String[] expectedTaskStrings) throws IOException {
List<Task> tasks = scheduler.assignTasks(taskTracker);
assertNotNull(tasks);
assertEquals(expectedTaskStrings.length, tasks.size());
for (int i=0; i < expectedTaskStrings.length; ++i) {
assertEquals(expectedTaskStrings[i], tasks.get(i).toString());
}
}
}