blob: edeb222a4f03df7eb039414bf20501ca27cdfb5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import junit.framework.TestCase;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
public class TestContainerQueue extends TestCase {
CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager = null;
CapacityTaskScheduler scheduler = null;
CapacityTestUtils.FakeClock clock = null;
CapacityTestUtils.ControlledInitializationPoller
controlledInitializationPoller;
@Override
protected void setUp() {
setUp(2, 2, 1);
}
private void setUp(
int numTaskTrackers, int numMapTasksPerTracker,
int numReduceTasksPerTracker) {
taskTrackerManager =
new CapacityTestUtils.FakeTaskTrackerManager(
numTaskTrackers, numMapTasksPerTracker,
numReduceTasksPerTracker);
clock = new CapacityTestUtils.FakeClock();
scheduler = new CapacityTaskScheduler(clock);
scheduler.setTaskTrackerManager(taskTrackerManager);
// Don't let the JobInitializationPoller come in our way.
controlledInitializationPoller =
new CapacityTestUtils.ControlledInitializationPoller(
scheduler.jobQueuesManager, taskTrackerManager);
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(taskTrackerManager.defaultJobConf);
}
/**
* check the minCapacity distribution across children
* and grand children.
* <p/>
* Check for both capacity % and also actual no of slots allocated.
*/
public void testMinCapacity() {
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
//Simple check to make sure that capacity is properly distributed among
// its children.
//level 1 children
QueueSchedulingContext a1 = new QueueSchedulingContext(
"a", 25, -1, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
"b", 25, -1, -1);
AbstractQueue q = new ContainerQueue(rt, a1);
AbstractQueue ql = new ContainerQueue(rt, a2);
//level 2 children
QueueSchedulingContext a = new QueueSchedulingContext(
"aa", 50, -1, -1);
QueueSchedulingContext b = new QueueSchedulingContext(
"ab", 50, -1, -1);
QueueSchedulingContext c = new QueueSchedulingContext(
"ac", 50, -1, -1);
QueueSchedulingContext d = new QueueSchedulingContext(
"ad", 50, -1, -1);
AbstractQueue q1 = new JobQueue(q, a);
AbstractQueue q2 = new JobQueue(q, b);
AbstractQueue q3 = new JobQueue(ql, c);
AbstractQueue q4 = new JobQueue(ql, d);
rt.update(1000, 1000);
//share at level 0.
// (1000 * 25) / 100
assertEquals(q.getQueueSchedulingContext().getMapTSC().getCapacity(), 250);
assertEquals(ql.getQueueSchedulingContext().getMapTSC().getCapacity(), 250);
//share would be (1000 * 25 / 100 ) * (50 / 100)
assertEquals(q1.getQueueSchedulingContext().getMapTSC().getCapacity(), 125);
assertEquals(q2.getQueueSchedulingContext().getMapTSC().getCapacity(), 125);
assertEquals(q3.getQueueSchedulingContext().getMapTSC().getCapacity(), 125);
assertEquals(q4.getQueueSchedulingContext().getMapTSC().getCapacity(), 125);
//
rt.update(1, 1);
assertEquals(q.getQueueSchedulingContext().getMapTSC().getCapacity(), 0);
assertEquals(ql.getQueueSchedulingContext().getMapTSC().getCapacity(), 0);
//share would be (1000 * 25 / 100 ) * (50 / 100)
assertEquals(q1.getQueueSchedulingContext().getMapTSC().getCapacity(), 0);
assertEquals(q2.getQueueSchedulingContext().getMapTSC().getCapacity(), 0);
assertEquals(q3.getQueueSchedulingContext().getMapTSC().getCapacity(), 0);
assertEquals(q4.getQueueSchedulingContext().getMapTSC().getCapacity(), 0);
}
public void testMaxCapacity() throws IOException {
this.setUp(4, 1, 1);
taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
QueueSchedulingContext a1 = new QueueSchedulingContext(
"R.a", 25, 50, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
"R.b", 25, 30, -1);
QueueSchedulingContext a3 = new QueueSchedulingContext(
"R.c", 50, -1, -1);
//Test for max capacity
AbstractQueue q = new JobQueue(rt, a1);
AbstractQueue q1 = new JobQueue(rt, a2);
AbstractQueue q2 = new JobQueue(rt, a3);
scheduler.jobQueuesManager.addQueue((JobQueue) q);
scheduler.jobQueuesManager.addQueue((JobQueue) q1);
scheduler.jobQueuesManager.addQueue((JobQueue) q2);
scheduler.setRoot(rt);
rt.update(4, 4);
scheduler.updateContextInfoForTests();
// submit a job to the second queue
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "R.a", "u1");
//Queue R.a should not more than 2 slots
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP,
"attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE,
"attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP,
"attempt_test_0001_m_000002_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE,
"attempt_test_0001_r_000002_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
assertNull(scheduler.assignTasks(
taskTrackerManager.getTaskTracker(
"tt3")));
}
public void testDistributedUnConfiguredCapacity() {
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
//generate Queuecontext for the children
QueueSchedulingContext a1 = new QueueSchedulingContext(
"a", 50, -1, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
"b", -1, -1, -1);
AbstractQueue rtChild1 = new ContainerQueue(rt, a1);
AbstractQueue rtChild2 = new ContainerQueue(rt, a2);
//Add further children to rtChild1.
QueueSchedulingContext b = new QueueSchedulingContext(
"ab", 30, -1, -1);
QueueSchedulingContext c = new QueueSchedulingContext(
"ac", -1, -1, -1);
QueueSchedulingContext d = new QueueSchedulingContext(
"ad", 100, -1, -1);
AbstractQueue q0 = new JobQueue(rtChild1, b);
AbstractQueue q1 = new JobQueue(rtChild1, c);
AbstractQueue q2 = new JobQueue(rtChild2, d);
rt.distributeUnConfiguredCapacity();
//after distribution the rtChild2 capacity should be 50.
assertEquals(
rtChild2.getQueueSchedulingContext().getCapacityPercent(), 50.0f);
//Now check the capacity of q1. It should be 60%.
assertTrue(q1.getQueueSchedulingContext().getCapacityPercent() == 70);
}
private Map setUpHierarchy() {
return setUpHierarchy(60, 40, 80, 20);
}
/**
* @param a capacity for sch q
* @param b capacity for gta q
* @param c capacity for sch.prod q
* @param d capacity for sch.misc q
* @return
*/
private Map<String, AbstractQueue> setUpHierarchy(
int a, int b, int c, int d) {
HashMap<String, AbstractQueue> map = new HashMap<String, AbstractQueue>();
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
map.put(rt.getName(), rt);
scheduler.setRoot(rt);
// Create 2 levels of hierarchy.
//Firt level
QueueSchedulingContext sch =
new QueueSchedulingContext("rt.sch", a, -1, -1);
QueueSchedulingContext gta =
new QueueSchedulingContext("rt.gta", b, -1, -1);
AbstractQueue schq = new ContainerQueue(rt, sch);
//Cannot declare a ContainerQueue if no children.
AbstractQueue gtaq = new JobQueue(rt, gta);
map.put(schq.getName(), schq);
map.put(gtaq.getName(), gtaq);
scheduler.jobQueuesManager.addQueue((JobQueue) gtaq);
//Create further children.
QueueSchedulingContext prod =
new QueueSchedulingContext("rt.sch.prod", c, -1, -1);
QueueSchedulingContext misc =
new QueueSchedulingContext("rt.sch.misc", d, -1, -1);
AbstractQueue prodq = new JobQueue(schq, prod);
AbstractQueue miscq = new JobQueue(schq, misc);
map.put(prodq.getName(), prodq);
map.put(miscq.getName(), miscq);
scheduler.jobQueuesManager.addQueue(
(JobQueue) prodq);
scheduler.jobQueuesManager.addQueue((JobQueue) miscq);
return map;
}
/**
* Verifies that capacities are allocated properly in hierarchical queues.
*
* The test case sets up a hierarchy of queues and submits jobs to
* all the queues. It verifies that computation of capacities, sorting,
* etc are working correctly.
* @throws Exception
*/
public void testCapacityAllocationInHierarchicalQueues() throws Exception {
this.setUp(9, 1, 1);
taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
// set up some queues
Map<String, AbstractQueue> map = setUpHierarchy();
scheduler.updateContextInfoForTests();
// verify initial capacity distribution
TaskSchedulingContext mapTsc
= map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 3);
mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 5);
mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 4);
mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 1);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{0, 0, 0, 0});
//Only Allow job submission to leaf queue
taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 4, 4, "rt.sch.prod",
"u1");
// submit a job to the second queue
taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 4, 4, "rt.sch.misc",
"u1");
//submit a job in gta level queue
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, "rt.gta", "u1");
int counter = 0;
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{0, 1, 1, 0});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{1, 1, 1, 0});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt3",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{1, 2, 1, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0003_m_000002_0 on tt4");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0003_r_000002_0 on tt4");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt4",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{2, 2, 1, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt5");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt5");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt5",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{2, 3, 2, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000003_0 on tt6");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000003_0 on tt6");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt6",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{2, 4, 3, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0003_m_000003_0 on tt7");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0003_r_000003_0 on tt7");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt7",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{3, 4, 3, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0001_m_000004_0 on tt8");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0001_r_000004_0 on tt8");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt8",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{3, 5, 4, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt9");
expectedStrings.put(
CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt9");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt9",
expectedStrings);
}
/**
* Test to make sure that capacity is divided at each level properly.
*
* The test case sets up a two level hierarchy of queues as follows:
* - rt.sch
* - rt.sch.prod
* - rt.sch.misc
* - rt.gta
* Jobs are submitted to rt.sch.misc and rt.gta, and the test verifies
* that as long as rt.sch is below rt.gta's capacity, it still gets
* allocated slots even if rt.sch.misc is over its capacity.
* @throws IOException
*/
public void testHierarchicalCapacityAllocation() throws IOException {
this.setUp(8, 1, 1);
taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
// set up some queues
Map<String, AbstractQueue> map = setUpHierarchy(70, 30, 80, 20);
scheduler.updateContextInfoForTests();
// verify capacities as per the setup.
TaskSchedulingContext mapTSC
= map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 2);
mapTSC = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 5);
mapTSC = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 4);
mapTSC = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 1);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{0, 0, 0, 0});
// submit a job to the second queue
taskTrackerManager.submitJobAndInit(
JobStatus.PREP, 20, 20, "rt.sch.misc",
"u1");
//submit a job in gta level queue
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "rt.gta", "u1");
int counter = 0;
Map<String, String> expectedStrings = new HashMap<String, String>();
expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{0, 1, 0, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt2");
expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt2");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{1, 1, 0, 1});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt3");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt3");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt3",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{1, 2, 0, 2});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt4");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt4");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt4",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{1, 3, 0, 3});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt5");
expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt5");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt5",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{2, 3, 0, 3});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0001_m_000004_0 on tt6");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000004_0 on tt6");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt6",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{2, 4, 0, 4});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt7");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt7");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt7",
expectedStrings);
assertUsedCapacity(
map,
new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
new int[]{2, 5, 0, 5});
//============================================
expectedStrings.clear();
expectedStrings.put(MAP, "attempt_test_0001_m_000006_0 on tt8");
expectedStrings.put(REDUCE, "attempt_test_0001_r_000006_0 on tt8");
checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt8",
expectedStrings);
}
// verify that the number of slots used for each queue
// matches the expected value.
private void assertUsedCapacity(Map<String, AbstractQueue> queueMap,
String[] queueNames, int[] expectedUsedSlots) {
scheduler.updateContextInfoForTests();
assertEquals(queueNames.length, expectedUsedSlots.length);
for (int i=0; i<queueNames.length; i++) {
TaskSchedulingContext mapTSC =
queueMap.get(queueNames[i]).getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getNumSlotsOccupied(), expectedUsedSlots[i]);
}
}
}