| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.yarn.Clock; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.AsyncDispatcher; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestFairScheduler { |
| |
| private class MockClock implements Clock { |
| private long time = 0; |
| @Override |
| public long getTime() { |
| return time; |
| } |
| |
| public void tick(int seconds) { |
| time = time + seconds * 1000; |
| } |
| |
| } |
| |
| final static String TEST_DIR = new File(System.getProperty("test.build.data", |
| "/tmp")).getAbsolutePath(); |
| |
| final static String ALLOC_FILE = new File(TEST_DIR, |
| "test-queues").getAbsolutePath(); |
| |
| private FairScheduler scheduler; |
| private ResourceManager resourceManager; |
| private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| |
| private int APP_ID = 1; // Incrementing counter for schedling apps |
| private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts |
| |
| // HELPER METHODS |
| @Before |
| public void setUp() throws IOException { |
| scheduler = new FairScheduler(); |
| Configuration conf = createConfiguration(); |
| // All tests assume only one assignment per node update |
| conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); |
| Store store = StoreFactory.getStore(conf); |
| resourceManager = new ResourceManager(store); |
| resourceManager.init(conf); |
| ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| } |
| |
| @After |
| public void tearDown() { |
| scheduler = null; |
| resourceManager = null; |
| } |
| |
| private Configuration createConfiguration() { |
| Configuration conf = new YarnConfiguration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, |
| ResourceScheduler.class); |
| return conf; |
| } |
| |
| private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { |
| ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class); |
| ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class); |
| appIdImpl.setId(appId); |
| attId.setAttemptId(attemptId); |
| attId.setApplicationId(appIdImpl); |
| return attId; |
| } |
| |
| |
| private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) { |
| ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); |
| request.setCapability(Resources.createResource(memory)); |
| request.setHostName(host); |
| request.setNumContainers(numContainers); |
| Priority prio = recordFactory.newRecordInstance(Priority.class); |
| prio.setPriority(priority); |
| request.setPriority(prio); |
| return request; |
| } |
| |
| /** |
| * Creates a single container priority-1 request and submits to |
| * scheduler. |
| */ |
| private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) { |
| return createSchedulingRequest(memory, queueId, userId, 1); |
| } |
| |
| private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) { |
| return createSchedulingRequest(memory, queueId, userId, numContainers, 1); |
| } |
| |
| private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) { |
| ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); |
| scheduler.addApplication(id, queueId, userId); |
| List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); |
| ResourceRequest request = createResourceRequest(memory, "*", priority, numContainers); |
| ask.add(request); |
| scheduler.allocate(id, ask, new ArrayList<ContainerId>()); |
| return id; |
| } |
| |
| // TESTS |
| |
| @Test |
| public void testAggregateCapacityTracking() throws Exception { |
| // Add a node |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| assertEquals(1024, scheduler.getClusterCapacity().getMemory()); |
| |
| // Add another node |
| RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512)); |
| NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); |
| scheduler.handle(nodeEvent2); |
| assertEquals(1536, scheduler.getClusterCapacity().getMemory()); |
| |
| // Remove the first node |
| NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent3); |
| assertEquals(512, scheduler.getClusterCapacity().getMemory()); |
| } |
| |
| @Test |
| public void testSimpleFairShareCalculation() { |
| // Add one big node (only care about aggregate capacity) |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| // Have two queues which want entire cluster capacity |
| createSchedulingRequest(10 * 1024, "queue1", "user1"); |
| createSchedulingRequest(10 * 1024, "queue2", "user1"); |
| |
| scheduler.update(); |
| |
| Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); |
| assertEquals(3, queues.size()); |
| |
| for (FSQueue p : queues) { |
| if (p.getName() != "default") { |
| assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSimpleContainerAllocation() { |
| // Add a node |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| // Add another node |
| RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512)); |
| NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); |
| scheduler.handle(nodeEvent2); |
| |
| createSchedulingRequest(512, "queue1", "user1", 2); |
| |
| scheduler.update(); |
| |
| NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, |
| new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); |
| scheduler.handle(updateEvent); |
| |
| // Asked for less than min_allocation. |
| assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| scheduler.getQueueManager().getQueue("queue1"). |
| getQueueSchedulable().getResourceUsage().getMemory()); |
| |
| NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, |
| new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); |
| scheduler.handle(updateEvent2); |
| |
| assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). |
| getQueueSchedulable().getResourceUsage().getMemory()); |
| } |
| |
| @Test |
| public void testSimpleContainerReservation() throws InterruptedException { |
| // Add a node |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| // Queue 1 requests full capacity of node |
| createSchedulingRequest(1024, "queue1", "user1", 1); |
| scheduler.update(); |
| NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, |
| new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); |
| scheduler.handle(updateEvent); |
| |
| // Make sure queue 1 is allocated app capacity |
| assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). |
| getQueueSchedulable().getResourceUsage().getMemory()); |
| |
| // Now queue 2 requests likewise |
| ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); |
| scheduler.update(); |
| scheduler.handle(updateEvent); |
| |
| // Make sure queue 2 is waiting with a reservation |
| assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). |
| getQueueSchedulable().getResourceUsage().getMemory()); |
| assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); |
| |
| // Now another node checks in with capacity |
| RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); |
| NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); |
| NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, |
| new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); |
| scheduler.handle(nodeEvent2); |
| scheduler.handle(updateEvent2); |
| |
| // Make sure this goes to queue 2 |
| assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). |
| getQueueSchedulable().getResourceUsage().getMemory()); |
| |
| // The old reservation should still be there... |
| assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); |
| // ... but it should disappear when we update the first node. |
| scheduler.handle(updateEvent); |
| assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory()); |
| |
| } |
| |
| @Test |
| public void testUserAsDefaultQueue() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( |
| createAppAttemptId(1, 1), "default", "user1"); |
| scheduler.handle(appAddedEvent); |
| assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); |
| assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size()); |
| |
| conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent( |
| createAppAttemptId(2, 1), "default", "user2"); |
| scheduler.handle(appAddedEvent2); |
| assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); |
| assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size()); |
| assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size()); |
| } |
| |
| @Test |
| public void testFairShareWithMinAlloc() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| out.println("<queue name=\"queueA\">"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueB\">"); |
| out.println("<minResources>2048</minResources>"); |
| out.println("</queue>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| // Add one big node (only care about aggregate capacity) |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| createSchedulingRequest(2 * 1024, "queueA", "user1"); |
| createSchedulingRequest(2 * 1024, "queueB", "user1"); |
| |
| scheduler.update(); |
| |
| Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); |
| assertEquals(3, queues.size()); |
| |
| for (FSQueue p : queues) { |
| if (p.getName().equals("queueA")) { |
| assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory()); |
| } |
| else if (p.getName().equals("queueB")) { |
| assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory()); |
| } |
| } |
| |
| } |
| |
| /** |
| * Make allocation requests and ensure they are reflected in queue demand. |
| */ |
| @Test |
| public void testQueueDemandCalculation() throws Exception { |
| ApplicationAttemptId id11 = createAppAttemptId(1, 1); |
| scheduler.addApplication(id11, "queue1", "user1"); |
| ApplicationAttemptId id21 = createAppAttemptId(2, 1); |
| scheduler.addApplication(id21, "queue2", "user1"); |
| ApplicationAttemptId id22 = createAppAttemptId(2, 2); |
| scheduler.addApplication(id22, "queue2", "user1"); |
| |
| int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB; |
| |
| // First ask, queue1 requests 1 large (minReqSize * 2). |
| List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); |
| ResourceRequest request1 = createResourceRequest(minReqSize * 2, "*", 1, 1); |
| ask1.add(request1); |
| scheduler.allocate(id11, ask1, new ArrayList<ContainerId>()); |
| |
| // Second ask, queue2 requests 1 large + (2 * minReqSize) |
| List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); |
| ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1); |
| ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2); |
| ask2.add(request2); |
| ask2.add(request3); |
| scheduler.allocate(id21, ask2, new ArrayList<ContainerId>()); |
| |
| // Third ask, queue2 requests 1 large |
| List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>(); |
| ResourceRequest request4 = createResourceRequest(2 * minReqSize, "*", 1, 1); |
| ask3.add(request4); |
| scheduler.allocate(id22, ask3, new ArrayList<ContainerId>()); |
| |
| scheduler.update(); |
| |
| assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("queue1") |
| .getQueueSchedulable().getDemand().getMemory()); |
| assertEquals(2 * minReqSize + 2 * minReqSize + (2 * minReqSize), scheduler |
| .getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand() |
| .getMemory()); |
| } |
| |
| @Test |
| public void testAppAdditionAndRemoval() throws Exception { |
| AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent( |
| createAppAttemptId(1, 1), "default", "user1"); |
| scheduler.handle(appAddedEvent1); |
| |
| // Scheduler should have one queue (the default) |
| assertEquals(1, scheduler.getQueueManager().getQueues().size()); |
| |
| // That queue should have one app |
| assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size()); |
| |
| AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent( |
| createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); |
| |
| // Now remove app |
| scheduler.handle(appRemovedEvent1); |
| |
| // Default queue should have no apps |
| assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size()); |
| } |
| |
| @Test |
| public void testAllocationFileParsing() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| // Give queue A a minimum of 1024 M |
| out.println("<queue name=\"queueA\">"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</queue>"); |
| // Give queue B a minimum of 2048 M |
| out.println("<queue name=\"queueB\">"); |
| out.println("<minResources>2048</minResources>"); |
| out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>"); |
| out.println("</queue>"); |
| // Give queue C no minimum |
| out.println("<queue name=\"queueC\">"); |
| out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>"); |
| out.println("</queue>"); |
| // Give queue D a limit of 3 running apps |
| out.println("<queue name=\"queueD\">"); |
| out.println("<maxRunningApps>3</maxRunningApps>"); |
| out.println("</queue>"); |
| // Give queue E a preemption timeout of one minute |
| out.println("<queue name=\"queueE\">"); |
| out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); |
| out.println("</queue>"); |
| // Set default limit of apps per queue to 15 |
| out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); |
| // Set default limit of apps per user to 5 |
| out.println("<userMaxAppsDefault>5</userMaxAppsDefault>"); |
| // Give user1 a limit of 10 jobs |
| out.println("<user name=\"user1\">"); |
| out.println("<maxRunningApps>10</maxRunningApps>"); |
| out.println("</user>"); |
| // Set default min share preemption timeout to 2 minutes |
| out.println("<defaultMinSharePreemptionTimeout>120" |
| + "</defaultMinSharePreemptionTimeout>"); |
| // Set fair share preemption timeout to 5 minutes |
| out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| |
| assertEquals(Resources.createResource(1024), |
| queueManager.getMinResources("queueA")); |
| assertEquals(Resources.createResource(2048), |
| queueManager.getMinResources("queueB")); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources("queueC")); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources("queueD")); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources("queueE")); |
| |
| assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| assertEquals(15, queueManager.getQueueMaxApps("queueA")); |
| assertEquals(15, queueManager.getQueueMaxApps("queueB")); |
| assertEquals(15, queueManager.getQueueMaxApps("queueC")); |
| assertEquals(3, queueManager.getQueueMaxApps("queueD")); |
| assertEquals(15, queueManager.getQueueMaxApps("queueE")); |
| assertEquals(10, queueManager.getUserMaxApps("user1")); |
| assertEquals(5, queueManager.getUserMaxApps("user2")); |
| |
| // Unspecified queues should get default ACL |
| Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA"); |
| assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE)); |
| assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString()); |
| assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS)); |
| assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); |
| |
| // Queue B ACL |
| Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB"); |
| assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); |
| assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); |
| |
| // Queue c ACL |
| Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC"); |
| assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); |
| assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); |
| |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout( |
| YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); |
| assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); |
| assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); |
| } |
| |
| @Test |
| public void testBackwardsCompatibleAllocationFileParsing() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| // Give queue A a minimum of 1024 M |
| out.println("<pool name=\"queueA\">"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</pool>"); |
| // Give queue B a minimum of 2048 M |
| out.println("<pool name=\"queueB\">"); |
| out.println("<minResources>2048</minResources>"); |
| out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>"); |
| out.println("</pool>"); |
| // Give queue C no minimum |
| out.println("<pool name=\"queueC\">"); |
| out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>"); |
| out.println("</pool>"); |
| // Give queue D a limit of 3 running apps |
| out.println("<pool name=\"queueD\">"); |
| out.println("<maxRunningApps>3</maxRunningApps>"); |
| out.println("</pool>"); |
| // Give queue E a preemption timeout of one minute |
| out.println("<pool name=\"queueE\">"); |
| out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); |
| out.println("</pool>"); |
| // Set default limit of apps per queue to 15 |
| out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); |
| // Set default limit of apps per user to 5 |
| out.println("<userMaxAppsDefault>5</userMaxAppsDefault>"); |
| // Give user1 a limit of 10 jobs |
| out.println("<user name=\"user1\">"); |
| out.println("<maxRunningApps>10</maxRunningApps>"); |
| out.println("</user>"); |
| // Set default min share preemption timeout to 2 minutes |
| out.println("<defaultMinSharePreemptionTimeout>120" |
| + "</defaultMinSharePreemptionTimeout>"); |
| // Set fair share preemption timeout to 5 minutes |
| out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| |
| assertEquals(Resources.createResource(1024), |
| queueManager.getMinResources("queueA")); |
| assertEquals(Resources.createResource(2048), |
| queueManager.getMinResources("queueB")); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources("queueC")); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources("queueD")); |
| assertEquals(Resources.createResource(0), |
| queueManager.getMinResources("queueE")); |
| |
| assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| assertEquals(15, queueManager.getQueueMaxApps("queueA")); |
| assertEquals(15, queueManager.getQueueMaxApps("queueB")); |
| assertEquals(15, queueManager.getQueueMaxApps("queueC")); |
| assertEquals(3, queueManager.getQueueMaxApps("queueD")); |
| assertEquals(15, queueManager.getQueueMaxApps("queueE")); |
| assertEquals(10, queueManager.getUserMaxApps("user1")); |
| assertEquals(5, queueManager.getUserMaxApps("user2")); |
| |
| // Unspecified queues should get default ACL |
| Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA"); |
| assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE)); |
| assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString()); |
| assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS)); |
| assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); |
| |
| // Queue B ACL |
| Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB"); |
| assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); |
| assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); |
| |
| // Queue c ACL |
| Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC"); |
| assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); |
| assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); |
| |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout( |
| YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); |
| assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); |
| assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); |
| assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); |
| } |
| |
| @Test |
| public void testIsStarvedForMinShare() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| out.println("<queue name=\"queueA\">"); |
| out.println("<minResources>2048</minResources>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueB\">"); |
| out.println("<minResources>2048</minResources>"); |
| out.println("</queue>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| // Add one big node (only care about aggregate capacity) |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| // Queue A wants 3 * 1024. Node update gives this all to A |
| createSchedulingRequest(3 * 1024, "queueA", "user1"); |
| scheduler.update(); |
| NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeEvent2); |
| |
| // Queue B arrives and wants 1 * 1024 |
| createSchedulingRequest(1 * 1024, "queueB", "user1"); |
| scheduler.update(); |
| Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); |
| assertEquals(3, queues.size()); |
| |
| // Queue A should be above min share, B below. |
| for (FSQueue p : queues) { |
| if (p.getName().equals("queueA")) { |
| assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); |
| } |
| else if (p.getName().equals("queueB")) { |
| assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); |
| } |
| } |
| |
| // Node checks in again, should allocate for B |
| scheduler.handle(nodeEvent2); |
| // Now B should have min share ( = demand here) |
| for (FSQueue p : queues) { |
| if (p.getName().equals("queueB")) { |
| assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); |
| } |
| } |
| } |
| |
| @Test |
| public void testIsStarvedForFairShare() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| out.println("<queue name=\"queueA\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueB\">"); |
| out.println("<weight>.75</weight>"); |
| out.println("</queue>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| // Add one big node (only care about aggregate capacity) |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| // Queue A wants 3 * 1024. Node update gives this all to A |
| createSchedulingRequest(3 * 1024, "queueA", "user1"); |
| scheduler.update(); |
| NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeEvent2); |
| |
| // Queue B arrives and wants 1 * 1024 |
| createSchedulingRequest(1 * 1024, "queueB", "user1"); |
| scheduler.update(); |
| Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); |
| assertEquals(3, queues.size()); |
| |
| // Queue A should be above fair share, B below. |
| for (FSQueue p : queues) { |
| if (p.getName().equals("queueA")) { |
| assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); |
| } |
| else if (p.getName().equals("queueB")) { |
| assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); |
| } |
| } |
| |
| // Node checks in again, should allocate for B |
| scheduler.handle(nodeEvent2); |
| // B should not be starved for fair share, since entire demand is |
| // satisfied. |
| for (FSQueue p : queues) { |
| if (p.getName().equals("queueB")) { |
| assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); |
| } |
| } |
| } |
| |
| @Test |
| /** |
| * Make sure containers are chosen to be preempted in the correct order. Right |
| * now this means decreasing order of priority. |
| */ |
| public void testChoiceOfPreemptedContainers() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| out.println("<queue name=\"queueA\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueB\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueC\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueD\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("</queue>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| // Create four nodes |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); |
| scheduler.handle(nodeEvent2); |
| |
| RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); |
| scheduler.handle(nodeEvent3); |
| |
| |
| // Queue A and B each request three containers |
| ApplicationAttemptId app1 = |
| createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); |
| ApplicationAttemptId app2 = |
| createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); |
| ApplicationAttemptId app3 = |
| createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); |
| |
| ApplicationAttemptId app4 = |
| createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); |
| ApplicationAttemptId app5 = |
| createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); |
| ApplicationAttemptId app6 = |
| createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); |
| |
| scheduler.update(); |
| |
| // Sufficient node check-ins to fully schedule containers |
| for (int i = 0; i < 2; i++) { |
| NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeUpdate1); |
| |
| NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeUpdate2); |
| |
| NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeUpdate3); |
| } |
| |
| assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size()); |
| |
| // Now new requests arrive from queues C and D |
| ApplicationAttemptId app7 = |
| createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); |
| ApplicationAttemptId app8 = |
| createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); |
| ApplicationAttemptId app9 = |
| createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); |
| |
| ApplicationAttemptId app10 = |
| createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); |
| ApplicationAttemptId app11 = |
| createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); |
| ApplicationAttemptId app12 = |
| createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); |
| |
| scheduler.update(); |
| |
| // We should be able to claw back one container from A and B each. |
| // Make sure it is lowest priority container. |
| scheduler.preemptResources(scheduler.getQueueSchedulables(), |
| Resources.createResource(2 * 1024)); |
| assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); |
| |
| // We should be able to claw back another container from A and B each. |
| // Make sure it is lowest priority container. |
| scheduler.preemptResources(scheduler.getQueueSchedulables(), |
| Resources.createResource(2 * 1024)); |
| assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); |
| |
| // Now A and B are below fair share, so preemption shouldn't do anything |
| scheduler.preemptResources(scheduler.getQueueSchedulables(), |
| Resources.createResource(2 * 1024)); |
| assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); |
| assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); |
| assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); |
| } |
| |
| @Test |
| /** |
| * Tests the timing of decision to preempt tasks. |
| */ |
| public void testPreemptionDecision() throws Exception { |
| Configuration conf = createConfiguration(); |
| conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); |
| MockClock clock = new MockClock(); |
| scheduler.setClock(clock); |
| scheduler.reinitialize(conf, resourceManager.getRMContext()); |
| |
| PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); |
| out.println("<?xml version=\"1.0\"?>"); |
| out.println("<allocations>"); |
| out.println("<queue name=\"queueA\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueB\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueC\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</queue>"); |
| out.println("<queue name=\"queueD\">"); |
| out.println("<weight>.25</weight>"); |
| out.println("<minResources>1024</minResources>"); |
| out.println("</queue>"); |
| out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); |
| out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); |
| out.println("</allocations>"); |
| out.close(); |
| |
| QueueManager queueManager = scheduler.getQueueManager(); |
| queueManager.initialize(); |
| |
| // Create four nodes |
| RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); |
| scheduler.handle(nodeEvent1); |
| |
| RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); |
| scheduler.handle(nodeEvent2); |
| |
| RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); |
| NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); |
| scheduler.handle(nodeEvent3); |
| |
| |
| // Queue A and B each request three containers |
| ApplicationAttemptId app1 = |
| createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); |
| ApplicationAttemptId app2 = |
| createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); |
| ApplicationAttemptId app3 = |
| createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); |
| |
| ApplicationAttemptId app4 = |
| createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); |
| ApplicationAttemptId app5 = |
| createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); |
| ApplicationAttemptId app6 = |
| createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); |
| |
| scheduler.update(); |
| |
| // Sufficient node check-ins to fully schedule containers |
| for (int i = 0; i < 2; i++) { |
| NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeUpdate1); |
| |
| NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeUpdate2); |
| |
| NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, |
| new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); |
| scheduler.handle(nodeUpdate3); |
| } |
| |
| // Now new requests arrive from queues C and D |
| ApplicationAttemptId app7 = |
| createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); |
| ApplicationAttemptId app8 = |
| createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); |
| ApplicationAttemptId app9 = |
| createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); |
| |
| ApplicationAttemptId app10 = |
| createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); |
| ApplicationAttemptId app11 = |
| createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); |
| ApplicationAttemptId app12 = |
| createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); |
| |
| scheduler.update(); |
| |
| FSQueueSchedulable schedC = |
| scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable(); |
| FSQueueSchedulable schedD = |
| scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable(); |
| |
| assertTrue(Resources.equals( |
| Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); |
| assertTrue(Resources.equals( |
| Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); |
| // After minSharePreemptionTime has passed, they should want to preempt min |
| // share. |
| clock.tick(6); |
| assertTrue(Resources.equals( |
| Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime()))); |
| assertTrue(Resources.equals( |
| Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime()))); |
| |
| // After fairSharePreemptionTime has passed, they should want to preempt |
| // fair share. |
| scheduler.update(); |
| clock.tick(6); |
| assertTrue(Resources.equals( |
| Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime()))); |
| assertTrue(Resources.equals( |
| Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime()))); |
| } |
| } |