| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyBoolean; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; |
| import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestReservations { |
| |
| private static final Log LOG = LogFactory.getLog(TestReservations.class); |
| |
| private final RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| RMContext rmContext; |
| RMContext spyRMContext; |
| CapacityScheduler cs; |
| // CapacitySchedulerConfiguration csConf; |
| CapacitySchedulerContext csContext; |
| |
| private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); |
| |
| CSQueue root; |
| Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); |
| Map<String, CSQueue> oldQueues = new HashMap<String, CSQueue>(); |
| |
| final static int GB = 1024; |
| final static String DEFAULT_RACK = "/default"; |
| |
| @Before |
| public void setUp() throws Exception { |
| CapacityScheduler spyCs = new CapacityScheduler(); |
| cs = spy(spyCs); |
| rmContext = TestUtils.getMockRMContext(); |
| |
| } |
| |
| private void setup(CapacitySchedulerConfiguration csConf) throws Exception { |
| |
| csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); |
| final String newRoot = "root" + System.currentTimeMillis(); |
| // final String newRoot = "root"; |
| |
| setupQueueConfiguration(csConf, newRoot); |
| YarnConfiguration conf = new YarnConfiguration(); |
| cs.setConf(conf); |
| |
| csContext = mock(CapacitySchedulerContext.class); |
| when(csContext.getConfiguration()).thenReturn(csConf); |
| when(csContext.getConf()).thenReturn(conf); |
| when(csContext.getMinimumResourceCapability()).thenReturn( |
| Resources.createResource(GB, 1)); |
| when(csContext.getMaximumResourceCapability()).thenReturn( |
| Resources.createResource(16 * GB, 12)); |
| when(csContext.getClusterResource()).thenReturn( |
| Resources.createResource(100 * 16 * GB, 100 * 12)); |
| when(csContext.getApplicationComparator()).thenReturn( |
| CapacityScheduler.applicationComparator); |
| when(csContext.getQueueComparator()).thenReturn( |
| CapacityScheduler.queueComparator); |
| when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); |
| when(csContext.getRMContext()).thenReturn(rmContext); |
| RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( |
| conf); |
| containerTokenSecretManager.rollMasterKey(); |
| when(csContext.getContainerTokenSecretManager()).thenReturn( |
| containerTokenSecretManager); |
| |
| root = CapacityScheduler.parseQueue(csContext, csConf, null, |
| CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); |
| |
| spyRMContext = spy(rmContext); |
| when(spyRMContext.getScheduler()).thenReturn(cs); |
| |
| cs.setRMContext(spyRMContext); |
| cs.init(csConf); |
| cs.start(); |
| } |
| |
| private static final String A = "a"; |
| |
| private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, |
| final String newRoot) { |
| |
| // Define top-level queues |
| conf.setQueues(CapacitySchedulerConfiguration.ROOT, |
| new String[] { newRoot }); |
| conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); |
| conf.setAcl(CapacitySchedulerConfiguration.ROOT, |
| QueueACL.SUBMIT_APPLICATIONS, " "); |
| |
| final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." |
| + newRoot; |
| conf.setQueues(Q_newRoot, new String[] { A }); |
| conf.setCapacity(Q_newRoot, 100); |
| conf.setMaximumCapacity(Q_newRoot, 100); |
| conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); |
| |
| final String Q_A = Q_newRoot + "." + A; |
| conf.setCapacity(Q_A, 100f); |
| conf.setMaximumCapacity(Q_A, 100); |
| conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); |
| |
| } |
| |
| static LeafQueue stubLeafQueue(LeafQueue queue) { |
| |
| // Mock some methods for ease in these unit tests |
| |
| // 1. LeafQueue.createContainer to return dummy containers |
| doAnswer(new Answer<Container>() { |
| @Override |
| public Container answer(InvocationOnMock invocation) throws Throwable { |
| final FiCaSchedulerApp application = (FiCaSchedulerApp) (invocation |
| .getArguments()[0]); |
| final ContainerId containerId = TestUtils |
| .getMockContainerId(application); |
| |
| Container container = TestUtils.getMockContainer(containerId, |
| ((FiCaSchedulerNode) (invocation.getArguments()[1])).getNodeID(), |
| (Resource) (invocation.getArguments()[2]), |
| ((Priority) invocation.getArguments()[3])); |
| return container; |
| } |
| }).when(queue).createContainer(any(FiCaSchedulerApp.class), |
| any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class)); |
| |
| // 2. Stub out LeafQueue.parent.completedContainer |
| CSQueue parent = queue.getParent(); |
| doNothing().when(parent).completedContainer(any(Resource.class), |
| any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), |
| any(RMContainer.class), any(ContainerStatus.class), |
| any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); |
| |
| return queue; |
| } |
| |
| @Test |
| public void testReservation() throws Exception { |
| // Test that we now unreserve and use a node that has space |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); |
| |
| a.submitApplicationAttempt(app_0, user_0); |
| |
| final ApplicationAttemptId appAttemptId_1 = TestUtils |
| .getMockApplicationAttemptId(1, 0); |
| FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| a.submitApplicationAttempt(app_1, user_0); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_2 = "host_2"; |
| FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); |
| when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); |
| when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); |
| |
| final int numNodes = 3; |
| Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priorityAM = TestUtils.createMockPriority(1); |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Priority priorityReduce = TestUtils.createMockPriority(10); |
| |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, |
| priorityAM, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, |
| priorityReduce, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, |
| priorityMap, recordFactory))); |
| |
| // Start testing... |
| // Only AM |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(2 * GB, a.getUsedResources().getMemory()); |
| assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(22 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(2 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // Only 1 map - simulating reduce |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(5 * GB, a.getUsedResources().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(19 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // Only 1 map to other node - simulating reduce |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(16 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(16 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // try to assign reducer (5G on node 0 and should reserve) |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(11 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(11 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() |
| .getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // assign reducer to node 2 |
| a.assignContainers(clusterResource, node_2, |
| new ResourceLimits(clusterResource)); |
| assertEquals(18 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(6 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(6 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() |
| .getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // node_1 heartbeat and unreserves from node_0 in order to allocate |
| // on node_1 |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(18 * GB, a.getUsedResources().getMemory()); |
| assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(18 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(6 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(6 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(8 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(0, app_0.getTotalRequiredResources(priorityReduce)); |
| } |
| |
| @Test |
| public void testReservationNoContinueLook() throws Exception { |
| // Test that with reservations-continue-look-all-nodes feature off |
| // we don't unreserve and show we could get stuck |
| |
| queues = new HashMap<String, CSQueue>(); |
| // test that the deadlock occurs when turned off |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| csConf.setBoolean( |
| "yarn.scheduler.capacity.reservations-continue-look-all-nodes", false); |
| setup(csConf); |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); |
| |
| a.submitApplicationAttempt(app_0, user_0); |
| |
| final ApplicationAttemptId appAttemptId_1 = TestUtils |
| .getMockApplicationAttemptId(1, 0); |
| FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| a.submitApplicationAttempt(app_1, user_0); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_2 = "host_2"; |
| FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); |
| when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); |
| when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); |
| |
| final int numNodes = 3; |
| Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priorityAM = TestUtils.createMockPriority(1); |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Priority priorityReduce = TestUtils.createMockPriority(10); |
| |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, |
| priorityAM, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, |
| priorityReduce, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, |
| priorityMap, recordFactory))); |
| |
| // Start testing... |
| // Only AM |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(2 * GB, a.getUsedResources().getMemory()); |
| assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(22 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(2 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // Only 1 map - simulating reduce |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(5 * GB, a.getUsedResources().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(19 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // Only 1 map to other node - simulating reduce |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(16 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(16 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // try to assign reducer (5G on node 0 and should reserve) |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(11 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(11 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() |
| .getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // assign reducer to node 2 |
| a.assignContainers(clusterResource, node_2, |
| new ResourceLimits(clusterResource)); |
| assertEquals(18 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(6 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(6 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() |
| .getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // node_1 heartbeat and won't unreserve from node_0, potentially stuck |
| // if AM doesn't handle |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(18 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(6 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(6 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() |
| .getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); |
| } |
| |
| @Test |
| public void testAssignContainersNeedToUnreserve() throws Exception { |
| // Test that we now unreserve and use a node that has space |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); |
| |
| a.submitApplicationAttempt(app_0, user_0); |
| |
| final ApplicationAttemptId appAttemptId_1 = TestUtils |
| .getMockApplicationAttemptId(1, 0); |
| FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| a.submitApplicationAttempt(app_1, user_0); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); |
| when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); |
| |
| final int numNodes = 2; |
| Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priorityAM = TestUtils.createMockPriority(1); |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Priority priorityReduce = TestUtils.createMockPriority(10); |
| |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, |
| priorityAM, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, |
| priorityReduce, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, |
| priorityMap, recordFactory))); |
| |
| // Start testing... |
| // Only AM |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(2 * GB, a.getUsedResources().getMemory()); |
| assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(14 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(2 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| |
| // Only 1 map - simulating reduce |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(5 * GB, a.getUsedResources().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(11 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| |
| // Only 1 map to other node - simulating reduce |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(8 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // try to assign reducer (5G on node 0 and should reserve) |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(3 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(3 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() |
| .getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); |
| |
| // could allocate but told need to unreserve first |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(3 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(3 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(8 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); |
| } |
| |
| @Test |
| public void testGetAppToUnreserve() throws Exception { |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| final String user_0 = "user_0"; |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| Resource clusterResource = Resources.createResource(2 * 8 * GB); |
| |
| // Setup resource-requests |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Resource capability = Resources.createResource(2*GB, 0); |
| |
| RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); |
| SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); |
| RMContext rmContext = mock(RMContext.class); |
| ContainerAllocationExpirer expirer = |
| mock(ContainerAllocationExpirer.class); |
| DrainDispatcher drainDispatcher = new DrainDispatcher(); |
| when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); |
| when(rmContext.getDispatcher()).thenReturn(drainDispatcher); |
| when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); |
| when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); |
| ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( |
| app_0.getApplicationId(), 1); |
| ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); |
| Container container = TestUtils.getMockContainer(containerId, |
| node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); |
| RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, |
| node_1.getNodeID(), "user", rmContext); |
| |
| Container container_1 = TestUtils.getMockContainer(containerId, |
| node_0.getNodeID(), Resources.createResource(1*GB), priorityMap); |
| RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId, |
| node_0.getNodeID(), "user", rmContext); |
| |
| // no reserved containers |
| NodeId unreserveId = |
| app_0.getNodeIdToUnreserve(priorityMap, capability, |
| cs.getResourceCalculator(), clusterResource); |
| assertEquals(null, unreserveId); |
| |
| // no reserved containers - reserve then unreserve |
| app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); |
| app_0.unreserve(node_0, priorityMap); |
| unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability, |
| cs.getResourceCalculator(), clusterResource); |
| assertEquals(null, unreserveId); |
| |
| // no container large enough is reserved |
| app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); |
| unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability, |
| cs.getResourceCalculator(), clusterResource); |
| assertEquals(null, unreserveId); |
| |
| // reserve one that is now large enough |
| app_0.reserve(node_1, priorityMap, rmContainer, container); |
| unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability, |
| cs.getResourceCalculator(), clusterResource); |
| assertEquals(node_1.getNodeID(), unreserveId); |
| } |
| |
| @Test |
| public void testFindNodeToUnreserve() throws Exception { |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| final String user_0 = "user_0"; |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| // Setup resource-requests |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Resource capability = Resources.createResource(2 * GB, 0); |
| |
| RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); |
| SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); |
| RMContext rmContext = mock(RMContext.class); |
| ContainerAllocationExpirer expirer = |
| mock(ContainerAllocationExpirer.class); |
| DrainDispatcher drainDispatcher = new DrainDispatcher(); |
| when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); |
| when(rmContext.getDispatcher()).thenReturn(drainDispatcher); |
| when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); |
| when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); |
| ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( |
| app_0.getApplicationId(), 1); |
| ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); |
| Container container = TestUtils.getMockContainer(containerId, |
| node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); |
| RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, |
| node_1.getNodeID(), "user", rmContext); |
| |
| // nothing reserved |
| boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), |
| node_1, app_0, priorityMap, capability); |
| assertFalse(res); |
| |
| // reserved but scheduler doesn't know about that node. |
| app_0.reserve(node_1, priorityMap, rmContainer, container); |
| node_1.reserveResource(app_0, priorityMap, rmContainer); |
| res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, |
| priorityMap, capability); |
| assertFalse(res); |
| } |
| |
| @Test |
| public void testAssignToQueue() throws Exception { |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); |
| |
| a.submitApplicationAttempt(app_0, user_0); |
| |
| final ApplicationAttemptId appAttemptId_1 = TestUtils |
| .getMockApplicationAttemptId(1, 0); |
| FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| a.submitApplicationAttempt(app_1, user_0); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_2 = "host_2"; |
| FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); |
| when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); |
| when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); |
| |
| final int numNodes = 2; |
| Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priorityAM = TestUtils.createMockPriority(1); |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Priority priorityReduce = TestUtils.createMockPriority(10); |
| |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, |
| priorityAM, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, |
| priorityReduce, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, |
| priorityMap, recordFactory))); |
| |
| // Start testing... |
| // Only AM |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(2 * GB, a.getUsedResources().getMemory()); |
| assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(14 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(2 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| |
| // Only 1 map - simulating reduce |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(5 * GB, a.getUsedResources().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(11 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| |
| // Only 1 map to other node - simulating reduce |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| |
| // allocate to queue so that the potential new capacity is greater then |
| // absoluteMaxCapacity |
| Resource capability = Resources.createResource(32 * GB, 0); |
| ResourceLimits limits = new ResourceLimits(clusterResource); |
| boolean res = |
| a.canAssignToThisQueue(clusterResource, |
| CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); |
| assertFalse(res); |
| assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); |
| |
| |
| // now add in reservations and make sure it continues if config set |
| // allocate to queue so that the potential new capacity is greater then |
| // absoluteMaxCapacity |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(3 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(3 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| |
| capability = Resources.createResource(5 * GB, 0); |
| limits = new ResourceLimits(clusterResource); |
| res = |
| a.canAssignToThisQueue(clusterResource, |
| CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources |
| .createResource(5 * GB)); |
| assertTrue(res); |
| // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have to |
| // unreserve 2GB to get the total 5GB needed. |
| // also note vcore checks not enabled |
| assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve()); |
| |
| // tell to not check reservations |
| limits = new ResourceLimits(clusterResource); |
| res = |
| a.canAssignToThisQueue(clusterResource, |
| CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); |
| assertFalse(res); |
| assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); |
| |
| |
| refreshQueuesTurnOffReservationsContLook(a, csConf); |
| |
| // should return false since reservations continue look is off. |
| limits = new ResourceLimits(clusterResource); |
| res = |
| a.canAssignToThisQueue(clusterResource, |
| CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); |
| assertFalse(res); |
| assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); |
| limits = new ResourceLimits(clusterResource); |
| res = |
| a.canAssignToThisQueue(clusterResource, |
| CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources |
| .createResource(5 * GB)); |
| assertFalse(res); |
| assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); |
| } |
| |
| public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, |
| CapacitySchedulerConfiguration csConf) throws Exception { |
| // before reinitialization |
| assertEquals(true, a.getReservationContinueLooking()); |
| assertEquals(true, |
| ((ParentQueue) a.getParent()).getReservationContinueLooking()); |
| |
| csConf.setBoolean( |
| CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); |
| Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>(); |
| CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null, |
| CapacitySchedulerConfiguration.ROOT, newQueues, queues, |
| TestUtils.spyHook); |
| queues = newQueues; |
| root.reinitialize(newRoot, cs.getClusterResource()); |
| |
| // after reinitialization |
| assertEquals(false, a.getReservationContinueLooking()); |
| assertEquals(false, |
| ((ParentQueue) a.getParent()).getReservationContinueLooking()); |
| } |
| |
| @Test |
| public void testContinueLookingReservationsAfterQueueRefresh() |
| throws Exception { |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| |
| // Manipulate queue 'e' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| refreshQueuesTurnOffReservationsContLook(a, csConf); |
| } |
| |
| @Test |
| public void testAssignToUser() throws Exception { |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); |
| a.submitApplicationAttempt(app_0, user_0); |
| |
| final ApplicationAttemptId appAttemptId_1 = TestUtils |
| .getMockApplicationAttemptId(1, 0); |
| FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| a.submitApplicationAttempt(app_1, user_0); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_2 = "host_2"; |
| FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); |
| when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); |
| when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); |
| |
| final int numNodes = 2; |
| Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priorityAM = TestUtils.createMockPriority(1); |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Priority priorityReduce = TestUtils.createMockPriority(10); |
| |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, |
| priorityAM, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, |
| priorityMap, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, |
| priorityReduce, recordFactory))); |
| |
| // Start testing... |
| // Only AM |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(2 * GB, a.getUsedResources().getMemory()); |
| assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(14 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(2 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| |
| // Only 1 map - simulating reduce |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(5 * GB, a.getUsedResources().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(11 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| |
| // Only 1 map to other node - simulating reduce |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(null, node_0.getReservedContainer()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| |
| // now add in reservations and make sure it continues if config set |
| // allocate to queue so that the potential new capacity is greater then |
| // absoluteMaxCapacity |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentReservation().getMemory()); |
| |
| assertEquals(5 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(3 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(3 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| |
| // not over the limit |
| Resource limit = Resources.createResource(14 * GB, 0); |
| ResourceLimits userResourceLimits = new ResourceLimits(clusterResource); |
| boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); |
| assertTrue(res); |
| assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); |
| |
| // set limit so it subtracts reservations and it can continue |
| limit = Resources.createResource(12 * GB, 0); |
| userResourceLimits = new ResourceLimits(clusterResource); |
| res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); |
| assertTrue(res); |
| // limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit |
| // we need to unreserve 1GB |
| // also note vcore checks not enabled |
| assertEquals(Resources.createResource(1 * GB, 4), |
| userResourceLimits.getAmountNeededUnreserve()); |
| |
| refreshQueuesTurnOffReservationsContLook(a, csConf); |
| userResourceLimits = new ResourceLimits(clusterResource); |
| |
| // should now return false since feature off |
| res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); |
| assertFalse(res); |
| assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); |
| } |
| |
| @Test |
| public void testReservationsNoneAvailable() throws Exception { |
| // Test that we now unreserve and use a node that has space |
| |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| setup(csConf); |
| |
| // Manipulate queue 'a' |
| LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); |
| |
| // Users |
| final String user_0 = "user_0"; |
| |
| // Submit applications |
| final ApplicationAttemptId appAttemptId_0 = TestUtils |
| .getMockApplicationAttemptId(0, 0); |
| FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); |
| |
| a.submitApplicationAttempt(app_0, user_0); |
| |
| final ApplicationAttemptId appAttemptId_1 = TestUtils |
| .getMockApplicationAttemptId(1, 0); |
| FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, |
| mock(ActiveUsersManager.class), spyRMContext); |
| a.submitApplicationAttempt(app_1, user_0); |
| |
| // Setup some nodes |
| String host_0 = "host_0"; |
| FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_1 = "host_1"; |
| FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, |
| 8 * GB); |
| String host_2 = "host_2"; |
| FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, |
| 8 * GB); |
| |
| when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); |
| when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); |
| when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); |
| |
| final int numNodes = 3; |
| Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); |
| when(csContext.getNumClusterNodes()).thenReturn(numNodes); |
| |
| // Setup resource-requests |
| Priority priorityAM = TestUtils.createMockPriority(1); |
| Priority priorityMap = TestUtils.createMockPriority(5); |
| Priority priorityReduce = TestUtils.createMockPriority(10); |
| Priority priorityLast = TestUtils.createMockPriority(12); |
| |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, |
| priorityAM, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, |
| priorityMap, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true, |
| priorityReduce, recordFactory))); |
| app_0.updateResourceRequests(Collections.singletonList(TestUtils |
| .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true, |
| priorityLast, recordFactory))); |
| |
| // Start testing... |
| // Only AM |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(2 * GB, a.getUsedResources().getMemory()); |
| assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(22 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(2 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // Only 1 map - simulating reduce |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(5 * GB, a.getUsedResources().getMemory()); |
| assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(19 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // Only 1 map to other node - simulating reduce |
| a.assignContainers(clusterResource, node_1, |
| new ResourceLimits(clusterResource)); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(16 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(16 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // try to assign reducer (5G on node 0), but tell it's resource limits < |
| // used (8G) + required (5G). It will not reserved since it has to unreserve |
| // some resource. Even with continous reservation looking, we don't allow |
| // unreserve resource to reserve container. |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(Resources.createResource(10 * GB))); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(16 * GB, a.getMetrics().getAvailableMB()); |
| // app_0's headroom = limit (10G) - used (8G) = 2G |
| assertEquals(2 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // try to assign reducer (5G on node 0), but tell it's resource limits < |
| // used (8G) + required (5G). It will not reserved since it has to unreserve |
| // some resource. Unfortunately, there's nothing to unreserve. |
| a.assignContainers(clusterResource, node_2, |
| new ResourceLimits(Resources.createResource(10 * GB))); |
| assertEquals(8 * GB, a.getUsedResources().getMemory()); |
| assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(16 * GB, a.getMetrics().getAvailableMB()); |
| // app_0's headroom = limit (10G) - used (8G) = 2G |
| assertEquals(2 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(0 * GB, node_2.getUsedResource().getMemory()); |
| |
| // let it assign 5G to node_2 |
| a.assignContainers(clusterResource, node_2, |
| new ResourceLimits(clusterResource)); |
| assertEquals(13 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(0 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(11 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(11 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| |
| // reserve 8G node_0 |
| a.assignContainers(clusterResource, node_0, |
| new ResourceLimits(clusterResource)); |
| assertEquals(21 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(8 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(3 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(3 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| |
| // try to assign (8G on node 2). No room to allocate, |
| // continued to try due to having reservation above, |
| // but hits queue limits so can't reserve anymore. |
| a.assignContainers(clusterResource, node_2, |
| new ResourceLimits(clusterResource)); |
| assertEquals(21 * GB, a.getUsedResources().getMemory()); |
| assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); |
| assertEquals(8 * GB, a.getMetrics().getReservedMB()); |
| assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); |
| assertEquals(3 * GB, a.getMetrics().getAvailableMB()); |
| assertEquals(3 * GB, app_0.getHeadroom().getMemory()); |
| assertEquals(5 * GB, node_0.getUsedResource().getMemory()); |
| assertEquals(3 * GB, node_1.getUsedResource().getMemory()); |
| assertEquals(5 * GB, node_2.getUsedResource().getMemory()); |
| } |
| } |