blob: a5e07494718fcba013839bf8417f53ef512b9c3b [file] [log] [blame]
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestParentQueue {
private static final Log LOG = LogFactory.getLog(TestParentQueue.class);
RMContext rmContext;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
@Before
public void setUp() throws Exception {
rmContext = TestUtils.getMockRMContext();
csConf = new CapacitySchedulerConfiguration();
csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getMinimumResourceCapability()).thenReturn(
Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB));
}
private static final String A = "a";
private static final String B = "b";
private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
conf.setCapacity(CapacityScheduler.ROOT, 100);
final String Q_A = CapacityScheduler.ROOT + "." + A;
conf.setCapacity(Q_A, 30);
final String Q_B = CapacityScheduler.ROOT + "." + B;
conf.setCapacity(Q_B, 70);
LOG.info("Setup top-level queues a and b");
}
private void stubQueueAllocation(final Queue queue,
final Resource clusterResource, final SchedulerNode node,
final int allocation) {
// Simulate the queue allocation
doAnswer(new Answer<Resource>() {
@Override
public Resource answer(InvocationOnMock invocation) throws Throwable {
try {
throw new Exception();
} catch (Exception e) {
LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() +
" alloc=" + allocation + " node=" + node.getHostName());
}
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
allocatedResource);
} else {
((LeafQueue)queue).allocateResource(clusterResource, "",
allocatedResource);
}
// Next call - nothing
if (allocation > 0) {
doReturn(Resources.none()).when(queue).assignContainers(
eq(clusterResource), eq(node));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
doReturn(Resources.subtractFrom(available, allocatedResource)).
when(node).getAvailableResource();
}
return allocatedResource;
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node));
}
private float computeQueueUtilization(Queue queue,
int expectedMemory, Resource clusterResource) {
return (expectedMemory /
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
}
@Test
public void testSingleLevelQueues() throws Exception {
// Setup queue configs
setupSingleLevelQueues(csConf);
Map<String, Queue> queues = new HashMap<String, Queue>();
Queue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacityScheduler.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
// Setup some nodes
final int memoryPerNode = 10;
final int numNodes = 2;
SchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Start testing
LeafQueue a = (LeafQueue)queues.get(A);
LeafQueue b = (LeafQueue)queues.get(B);
final float delta = 0.0001f;
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
assertEquals(0.0f, a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 1*GB, clusterResource),
b.getUtilization(), delta);
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 2*GB, clusterResource),
b.getUtilization(), delta);
// Now, B should get the scheduling opportunity
// since A has 2/6G while B has 2/14G
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
b.getUtilization(), delta);
// Now, B should still get the scheduling opportunity
// since A has 3/6G while B has 4/14G
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 8*GB, clusterResource),
b.getUtilization(), delta);
// Now, A should get the scheduling opportunity
// since A has 3/6G while B has 8/14G
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 4*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 9*GB, clusterResource),
b.getUtilization(), delta);
}
private static final String C = "c";
private static final String D = "d";
private static final String A1 = "a1";
private static final String A2 = "a2";
private static final String B1 = "b1";
private static final String B2 = "b2";
private static final String B3 = "b3";
private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B, C, D});
conf.setCapacity(CapacityScheduler.ROOT, 100);
final String Q_A = CapacityScheduler.ROOT + "." + A;
conf.setCapacity(Q_A, 10);
final String Q_B = CapacityScheduler.ROOT + "." + B;
conf.setCapacity(Q_B, 50);
final String Q_C = CapacityScheduler.ROOT + "." + C;
conf.setCapacity(Q_C, 20);
final String Q_D = CapacityScheduler.ROOT + "." + D;
conf.setCapacity(Q_D, 20);
// Define 2-nd level queues
conf.setQueues(Q_A, new String[] {A1, A2});
conf.setCapacity(Q_A + "." + A1, 50);
conf.setCapacity(Q_A + "." + A2, 50);
conf.setQueues(Q_B, new String[] {B1, B2, B3});
conf.setCapacity(Q_B + "." + B1, 10);
conf.setCapacity(Q_B + "." + B2, 20);
conf.setCapacity(Q_B + "." + B3, 70);
}
@Test
public void testMultiLevelQueues() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
Map<String, Queue> queues = new HashMap<String, Queue>();
Queue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacityScheduler.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
// Setup some nodes
final int memoryPerNode = 10;
final int numNodes = 3;
SchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_2 =
TestUtils.getMockNode("host_2", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Start testing
Queue a = queues.get(A);
Queue b = queues.get(B);
Queue c = queues.get(C);
Queue d = queues.get(D);
Queue a1 = queues.get(A1);
Queue a2 = queues.get(A2);
Queue b1 = queues.get(B1);
Queue b2 = queues.get(B2);
Queue b3 = queues.get(B3);
final float delta = 0.0001f;
// Simulate C returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
assertEquals(computeQueueUtilization(a, 0*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 0*GB, clusterResource),
b.getUtilization(), delta);
assertEquals(computeQueueUtilization(c, 1*GB, clusterResource),
c.getUtilization(), delta);
assertEquals(computeQueueUtilization(d, 0*GB, clusterResource),
d.getUtilization(), delta);
reset(a); reset(b); reset(c);
// Now get B2 to allocate
// A = 0/3, B = 0/15, C = 1/6, D=0/6
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1);
assertEquals(computeQueueUtilization(a, 0*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
b.getUtilization(), delta);
assertEquals(computeQueueUtilization(c, 1*GB, clusterResource),
c.getUtilization(), delta);
reset(a); reset(b); reset(c);
// Now get both A1, C & B3 to allocate in right order
// A = 0/3, B = 4/15, C = 1/6, D=0/6
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 1*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 6*GB, clusterResource),
b.getUtilization(), delta);
assertEquals(computeQueueUtilization(c, 3*GB, clusterResource),
c.getUtilization(), delta);
reset(a); reset(b); reset(c);
// Now verify max-capacity
// A = 1/3, B = 6/15, C = 3/6, D=0/6
// Ensure a1 won't alloc above max-cap although it should get
// scheduling opportunity now, right after a2
LOG.info("here");
((ParentQueue)a).setMaxCapacity(.1f); // a should be capped at 3/30
stubQueueAllocation(a1, clusterResource, node_2, 1*GB); // shouldn't be
// allocated due
// to max-cap
stubQueueAllocation(a2, clusterResource, node_2, 2*GB);
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 8*GB, clusterResource),
b.getUtilization(), delta);
assertEquals(computeQueueUtilization(c, 4*GB, clusterResource),
c.getUtilization(), delta);
reset(a); reset(b); reset(c);
}
@After
public void tearDown() throws Exception {
}
}