blob: 81452a1f223666e094396d4d68a77d1aea4851cf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
import java.io.File;
import java.io.IOException;
import java.util.*;
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
String queueConfigPath =
System.getProperty("test.build.extraconf", "build/test/extraconf");
File queueConfigFile =
new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
private static int jobCounter;
private ControlledInitializationPoller controlledInitializationPoller;
protected JobConf conf;
protected CapacityTaskScheduler scheduler;
private FakeTaskTrackerManager taskTrackerManager;
private FakeClock clock;
@Override
protected void setUp() {
setUp(2, 2, 1);
}
private void setUp(
int numTaskTrackers, int numMapTasksPerTracker,
int numReduceTasksPerTracker) {
jobCounter = 0;
taskTrackerManager =
new FakeTaskTrackerManager(
numTaskTrackers, numMapTasksPerTracker,
numReduceTasksPerTracker);
clock = new FakeClock();
scheduler = new CapacityTaskScheduler(clock);
scheduler.setTaskTrackerManager(taskTrackerManager);
conf = new JobConf();
// Don't let the JobInitializationPoller come in our way.
conf.set("mapred.queue.names","default");
controlledInitializationPoller =
new ControlledInitializationPoller(scheduler.jobQueuesManager,
taskTrackerManager);
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
//by default disable speculative execution.
conf.setMapSpeculativeExecution(false);
conf.setReduceSpeculativeExecution(false);
}
@Override
protected void tearDown() throws Exception {
if (scheduler != null) {
scheduler.terminate();
}
}
/**
* Test max capacity
* @throws IOException
*/
public void testMaxCapacity() throws IOException {
this.setUp(4, 1, 1);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
.setMaxCapacityPercent(50.0f);
//submit the Job
FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
JobStatus.PREP, 4, 4, "default", "user");
taskTrackerManager.initJob(fjob1);
HashMap<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
List<Task> task1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1", expectedStrings);
expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
List<Task> task2 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2", expectedStrings);
//we have already reached the limit
//this call would return null
List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
assertNull(task3);
//Now complete the task 1 i.e map task.
for (Task task : task1) {
taskTrackerManager.finishTask(
task.getTaskID().toString(), fjob1);
}
expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt1");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt1");
task2 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1", expectedStrings);
}
// test job run-state change
public void testJobRunStateChange() throws IOException {
// start the scheduler
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit the job
FakeJobInProgress fjob1 =
taskTrackerManager.submitJob(JobStatus.PREP, 1, 0, "default", "user");
FakeJobInProgress fjob2 =
taskTrackerManager.submitJob(JobStatus.PREP, 1, 0, "default", "user");
// test if changing the job priority/start-time works as expected in the
// waiting queue
testJobOrderChange(fjob1, fjob2, true);
// Init the jobs
// simulate the case where the job with a lower priority becomes running
// first (may be because of the setup tasks).
// init the lower ranked job first
taskTrackerManager.initJob(fjob2);
// init the higher ordered job later
taskTrackerManager.initJob(fjob1);
// check if the jobs are missing from the waiting queue
// The jobs are not removed from waiting queue until they are scheduled
assertEquals(
"Waiting queue is garbled on job init", 2,
scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
.size());
// test if changing the job priority/start-time works as expected in the
// running queue
testJobOrderChange(fjob1, fjob2, false);
// schedule a task
List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
// complete the job
taskTrackerManager.finishTask(
tasks.get(0).getTaskID().toString(),
fjob1);
// mark the job as complete
taskTrackerManager.finalizeJob(fjob1);
Collection<JobInProgress> rqueue =
scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
// check if the job is removed from the scheduler
assertFalse(
"Scheduler contains completed job",
rqueue.contains(fjob1));
// check if the running queue size is correct
assertEquals(
"Job finish garbles the queue",
1, rqueue.size());
}
// test if the queue reflects the changes
private void testJobOrderChange(
FakeJobInProgress fjob1,
FakeJobInProgress fjob2,
boolean waiting) {
String queueName = waiting ? "waiting" : "running";
// check if the jobs in the queue are the right order
JobInProgress[] jobs = getJobsInQueue(waiting);
assertTrue(
queueName + " queue doesnt contain job #1 in right order",
jobs[0].getJobID().equals(fjob1.getJobID()));
assertTrue(
queueName + " queue doesnt contain job #2 in right order",
jobs[1].getJobID().equals(fjob2.getJobID()));
// I. Check the start-time change
// Change job2 start-time and check if job2 bumps up in the queue
taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
jobs = getJobsInQueue(waiting);
assertTrue(
"Start time change didnt not work as expected for job #2 in "
+ queueName + " queue",
jobs[0].getJobID().equals(fjob2.getJobID()));
assertTrue(
"Start time change didnt not work as expected for job #1 in"
+ queueName + " queue",
jobs[1].getJobID().equals(fjob1.getJobID()));
// check if the queue is fine
assertEquals(
"Start-time change garbled the " + queueName + " queue",
2, jobs.length);
// II. Change job priority change
// Bump up job1's priority and make sure job1 bumps up in the queue
taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
// Check if the priority changes are reflected
jobs = getJobsInQueue(waiting);
assertTrue(
"Priority change didnt not work as expected for job #1 in "
+ queueName + " queue",
jobs[0].getJobID().equals(fjob1.getJobID()));
assertTrue(
"Priority change didnt not work as expected for job #2 in "
+ queueName + " queue",
jobs[1].getJobID().equals(fjob2.getJobID()));
// check if the queue is fine
assertEquals(
"Priority change has garbled the " + queueName + " queue",
2, jobs.length);
// reset the queue state back to normal
taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
}
private JobInProgress[] getJobsInQueue(boolean waiting) {
Collection<JobInProgress> queue =
waiting
? scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
: scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
return queue.toArray(new JobInProgress[0]);
}
// tests if tasks can be assinged when there are multiple jobs from a same
// user
public void testJobFinished() throws Exception {
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit 2 jobs
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 3, 0, "default", "u1");
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 3, 0, "default", "u1");
// I. Check multiple assignments with running tasks within job
// ask for a task from first job
Task t = checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000001_0 on tt1");
// ask for another task from the first job
t = checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
// complete tasks
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
// II. Check multiple assignments with running tasks across jobs
// ask for a task from first job
t = checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000003_0 on tt1");
// ask for a task from the second job
t = checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000001_0 on tt1");
// complete tasks
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
// III. Check multiple assignments with completed tasks across jobs
// ask for a task from the second job
t = checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000002_0 on tt1");
// complete task
taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j2);
// IV. Check assignment with completed job
// finish first job
scheduler.jobQueuesManager.getJobQueue(j1).jobCompleted(j1);
// ask for another task from the second job
// if tasks can be assigned then the structures are properly updated
t = checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000003_0 on tt1");
// complete task
taskTrackerManager.finishTask("attempt_test_0002_m_000003_0", j2);
}
/**
* tests the submission of jobs to container and job queues
* @throws Exception
*/
public void testJobSubmission() throws Exception {
JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
queues[0].getProperties().setProperty(
CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
queues[1].getProperties().setProperty(
CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
setUp(1, 4, 4);
// use the queues from the config file.
taskTrackerManager.setQueueManager(new QueueManager());
scheduler.start();
// submit a job to the container queue
try {
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0,
queues[0].getQueueName(), "user");
fail("Jobs are being able to be submitted to the container queue");
} catch (Exception e) {
assertTrue(scheduler.getJobs(queues[0].getQueueName()).isEmpty());
}
FakeJobInProgress job = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
1, 0, queues[1].getQueueName(), "user");
assertEquals(1, scheduler.getJobs(queues[1].getQueueName()).size());
assertTrue(scheduler.getJobs(queues[1].getQueueName()).contains(job));
// check if the job is submitted
checkAssignment(taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000001_0 on tt1");
// test for getJobs
HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
taskTrackerManager.submitJobs(1, 4, queues[2].getQueueName());
JobQueuesManager mgr = scheduler.jobQueuesManager;
//Raise status change events for jobs submitted.
raiseStatusChangeEvents(mgr, queues[2].getQueueName());
Collection<JobInProgress> jobs =
scheduler.getJobs(queues[2].getQueueName());
assertTrue(
"Number of jobs returned by scheduler is wrong"
, jobs.size() == 4);
assertTrue(
"Submitted jobs and Returned jobs are not same",
subJobsList.get("u1").containsAll(jobs));
}
//Basic test to test capacity allocation across the queues which have no
//capacity configured.
public void testCapacityAllocationToQueues() throws Exception {
String[] qs = {"default", "qAZ1", "qAZ2", "qAZ3", "qAZ4"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
queues.add(new FakeQueueInfo("qAZ1", -1.0f, true, 25));
queues.add(new FakeQueueInfo("qAZ2", -1.0f, true, 25));
queues.add(new FakeQueueInfo("qAZ3", -1.0f, true, 25));
queues.add(new FakeQueueInfo("qAZ4", -1.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
JobQueuesManager jqm = scheduler.jobQueuesManager;
assertEquals(18.75f, jqm.getJobQueue("qAZ1").qsc.getCapacityPercent());
assertEquals(18.75f, jqm.getJobQueue("qAZ2").qsc.getCapacityPercent());
assertEquals(18.75f, jqm.getJobQueue("qAZ3").qsc.getCapacityPercent());
assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
}
public void testCapacityAllocFailureWithLowerMaxCapacity() throws Exception {
String[] qs = {"default", "qAZ1"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
FakeQueueInfo qi = new FakeQueueInfo("qAZ1", -1.0f, true, 25);
qi.maxCapacity = 40.0f;
queues.add(qi);
taskTrackerManager.setFakeQueues(queues);
try {
scheduler.start();
fail("scheduler start should fail ");
}catch(IOException ise) {
Throwable e = ise.getCause();
assertTrue(e instanceof IllegalStateException);
assertEquals(
e.getMessage(),
" Capacity share (" + 75.0f + ")for unconfigured queue " + "qAZ1" +
" is greater than its maximum-capacity percentage " + 40.0f);
}
}
// Tests how capacity is computed and assignment of tasks done
// on the basis of the capacity.
public void testCapacityBasedAllocation() throws Exception {
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
// set the capacity % as 10%, so that capacity will be zero initially as
// the cluster capacity increase slowly.
queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job to the default queue
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
// submit a job to the second queue
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
// job from q2 runs first because it has some non-zero capacity.
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000001_0 on tt1");
verifyCapacity(taskTrackerManager, "0", "default");
verifyCapacity(taskTrackerManager, "3", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt3");
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0002_m_000002_0 on tt2");
verifyCapacity(taskTrackerManager, "0", "default");
verifyCapacity(taskTrackerManager, "5", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt4");
checkAssignment(
taskTrackerManager, scheduler, "tt3",
"attempt_test_0002_m_000003_0 on tt3");
verifyCapacity(taskTrackerManager, "0", "default");
verifyCapacity(taskTrackerManager, "7", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
// in terms of runningMaps / capacity.
checkAssignment(
taskTrackerManager, scheduler, "tt4",
"attempt_test_0001_m_000001_0 on tt4");
verifyCapacity(taskTrackerManager, "1", "default");
verifyCapacity(taskTrackerManager, "9", "q2");
}
// test capacity transfer
public void testCapacityTransfer() throws Exception {
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
Map<String,String> expectedStrings = new HashMap<String,String>();
expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
// I should get another map task.
//No redduces as there is 1 slot only for reduce on TT
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
// Now we're at full capacity for maps. If I ask for another map task,
// I should get a map task from the default queue's capacity.
//same with reduce
expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2");
expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
// and another
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0001_m_000004_0 on tt2");
}
/**
* test the high memory blocking with max capacity.
* @throws IOException
*/
public void testHighMemoryBlockingWithMaxCapacity()
throws IOException {
taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("defaultXYZM", 25.0f, true, 50));
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
.setMaxCapacityPercent(50);
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(1);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(2 * 1024);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(2);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
//high ram map from job 1 and normal reduce task from job 1
HashMap<String,String> expectedStrings = new HashMap<String,String>();
expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
"tt1", expectedStrings);
checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 1, 100.0f,0,2);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
//we have reached the maximum limit for map, so no more map tasks.
//we have used 1 reduce already and 1 more reduce slot is left for the
//before we reach maxcapacity for reduces.
// But current 1 slot + 2 slots for high ram reduce would
//mean we are crossing the maxium capacity.hence nothing would be assigned
//in this call
assertNull(scheduler.assignTasks(tracker("tt2")));
//complete the high ram job on tt1.
for (Task task : tasks) {
taskTrackerManager.finishTask(
task.getTaskID().toString(),
job1);
}
expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt2");
tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
"tt2", expectedStrings);
checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 2, 200.0f,0,2);
checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
//complete the high ram job on tt1.
for (Task task : tasks) {
taskTrackerManager.finishTask(
task.getTaskID().toString(),
job2);
}
expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt2");
expectedStrings.put(REDUCE,"attempt_test_0002_r_000002_0 on tt2");
tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
"tt2", expectedStrings);
}
/**
* test if user limits automatically adjust to max map or reduce limit
*/
public void testUserLimitsWithMaxCapacity() throws Exception {
setUp(2, 2, 2);
// set up some queues
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
.setMaxCapacityPercent(75);
// submit a job
FakeJobInProgress fjob1 =
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
FakeJobInProgress fjob2 =
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
// for queue 'default', maxCapacity for map and reduce is 3.
// initial user limit for 50% assuming there are 2 users/queue is.
// 1 map and 1 reduce.
// after max capacity it is 1.5 each.
//first job would be given 1 job each.
HashMap<String,String> expectedStrings = new HashMap<String,String>();
expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
"tt1", expectedStrings);
//for user u1 we have reached the limit. that is 1 job.
//1 more map and reduce tasks.
expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt1");
expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
"tt1", expectedStrings);
expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
"tt2", expectedStrings);
assertNull(scheduler.assignTasks(tracker("tt2")));
}
// Utility method to construct a map of expected strings
// with exactly one map task and one reduce task.
private void populateExpectedStrings(Map<String, String> expectedTaskStrings,
String mapTask, String reduceTask) {
expectedTaskStrings.clear();
expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask);
expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask);
}
// test user limits
public void testUserLimits() throws Exception {
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity is 2 for maps and 1 for reduce.
// Since we're the only user, we should get tasks
Map<String, String> expectedTaskStrings = new HashMap<String, String>();
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_000001_0 on tt1",
"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt1", expectedTaskStrings);
// Submit another job, from a different user
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a task, it should come from the second job
checkAssignment(taskTrackerManager, scheduler,
"tt1", "attempt_test_0002_m_000001_0 on tt1");
// Now we're at full capacity. If I ask for another task,
// I should get tasks from the default queue's capacity.
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_000002_0 on tt2",
"attempt_test_0002_r_000001_0 on tt2");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt2", expectedTaskStrings);
// and another
checkAssignment(taskTrackerManager, scheduler,
"tt2", "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
public void testUserLimits2() throws Exception {
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2 and reduce is 1.
// Since we're the only user, we should get tasks
Map<String, String> expectedTaskStrings = new HashMap<String, String>();
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_000001_0 on tt1",
"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt1", expectedTaskStrings);
// since we're the only job, we get another map
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
// Submit another job, from a different user
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a task, it should come from the second job
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0002_m_000001_0 on tt2",
"attempt_test_0002_r_000001_0 on tt2");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt2", expectedTaskStrings);
// and another
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
// and we need to wait for first job's task to complete
public void testUserLimits3() throws Exception {
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2 and reduces is 1.
// Since we're the only user, we should get a task
Map<String, String> expectedTaskStrings = new HashMap<String, String>();
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_000001_0 on tt1",
"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt1", expectedTaskStrings);
// since we're the only job, we get another map
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
// we get more tasks from 'default queue'
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_000003_0 on tt2",
"attempt_test_0001_r_000002_0 on tt2");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt2", expectedTaskStrings);
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0001_m_000004_0 on tt2");
// Submit another job, from a different user
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// one of the task finishes of each type
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1);
// Now if I ask for a task, it should come from the second job
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0002_m_000001_0 on tt1",
"attempt_test_0002_r_000001_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt1", expectedTaskStrings);
// another task from job1 finishes, another new task to job2
taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000002_0 on tt1");
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1);
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_000005_0 on tt2",
"attempt_test_0001_r_000003_0 on tt2");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt2", expectedTaskStrings);
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000003_0 on tt1");
}
// test user limits with many users, more slots
public void testUserLimits4() throws Exception {
// set up one queue, with 10 map slots and 5 reduce slots
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// add some more TTs
taskTrackerManager.addTaskTracker("tt3");
taskTrackerManager.addTaskTracker("tt4");
taskTrackerManager.addTaskTracker("tt5");
// u1 submits job
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
Map<String, String> expectedTaskStrings = new HashMap<String, String>();
for (int i=0; i<5; i++) {
String ttName = "tt"+(i+1);
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName,
"attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName);
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
ttName, expectedTaskStrings);
}
// u2 submits job with 4 slots
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
for (int i=0; i<4; i++) {
String ttName = "tt"+(i+1);
checkAssignment(taskTrackerManager, scheduler, ttName,
"attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName);
}
// last slot should go to u1, since u2 has no more tasks
checkAssignment(
taskTrackerManager, scheduler, "tt5",
"attempt_test_0001_m_000006_0 on tt5");
// u1 finishes tasks
taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1);
// u1 submits a few more jobs
// All the jobs are inited when submitted
// because of addition of Eager Job Initializer all jobs in this
//case would e initialised.
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// u2 also submits a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
// now u3 submits a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
// next map slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
// reduce slot will go to job 2, as it is still under limit.
populateExpectedStrings(expectedTaskStrings,
"attempt_test_0007_m_000001_0 on tt5",
"attempt_test_0002_r_000001_0 on tt5");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt5", expectedTaskStrings);
// some other task finishes and u3 gets it
taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
checkAssignment(
taskTrackerManager, scheduler, "tt4",
"attempt_test_0007_m_000002_0 on tt4");
// now, u2 finishes a task
taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
// next slot will go to u1, since u3 has nothing to run and u1's job is
// first in the queue
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0001_m_000007_0 on tt2");
}
/**
* Test to verify that high memory jobs hit user limits faster than any normal
* job.
*
* @throws IOException
*/
public void testUserLimitsForHighMemoryJobs()
throws IOException {
taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
scheduler.setTaskTrackerManager(taskTrackerManager);
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(
JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// Submit one normal job to the other queue.
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(6);
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("default");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
LOG.debug(
"Submit one high memory(2GB maps, 2GB reduces) job of "
+ "6 map and 6 reduce tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(2 * 1024);
jConf.setNumMapTasks(6);
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u2");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// Verify that normal job takes 5 task assignments to hit user limits
Map<String, String> expectedStrings = new HashMap<String, String>();
for (int i = 0; i < 5; i++) {
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP,
"attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE,
"attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
}
// u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
// are hit. So u2 should get slots
for (int i = 0; i < 2; i++) {
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP,
"attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE,
"attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
} // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
// slots. Because of high memory tasks, giving u2 another task would
// overflow limits. So, no more tasks should be given to anyone.
assertNull(scheduler.assignTasks(tracker("tt1")));
}
/*
* Following is the testing strategy for testing scheduling information.
* - start capacity scheduler with two queues.
* - check the scheduling information with respect to the configuration
* which was used to configure the queues.
* - Submit 5 jobs to a queue.
* - Check the waiting jobs count, it should be 5.
* - Then run initializationPoller()
* - Check once again the waiting queue, it should be 5 jobs again.
* - Then raise status change events.
* - Assign tasks to a task tracker.
* - Check waiting job count, it should be 4 now and used map (%) = 100
* and used reduce (%) = 100
* - finish the job and then check the used percentage it should go
* back to zero
* - Then pick an initialized job but not scheduled job and fail it.
* - Run the poller
* - Check the waiting job count should now be 3.
* - Now fail a job which has not been initialized at all.
* - Run the poller, so that it can clean up the job queue.
* - Check the count, the waiting job count should be 2.
* - Now raise status change events to move the initialized jobs which
* should be two in count to running queue.
* - Then schedule a map and reduce of the job in running queue.
* - Run the poller because the poller is responsible for waiting
* jobs count. Check the count, it should be using 100% map, reduce and one
* waiting job
* - fail the running job.
* - Check the count, it should be now one waiting job and zero running
* tasks
*/
public void testSchedulingInformation() throws Exception {
String[] qs = {"default", "q2"};
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.assignTasks(tracker("tt1")); // heartbeat
scheduler.assignTasks(tracker("tt2")); // heartbeat
int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
int totalReduces =
taskTrackerManager.getClusterStatus().getMaxReduceTasks();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
String schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
String schedulingInfo2 =
queueManager.getJobQueueInfo("q2").getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[0], "Queue configuration");
assertEquals(infoStrings[1], "Capacity Percentage: 50.0%");
assertEquals(infoStrings[2], "User Limit: 25%");
assertEquals(infoStrings[3], "Priority Supported: YES");
assertEquals(infoStrings[4], "-------------");
assertEquals(infoStrings[5], "Map tasks");
assertEquals(
infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+ " slots");
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[9], "-------------");
assertEquals(infoStrings[10], "Reduce tasks");
assertEquals(
infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+ " slots");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
assertEquals(infoStrings[14], "-------------");
assertEquals(infoStrings[15], "Job info");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
assertEquals(schedulingInfo, schedulingInfo2);
//Testing with actual job submission.
ArrayList<FakeJobInProgress> userJobs =
taskTrackerManager.submitJobs(1, 5, "default").get("u1");
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
//waiting job should be equal to number of jobs submitted.
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1");
//Initalize the jobs but don't raise events
controlledInitializationPoller.selectJobsToInitialize();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
//Raise status change event so that jobs can move to running queue.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
//assign one job
Map<String, String> strs = new HashMap<String, String>();
strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
strs);
//Initalize extra job.
controlledInitializationPoller.selectJobsToInitialize();
//Get scheduling information, now the number of waiting job should have
//changed to 4 as one is scheduled and has become running.
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[15], "Running tasks: 1");
assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[15], "Running tasks: 1");
assertEquals(infoStrings[16], "Active users:");
assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
for (Task task : t1) {
taskTrackerManager.finishTask(task.getTaskID().toString(), u1j1);
}
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
assertTrue(
"User1 job 2 not initalized ",
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
//Run initializer to clean up failed jobs
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 3");
//Fail a job which is not initialized but is in the waiting queue.
FakeJobInProgress u1j5 = userJobs.get(4);
assertFalse(
"User1 job 5 initalized ",
u1j5.getStatus().getRunState() == JobStatus.RUNNING);
taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
//run initializer to clean up failed job
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 2");
//Raise status change events as none of the intialized jobs would be
//in running queue as we just failed the second job which was initialized
//and completed the first one.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
//Now schedule a map should be job3 of the user as job1 succeeded job2
//failed and now job3 is running
strs.clear();
strs.put(CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
strs.put(CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
strs);
FakeJobInProgress u1j3 = userJobs.get(2);
assertTrue(
"User Job 3 not running ",
u1j3.getStatus().getRunState() == JobStatus.RUNNING);
//now the running count of map should be one and waiting jobs should be
//one. run the poller as it is responsible for waiting count
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[15], "Running tasks: 1");
assertEquals(infoStrings[16], "Active users:");
assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
assertEquals(infoStrings[20], "Number of Waiting Jobs: 1");
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
// make sure we update our stats
scheduler.updateContextInfoForTests();
//Now running counts should become zero
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[16], "Number of Waiting Jobs: 1");
}
/**
* Test to verify that highMemoryJobs are scheduled like all other jobs when
* memory-based scheduling is not enabled.
*
* @throws IOException
*/
public void testDisabledMemoryBasedScheduling()
throws IOException {
LOG.debug("Starting the scheduler.");
taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// memory-based scheduling disabled by default.
scheduler.start();
LOG.debug(
"Submit one high memory job of 1 3GB map task "
+ "and 1 1GB reduce task.");
JobConf jConf = new JobConf();
jConf.setMemoryForMapTask(3 * 1024L); // 3GB
jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
jConf.setUser("u1");
taskTrackerManager.submitJobAndInit(JobStatus.RUNNING, jConf);
// assert that all tasks are launched even though they transgress the
// scheduling limits.
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
}
/**
* Test reverting HADOOP-4979. If there is a high-mem job, we should now look
* at reduce jobs (if map tasks are high-mem) or vice-versa.
*
* @throws IOException
*/
public void testHighMemoryBlockingAcrossTaskTypes()
throws IOException {
// 2 map and 1 reduce slots
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// The situation : Two jobs in the queue. First job with only maps and no
// reduces and is a high memory job. Second job is a normal job with both
// maps and reduces.
// First job cannot run for want of memory for maps. In this case, second
// job's reduces should run.
LOG.debug(
"Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
LOG.debug(
"Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// first, a map from j1 and a reduce from other job j2
Map<String,String> strs = new HashMap<String,String>();
strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
strs.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
strs);
// Total 2 map slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
//TT has 2 slots for reduces hence this call should get a reduce task
//from other job
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_r_000002_0 on tt1");
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
//now as all the slots are occupied hence no more tasks would be
//assigned.
assertNull(scheduler.assignTasks(tracker("tt1")));
}
/**
* Tests that scheduler schedules normal jobs once high RAM jobs
* have been reserved to the limit.
*
* The test causes the scheduler to schedule a normal job on two
* trackers, and one task of the high RAM job on a third. Then it
* asserts that one of the first two trackers gets a reservation
* for the remaining task of the high RAM job. After this, it
* asserts that a normal job submitted later is allowed to run
* on a free slot, as all tasks of the high RAM job are either
* scheduled or reserved.
*
* @throws IOException
*/
public void testClusterBlockingForLackOfMemory()
throws IOException {
LOG.debug("Starting the scheduler.");
taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
taskTrackerManager.addQueues(new String[]{"default"});
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
LOG.debug(
"Submit one normal memory(1GB maps/reduces) job of "
+ "2 map, 2 reduce tasks.");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// Fill a tt with this job's tasks.
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
// Total 1 map slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
assertEquals(JobQueue.getJobQueueSchedInfo(1, 1, 0, 1, 1, 0),
job1.getSchedulingInfo().toString());
checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
// fill another TT with the rest of the tasks of the job
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
LOG.debug(
"Submit one high memory(2GB maps/reduces) job of "
+ "2 map, 2 reduce tasks.");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(2 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// Have another TT run one task of each type of the high RAM
// job. This will fill up the TT.
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
checkMultipleTaskAssignment(taskTrackerManager, scheduler,
"tt3", expectedStrings);
checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
assertEquals(JobQueue.getJobQueueSchedInfo(1, 2, 0, 1, 2, 0),
job2.getSchedulingInfo().toString());
checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
LOG.debug(
"Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 1 reduce tasks.");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// Send a TT with insufficient space for task assignment,
// This will cause a reservation for the high RAM job.
assertNull(scheduler.assignTasks(tracker("tt1")));
// reserved tasktrackers contribute to occupied slots for maps and reduces
checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
LOG.info(job2.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(1, 2, 2, 1, 2, 2),
job2.getSchedulingInfo().toString());
assertEquals(JobQueue.getJobQueueSchedInfo(0, 0, 0, 0, 0, 0),
job3.getSchedulingInfo().toString());
// Reservations are already done for job2. So job3 should go ahead.
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
}
/**
* Testcase to verify fix for a NPE (HADOOP-5641), when memory based
* scheduling is enabled and jobs are retired from memory when tasks
* are still active on some Tasktrackers.
*
* @throws IOException
*/
public void testMemoryMatchingWithRetiredJobs() throws IOException {
// create a cluster with a single node.
LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
// create scheduler
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
taskTrackerManager.addQueues(new String[]{"default"});
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024L);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 512);
scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024L);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 512);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a normal job
LOG.debug("Submitting a normal job with 2 maps and 2 reduces");
JobConf jConf = new JobConf();
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setMemoryForMapTask(512);
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// 1st cycle - 1 map and reduce gets assigned.
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
List<Task> t = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
// Total 1 map slot and 1 reduce slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
checkMemReservedForTasksOnTT("tt1", 512L, 512L);
// kill this job !
taskTrackerManager.killJob(job1.getJobID(), false);
// No more map/reduce slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
checkOccupiedSlots(
"default", TaskType.REDUCE, 0, 0,
0.0f);
// retire the job
taskTrackerManager.retireJob(job1.getJobID());
// submit another job.
LOG.debug("Submitting another normal job with 2 maps and 2 reduces");
jConf = new JobConf();
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setMemoryForMapTask(512);
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// since with HADOOP-5964, we don't rely on a job conf to get
// the memory occupied, scheduling should be able to work correctly.
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
// now, no more can be assigned because all the slots are blocked.
assertNull(scheduler.assignTasks(tracker("tt1")));
// finish the tasks on the tracker.
for (Task task : t) {
taskTrackerManager.finishTask(task.getTaskID().toString(), job1);
}
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
// now a new task can be assigned.
t = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
// memory used will change because of the finished task above.
checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
}
/*
* Test cases for Job Initialization poller.
*/
/*
* This test verifies that the correct number of jobs for
* correct number of users is initialized.
* It also verifies that as jobs of users complete, new jobs
* from the correct users are initialized.
*/
public void testJobInitialization() throws Exception {
// set up the scheduler
String[] qs = {"default"};
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
// submit 4 jobs each for 3 users.
HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
taskTrackerManager.submitJobs(
3,
4, "default");
// get the jobs submitted.
ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
// reference to the initializedJobs data structure
// changes are reflected in the set as they are made by the poller
Set<JobID> initializedJobs = initPoller.getInitializedJobList();
// we should have 12 (3 x 4) jobs in the job queue
assertEquals(mgr.getJobQueue("default").getWaitingJobs().size(), 12);
// run one poller iteration.
controlledInitializationPoller.selectJobsToInitialize();
// the poller should initialize 6 jobs
// 3 users and 2 jobs from each
assertEquals(initializedJobs.size(), 6);
assertTrue(
"Initialized jobs didnt contain the user1 job 1",
initializedJobs.contains(u1Jobs.get(0).getJobID()));
assertTrue(
"Initialized jobs didnt contain the user1 job 2",
initializedJobs.contains(u1Jobs.get(1).getJobID()));
assertTrue(
"Initialized jobs didnt contain the user2 job 1",
initializedJobs.contains(u2Jobs.get(0).getJobID()));
assertTrue(
"Initialized jobs didnt contain the user2 job 2",
initializedJobs.contains(u2Jobs.get(1).getJobID()));
assertTrue(
"Initialized jobs didnt contain the user3 job 1",
initializedJobs.contains(u3Jobs.get(0).getJobID()));
assertTrue(
"Initialized jobs didnt contain the user3 job 2",
initializedJobs.contains(u3Jobs.get(1).getJobID()));
// now submit one more job from another user.
FakeJobInProgress u4j1 =
taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "default", "u4");
// run the poller again.
controlledInitializationPoller.selectJobsToInitialize();
// since no jobs have started running, there should be no
// change to the initialized jobs.
assertEquals(initializedJobs.size(), 6);
assertFalse(
"Initialized jobs contains user 4 jobs",
initializedJobs.contains(u4j1.getJobID()));
// This event simulates raising the event on completion of setup task
// and moves the job to the running list for the scheduler to pick up.
raiseStatusChangeEvents(mgr);
// get some tasks assigned.
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt2");
List<Task> t2 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
for (Task task : t1) {
taskTrackerManager.finishTask(
task.getTaskID().toString(), u1Jobs.get(
0));
}
for (Task task : t2) {
taskTrackerManager.finishTask(
task.getTaskID().toString(), u1Jobs.get(
0));
}
// as some jobs have running tasks, the poller will now
// pick up new jobs to initialize.
controlledInitializationPoller.selectJobsToInitialize();
// count should still be the same
assertEquals(initializedJobs.size(), 6);
// new jobs that have got into the list
assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
raiseStatusChangeEvents(mgr);
// the first two jobs are done, no longer in the initialized list.
assertFalse(
"Initialized jobs contains the user1 job 1",
initializedJobs.contains(u1Jobs.get(0).getJobID()));
assertFalse(
"Initialized jobs contains the user1 job 2",
initializedJobs.contains(u1Jobs.get(1).getJobID()));
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
// finish one more job
t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
for (Task task : t1) {
taskTrackerManager.finishTask(
task.getTaskID().toString(), u1Jobs.get(
2));
}
// no new jobs should be picked up, because max user limit
// is still 3.
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobs.size(), 5);
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0004_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0004_r_000001_0 on tt1");
// run 1 more jobs..
t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
for (Task task : t1) {
taskTrackerManager.finishTask(
task.getTaskID().toString(), u1Jobs.get(
3));
}
// Now initialised jobs should contain user 4's job, as
// user 1's jobs are all done and the number of users is
// below the limit
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobs.size(), 5);
assertTrue(initializedJobs.contains(u4j1.getJobID()));
controlledInitializationPoller.stopRunning();
}
/*
* testHighPriorityJobInitialization() shows behaviour when high priority job
* is submitted into a queue and how initialisation happens for the same.
*/
public void testHighPriorityJobInitialization() throws Exception {
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
// submit 3 jobs for 3 users
taskTrackerManager.submitJobs(3, 3, "default");
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobsList.size(), 6);
// submit 2 job for a different user. one of them will be made high priority
FakeJobInProgress u4j1 = taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "default", "u4");
FakeJobInProgress u4j2 = taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "default", "u4");
controlledInitializationPoller.selectJobsToInitialize();
// shouldn't change
assertEquals(initializedJobsList.size(), 6);
assertFalse(
"Contains U4J1 high priority job ",
initializedJobsList.contains(u4j1.getJobID()));
assertFalse(
"Contains U4J2 Normal priority job ",
initializedJobsList.contains(u4j2.getJobID()));
// change priority of one job
taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
controlledInitializationPoller.selectJobsToInitialize();
// the high priority job should get initialized, but not the
// low priority job from u4, as we have already exceeded the
// limit.
assertEquals(initializedJobsList.size(), 7);
assertTrue(
"Does not contain U4J1 high priority job ",
initializedJobsList.contains(u4j1.getJobID()));
assertFalse(
"Contains U4J2 Normal priority job ",
initializedJobsList.contains(u4j2.getJobID()));
controlledInitializationPoller.stopRunning();
}
public void testJobMovement() throws Exception {
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
// check proper running job movement and completion
checkRunningJobMovementAndCompletion();
// check failed running job movement
checkFailedRunningJobMovement();
// Check job movement of failed initalized job
checkFailedInitializedJobMovement();
// Check failed waiting job movement
checkFailedWaitingJobMovement();
}
public void testStartWithoutDefaultQueueConfigured() throws Exception {
//configure a single queue which is not default queue
String[] qs = {"q1"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("q1", 100.0f, true, 100));
taskTrackerManager.setFakeQueues(queues);
//Start the scheduler.
scheduler.start();
//Submit a job and wait till it completes
FakeJobInProgress job =
taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
Map<String,String> strs = new HashMap<String,String>();
strs.put(CapacityTestUtils.MAP,"attempt_test_0001_m_000001_0 on tt1");
strs.put(CapacityTestUtils.REDUCE,"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
strs);
}
public void testFailedJobInitalizations() throws Exception {
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
//Submit a job whose initialization would fail always.
FakeJobInProgress job =
new FakeFailingJobInProgress(
new JobID("test", ++jobCounter),
new JobConf(), taskTrackerManager, "u1", UtilsForTests.getJobTracker());
job.getStatus().setRunState(JobStatus.PREP);
taskTrackerManager.submitJob(job);
//check if job is present in waiting list.
assertEquals(
"Waiting job list does not contain submitted job",
1, mgr.getJobQueue("default").getWaitingJobCount());
assertTrue(
"Waiting job does not contain submitted job",
mgr.getJobQueue("default").getWaitingJobs().contains(job));
//initialization should fail now.
controlledInitializationPoller.selectJobsToInitialize();
//Check if the job has been properly cleaned up.
assertEquals(
"Waiting job list contains submitted job",
0, mgr.getJobQueue("default").getWaitingJobCount());
assertFalse(
"Waiting job contains submitted job",
mgr.getJobQueue("default").getWaitingJobs().contains(job));
assertFalse(
"Waiting job contains submitted job",
mgr.getJobQueue("default").getRunningJobs().contains(job));
}
/**
* Test case deals with normal jobs which have speculative maps and reduce.
* Following is test executed
* <ol>
* <li>Submit one job with speculative maps and reduce.</li>
* <li>Submit another job with no speculative execution.</li>
* <li>Observe that all tasks from first job get scheduled, speculative
* and normal tasks</li>
* <li>Finish all the first jobs tasks second jobs tasks get scheduled.</li>
* </ol>
*
* @throws IOException
*/
public void testSpeculativeTaskScheduling() throws IOException {
String[] qs = {"default"};
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
JobConf conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setMapSpeculativeExecution(true);
conf.setReduceSpeculativeExecution(true);
//Submit a job which would have one speculative map and one speculative
//reduce.
FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
JobStatus.PREP, conf);
conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
//Submit a job which has no speculative map or reduce.
FakeJobInProgress fjob2 = taskTrackerManager.submitJob(
JobStatus.PREP, conf);
//Ask the poller to initalize all the submitted job and raise status
//change event.
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(mgr);
Map<String, String> strs = new HashMap<String, String>();
strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
strs);
assertTrue(
"Pending maps of job1 greater than zero",
(fjob1.pendingMaps() == 0));
assertTrue(
"Pending reduces of job1 greater than zero",
(fjob1.pendingReduces() == 0));
Map<String, String> str = new HashMap<String, String>();
str.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_1 on tt2");
str.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_1 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
str);
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", fjob1);
taskTrackerManager.finishTask("attempt_test_0001_m_000001_1", fjob1);
taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", fjob1);
taskTrackerManager.finishTask("attempt_test_0001_r_000001_1", fjob1);
taskTrackerManager.finalizeJob(fjob1);
str.clear();
str.put(CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
str.put(CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
str);
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", fjob2);
taskTrackerManager.finishTask("attempt_test_0002_r_000001_0", fjob2);
taskTrackerManager.finalizeJob(fjob2);
}
/**
* Test to verify that TTs are reserved for high memory jobs, but only till a
* TT is reserved for each of the pending task.
*
* @throws IOException
*/
public void testTTReservingWithHighMemoryJobs()
throws IOException {
// 3 taskTrackers, 2 map and 0 reduce slots on each TT
taskTrackerManager = new FakeTaskTrackerManager(3, 2, 0);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
LOG.debug(
"Submit a regular memory(1GB vmem maps/reduces) job of "
+ "3 map/red tasks");
JobConf jConf = new JobConf(conf);
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(3);
jConf.setNumReduceTasks(3);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
// assign one map task of job1 on all the TTs
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000001_0 on tt1");
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0001_m_000002_0 on tt2");
checkAssignment(
taskTrackerManager, scheduler, "tt3",
"attempt_test_0001_m_000003_0 on tt3");
scheduler.updateContextInfoForTests();
LOG.info(job1.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(3, 3, 0, 0, 0, 0),
job1.getSchedulingInfo().toString());
LOG.debug(
"Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
LOG.debug(
"Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
// Job2, a high memory job cannot be accommodated on a any TT. But with each
// trip to the scheduler, each of the TT should be reserved by job2.
assertNull(scheduler.assignTasks(tracker("tt1")));
scheduler.updateContextInfoForTests();
LOG.info(job2.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(0, 0, 2, 0, 0, 0),
job2.getSchedulingInfo().toString());
assertNull(scheduler.assignTasks(tracker("tt2")));
scheduler.updateContextInfoForTests();
LOG.info(job2.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
job2.getSchedulingInfo().toString());
// Job2 has only 2 pending tasks. So no more reservations. Job3 should get
// slots on tt3. tt1 and tt2 should not be assigned any slots with the
// reservation stats intact.
assertNull(scheduler.assignTasks(tracker("tt1")));
scheduler.updateContextInfoForTests();
LOG.info(job2.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
job2.getSchedulingInfo().toString());
assertNull(scheduler.assignTasks(tracker("tt2")));
scheduler.updateContextInfoForTests();
LOG.info(job2.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
job2.getSchedulingInfo().toString());
checkAssignment(
taskTrackerManager, scheduler, "tt3",
"attempt_test_0003_m_000001_0 on tt3");
scheduler.updateContextInfoForTests();
LOG.info(job2.getSchedulingInfo());
assertEquals(JobQueue.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
job2.getSchedulingInfo().toString());
// No more tasks there in job3 also
assertNull(scheduler.assignTasks(tracker("tt3")));
}
/**
* Test to verify that queue ordering is based on the number of slots occupied
* and hence to verify that presence of high memory jobs is reflected properly
* while determining used capacities of queues and hence the queue ordering.
*
* @throws IOException
*/
public void testQueueOrdering()
throws IOException {
taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6);
scheduler.setTaskTrackerManager(taskTrackerManager);
String[] qs = {"default", "q1"};
String[] reversedQs = {qs[1], qs[0]};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
LOG.debug(
"Submit one high memory(2GB maps, 2GB reduces) job of "
+ "6 map and 6 reduce tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(2 * 1024);
jConf.setNumMapTasks(6);
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// Submit a normal job to the other queue.
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(6);
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("q1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
// Map and reduce of high memory job should be assigned
HashMap<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// 1st map and reduce of normal job should be assigned
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// 2nd map and reduce of normal job should be assigned
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// Now both the queues are equally served. But the comparator doesn't change
// the order if queues are equally served.
// Hence, 3rd map and reduce of normal job should be assigned
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000003_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000003_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// 2nd map and reduce of high memory job should be assigned
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// 4th map and reduce of normal job should be assigned.
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000004_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000004_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
}
/**
* Tests whether 1 map and 1 reduce are assigned even if reduces span across
* multiple jobs or multiple queues.
*
* creates a cluster of 6 maps and 2 reduces.
* Submits 2 jobs:
* job1 , with 6 map and 1 reduces
* job2 with 2 map and 1 reduces
*
*
* check that first assignment assigns a map and a reduce.
* check that second assignment assigns a map and a reduce
* (both from other job and other queue)
*
* the last 2 calls just checks to make sure that we dont get further reduces
*
* @throws Exception
*/
public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
setUp(1, 6, 2);
// set up some queues
String[] qs = {"default", "q1"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
//Submit the job with 6 maps and 2 reduces
taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 6, 1, "default", "u1");
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 2, 1, "q1", "u2");
Map<String, String> str = new HashMap<String, String>();
str.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
str.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
// next assignment will be for job in second queue.
str.clear();
str.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
str.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
//now both the reduce slots are being used , hence we sholdnot get only 1
//map task in this assignTasks call.
str.clear();
str.put(MAP, "attempt_test_0002_m_000002_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
str.clear();
str.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
}
private void checkRunningJobMovementAndCompletion() throws IOException {
JobQueuesManager mgr = scheduler.jobQueuesManager;
JobInitializationPoller p = scheduler.getInitializationPoller();
// submit a job
FakeJobInProgress job =
taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "default", "u1");
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(p.getInitializedJobList().size(), 1);
// make it running.
raiseStatusChangeEvents(mgr);
// it should be there in both the queues.
assertTrue(
"Job not present in Job Queue",
mgr.getJobQueue("default").getWaitingJobs().contains(job));
assertTrue(
"Job not present in Running Queue",
mgr.getJobQueue("default").getRunningJobs().contains(job));
// assign a task
Map<String,String> strs = new HashMap<String,String>();
strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
strs.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
strs);
controlledInitializationPoller.selectJobsToInitialize();
// now this task should be removed from the initialized list.
assertTrue(p.getInitializedJobList().isEmpty());
// the job should also be removed from the job queue as tasks
// are scheduled
assertFalse(
"Job present in Job Queue",
mgr.getJobQueue("default").getWaitingJobs().contains(job));
// complete tasks and job
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", job);
taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", job);
taskTrackerManager.finalizeJob(job);
// make sure it is removed from the run queue
assertFalse(
"Job present in running queue",
mgr.getJobQueue("default").getRunningJobs().contains(job));
}
private void checkFailedRunningJobMovement() throws IOException {
JobQueuesManager mgr = scheduler.jobQueuesManager;
//submit a job and initalized the same
FakeJobInProgress job =
taskTrackerManager.submitJobAndInit(JobStatus.RUNNING, 1, 1, "default", "u1");
//check if the job is present in running queue.
assertTrue(
"Running jobs list does not contain submitted job",
mgr.getJobQueue("default").getRunningJobs().contains(job));
taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
//check if the job is properly removed from running queue.
assertFalse(
"Running jobs list does not contain submitted job",
mgr.getJobQueue("default").getRunningJobs().contains(job));
}
private void checkFailedInitializedJobMovement() throws IOException {
JobQueuesManager mgr = scheduler.jobQueuesManager;
JobInitializationPoller p = scheduler.getInitializationPoller();
//submit a job
FakeJobInProgress job = taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "default", "u1");
//Initialize the job
p.selectJobsToInitialize();
//Don't raise the status change event.
//check in waiting and initialized jobs list.
assertTrue(
"Waiting jobs list does not contain the job",
mgr.getJobQueue("default").getWaitingJobs().contains(job));
assertTrue(
"Initialized job does not contain the job",
p.getInitializedJobList().contains(job.getJobID()));
//fail the initalized job
taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
//Check if the job is present in waiting queue
assertFalse(
"Waiting jobs list contains failed job",
mgr.getJobQueue("default").getWaitingJobs().contains(job));
//run the poller to do the cleanup
p.selectJobsToInitialize();
//check for failed job in the initialized job list
assertFalse(
"Initialized jobs contains failed job",
p.getInitializedJobList().contains(job.getJobID()));
}
private void checkFailedWaitingJobMovement() throws IOException {
JobQueuesManager mgr = scheduler.jobQueuesManager;
// submit a job
FakeJobInProgress job = taskTrackerManager.submitJob(
JobStatus.PREP, 1, 1, "default",
"u1");
// check in waiting and initialized jobs list.
assertTrue(
"Waiting jobs list does not contain the job", mgr
.getJobQueue("default").getWaitingJobs().contains(job));
// fail the waiting job
taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
// Check if the job is present in waiting queue
assertFalse(
"Waiting jobs list contains failed job", mgr
.getJobQueue("default").getWaitingJobs().contains(job));
}
private void raiseStatusChangeEvents(JobQueuesManager mgr) {
raiseStatusChangeEvents(mgr, "default");
}
private void raiseStatusChangeEvents(JobQueuesManager mgr, String queueName) {
Collection<JobInProgress> jips = mgr.getJobQueue(queueName)
.getWaitingJobs();
for (JobInProgress jip : jips) {
if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
JobStatusChangeEvent evt = new JobStatusChangeEvent(
jip,
EventType.RUN_STATE_CHANGED, jip.getStatus());
mgr.jobUpdated(evt);
}
}
}
protected TaskTracker tracker(String taskTrackerName) {
return taskTrackerManager.getTaskTracker(taskTrackerName);
}
/**
* Get the amount of memory that is reserved for tasks on the taskTracker and
* verify that it matches what is expected.
*
* @param taskTracker
* @param expectedMemForMapsOnTT
* @param expectedMemForReducesOnTT
*/
private void checkMemReservedForTasksOnTT(
String taskTracker,
Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
Long observedMemForMapsOnTT =
scheduler.memoryMatcher.getMemReservedForTasks(
tracker(taskTracker).getStatus(),
TaskType.MAP);
Long observedMemForReducesOnTT =
scheduler.memoryMatcher.getMemReservedForTasks(
tracker(taskTracker).getStatus(),
TaskType.REDUCE);
if (expectedMemForMapsOnTT == null) {
assertEquals(observedMemForMapsOnTT,null);
} else {
assertEquals(observedMemForMapsOnTT,expectedMemForMapsOnTT);
}
if (expectedMemForReducesOnTT == null) {
assertEquals(observedMemForReducesOnTT,null);
} else {
assertEquals(observedMemForReducesOnTT,expectedMemForReducesOnTT);
}
}
/**
* Verify the number of slots of type 'type' from the queue 'queue'.
* incrMapIndex and incrReduceIndex are set , when expected output string is
* changed.these values can be set if the index of
* "Used capacity: %d (%.1f%% of Capacity)"
* is changed.
*
* @param queue
* @param type
* @param numActiveUsers in the queue at present.
* @param expectedOccupiedSlots
* @param expectedOccupiedSlotsPercent
* @param incrMapIndex
* @param incrReduceIndex
*/
private void checkOccupiedSlots(
String queue, TaskType type, int numActiveUsers, int expectedOccupiedSlots,
float expectedOccupiedSlotsPercent, int incrMapIndex, int incrReduceIndex) {
scheduler.updateContextInfoForTests();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
String schedulingInfo = queueManager.getJobQueueInfo(queue)
.getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
int index = -1;
if (type.equals(TaskType.MAP)) {
index = 7 + incrMapIndex;
} else if (type.equals(TaskType.REDUCE)) {
index =
(numActiveUsers == 0 ? 12 : 13 + numActiveUsers) + incrReduceIndex;
}
LOG.info(infoStrings[index]);
assertEquals(
String.format(
"Used capacity: %d (%.1f%% of Capacity)", expectedOccupiedSlots,
expectedOccupiedSlotsPercent), infoStrings[index]);
}
/**
* @param queue
* @param type
* @param numActiveUsers
* @param expectedOccupiedSlots
* @param expectedOccupiedSlotsPercent
*/
private void checkOccupiedSlots(
String queue,
TaskType type, int numActiveUsers,
int expectedOccupiedSlots, float expectedOccupiedSlotsPercent
) {
checkOccupiedSlots(
queue, type, numActiveUsers, expectedOccupiedSlots,
expectedOccupiedSlotsPercent, 0, 0);
}
private void checkQueuesOrder(
String[] expectedOrder, String[] observedOrder) {
assertTrue(
"Observed and expected queues are not of same length.",
expectedOrder.length == observedOrder.length);
int i = 0;
for (String expectedQ : expectedOrder) {
assertTrue(
"Observed and expected queues are not in the same order. "
+ "Differ at index " + i + ". Got " + observedOrder[i]
+ " instead of " + expectedQ, expectedQ.equals(observedOrder[i]));
i++;
}
}
public void testDeprecatedMemoryValues() throws IOException {
// 2 map and 1 reduce slots
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
JobConf conf = (JobConf) (scheduler.getConf());
conf.set(
JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, String.valueOf(
1024 * 1024 * 3));
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
assertEquals(MemoryMatcher.getLimitMaxMemForMapSlot(), 3);
assertEquals(MemoryMatcher.getLimitMaxMemForReduceSlot(), 3);
}
}