| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import static org.junit.Assert.*; |
| import static org.mockito.Mockito.*; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestLeafQueue { |
| private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); |
| |
| private final RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| RMContext rmContext; |
| CapacityScheduler cs; |
| CapacitySchedulerConfiguration csConf; |
| CapacitySchedulerContext csContext; |
| |
| Queue root; |
| Map<String, Queue> queues = new HashMap<String, Queue>(); |
| |
| final static int GB = 1024; |
| final static String DEFAULT_RACK = "/default"; |
| |
| @Before |
| public void setUp() throws Exception { |
| cs = new CapacityScheduler(); |
| rmContext = TestUtils.getMockRMContext(); |
| |
| csConf = |
| new CapacitySchedulerConfiguration(); |
| setupQueueConfiguration(csConf); |
| |
| |
| csContext = mock(CapacitySchedulerContext.class); |
| when(csContext.getConfiguration()).thenReturn(csConf); |
| when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); |
| when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); |
| root = |
| CapacityScheduler.parseQueue(csContext, csConf, null, "root", |
| queues, queues, |
| CapacityScheduler.queueComparator, |
| CapacityScheduler.applicationComparator, |
| TestUtils.spyHook); |
| |
| cs.reinitialize(csConf, null, rmContext); |
| } |
| |
| private static final String A = "a"; |
| private static final String B = "b"; |
| private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { |
| |
| // Define top-level queues |
| conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B}); |
| conf.setCapacity(CapacityScheduler.ROOT, 100); |
| |
| final String Q_A = CapacityScheduler.ROOT + "." + A; |
| conf.setCapacity(Q_A, 10); |
| |
| final String Q_B = CapacityScheduler.ROOT + "." + B; |
| conf.setCapacity(Q_B, 90); |
| |
| LOG.info("Setup top-level queues a and b"); |
| } |
| |
| private LeafQueue stubLeafQueue(LeafQueue queue) { |
| |
| // Mock some methods for ease in these unit tests |
| |
| // 1. LeafQueue.createContainer to return dummy containers |
| doAnswer( |
| new Answer<Container>() { |
| @Override |
| public Container answer(InvocationOnMock invocation) |
| throws Throwable { |
| final SchedulerApp application = |
| (SchedulerApp)(invocation.getArguments()[0]); |
| final ContainerId containerId = |
| TestUtils.getMockContainerId(application); |
| |
| Container container = TestUtils.getMockContainer( |
| containerId, |
| ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(), |
| (Resource)(invocation.getArguments()[2])); |
| return container; |
| } |
| } |
| ). |
| when(queue).createContainer( |
| any(SchedulerApp.class), |
| any(SchedulerNode.class), |
| any(Resource.class)); |
| |
| // 2. Stub out LeafQueue.parent.completedContainer |
| Queue parent = queue.getParent(); |
| doNothing().when(parent).completedContainer( |
| any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class), |
| any(RMContainer.class), any(RMContainerEventType.class)); |
| |
| return queue; |
| } |
| |
| @Test |
| public void testSingleQueueWithOneUser() throws Exception { |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = |
| TestUtils.getMockApplicationAttemptId(0, 0); |
| SchedulerApp app_0 = |
| new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); |
| a.submitApplication(app_0, user_0, A); |
| |
| final ApplicationAttemptId appAttemptId_1 = |
| TestUtils.getMockApplicationAttemptId(1, 0); |
| SchedulerApp app_1 = |
| new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null); |
| a.submitApplication(app_1, user_0, A); // same user |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); |
| |
| final int numNodes = 1; |
| Resource clusterResource = Resources.createResource(numNodes * (8*GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priority = TestUtils.createMockPriority(1); |
| app_0.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority, |
| recordFactory))); |
| |
| app_1.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, |
| recordFactory))); |
| |
| // Start testing... |
| |
| // Only 1 container |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(1*GB, a.getUsedResources().getMemory()); |
| assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also |
| // you can get one container more than user-limit |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(2*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Can't allocate 3rd due to user-limit |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(2*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Bump up user-limit-factor, now allocate should work |
| a.setUserLimitFactor(10); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(3*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // One more should work, for app_1, due to user-limit-factor |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(4*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Test max-capacity |
| // Now - no more allocs since we are at max-cap |
| a.setMaxCapacity(0.5f); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(4*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Release each container from app_0 |
| for (RMContainer rmContainer : app_0.getLiveContainers()) { |
| a.completedContainer(clusterResource, app_0, node_0, rmContainer, |
| RMContainerEventType.KILL); |
| } |
| assertEquals(1*GB, a.getUsedResources().getMemory()); |
| assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Release each container from app_1 |
| for (RMContainer rmContainer : app_1.getLiveContainers()) { |
| a.completedContainer(clusterResource, app_1, node_0, rmContainer, |
| RMContainerEventType.KILL); |
| } |
| assertEquals(0*GB, a.getUsedResources().getMemory()); |
| assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| } |
| |
| @Test |
| public void testSingleQueueWithMultipleUsers() throws Exception { |
| |
| // Mock the queue |
| LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| final String user_1 = "user_1"; |
| final String user_2 = "user_2"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = |
| TestUtils.getMockApplicationAttemptId(0, 0); |
| SchedulerApp app_0 = |
| new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); |
| a.submitApplication(app_0, user_0, A); |
| |
| final ApplicationAttemptId appAttemptId_1 = |
| TestUtils.getMockApplicationAttemptId(1, 0); |
| SchedulerApp app_1 = |
| new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null); |
| a.submitApplication(app_1, user_0, A); // same user |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); |
| |
| final int numNodes = 1; |
| Resource clusterResource = Resources.createResource(numNodes * (8*GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priority = TestUtils.createMockPriority(1); |
| app_0.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority, |
| recordFactory))); |
| |
| app_1.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority, |
| recordFactory))); |
| |
| /** |
| * Start testing... |
| */ |
| |
| // Only 1 container |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(1*GB, a.getUsedResources().getMemory()); |
| assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also |
| // you can get one container more than user-limit |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(2*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Can't allocate 3rd due to user-limit |
| a.setUserLimit(25); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(2*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Submit more apps |
| final ApplicationAttemptId appAttemptId_2 = |
| TestUtils.getMockApplicationAttemptId(2, 0); |
| SchedulerApp app_2 = |
| new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null); |
| a.submitApplication(app_2, user_1, A); |
| |
| final ApplicationAttemptId appAttemptId_3 = |
| TestUtils.getMockApplicationAttemptId(3, 0); |
| SchedulerApp app_3 = |
| new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null); |
| a.submitApplication(app_3, user_2, A); |
| |
| app_2.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority, |
| recordFactory))); |
| |
| app_3.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, |
| recordFactory))); |
| |
| // Now allocations should goto app_2 since |
| // user_0 is at limit inspite of high user-limit-factor |
| a.setUserLimitFactor(10); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(5*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(3*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // Now allocations should goto app_0 since |
| // user_0 is at user-limit not above it |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(6*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(3*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // Test max-capacity |
| // Now - no more allocs since we are at max-cap |
| a.setMaxCapacity(0.5f); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(6*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(3*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // Revert max-capacity and user-limit-factor |
| // Now, allocations should goto app_3 since it's under user-limit |
| a.setMaxCapacity(-1); |
| a.setUserLimitFactor(1); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(7*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(3*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(1*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // Now we should assign to app_3 again since user_2 is under user-limit |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(8*GB, a.getUsedResources().getMemory()); |
| assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(3*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(2*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // 8. Release each container from app_0 |
| for (RMContainer rmContainer : app_0.getLiveContainers()) { |
| a.completedContainer(clusterResource, app_0, node_0, rmContainer, |
| RMContainerEventType.KILL); |
| } |
| assertEquals(5*GB, a.getUsedResources().getMemory()); |
| assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(3*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(2*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // 9. Release each container from app_2 |
| for (RMContainer rmContainer : app_2.getLiveContainers()) { |
| a.completedContainer(clusterResource, app_2, node_0, rmContainer, |
| RMContainerEventType.KILL); |
| } |
| assertEquals(2*GB, a.getUsedResources().getMemory()); |
| assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(2*GB, app_3.getCurrentConsumption().getMemory()); |
| |
| // 10. Release each container from app_3 |
| for (RMContainer rmContainer : app_3.getLiveContainers()) { |
| a.completedContainer(clusterResource, app_3, node_0, rmContainer, |
| RMContainerEventType.KILL); |
| } |
| assertEquals(0*GB, a.getUsedResources().getMemory()); |
| assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); |
| } |
| |
| @Test |
| public void testReservation() throws Exception { |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| final String user_1 = "user_1"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = |
| TestUtils.getMockApplicationAttemptId(0, 0); |
| SchedulerApp app_0 = |
| new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); |
| a.submitApplication(app_0, user_0, A); |
| |
| final ApplicationAttemptId appAttemptId_1 = |
| TestUtils.getMockApplicationAttemptId(1, 0); |
| SchedulerApp app_1 = |
| new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null); |
| a.submitApplication(app_1, user_1, A); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); |
| |
| final int numNodes = 1; |
| Resource clusterResource = Resources.createResource(numNodes * (8*GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priority = TestUtils.createMockPriority(1); |
| app_0.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, |
| recordFactory))); |
| |
| app_1.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority, |
| recordFactory))); |
| |
| // Start testing... |
| |
| // Only 1 container |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(1*GB, a.getUsedResources().getMemory()); |
| assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also |
| // you can get one container more than user-limit |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(2*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| |
| // Now, reservation should kick in for app_1 |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(6*GB, a.getUsedResources().getMemory()); |
| assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(4*GB, app_1.getCurrentReservation().getMemory()); |
| assertEquals(2*GB, node_0.getUsedResource().getMemory()); |
| |
| // Now free 1 container from app_0 i.e. 1G |
| a.completedContainer(clusterResource, app_0, node_0, |
| app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(5*GB, a.getUsedResources().getMemory()); |
| assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(4*GB, app_1.getCurrentReservation().getMemory()); |
| assertEquals(1*GB, node_0.getUsedResource().getMemory()); |
| |
| // Now finish another container from app_0 and fulfill the reservation |
| a.completedContainer(clusterResource, app_0, node_0, |
| app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); |
| a.assignContainers(clusterResource, node_0); |
| assertEquals(4*GB, a.getUsedResources().getMemory()); |
| assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); |
| assertEquals(0*GB, app_1.getCurrentReservation().getMemory()); |
| assertEquals(4*GB, node_0.getUsedResource().getMemory()); |
| } |
| |
| |
| @Test |
| public void testLocalityScheduling() throws Exception { |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); |
| |
| // User |
| String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = |
| TestUtils.getMockApplicationAttemptId(0, 0); |
| SchedulerApp app_0 = |
| spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); |
| a.submitApplication(app_0, user_0, A); |
| |
| // Setup some nodes and racks |
| String host_0 = "host_0"; |
| String rack_0 = "rack_0"; |
| SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); |
| |
| String host_1 = "host_1"; |
| String rack_1 = "rack_1"; |
| SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); |
| |
| String host_2 = "host_2"; |
| String rack_2 = "rack_2"; |
| SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); |
| |
| final int numNodes = 3; |
| Resource clusterResource = Resources.createResource(numNodes * (8*GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests and submit |
| Priority priority = TestUtils.createMockPriority(1); |
| List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>(); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(host_0, 1*GB, 1, |
| priority, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(rack_0, 1*GB, 1, |
| priority, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(host_1, 1*GB, 1, |
| priority, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(rack_1, 1*GB, 1, |
| priority, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, // one extra |
| priority, recordFactory)); |
| app_0.updateResourceRequests(app_0_requests_0); |
| |
| // Start testing... |
| |
| // Start with off switch, shouldn't allocate due to delay scheduling |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(1, app_0.getSchedulingOpportunities(priority)); |
| assertEquals(3, app_0.getTotalRequiredResources(priority)); |
| |
| // Another off switch, shouldn't allocate due to delay scheduling |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(2, app_0.getSchedulingOpportunities(priority)); |
| assertEquals(3, app_0.getTotalRequiredResources(priority)); |
| |
| // Another off switch, shouldn't allocate due to delay scheduling |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(3, app_0.getSchedulingOpportunities(priority)); |
| assertEquals(3, app_0.getTotalRequiredResources(priority)); |
| |
| // Another off switch, now we should allocate |
| // since missedOpportunities=3 and reqdContainers=3 |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset |
| assertEquals(2, app_0.getTotalRequiredResources(priority)); |
| |
| // NODE_LOCAL - node_0 |
| a.assignContainers(clusterResource, node_0); |
| verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset |
| assertEquals(1, app_0.getTotalRequiredResources(priority)); |
| |
| // NODE_LOCAL - node_1 |
| a.assignContainers(clusterResource, node_1); |
| verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset |
| assertEquals(0, app_0.getTotalRequiredResources(priority)); |
| |
| // Add 1 more request to check for RACK_LOCAL |
| app_0_requests_0.clear(); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(host_1, 1*GB, 1, |
| priority, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(rack_1, 1*GB, 1, |
| priority, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // one extra |
| priority, recordFactory)); |
| app_0.updateResourceRequests(app_0_requests_0); |
| assertEquals(1, app_0.getTotalRequiredResources(priority)); |
| |
| String host_3 = "host_3"; // on rack_1 |
| SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); |
| |
| a.assignContainers(clusterResource, node_3); |
| verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), |
| any(Priority.class), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset |
| assertEquals(0, app_0.getTotalRequiredResources(priority)); |
| } |
| |
| @Test |
| public void testApplicationPriorityScheduling() throws Exception { |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); |
| |
| // User |
| String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = |
| TestUtils.getMockApplicationAttemptId(0, 0); |
| SchedulerApp app_0 = |
| spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); |
| a.submitApplication(app_0, user_0, A); |
| |
| // Setup some nodes and racks |
| String host_0 = "host_0"; |
| String rack_0 = "rack_0"; |
| SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); |
| |
| String host_1 = "host_1"; |
| String rack_1 = "rack_1"; |
| SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); |
| |
| String host_2 = "host_2"; |
| String rack_2 = "rack_2"; |
| SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); |
| |
| final int numNodes = 3; |
| Resource clusterResource = Resources.createResource(numNodes * (8*GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests and submit |
| List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>(); |
| |
| // P1 |
| Priority priority_1 = TestUtils.createMockPriority(1); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(host_0, 1*GB, 1, |
| priority_1, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(rack_0, 1*GB, 1, |
| priority_1, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(host_1, 1*GB, 1, |
| priority_1, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(rack_1, 1*GB, 1, |
| priority_1, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, |
| priority_1, recordFactory)); |
| |
| // P2 |
| Priority priority_2 = TestUtils.createMockPriority(2); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(host_2, 2*GB, 1, |
| priority_2, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(rack_2, 2*GB, 1, |
| priority_2, recordFactory)); |
| app_0_requests_0.add( |
| TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, |
| priority_2, recordFactory)); |
| |
| app_0.updateResourceRequests(app_0_requests_0); |
| |
| // Start testing... |
| |
| // Start with off switch, shouldn't allocate P1 due to delay scheduling |
| // thus, no P2 either! |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| eq(priority_1), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); |
| assertEquals(2, app_0.getTotalRequiredResources(priority_1)); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| eq(priority_2), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); |
| assertEquals(1, app_0.getTotalRequiredResources(priority_2)); |
| |
| // Another off-switch, shouldn't allocate P1 due to delay scheduling |
| // thus, no P2 either! |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| eq(priority_1), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); |
| assertEquals(2, app_0.getTotalRequiredResources(priority_1)); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| eq(priority_2), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); |
| assertEquals(1, app_0.getTotalRequiredResources(priority_2)); |
| |
| // Another off-switch, shouldn allocate OFF_SWITCH P1 |
| a.assignContainers(clusterResource, node_2); |
| verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), |
| eq(priority_1), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); |
| assertEquals(1, app_0.getTotalRequiredResources(priority_1)); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), |
| eq(priority_2), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); |
| assertEquals(1, app_0.getTotalRequiredResources(priority_2)); |
| |
| // Now, DATA_LOCAL for P1 |
| a.assignContainers(clusterResource, node_0); |
| verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), |
| eq(priority_1), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); |
| assertEquals(0, app_0.getTotalRequiredResources(priority_1)); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_0), |
| eq(priority_2), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); |
| assertEquals(1, app_0.getTotalRequiredResources(priority_2)); |
| |
| // Now, OFF_SWITCH for P2 |
| a.assignContainers(clusterResource, node_1); |
| verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), |
| eq(priority_1), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); |
| assertEquals(0, app_0.getTotalRequiredResources(priority_1)); |
| verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1), |
| eq(priority_2), any(ResourceRequest.class), any(Container.class)); |
| assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); |
| assertEquals(0, app_0.getTotalRequiredResources(priority_2)); |
| |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| } |
| } |