| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupAdditionalQueues; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A3; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A_CAPACITY; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_PATH; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_PATH; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.DEFAULT; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ROOT; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResource; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResourceGreaterThanZero; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertPreemption; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertTime; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.updateNodeResource; |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.waitMemory; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.net.InetSocketAddress; |
| import java.security.PrivilegedAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.CyclicBarrier; |
| |
| import org.apache.hadoop.util.Sets; |
| import org.apache.hadoop.service.ServiceStateException; |
| import org.apache.hadoop.yarn.server.api.records.NodeStatus; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.Groups; |
| import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; |
| import org.apache.hadoop.security.TestGroupsCaching; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.LocalConfigurationProvider; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerUpdateType; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.PreemptionMessage; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.AsyncDispatcher; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.server.resourcemanager.Application; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockAM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; |
| import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.Task; |
| import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; |
| import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. |
| ContainerExpiredSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import java.util.function.Supplier; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestCapacityScheduler { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestCapacityScheduler.class); |
| private final static ContainerUpdates NULL_UPDATE_REQUESTS = |
| new ContainerUpdates(); |
| private ResourceManager resourceManager = null; |
| private RMContext mockContext; |
| |
| private static final double DELTA = 0.0001; |
| |
| @Before |
| public void setUp() throws Exception { |
| resourceManager = createResourceManager(); |
| mockContext = createMockRMContext(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| stopResourceManager(resourceManager); |
| } |
| |
| @Test (timeout = 30000) |
| public void testConfValidation() throws Exception { |
| CapacityScheduler scheduler = new CapacityScheduler(); |
| scheduler.setRMContext(resourceManager.getRMContext()); |
| Configuration conf = new YarnConfiguration(); |
| |
| setMinAllocMb(conf, 2048); |
| setMaxAllocMb(conf, 1024); |
| try { |
| scheduler.init(conf); |
| fail("Exception is expected because the min memory allocation is" + |
| " larger than the max memory allocation."); |
| } catch (YarnRuntimeException e) { |
| // Exception is expected. |
| assertTrue("The thrown exception is not the expected one.", |
| e.getMessage().startsWith( |
| "Invalid resource scheduler memory")); |
| } |
| |
| conf = new YarnConfiguration(); |
| setMinAllocVcores(conf, 2); |
| setMaxAllocVcores(conf, 1); |
| try { |
| scheduler.reinitialize(conf, mockContext); |
| fail("Exception is expected because the min vcores allocation is" + |
| " larger than the max vcores allocation."); |
| } catch (YarnRuntimeException e) { |
| // Exception is expected. |
| assertTrue("The thrown exception is not the expected one.", |
| e.getMessage().startsWith( |
| "Invalid resource scheduler vcores")); |
| } |
| } |
| |
| @Test |
| public void testCapacityScheduler() throws Exception { |
| |
| LOG.info("--- START: testCapacityScheduler ---"); |
| |
| NodeStatus mockNodeStatus = createMockNodeStatus(); |
| |
| // Register node1 |
| String host_0 = "host_0"; |
| NodeManager nm_0 = |
| registerNode(resourceManager, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, |
| Resources.createResource(4 * GB, 1), mockNodeStatus); |
| |
| // Register node2 |
| String host_1 = "host_1"; |
| NodeManager nm_1 = |
| registerNode(resourceManager, host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, |
| Resources.createResource(2 * GB, 1), mockNodeStatus); |
| |
| // ResourceRequest priorities |
| Priority priority_0 = Priority.newInstance(0); |
| Priority priority_1 = Priority.newInstance(1); |
| |
| // Submit an application |
| Application application_0 = new Application("user_0", "a1", resourceManager); |
| application_0.submit(); |
| |
| application_0.addNodeManager(host_0, 1234, nm_0); |
| application_0.addNodeManager(host_1, 1234, nm_1); |
| |
| Resource capability_0_0 = Resources.createResource(1 * GB, 1); |
| application_0.addResourceRequestSpec(priority_1, capability_0_0); |
| |
| Resource capability_0_1 = Resources.createResource(2 * GB, 1); |
| application_0.addResourceRequestSpec(priority_0, capability_0_1); |
| |
| Task task_0_0 = new Task(application_0, priority_1, |
| new String[] {host_0, host_1}); |
| application_0.addTask(task_0_0); |
| |
| // Submit another application |
| Application application_1 = new Application("user_1", "b2", resourceManager); |
| application_1.submit(); |
| |
| application_1.addNodeManager(host_0, 1234, nm_0); |
| application_1.addNodeManager(host_1, 1234, nm_1); |
| |
| Resource capability_1_0 = Resources.createResource(3 * GB, 1); |
| application_1.addResourceRequestSpec(priority_1, capability_1_0); |
| |
| Resource capability_1_1 = Resources.createResource(2 * GB, 1); |
| application_1.addResourceRequestSpec(priority_0, capability_1_1); |
| |
| Task task_1_0 = new Task(application_1, priority_1, |
| new String[] {host_0, host_1}); |
| application_1.addTask(task_1_0); |
| |
| // Send resource requests to the scheduler |
| application_0.schedule(); |
| application_1.schedule(); |
| |
| // Send a heartbeat to kick the tires on the Scheduler |
| LOG.info("Kick!"); |
| |
| // task_0_0 and task_1_0 allocated, used=4G |
| nodeUpdate(resourceManager, nm_0); |
| |
| // nothing allocated |
| nodeUpdate(resourceManager, nm_1); |
| |
| // Get allocations from the scheduler |
| application_0.schedule(); // task_0_0 |
| checkApplicationResourceUsage(1 * GB, application_0); |
| |
| application_1.schedule(); // task_1_0 |
| checkApplicationResourceUsage(3 * GB, application_1); |
| |
| checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) |
| checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available |
| |
| LOG.info("Adding new tasks..."); |
| |
| Task task_1_1 = new Task(application_1, priority_0, |
| new String[] {ResourceRequest.ANY}); |
| application_1.addTask(task_1_1); |
| |
| application_1.schedule(); |
| |
| Task task_0_1 = new Task(application_0, priority_0, |
| new String[] {host_0, host_1}); |
| application_0.addTask(task_0_1); |
| |
| application_0.schedule(); |
| |
| // Send a heartbeat to kick the tires on the Scheduler |
| LOG.info("Sending hb from " + nm_0.getHostName()); |
| // nothing new, used=4G |
| nodeUpdate(resourceManager, nm_0); |
| |
| LOG.info("Sending hb from " + nm_1.getHostName()); |
| // task_0_1 is prefer as locality, used=2G |
| nodeUpdate(resourceManager, nm_1); |
| |
| // Get allocations from the scheduler |
| LOG.info("Trying to allocate..."); |
| application_0.schedule(); |
| checkApplicationResourceUsage(1 * GB, application_0); |
| |
| application_1.schedule(); |
| checkApplicationResourceUsage(5 * GB, application_1); |
| |
| nodeUpdate(resourceManager, nm_0); |
| nodeUpdate(resourceManager, nm_1); |
| |
| checkNodeResourceUsage(4*GB, nm_0); |
| checkNodeResourceUsage(2*GB, nm_1); |
| |
| LOG.info("--- END: testCapacityScheduler ---"); |
| } |
| |
| @Test |
| public void testNotAssignMultiple() throws Exception { |
| LOG.info("--- START: testNotAssignMultiple ---"); |
| ResourceManager rm = new ResourceManager() { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); |
| mgr.init(getConfig()); |
| return mgr; |
| } |
| }; |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setBoolean( |
| CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false); |
| setupQueueConfiguration(csConf); |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| rm.init(conf); |
| rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); |
| rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); |
| ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); |
| RMContext mC = mock(RMContext.class); |
| when(mC.getConfigurationProvider()).thenReturn( |
| new LocalConfigurationProvider()); |
| |
| NodeStatus mockNodeStatus = createMockNodeStatus(); |
| |
| // Register node1 |
| String host0 = "host_0"; |
| NodeManager nm0 = |
| registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, |
| Resources.createResource(10 * GB, 10), mockNodeStatus); |
| |
| // ResourceRequest priorities |
| Priority priority0 = Priority.newInstance(0); |
| Priority priority1 = Priority.newInstance(1); |
| |
| // Submit an application |
| Application application0 = new Application("user_0", "a1", rm); |
| application0.submit(); |
| application0.addNodeManager(host0, 1234, nm0); |
| |
| Resource capability00 = Resources.createResource(1 * GB, 1); |
| application0.addResourceRequestSpec(priority0, capability00); |
| |
| Resource capability01 = Resources.createResource(2 * GB, 1); |
| application0.addResourceRequestSpec(priority1, capability01); |
| |
| Task task00 = |
| new Task(application0, priority0, new String[] {host0}); |
| Task task01 = |
| new Task(application0, priority1, new String[] {host0}); |
| application0.addTask(task00); |
| application0.addTask(task01); |
| |
| // Submit another application |
| Application application1 = new Application("user_1", "b2", rm); |
| application1.submit(); |
| application1.addNodeManager(host0, 1234, nm0); |
| |
| Resource capability10 = Resources.createResource(3 * GB, 1); |
| application1.addResourceRequestSpec(priority0, capability10); |
| |
| Resource capability11 = Resources.createResource(4 * GB, 1); |
| application1.addResourceRequestSpec(priority1, capability11); |
| |
| Task task10 = new Task(application1, priority0, new String[] {host0}); |
| Task task11 = new Task(application1, priority1, new String[] {host0}); |
| application1.addTask(task10); |
| application1.addTask(task11); |
| |
| // Send resource requests to the scheduler |
| application0.schedule(); |
| |
| application1.schedule(); |
| |
| // Send a heartbeat to kick the tires on the Scheduler |
| LOG.info("Kick!"); |
| |
| // task00, used=1G |
| nodeUpdate(rm, nm0); |
| |
| // Get allocations from the scheduler |
| application0.schedule(); |
| application1.schedule(); |
| // 1 Task per heart beat should be scheduled |
| checkNodeResourceUsage(3 * GB, nm0); // task00 (1G) |
| checkApplicationResourceUsage(0 * GB, application0); |
| checkApplicationResourceUsage(3 * GB, application1); |
| |
| // Another heartbeat |
| nodeUpdate(rm, nm0); |
| application0.schedule(); |
| checkApplicationResourceUsage(1 * GB, application0); |
| application1.schedule(); |
| checkApplicationResourceUsage(3 * GB, application1); |
| checkNodeResourceUsage(4 * GB, nm0); |
| LOG.info("--- END: testNotAssignMultiple ---"); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testAssignMultiple() throws Exception { |
| LOG.info("--- START: testAssignMultiple ---"); |
| ResourceManager rm = new ResourceManager() { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); |
| mgr.init(getConfig()); |
| return mgr; |
| } |
| }; |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setBoolean( |
| CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true); |
| // Each heartbeat will assign 2 containers at most |
| csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2); |
| setupQueueConfiguration(csConf); |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| rm.init(conf); |
| rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); |
| rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); |
| ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); |
| RMContext mC = mock(RMContext.class); |
| when(mC.getConfigurationProvider()).thenReturn( |
| new LocalConfigurationProvider()); |
| |
| NodeStatus mockNodeStatus = createMockNodeStatus(); |
| |
| // Register node1 |
| String host0 = "host_0"; |
| NodeManager nm0 = |
| registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, |
| Resources.createResource(10 * GB, 10), mockNodeStatus); |
| |
| // ResourceRequest priorities |
| Priority priority0 = Priority.newInstance(0); |
| Priority priority1 = Priority.newInstance(1); |
| |
| // Submit an application |
| Application application0 = new Application("user_0", "a1", rm); |
| application0.submit(); |
| application0.addNodeManager(host0, 1234, nm0); |
| |
| Resource capability00 = Resources.createResource(1 * GB, 1); |
| application0.addResourceRequestSpec(priority0, capability00); |
| |
| Resource capability01 = Resources.createResource(2 * GB, 1); |
| application0.addResourceRequestSpec(priority1, capability01); |
| |
| Task task00 = new Task(application0, priority0, new String[] {host0}); |
| Task task01 = new Task(application0, priority1, new String[] {host0}); |
| application0.addTask(task00); |
| application0.addTask(task01); |
| |
| // Submit another application |
| Application application1 = new Application("user_1", "b2", rm); |
| application1.submit(); |
| application1.addNodeManager(host0, 1234, nm0); |
| |
| Resource capability10 = Resources.createResource(3 * GB, 1); |
| application1.addResourceRequestSpec(priority0, capability10); |
| |
| Resource capability11 = Resources.createResource(4 * GB, 1); |
| application1.addResourceRequestSpec(priority1, capability11); |
| |
| Task task10 = |
| new Task(application1, priority0, new String[] {host0}); |
| Task task11 = |
| new Task(application1, priority1, new String[] {host0}); |
| application1.addTask(task10); |
| application1.addTask(task11); |
| |
| // Send resource requests to the scheduler |
| application0.schedule(); |
| |
| application1.schedule(); |
| |
| // Send a heartbeat to kick the tires on the Scheduler |
| LOG.info("Kick!"); |
| |
| // task_0_0, used=1G |
| nodeUpdate(rm, nm0); |
| |
| // Get allocations from the scheduler |
| application0.schedule(); |
| application1.schedule(); |
| // 1 Task per heart beat should be scheduled |
| checkNodeResourceUsage(4 * GB, nm0); // task00 (1G) |
| checkApplicationResourceUsage(1 * GB, application0); |
| checkApplicationResourceUsage(3 * GB, application1); |
| |
| // Another heartbeat |
| nodeUpdate(rm, nm0); |
| application0.schedule(); |
| checkApplicationResourceUsage(3 * GB, application0); |
| application1.schedule(); |
| checkApplicationResourceUsage(7 * GB, application1); |
| checkNodeResourceUsage(10 * GB, nm0); |
| LOG.info("--- END: testAssignMultiple ---"); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testMaximumCapacitySetup() { |
| float delta = 0.0000001f; |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE, |
| conf.getNonLabeledQueueMaximumCapacity(A), delta); |
| conf.setMaximumCapacity(A, 50.0f); |
| assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(A), delta); |
| conf.setMaximumCapacity(A, -1); |
| assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE, |
| conf.getNonLabeledQueueMaximumCapacity(A), delta); |
| } |
| |
| @Test |
| public void testQueueMaximumAllocations() { |
| CapacityScheduler scheduler = new CapacityScheduler(); |
| scheduler.setConf(new YarnConfiguration()); |
| scheduler.setRMContext(resourceManager.getRMContext()); |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| |
| setupQueueConfiguration(conf); |
| setMaxAllocMb(conf, A1, 1024); |
| setMaxAllocVcores(conf, A1, 1); |
| |
| scheduler.init(conf); |
| scheduler.start(); |
| |
| Resource maxAllocationForQueue = |
| scheduler.getMaximumResourceCapability("a1"); |
| Resource maxAllocation1 = scheduler.getMaximumResourceCapability(""); |
| Resource maxAllocation2 = scheduler.getMaximumResourceCapability(null); |
| Resource maxAllocation3 = scheduler.getMaximumResourceCapability(); |
| |
| Assert.assertEquals(maxAllocation1, maxAllocation2); |
| Assert.assertEquals(maxAllocation1, maxAllocation3); |
| Assert.assertEquals( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, |
| maxAllocation1.getMemorySize()); |
| Assert.assertEquals( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, |
| maxAllocation1.getVirtualCores()); |
| |
| Assert.assertEquals(1024, maxAllocationForQueue.getMemorySize()); |
| Assert.assertEquals(1, maxAllocationForQueue.getVirtualCores()); |
| scheduler.stop(); |
| } |
| |
| @Test |
| public void testParseQueueWithAbsoluteResource() { |
| String childQueue = "testQueue"; |
| String labelName = "testLabel"; |
| QueuePath childQueuePath = new QueuePath("root." + childQueue); |
| |
| CapacityScheduler cs = new CapacityScheduler(); |
| cs.setConf(new YarnConfiguration()); |
| cs.setRMContext(resourceManager.getRMContext()); |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| |
| conf.setQueues(ROOT, new String[] {childQueue}); |
| conf.setCapacity(childQueuePath, "[memory=20480,vcores=200]"); |
| conf.setAccessibleNodeLabels(childQueuePath, |
| Sets.newHashSet(labelName)); |
| conf.setCapacityByLabel(ROOT, labelName, "[memory=10240,vcores=100]"); |
| conf.setCapacityByLabel(childQueuePath, labelName, |
| "[memory=4096,vcores=10]"); |
| |
| cs.init(conf); |
| cs.start(); |
| |
| Resource rootQueueLableCapacity = |
| cs.getQueue("root").getQueueResourceQuotas() |
| .getConfiguredMinResource(labelName); |
| assertEquals(10240, rootQueueLableCapacity.getMemorySize()); |
| assertEquals(100, rootQueueLableCapacity.getVirtualCores()); |
| |
| QueueResourceQuotas childQueueQuotas = |
| cs.getQueue(childQueue).getQueueResourceQuotas(); |
| Resource childQueueCapacity = childQueueQuotas.getConfiguredMinResource(); |
| assertEquals(20480, childQueueCapacity.getMemorySize()); |
| assertEquals(200, childQueueCapacity.getVirtualCores()); |
| |
| Resource childQueueLabelCapacity = |
| childQueueQuotas.getConfiguredMinResource(labelName); |
| assertEquals(4096, childQueueLabelCapacity.getMemorySize()); |
| assertEquals(10, childQueueLabelCapacity.getVirtualCores()); |
| cs.stop(); |
| } |
| |
| @Test |
| public void testCapacitySchedulerInfo() throws Exception { |
| QueueInfo queueInfo = resourceManager.getResourceScheduler().getQueueInfo("a", true, true); |
| Assert.assertEquals("Queue Name should be a", "a", |
| queueInfo.getQueueName()); |
| Assert.assertEquals("Queue Path should be root.a", "root.a", |
| queueInfo.getQueuePath()); |
| Assert.assertEquals("Child Queues size should be 2", 2, |
| queueInfo.getChildQueues().size()); |
| |
| List<QueueUserACLInfo> userACLInfo = resourceManager.getResourceScheduler().getQueueUserAclInfo(); |
| Assert.assertNotNull(userACLInfo); |
| for (QueueUserACLInfo queueUserACLInfo : userACLInfo) { |
| Assert.assertEquals(1, getQueueCount(userACLInfo, |
| queueUserACLInfo.getQueueName())); |
| } |
| |
| } |
| |
| private int getQueueCount(List<QueueUserACLInfo> queueInformation, String queueName) { |
| int result = 0; |
| for (QueueUserACLInfo queueUserACLInfo : queueInformation) { |
| if (queueName.equals(queueUserACLInfo.getQueueName())) { |
| result++; |
| } |
| } |
| return result; |
| } |
| |
| @Test |
| public void testAllocateReorder() throws Exception { |
| |
| //Confirm that allocation (resource request) alone will trigger a change in |
| //application ordering where appropriate |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| LeafQueue q = (LeafQueue) cs.getQueue("default"); |
| Assert.assertNotNull(q); |
| |
| FairOrderingPolicy fop = new FairOrderingPolicy(); |
| fop.setSizeBasedWeight(true); |
| q.setOrderingPolicy(fop); |
| |
| String host = "127.0.0.1"; |
| RMNode node = |
| MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| |
| ApplicationAttemptId appAttemptId1 = appHelper(rm, cs, 100, 1, "default", "user"); |
| ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user"); |
| |
| RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| Priority priority = TestUtils.createMockPriority(1); |
| ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); |
| |
| //This will allocate for app1 |
| cs.allocate(appAttemptId1, |
| Collections.<ResourceRequest>singletonList(r1), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| |
| //And this will result in container assignment for app1 |
| CapacityScheduler.schedule(cs); |
| |
| //Verify that app1 is still first in assignment order |
| //This happens because app2 has no demand/a magnitude of NaN, which |
| //results in app1 and app2 being equal in the fairness comparison and |
| //failling back to fifo (start) ordering |
| assertEquals(q.getOrderingPolicy().getAssignmentIterator( |
| IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), |
| appAttemptId1.getApplicationId().toString()); |
| |
| //Now, allocate for app2 (this would be the first/AM allocation) |
| ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId2, |
| Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| |
| //In this case we do not perform container assignment because we want to |
| //verify re-ordering based on the allocation alone |
| |
| //Now, the first app for assignment is app2 |
| assertEquals(q.getOrderingPolicy().getAssignmentIterator( |
| IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), |
| appAttemptId2.getApplicationId().toString()); |
| |
| rm.stop(); |
| } |
| |
| @Test |
| public void testResourceOverCommit() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| |
| MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB); |
| NodeId nmId = nm.getNodeId(); |
| RMApp app = MockRMAppSubmitter.submitWithMemory(2048, rm); |
| // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm |
| nm.nodeHeartbeat(true); |
| RMAppAttempt attempt1 = app.getCurrentAppAttempt(); |
| MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId()); |
| am.registerAppAttempt(); |
| assertMemory(scheduler, nmId, 2 * GB, 2 * GB); |
| |
| // add request for 1 container of 2 GB |
| am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1); |
| AllocateResponse alloc1Response = am.schedule(); // send the request |
| |
| // kick the scheduler, 2 GB given to AM1, resource remaining 0 |
| nm.nodeHeartbeat(true); |
| while (alloc1Response.getAllocatedContainers().isEmpty()) { |
| LOG.info("Waiting for containers to be created for app 1..."); |
| Thread.sleep(100); |
| alloc1Response = am.schedule(); |
| } |
| |
| List<Container> allocated1 = alloc1Response.getAllocatedContainers(); |
| assertEquals(1, allocated1.size()); |
| Container c1 = allocated1.get(0); |
| assertEquals(2 * GB, c1.getResource().getMemorySize()); |
| assertEquals(nmId, c1.getNodeId()); |
| |
| // check node report, 4 GB used and 0 GB available |
| assertMemory(scheduler, nmId, 4 * GB, 0); |
| nm.nodeHeartbeat(true); |
| assertEquals(4 * GB, nm.getCapability().getMemorySize()); |
| |
| // update node resource to 2 GB, so resource is over-consumed |
| updateNodeResource(rm, nmId, 2 * GB, 2, -1); |
| // the used resource should still 4 GB and negative available resource |
| waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); |
| // check that we did not get a preemption requests |
| assertNoPreemption(am.schedule().getPreemptionMessage()); |
| |
| // check that the NM got the updated resources |
| nm.nodeHeartbeat(true); |
| assertEquals(2 * GB, nm.getCapability().getMemorySize()); |
| |
| // check container can complete successfully with resource over-commitment |
| ContainerStatus containerStatus = BuilderUtils.newContainerStatus( |
| c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); |
| nm.containerStatus(containerStatus); |
| |
| LOG.info("Waiting for containers to be finished for app 1..."); |
| GenericTestUtils.waitFor( |
| () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000); |
| assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); |
| assertMemory(scheduler, nmId, 2 * GB, 0); |
| |
| // verify no NPE is trigger in schedule after resource is updated |
| am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); |
| AllocateResponse allocResponse2 = am.schedule(); |
| assertTrue("Shouldn't have enough resource to allocate containers", |
| allocResponse2.getAllocatedContainers().isEmpty()); |
| // try 10 times as scheduling is an async process |
| for (int i = 0; i < 10; i++) { |
| Thread.sleep(100); |
| allocResponse2 = am.schedule(); |
| assertTrue("Shouldn't have enough resource to allocate containers", |
| allocResponse2.getAllocatedContainers().isEmpty()); |
| } |
| |
| // increase the resources again to 5 GB to schedule the 3GB container |
| updateNodeResource(rm, nmId, 5 * GB, 2, -1); |
| waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000); |
| |
| // kick the scheduling and check it took effect |
| nm.nodeHeartbeat(true); |
| while (allocResponse2.getAllocatedContainers().isEmpty()) { |
| LOG.info("Waiting for containers to be created for app 1..."); |
| Thread.sleep(100); |
| allocResponse2 = am.schedule(); |
| } |
| assertEquals(1, allocResponse2.getAllocatedContainers().size()); |
| Container c2 = allocResponse2.getAllocatedContainers().get(0); |
| assertEquals(3 * GB, c2.getResource().getMemorySize()); |
| assertEquals(nmId, c2.getNodeId()); |
| assertMemory(scheduler, nmId, 5 * GB, 0); |
| |
| // reduce the resources and trigger a preempt request to the AM for c2 |
| updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); |
| waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); |
| |
| PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); |
| assertPreemption(c2.getId(), preemptMsg); |
| |
| // increasing the resources again, should stop killing the containers |
| updateNodeResource(rm, nmId, 5 * GB, 2, -1); |
| waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000); |
| Thread.sleep(3 * 1000); |
| assertMemory(scheduler, nmId, 5 * GB, 0); |
| |
| // reduce the resources again to trigger a preempt request to the AM for c2 |
| long t0 = Time.now(); |
| updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); |
| waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); |
| |
| preemptMsg = am.schedule().getPreemptionMessage(); |
| assertPreemption(c2.getId(), preemptMsg); |
| |
| // wait until the scheduler kills the container |
| GenericTestUtils.waitFor(() -> { |
| try { |
| nm.nodeHeartbeat(true); // trigger preemption in the NM |
| } catch (Exception e) { |
| LOG.error("Cannot heartbeat", e); |
| } |
| SchedulerNodeReport report = scheduler.getNodeReport(nmId); |
| return report.getAvailableResource().getMemorySize() > 0; |
| }, 200, 5 * 1000); |
| assertMemory(scheduler, nmId, 2 * GB, 1 * GB); |
| |
| List<ContainerStatus> completedContainers = |
| am.schedule().getCompletedContainersStatuses(); |
| assertEquals(1, completedContainers.size()); |
| ContainerStatus c2status = completedContainers.get(0); |
| assertContainerKilled(c2.getId(), c2status); |
| |
| assertTime(2000, Time.now() - t0); |
| |
| rm.stop(); |
| } |
| |
| @Test |
| public void testAsyncScheduling() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| final int NODES = 100; |
| |
| // Register nodes |
| for (int i=0; i < NODES; ++i) { |
| String host = "192.168.1." + i; |
| RMNode node = |
| MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| } |
| |
| // Now directly exercise the scheduling loop |
| for (int i=0; i < NODES; ++i) { |
| CapacityScheduler.schedule(cs); |
| } |
| rm.stop(); |
| } |
| |
| private void waitForAppPreemptionInfo(RMApp app, Resource preempted, |
| int numAMPreempted, int numTaskPreempted, |
| Resource currentAttemptPreempted, boolean currentAttemptAMPreempted, |
| int numLatestAttemptTaskPreempted) throws InterruptedException { |
| while (true) { |
| RMAppMetrics appPM = app.getRMAppMetrics(); |
| RMAppAttemptMetrics attemptPM = |
| app.getCurrentAppAttempt().getRMAppAttemptMetrics(); |
| |
| if (appPM.getResourcePreempted().equals(preempted) |
| && appPM.getNumAMContainersPreempted() == numAMPreempted |
| && appPM.getNumNonAMContainersPreempted() == numTaskPreempted |
| && attemptPM.getResourcePreempted().equals(currentAttemptPreempted) |
| && app.getCurrentAppAttempt().getRMAppAttemptMetrics() |
| .getIsPreempted() == currentAttemptAMPreempted |
| && attemptPM.getNumNonAMContainersPreempted() == |
| numLatestAttemptTaskPreempted) { |
| return; |
| } |
| Thread.sleep(500); |
| } |
| } |
| |
| private void waitForNewAttemptCreated(RMApp app, |
| ApplicationAttemptId previousAttemptId) throws InterruptedException { |
| while (app.getCurrentAppAttempt().equals(previousAttemptId)) { |
| Thread.sleep(500); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception { |
| final YarnConfiguration conf = new YarnConfiguration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MyContainerManager containerManager = new MyContainerManager(); |
| final MockRMWithAMS rm = |
| new MockRMWithAMS(conf, containerManager); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("localhost:1234", 5120); |
| |
| Map<ApplicationAccessType, String> acls = |
| new HashMap<ApplicationAccessType, String>(2); |
| acls.put(ApplicationAccessType.VIEW_APP, "*"); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) |
| .withAppName("appname") |
| .withUser("appuser") |
| .withAcls(acls) |
| .build(); |
| RMApp app = MockRMAppSubmitter.submit(rm, data); |
| |
| nm1.nodeHeartbeat(true); |
| |
| RMAppAttempt attempt = app.getCurrentAppAttempt(); |
| ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); |
| int msecToWait = 10000; |
| int msecToSleep = 100; |
| while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED |
| && msecToWait > 0) { |
| LOG.info("Waiting for AppAttempt to reach LAUNCHED state. " |
| + "Current state is " + attempt.getAppAttemptState()); |
| Thread.sleep(msecToSleep); |
| msecToWait -= msecToSleep; |
| } |
| Assert.assertEquals(attempt.getAppAttemptState(), |
| RMAppAttemptState.LAUNCHED); |
| |
| // Create a client to the RM. |
| final YarnRPC rpc = YarnRPC.create(conf); |
| |
| UserGroupInformation currentUser = |
| UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); |
| Credentials credentials = containerManager.getContainerCredentials(); |
| final InetSocketAddress rmBindAddress = |
| rm.getApplicationMasterService().getBindAddress(); |
| Token<? extends TokenIdentifier> amRMToken = |
| MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress, |
| credentials.getAllTokens()); |
| currentUser.addToken(amRMToken); |
| ApplicationMasterProtocol client = |
| currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { |
| @Override |
| public ApplicationMasterProtocol run() { |
| return (ApplicationMasterProtocol) rpc.getProxy( |
| ApplicationMasterProtocol.class, rmBindAddress, conf); |
| } |
| }); |
| |
| RegisterApplicationMasterRequest request = |
| RegisterApplicationMasterRequest.newInstance("localhost", 12345, ""); |
| client.registerApplicationMaster(request); |
| |
| // Allocate a container |
| List<ResourceRequest> asks = Collections.singletonList( |
| ResourceRequest.newInstance( |
| Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)); |
| AllocateRequest allocateRequest = |
| AllocateRequest.newInstance(0, 0.0f, asks, null, null); |
| client.allocate(allocateRequest); |
| |
| // Make sure the container is allocated in RM |
| nm1.nodeHeartbeat(true); |
| ContainerId containerId2 = |
| ContainerId.newContainerId(applicationAttemptId, 2); |
| Assert.assertTrue(rm.waitForState(nm1, containerId2, |
| RMContainerState.ALLOCATED)); |
| |
| // Acquire the container |
| allocateRequest = AllocateRequest.newInstance(1, 0.0f, null, null, null); |
| client.allocate(allocateRequest); |
| |
| // Launch the container |
| final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| RMContainer rmContainer = cs.getRMContainer(containerId2); |
| rmContainer.handle( |
| new RMContainerEvent(containerId2, RMContainerEventType.LAUNCHED)); |
| |
| // grab the scheduler lock from another thread |
| // and verify an allocate call in this thread doesn't block on it |
| final CyclicBarrier barrier = new CyclicBarrier(2); |
| Thread otherThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| synchronized(cs) { |
| try { |
| barrier.await(); |
| barrier.await(); |
| } catch (InterruptedException | BrokenBarrierException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| }); |
| otherThread.start(); |
| barrier.await(); |
| List<ContainerId> release = Collections.singletonList(containerId2); |
| allocateRequest = |
| AllocateRequest.newInstance(2, 0.0f, null, release, null); |
| client.allocate(allocateRequest); |
| barrier.await(); |
| otherThread.join(); |
| |
| rm.stop(); |
| } |
| |
| @Test(timeout = 120000) |
| public void testPreemptionInfo() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| int CONTAINER_MEMORY = 1024; // start RM |
| MockRM rm1 = new MockRM(conf); |
| rm1.start(); |
| |
| // get scheduler |
| CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); |
| |
| // start NM |
| MockNM nm1 = |
| new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| // create app and launch the AM |
| RMApp app0 = MockRMAppSubmitter.submitWithMemory(CONTAINER_MEMORY, rm1); |
| MockAM am0 = MockRM.launchAM(app0, rm1, nm1); |
| am0.registerAppAttempt(); |
| |
| // get scheduler app |
| FiCaSchedulerApp schedulerAppAttempt = |
| cs.getSchedulerApplications().get(app0.getApplicationId()) |
| .getCurrentAppAttempt(); |
| |
| // allocate some containers and launch them |
| List<Container> allocatedContainers = |
| am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); |
| |
| // kill the 3 containers |
| for (Container c : allocatedContainers) { |
| cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); |
| } |
| |
| // check values |
| waitForAppPreemptionInfo(app0, |
| Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3, |
| Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); |
| |
| // kill app0-attempt0 AM container |
| cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0 |
| .getCurrentAppAttempt().getMasterContainer().getId())); |
| |
| // wait for app0 failed |
| waitForNewAttemptCreated(app0, am0.getApplicationAttemptId()); |
| |
| // check values |
| waitForAppPreemptionInfo(app0, |
| Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3, |
| Resource.newInstance(0, 0), false, 0); |
| |
| // launch app0-attempt1 |
| MockAM am1 = MockRM.launchAM(app0, rm1, nm1); |
| am1.registerAppAttempt(); |
| |
| schedulerAppAttempt = |
| cs.getSchedulerApplications().get(app0.getApplicationId()) |
| .getCurrentAppAttempt(); |
| |
| // allocate some containers and launch them |
| allocatedContainers = |
| am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); |
| for (Container c : allocatedContainers) { |
| cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); |
| } |
| |
| // check values |
| waitForAppPreemptionInfo(app0, |
| Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6, |
| Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); |
| |
| rm1.stop(); |
| } |
| |
| @Test(timeout = 300000) |
| public void testRecoverRequestAfterPreemption() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm1 = new MockRM(conf); |
| rm1.start(); |
| MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); |
| RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm1); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); |
| |
| // request a container. |
| am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); |
| ContainerId containerId1 = ContainerId.newContainerId( |
| am1.getApplicationAttemptId(), 2); |
| rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); |
| |
| RMContainer rmContainer = cs.getRMContainer(containerId1); |
| List<ResourceRequest> requests = |
| rmContainer.getContainerRequest().getResourceRequests(); |
| FiCaSchedulerApp app = cs.getApplicationAttempt(am1 |
| .getApplicationAttemptId()); |
| |
| FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode()); |
| for (ResourceRequest request : requests) { |
| // Skip the OffRack and RackLocal resource requests. |
| if (request.getResourceName().equals(node.getRackName()) |
| || request.getResourceName().equals(ResourceRequest.ANY)) { |
| continue; |
| } |
| |
| // Already the node local resource request is cleared from RM after |
| // allocation. |
| Assert.assertEquals(0, |
| app.getOutstandingAsksCount(SchedulerRequestKey.create(request), |
| request.getResourceName())); |
| } |
| |
| // Call killContainer to preempt the container |
| cs.markContainerForKillable(rmContainer); |
| |
| Assert.assertEquals(3, requests.size()); |
| for (ResourceRequest request : requests) { |
| // Resource request must have added back in RM after preempt event |
| // handling. |
| Assert.assertEquals(1, |
| app.getOutstandingAsksCount(SchedulerRequestKey.create(request), |
| request.getResourceName())); |
| } |
| |
| // New container will be allocated and will move to ALLOCATED state |
| ContainerId containerId2 = ContainerId.newContainerId( |
| am1.getApplicationAttemptId(), 3); |
| rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); |
| |
| // allocate container |
| List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers(); |
| |
| // Now with updated ResourceRequest, a container is allocated for AM. |
| Assert.assertTrue(containers.size() == 1); |
| rm1.stop(); |
| } |
| |
| @Test |
| public void testPreemptionDisabled() throws Exception { |
| CapacityScheduler cs = new CapacityScheduler(); |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); |
| RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, |
| null, new RMContainerTokenSecretManager(conf), |
| new NMTokenSecretManagerInRM(conf), |
| new ClientToAMTokenSecretManagerInRM(), null); |
| setupQueueConfiguration(conf); |
| cs.setConf(new YarnConfiguration()); |
| cs.setRMContext(resourceManager.getRMContext()); |
| cs.init(conf); |
| cs.start(); |
| cs.reinitialize(conf, rmContext); |
| |
| CSQueue rootQueue = cs.getRootQueue(); |
| CSQueue queueB = findQueue(rootQueue, B_PATH); |
| CSQueue queueB2 = findQueue(queueB, B2_PATH); |
| |
| // When preemption turned on for the whole system |
| // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other |
| // preemption properties set, queue root.b.b2 should be preemptable. |
| assertFalse("queue " + B2 + " should default to preemptable", |
| queueB2.getPreemptionDisabled()); |
| |
| // Disable preemption at the root queue level. |
| // The preemption property should be inherited from root all the |
| // way down so that root.b.b2 should NOT be preemptable. |
| conf.setPreemptionDisabled(rootQueue.getQueuePathObject(), true); |
| cs.reinitialize(conf, rmContext); |
| assertTrue( |
| "queue " + B2 + " should have inherited non-preemptability from root", |
| queueB2.getPreemptionDisabled()); |
| |
| // Enable preemption for root (grandparent) but disable for root.b (parent). |
| // root.b.b2 should inherit property from parent and NOT be preemptable |
| conf.setPreemptionDisabled(rootQueue.getQueuePathObject(), false); |
| conf.setPreemptionDisabled(queueB.getQueuePathObject(), true); |
| cs.reinitialize(conf, rmContext); |
| assertTrue( |
| "queue " + B2 + " should have inherited non-preemptability from parent", |
| queueB2.getPreemptionDisabled()); |
| |
| // When preemption is turned on for root.b.b2, it should be preemptable |
| // even though preemption is disabled on root.b (parent). |
| conf.setPreemptionDisabled(queueB2.getQueuePathObject(), false); |
| cs.reinitialize(conf, rmContext); |
| assertFalse("queue " + B2 + " should have been preemptable", |
| queueB2.getPreemptionDisabled()); |
| cs.stop(); |
| } |
| |
| private void waitContainerAllocated(MockAM am, int mem, int nContainer, |
| int startContainerId, MockRM rm, MockNM nm) throws Exception { |
| for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) { |
| am.allocate("*", mem, 1, new ArrayList<ContainerId>()); |
| ContainerId containerId = |
| ContainerId.newContainerId(am.getApplicationAttemptId(), cId); |
| Assert.assertTrue(rm.waitForState(nm, containerId, |
| RMContainerState.ALLOCATED)); |
| } |
| } |
| |
| @Test |
| public void testSchedulerKeyGarbageCollection() throws Exception { |
| YarnConfiguration conf = |
| new YarnConfiguration(new CapacitySchedulerConfiguration()); |
| conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); |
| |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| HashMap<NodeId, MockNM> nodes = new HashMap<>(); |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm1.getNodeId(), nm1); |
| MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm2.getNodeId(), nm2); |
| MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm3.getNodeId(), nm3); |
| MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm4.getNodeId(), nm4); |
| nm1.registerNode(); |
| nm2.registerNode(); |
| nm3.registerNode(); |
| nm4.registerNode(); |
| |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| ApplicationAttemptId attemptId = |
| app1.getCurrentAppAttempt().getAppAttemptId(); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| |
| // All nodes 1 - 4 will be applicable for scheduling. |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| nm3.nodeHeartbeat(true); |
| nm4.nodeHeartbeat(true); |
| |
| Thread.sleep(1000); |
| |
| AllocateResponse allocateResponse = am1.allocate( |
| Arrays.asList( |
| newResourceRequest(1, 1, ResourceRequest.ANY, |
| Resources.createResource(3 * GB), 1, true, |
| ExecutionType.GUARANTEED), |
| newResourceRequest(2, 2, ResourceRequest.ANY, |
| Resources.createResource(3 * GB), 1, true, |
| ExecutionType.GUARANTEED), |
| newResourceRequest(3, 3, ResourceRequest.ANY, |
| Resources.createResource(3 * GB), 1, true, |
| ExecutionType.GUARANTEED), |
| newResourceRequest(4, 4, ResourceRequest.ANY, |
| Resources.createResource(3 * GB), 1, true, |
| ExecutionType.GUARANTEED) |
| ), |
| null); |
| List<Container> allocatedContainers = allocateResponse |
| .getAllocatedContainers(); |
| Assert.assertEquals(0, allocatedContainers.size()); |
| |
| Collection<SchedulerRequestKey> schedulerKeys = |
| ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) |
| .getAppSchedulingInfo().getSchedulerKeys(); |
| Assert.assertEquals(4, schedulerKeys.size()); |
| |
| // Get a Node to HB... at which point 1 container should be |
| // allocated |
| nm1.nodeHeartbeat(true); |
| Thread.sleep(200); |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| allocatedContainers = allocateResponse.getAllocatedContainers(); |
| Assert.assertEquals(1, allocatedContainers.size()); |
| |
| // Verify 1 outstanding schedulerKey is removed |
| Assert.assertEquals(3, schedulerKeys.size()); |
| |
| List <ResourceRequest> resReqs = |
| ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) |
| .getAppSchedulingInfo().getAllResourceRequests(); |
| |
| // Verify 1 outstanding schedulerKey is removed from the |
| // rrMap as well |
| Assert.assertEquals(3, resReqs.size()); |
| |
| // Verify One more container Allocation on node nm2 |
| // And ensure the outstanding schedulerKeys go down.. |
| nm2.nodeHeartbeat(true); |
| Thread.sleep(200); |
| |
| // Update the allocateReq to send 0 numContainer req. |
| // For the satisfied container... |
| allocateResponse = am1.allocate(Arrays.asList( |
| newResourceRequest(1, |
| allocatedContainers.get(0).getAllocationRequestId(), |
| ResourceRequest.ANY, |
| Resources.createResource(3 * GB), 0, true, |
| ExecutionType.GUARANTEED) |
| ), |
| new ArrayList<>()); |
| allocatedContainers = allocateResponse.getAllocatedContainers(); |
| Assert.assertEquals(1, allocatedContainers.size()); |
| |
| // Verify 1 outstanding schedulerKey is removed |
| Assert.assertEquals(2, schedulerKeys.size()); |
| |
| resReqs = ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) |
| .getAppSchedulingInfo().getAllResourceRequests(); |
| // Verify the map size is not increased due to 0 req |
| Assert.assertEquals(2, resReqs.size()); |
| |
| // Now Verify that the AM can cancel 1 Ask: |
| SchedulerRequestKey sk = schedulerKeys.iterator().next(); |
| am1.allocate( |
| Arrays.asList( |
| newResourceRequest(sk.getPriority().getPriority(), |
| sk.getAllocationRequestId(), |
| ResourceRequest.ANY, Resources.createResource(3 * GB), 0, true, |
| ExecutionType.GUARANTEED) |
| ), |
| null); |
| |
| schedulerKeys = |
| ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) |
| .getAppSchedulingInfo().getSchedulerKeys(); |
| |
| Thread.sleep(200); |
| |
| // Verify 1 outstanding schedulerKey is removed because of the |
| // cancel ask |
| Assert.assertEquals(1, schedulerKeys.size()); |
| |
| // Now verify that after the next node heartbeat, we allocate |
| // the last schedulerKey |
| nm3.nodeHeartbeat(true); |
| Thread.sleep(200); |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| allocatedContainers = allocateResponse.getAllocatedContainers(); |
| Assert.assertEquals(1, allocatedContainers.size()); |
| |
| // Verify no more outstanding schedulerKeys.. |
| Assert.assertEquals(0, schedulerKeys.size()); |
| resReqs = |
| ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) |
| .getAppSchedulingInfo().getAllResourceRequests(); |
| Assert.assertEquals(0, resReqs.size()); |
| rm.stop(); |
| } |
| |
| private static ResourceRequest newResourceRequest(int priority, |
| long allocReqId, String rName, Resource resource, int numContainers, |
| boolean relaxLoc, ExecutionType eType) { |
| ResourceRequest rr = ResourceRequest.newInstance( |
| Priority.newInstance(priority), rName, resource, numContainers, |
| relaxLoc, null, ExecutionTypeRequest.newInstance(eType, true)); |
| rr.setAllocationRequestId(allocReqId); |
| return rr; |
| } |
| |
| @Test |
| public void testHierarchyQueuesCurrentLimits() throws Exception { |
| /* |
| * Queue tree: |
| * Root |
| * / \ |
| * A B |
| * / \ / | \ |
| * A1 A2 B1 B2 B3 |
| */ |
| YarnConfiguration conf = |
| new YarnConfiguration( |
| setupQueueConfiguration(new CapacitySchedulerConfiguration())); |
| conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); |
| MockRM rm1 = new MockRM(conf); |
| rm1.start(); |
| MockNM nm1 = |
| new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| MockRMAppSubmissionData data2 = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("b1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm1, data2); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| |
| waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1); |
| |
| // Maximum resource of b1 is 100 * 0.895 * 0.792 = 71 GB |
| // 2 GBs used by am, so it's 71 - 2 = 69G. |
| Assert.assertEquals(69 * GB, |
| am1.doHeartbeat().getAvailableResources().getMemorySize()); |
| |
| MockRMAppSubmissionData data1 = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("b2") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app2 = MockRMAppSubmitter.submit(rm1, data1); |
| MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); |
| |
| // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total) |
| waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1); |
| |
| // Allocated one more container with 1 GB resource in b1 |
| waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1); |
| |
| // Total is 100 GB, |
| // B2 uses 41 GB (5 * 8GB containers and 1 AM container) |
| // B1 uses 3 GB (2 * 1GB containers and 1 AM container) |
| // Available is 100 - 41 - 3 = 56 GB |
| Assert.assertEquals(56 * GB, |
| am1.doHeartbeat().getAvailableResources().getMemorySize()); |
| |
| // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom |
| // of app1 (in queue b1) updated correctly |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app3 = MockRMAppSubmitter.submit(rm1, data); |
| MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); |
| |
| // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total) |
| waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1); |
| |
| // Allocated one more container with 4 GB resource in b1 |
| waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1); |
| |
| // Total is 100 GB, |
| // B2 uses 41 GB (5 * 8GB containers and 1 AM container) |
| // B1 uses 4 GB (3 * 1GB containers and 1 AM container) |
| // A1 uses 25 GB (3 * 8GB containers and 1 AM container) |
| // Available is 100 - 41 - 4 - 25 = 30 GB |
| Assert.assertEquals(30 * GB, |
| am1.doHeartbeat().getAvailableResources().getMemorySize()); |
| rm1.stop(); |
| } |
| |
| @Test |
| public void testParentQueueMaxCapsAreRespected() throws Exception { |
| /* |
| * Queue tree: |
| * Root |
| * / \ |
| * A B |
| * / \ |
| * A1 A2 |
| */ |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| csConf.setQueues(ROOT, new String[] {"a", "b"}); |
| csConf.setCapacity(A, 50); |
| csConf.setMaximumCapacity(A, 50); |
| csConf.setCapacity(B, 50); |
| |
| // Define 2nd-level queues |
| csConf.setQueues(A, new String[] {"a1", "a2"}); |
| csConf.setCapacity(A1, 50); |
| csConf.setUserLimitFactor(A1, 100.0f); |
| csConf.setCapacity(A2, 50); |
| csConf.setUserLimitFactor(A2, 100.0f); |
| csConf.setCapacity(B1, B1_CAPACITY); |
| csConf.setUserLimitFactor(B1, 100.0f); |
| |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); |
| |
| MockRM rm1 = new MockRM(conf); |
| rm1.start(); |
| MockNM nm1 = |
| new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB |
| MockRMAppSubmissionData data1 = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm1, data1); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1); |
| |
| // Try to launch app2 in a2, asked 2GB, should success |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a2") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app2 = MockRMAppSubmitter.submit(rm1, data); |
| MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); |
| try { |
| // Try to allocate a container, a's usage=11G/max=12 |
| // a1's usage=9G/max=12 |
| // a2's usage=2G/max=12 |
| // In this case, if a2 asked 2G, should fail. |
| waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1); |
| } catch (AssertionError failure) { |
| // Expected, return; |
| return; |
| } |
| Assert.fail("Shouldn't successfully allocate containers for am2, " |
| + "queue-a's max capacity will be violated if container allocated"); |
| rm1.stop(); |
| } |
| |
| @Test |
| public void testQueueHierarchyPendingResourceUpdate() throws Exception { |
| Configuration conf = |
| TestUtils.getConfigurationWithQueueLabels(new Configuration(false)); |
| conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); |
| |
| final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); |
| mgr.init(conf); |
| mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); |
| mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); |
| |
| MockRM rm = new MockRM(conf) { |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return mgr; |
| } |
| }; |
| |
| rm.start(); |
| MockNM nm1 = // label = x |
| new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| MockNM nm2 = // label = "" |
| new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); |
| nm2.registerNode(); |
| |
| // Launch app1 in queue=a1 |
| MockRMAppSubmissionData data1 = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data1); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| |
| // Launch app2 in queue=b1 |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("b1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app2 = MockRMAppSubmitter.submit(rm, data); |
| MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); |
| |
| // am1 asks for 8 * 1GB container for no label |
| am1.allocate(Arrays.asList(ResourceRequest.newInstance( |
| Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)), |
| null); |
| |
| checkPendingResource(rm, "a1", 8 * GB, null); |
| checkPendingResource(rm, "a", 8 * GB, null); |
| checkPendingResource(rm, "root", 8 * GB, null); |
| |
| // am2 asks for 8 * 1GB container for no label |
| am2.allocate(Arrays.asList(ResourceRequest.newInstance( |
| Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)), |
| null); |
| |
| checkPendingResource(rm, "a1", 8 * GB, null); |
| checkPendingResource(rm, "a", 8 * GB, null); |
| checkPendingResource(rm, "b1", 8 * GB, null); |
| checkPendingResource(rm, "b", 8 * GB, null); |
| // root = a + b |
| checkPendingResource(rm, "root", 16 * GB, null); |
| |
| // am2 asks for 8 * 1GB container in another priority for no label |
| am2.allocate(Arrays.asList(ResourceRequest.newInstance( |
| Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)), |
| null); |
| |
| checkPendingResource(rm, "a1", 8 * GB, null); |
| checkPendingResource(rm, "a", 8 * GB, null); |
| checkPendingResource(rm, "b1", 16 * GB, null); |
| checkPendingResource(rm, "b", 16 * GB, null); |
| // root = a + b |
| checkPendingResource(rm, "root", 24 * GB, null); |
| |
| // am1 asks 4 GB resource instead of 8 * GB for priority=1 |
| am1.allocate(Arrays.asList(ResourceRequest.newInstance( |
| Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)), |
| null); |
| |
| checkPendingResource(rm, "a1", 4 * GB, null); |
| checkPendingResource(rm, "a", 4 * GB, null); |
| checkPendingResource(rm, "b1", 16 * GB, null); |
| checkPendingResource(rm, "b", 16 * GB, null); |
| // root = a + b |
| checkPendingResource(rm, "root", 20 * GB, null); |
| |
| // am1 asks 8 * GB resource which label=x |
| am1.allocate(Arrays.asList(ResourceRequest.newInstance( |
| Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1, |
| true, "x")), null); |
| |
| checkPendingResource(rm, "a1", 4 * GB, null); |
| checkPendingResource(rm, "a", 4 * GB, null); |
| checkPendingResource(rm, "a1", 8 * GB, "x"); |
| checkPendingResource(rm, "a", 8 * GB, "x"); |
| checkPendingResource(rm, "b1", 16 * GB, null); |
| checkPendingResource(rm, "b", 16 * GB, null); |
| // root = a + b |
| checkPendingResource(rm, "root", 20 * GB, null); |
| checkPendingResource(rm, "root", 8 * GB, "x"); |
| |
| // some containers allocated for am1, pending resource should decrease |
| ContainerId containerId = |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); |
| Assert.assertTrue(rm.waitForState(nm1, containerId, |
| RMContainerState.ALLOCATED)); |
| containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); |
| Assert.assertTrue(rm.waitForState(nm2, containerId, |
| RMContainerState.ALLOCATED)); |
| |
| checkPendingResource(rm, "a1", 0 * GB, null); |
| checkPendingResource(rm, "a", 0 * GB, null); |
| checkPendingResource(rm, "a1", 0 * GB, "x"); |
| checkPendingResource(rm, "a", 0 * GB, "x"); |
| // some containers could be allocated for am2 when we allocating containers |
| // for am1, just check if pending resource of b1/b/root > 0 |
| checkPendingResourceGreaterThanZero(rm, "b1", null); |
| checkPendingResourceGreaterThanZero(rm, "b", null); |
| // root = a + b |
| checkPendingResourceGreaterThanZero(rm, "root", null); |
| checkPendingResource(rm, "root", 0 * GB, "x"); |
| |
| // complete am2, pending resource should be 0 now |
| AppAttemptRemovedSchedulerEvent appRemovedEvent = |
| new AppAttemptRemovedSchedulerEvent( |
| am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); |
| rm.getResourceScheduler().handle(appRemovedEvent); |
| |
| checkPendingResource(rm, "a1", 0 * GB, null); |
| checkPendingResource(rm, "a", 0 * GB, null); |
| checkPendingResource(rm, "a1", 0 * GB, "x"); |
| checkPendingResource(rm, "a", 0 * GB, "x"); |
| checkPendingResource(rm, "b1", 0 * GB, null); |
| checkPendingResource(rm, "b", 0 * GB, null); |
| checkPendingResource(rm, "root", 0 * GB, null); |
| checkPendingResource(rm, "root", 0 * GB, "x"); |
| rm.stop(); |
| } |
| |
| // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is |
| // lesser than minimumAllocation |
| @Test(timeout = 30000) |
| public void testAMUsedResource() throws Exception { |
| MockRM rm = setUpMove(); |
| rm.registerNode("127.0.0.1:1234", 4 * GB); |
| |
| Configuration conf = rm.getConfig(); |
| int minAllocMb = |
| conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); |
| int amMemory = 50; |
| assertTrue("AM memory is greater than or equal to minAllocation", |
| amMemory < minAllocMb); |
| Resource minAllocResource = Resource.newInstance(minAllocMb, 1); |
| String queueName = "a1"; |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(amMemory, rm) |
| .withAppName("app-1") |
| .withUser("user_0") |
| .withAcls(null) |
| .withQueue(queueName) |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp rmApp = MockRMAppSubmitter.submit(rm, data); |
| |
| assertEquals("RMApp does not containes minimum allocation", |
| minAllocResource, rmApp.getAMResourceRequests().get(0).getCapability()); |
| |
| ResourceScheduler scheduler = rm.getRMContext().getScheduler(); |
| LeafQueue queueA = |
| (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName); |
| assertEquals("Minimum Resource for AM is incorrect", minAllocResource, |
| queueA.getUser("user_0").getResourceUsage().getAMUsed()); |
| rm.stop(); |
| } |
| |
| // Verifies headroom passed to ApplicationMaster has been updated in |
| // RMAppAttemptMetrics |
| @Test |
| public void testApplicationHeadRoom() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| ApplicationId appId = BuilderUtils.newApplicationId(100, 1); |
| ApplicationAttemptId appAttemptId = |
| BuilderUtils.newApplicationAttemptId(appId, 1); |
| |
| RMAppAttemptMetrics attemptMetric = |
| new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); |
| RMAppImpl app = mock(RMAppImpl.class); |
| when(app.getApplicationId()).thenReturn(appId); |
| RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); |
| Container container = mock(Container.class); |
| when(attempt.getMasterContainer()).thenReturn(container); |
| ApplicationSubmissionContext submissionContext = mock( |
| ApplicationSubmissionContext.class); |
| when(attempt.getSubmissionContext()).thenReturn(submissionContext); |
| when(attempt.getAppAttemptId()).thenReturn(appAttemptId); |
| when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); |
| when(app.getCurrentAppAttempt()).thenReturn(attempt); |
| |
| rm.getRMContext().getRMApps().put(appId, app); |
| |
| SchedulerEvent addAppEvent = |
| new AppAddedSchedulerEvent(appId, "default", "user"); |
| cs.handle(addAppEvent); |
| SchedulerEvent addAttemptEvent = |
| new AppAttemptAddedSchedulerEvent(appAttemptId, false); |
| cs.handle(addAttemptEvent); |
| |
| Allocation allocate = |
| cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(), |
| null, Collections.<ContainerId> emptyList(), null, null, |
| NULL_UPDATE_REQUESTS); |
| |
| Assert.assertNotNull(attempt); |
| |
| Assert |
| .assertEquals(Resource.newInstance(0, 0), allocate.getResourceLimit()); |
| Assert.assertEquals(Resource.newInstance(0, 0), |
| attemptMetric.getApplicationAttemptHeadroom()); |
| |
| // Add a node to cluster |
| Resource newResource = Resource.newInstance(4 * GB, 1); |
| RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| |
| allocate = |
| cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(), |
| null, Collections.<ContainerId> emptyList(), null, null, |
| NULL_UPDATE_REQUESTS); |
| |
| // All resources should be sent as headroom |
| Assert.assertEquals(newResource, allocate.getResourceLimit()); |
| Assert.assertEquals(newResource, |
| attemptMetric.getApplicationAttemptHeadroom()); |
| |
| rm.stop(); |
| } |
| |
| |
| @Test |
| public void testHeadRoomCalculationWithDRC() throws Exception { |
| // test with total cluster resource of 20GB memory and 20 vcores. |
| // the queue where two apps running has user limit 0.8 |
| // allocate 10GB memory and 1 vcore to app 1. |
| // app 1 should have headroom |
| // 20GB*0.8 - 10GB = 6GB memory available and 15 vcores. |
| // allocate 1GB memory and 1 vcore to app2. |
| // app 2 should have headroom 20GB - 10 - 1 = 1GB memory, |
| // and 20*0.8 - 1 = 15 vcores. |
| |
| CapacitySchedulerConfiguration csconf = |
| new CapacitySchedulerConfiguration(); |
| csconf.setResourceComparator(DominantResourceCalculator.class); |
| |
| YarnConfiguration conf = new YarnConfiguration(csconf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| LeafQueue qb = (LeafQueue)cs.getQueue("default"); |
| qb.setUserLimitFactor((float)0.8); |
| |
| ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user1"); |
| ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user2"); |
| |
| // add nodes to cluster, so cluster have 20GB and 20 vcores |
| Resource newResource = Resource.newInstance(10 * GB, 10); |
| RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| |
| Resource newResource2 = Resource.newInstance(10 * GB, 10); |
| RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2"); |
| cs.handle(new NodeAddedSchedulerEvent(node2)); |
| |
| FiCaSchedulerApp fiCaApp1 = |
| cs.getSchedulerApplications().get(appAttemptId.getApplicationId()) |
| .getCurrentAppAttempt(); |
| |
| FiCaSchedulerApp fiCaApp2 = |
| cs.getSchedulerApplications().get(appAttemptId2.getApplicationId()) |
| .getCurrentAppAttempt(); |
| Priority u0Priority = TestUtils.createMockPriority(1); |
| RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| // allocate container for app1 with 10GB memory and 1 vcore |
| fiCaApp1.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true, |
| u0Priority, recordFactory))); |
| cs.handle(new NodeUpdateSchedulerEvent(node)); |
| cs.handle(new NodeUpdateSchedulerEvent(node2)); |
| assertEquals(6*GB, fiCaApp1.getHeadroom().getMemorySize()); |
| assertEquals(15, fiCaApp1.getHeadroom().getVirtualCores()); |
| |
| // allocate container for app2 with 1GB memory and 1 vcore |
| fiCaApp2.updateResourceRequests(Collections.singletonList( |
| TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, |
| u0Priority, recordFactory))); |
| cs.handle(new NodeUpdateSchedulerEvent(node)); |
| cs.handle(new NodeUpdateSchedulerEvent(node2)); |
| assertEquals(9*GB, fiCaApp2.getHeadroom().getMemorySize()); |
| assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores()); |
| rm.stop(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAMLimitUsage() throws Exception { |
| |
| CapacitySchedulerConfiguration config = |
| new CapacitySchedulerConfiguration(); |
| |
| config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, |
| DefaultResourceCalculator.class.getName()); |
| verifyAMLimitForLeafQueue(config); |
| |
| config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, |
| DominantResourceCalculator.class.getName()); |
| verifyAMLimitForLeafQueue(config); |
| } |
| |
| private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, |
| ApplicationId appId) { |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); |
| } |
| |
| @Test |
| public void testPendingResourceUpdatedAccordingToIncreaseRequestChanges() |
| throws Exception { |
| Configuration conf = |
| TestUtils.getConfigurationWithQueueLabels(new Configuration(false)); |
| conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); |
| |
| final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); |
| mgr.init(conf); |
| |
| |
| MockRM rm = new MockRM(conf) { |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return mgr; |
| } |
| }; |
| |
| rm.start(); |
| |
| MockNM nm1 = // label = "" |
| new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| // Launch app1 in queue=a1 |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); |
| |
| // Allocate two more containers |
| am1.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(2 * GB), 2)), |
| null); |
| ContainerId containerId1 = |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); |
| ContainerId containerId2 = |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); |
| ContainerId containerId3 = |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); |
| Assert.assertTrue(rm.waitForState(nm1, containerId3, |
| RMContainerState.ALLOCATED)); |
| // Acquire them |
| am1.allocate(null, null); |
| sentRMContainerLaunched(rm, |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 1L)); |
| sentRMContainerLaunched(rm, |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L)); |
| sentRMContainerLaunched(rm, |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 3L)); |
| |
| // am1 asks to change its AM container from 1GB to 3GB |
| am1.sendContainerResizingRequest(Arrays.asList( |
| UpdateContainerRequest |
| .newInstance(0, containerId1, |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(3 * GB), null))); |
| |
| FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); |
| |
| Assert.assertEquals(2 * GB, |
| app.getAppAttemptResourceUsage().getPending().getMemorySize()); |
| checkPendingResource(rm, "a1", 2 * GB, null); |
| checkPendingResource(rm, "a", 2 * GB, null); |
| checkPendingResource(rm, "root", 2 * GB, null); |
| |
| // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G) |
| am1.sendContainerResizingRequest(Arrays.asList( |
| UpdateContainerRequest |
| .newInstance(0, containerId2, |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(3 * GB), null), |
| UpdateContainerRequest |
| .newInstance(0, containerId3, |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(5 * GB), null))); |
| |
| Assert.assertEquals(6 * GB, |
| app.getAppAttemptResourceUsage().getPending().getMemorySize()); |
| checkPendingResource(rm, "a1", 6 * GB, null); |
| checkPendingResource(rm, "a", 6 * GB, null); |
| checkPendingResource(rm, "root", 6 * GB, null); |
| |
| // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and |
| // containerId3 (2G -> 2G) |
| am1.sendContainerResizingRequest(Arrays.asList( |
| UpdateContainerRequest |
| .newInstance(0, containerId1, |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(3 * GB), null), |
| UpdateContainerRequest |
| .newInstance(0, containerId2, |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(4 * GB), null), |
| UpdateContainerRequest |
| .newInstance(0, containerId3, |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(2 * GB), null))); |
| Assert.assertEquals(4 * GB, |
| app.getAppAttemptResourceUsage().getPending().getMemorySize()); |
| checkPendingResource(rm, "a1", 4 * GB, null); |
| checkPendingResource(rm, "a", 4 * GB, null); |
| checkPendingResource(rm, "root", 4 * GB, null); |
| rm.stop(); |
| } |
| |
| private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config) |
| throws Exception { |
| MockRM rm = setUpMove(config); |
| int nodeMemory = 4 * GB; |
| rm.registerNode("127.0.0.1:1234", nodeMemory); |
| |
| String queueName = "a1"; |
| String userName = "user_0"; |
| ResourceScheduler scheduler = rm.getRMContext().getScheduler(); |
| LeafQueue queueA = |
| (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName); |
| Resource amResourceLimit = queueA.getAMResourceLimit(); |
| |
| Resource amResource1 = |
| Resource.newInstance(amResourceLimit.getMemorySize() + 1024, |
| amResourceLimit.getVirtualCores() + 1); |
| Resource amResource2 = |
| Resource.newInstance(amResourceLimit.getMemorySize() + 2048, |
| amResourceLimit.getVirtualCores() + 1); |
| |
| // Wait for the scheduler to be updated with new node capacity |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return scheduler.getMaximumResourceCapability().getMemorySize() == nodeMemory; |
| } |
| }, 100, 60 * 1000); |
| |
| MockRMAppSubmitter.submit(rm, |
| MockRMAppSubmissionData.Builder.createWithResource(amResource1, rm) |
| .withResource(amResource1) |
| .withAppName("app-1") |
| .withUser(userName) |
| .withAcls(null) |
| .withQueue(queueName) |
| .build()); |
| |
| MockRMAppSubmitter.submit(rm, |
| MockRMAppSubmissionData.Builder.createWithResource(amResource2, rm) |
| .withResource(amResource2) |
| .withAppName("app-2") |
| .withUser(userName) |
| .withAcls(null) |
| .withQueue(queueName) |
| .build()); |
| |
| // When AM limit is exceeded, 1 applications will be activated.Rest all |
| // applications will be in pending |
| Assert.assertEquals("PendingApplications should be 1", 1, |
| queueA.getNumPendingApplications()); |
| Assert.assertEquals("Active applications should be 1", 1, |
| queueA.getNumActiveApplications()); |
| |
| Assert.assertEquals("User PendingApplications should be 1", 1, queueA |
| .getUser(userName).getPendingApplications()); |
| Assert.assertEquals("User Active applications should be 1", 1, queueA |
| .getUser(userName).getActiveApplications()); |
| rm.stop(); |
| } |
| |
| private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| RMContainer rmContainer = cs.getRMContainer(containerId); |
| if (rmContainer != null) { |
| rmContainer.handle( |
| new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); |
| } else { |
| Assert.fail("Cannot find RMContainer"); |
| } |
| } |
| |
| @Test |
| public void testCSReservationWithRootUnblocked() throws Exception { |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| conf.setResourceComparator(DominantResourceCalculator.class); |
| setupOtherBlockedQueueConfiguration(conf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| ParentQueue q = (ParentQueue) cs.getQueue("p1"); |
| |
| Assert.assertNotNull(q); |
| String host = "127.0.0.1"; |
| String host1 = "test"; |
| RMNode node = |
| MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host); |
| RMNode node1 = |
| MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| cs.handle(new NodeAddedSchedulerEvent(node1)); |
| ApplicationAttemptId appAttemptId1 = |
| appHelper(rm, cs, 100, 1, "x1", "userX1"); |
| ApplicationAttemptId appAttemptId2 = |
| appHelper(rm, cs, 100, 2, "x2", "userX2"); |
| ApplicationAttemptId appAttemptId3 = |
| appHelper(rm, cs, 100, 3, "y1", "userY1"); |
| RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| Priority priority = TestUtils.createMockPriority(1); |
| ResourceRequest y1Req = null; |
| ResourceRequest x1Req = null; |
| ResourceRequest x2Req = null; |
| for(int i=0; i < 4; i++) { |
| y1Req = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId3, |
| Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| } |
| assertEquals("Y1 Used Resource should be 4 GB", 4 * GB, |
| cs.getQueue("y1").getUsedResources().getMemorySize()); |
| assertEquals("P2 Used Resource should be 4 GB", 4 * GB, |
| cs.getQueue("p2").getUsedResources().getMemorySize()); |
| |
| for(int i=0; i < 7; i++) { |
| x1Req = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId1, |
| Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| } |
| assertEquals("X1 Used Resource should be 7 GB", 7 * GB, |
| cs.getQueue("x1").getUsedResources().getMemorySize()); |
| assertEquals("P1 Used Resource should be 7 GB", 7 * GB, |
| cs.getQueue("p1").getUsedResources().getMemorySize()); |
| |
| x2Req = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId2, |
| Collections.<ResourceRequest>singletonList(x2Req), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| assertEquals("X2 Used Resource should be 0", 0, |
| cs.getQueue("x2").getUsedResources().getMemorySize()); |
| assertEquals("P1 Used Resource should be 7 GB", 7 * GB, |
| cs.getQueue("p1").getUsedResources().getMemorySize()); |
| //this assign should fail |
| x1Req = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId1, |
| Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| assertEquals("X1 Used Resource should be 7 GB", 7 * GB, |
| cs.getQueue("x1").getUsedResources().getMemorySize()); |
| assertEquals("P1 Used Resource should be 7 GB", 7 * GB, |
| cs.getQueue("p1").getUsedResources().getMemorySize()); |
| |
| //this should get thru |
| for (int i=0; i < 4; i++) { |
| y1Req = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId3, |
| Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| } |
| assertEquals("P2 Used Resource should be 8 GB", 8 * GB, |
| cs.getQueue("p2").getUsedResources().getMemorySize()); |
| |
| //Free a container from X1 |
| ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2); |
| cs.handle(new ContainerExpiredSchedulerEvent(containerId)); |
| |
| //Schedule pending request |
| CapacityScheduler.schedule(cs); |
| assertEquals("X2 Used Resource should be 2 GB", 2 * GB, |
| cs.getQueue("x2").getUsedResources().getMemorySize()); |
| assertEquals("P1 Used Resource should be 8 GB", 8 * GB, |
| cs.getQueue("p1").getUsedResources().getMemorySize()); |
| assertEquals("P2 Used Resource should be 8 GB", 8 * GB, |
| cs.getQueue("p2").getUsedResources().getMemorySize()); |
| assertEquals("Root Used Resource should be 16 GB", 16 * GB, |
| cs.getRootQueue().getUsedResources().getMemorySize()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testCSQueueBlocked() throws Exception { |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| setupBlockedQueueConfiguration(conf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| LeafQueue q = (LeafQueue) cs.getQueue("a"); |
| |
| Assert.assertNotNull(q); |
| String host = "127.0.0.1"; |
| String host1 = "test"; |
| RMNode node = |
| MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host); |
| RMNode node1 = |
| MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| cs.handle(new NodeAddedSchedulerEvent(node1)); |
| //add app begin |
| ApplicationAttemptId appAttemptId1 = |
| appHelper(rm, cs, 100, 1, "a", "user1"); |
| ApplicationAttemptId appAttemptId2 = |
| appHelper(rm, cs, 100, 2, "b", "user2"); |
| //add app end |
| |
| RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| Priority priority = TestUtils.createMockPriority(1); |
| ResourceRequest r1 = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); |
| //This will allocate for app1 |
| cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1), |
| null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS).getContainers().size(); |
| CapacityScheduler.schedule(cs); |
| ResourceRequest r2 = null; |
| for (int i =0; i < 13; i++) { |
| r2 = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId2, |
| Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| } |
| assertEquals("A Used Resource should be 2 GB", 2 * GB, |
| cs.getQueue("a").getUsedResources().getMemorySize()); |
| assertEquals("B Used Resource should be 13 GB", 13 * GB, |
| cs.getQueue("b").getUsedResources().getMemorySize()); |
| r1 = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); |
| r2 = TestUtils.createResourceRequest( |
| ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); |
| cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1), |
| null, Collections.<ContainerId>emptyList(), |
| null, null, NULL_UPDATE_REQUESTS).getContainers().size(); |
| CapacityScheduler.schedule(cs); |
| |
| cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2), |
| null, Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS); |
| CapacityScheduler.schedule(cs); |
| //Check blocked Resource |
| assertEquals("A Used Resource should be 2 GB", 2 * GB, |
| cs.getQueue("a").getUsedResources().getMemorySize()); |
| assertEquals("B Used Resource should be 13 GB", 13 * GB, |
| cs.getQueue("b").getUsedResources().getMemorySize()); |
| |
| ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10); |
| ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11); |
| |
| cs.handle(new ContainerExpiredSchedulerEvent(containerId1)); |
| rm.drainEvents(); |
| CapacityScheduler.schedule(cs); |
| |
| cs.handle(new ContainerExpiredSchedulerEvent(containerId2)); |
| CapacityScheduler.schedule(cs); |
| rm.drainEvents(); |
| |
| assertEquals("A Used Resource should be 4 GB", 4 * GB, |
| cs.getQueue("a").getUsedResources().getMemorySize()); |
| assertEquals("B Used Resource should be 12 GB", 12 * GB, |
| cs.getQueue("b").getUsedResources().getMemorySize()); |
| assertEquals("Used Resource on Root should be 16 GB", 16 * GB, |
| cs.getRootQueue().getUsedResources().getMemorySize()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testAppAttemptLocalityStatistics() throws Exception { |
| Configuration conf = |
| TestUtils.getConfigurationWithMultipleQueues(new Configuration(false)); |
| conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); |
| |
| final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); |
| mgr.init(conf); |
| |
| MockRM rm = new MockRM(conf) { |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return mgr; |
| } |
| }; |
| |
| rm.start(); |
| MockNM nm1 = |
| new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| // Launch app1 in queue=a1 |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| |
| // Got one offswitch request and offswitch allocation |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); |
| |
| // am1 asks for 1 GB resource on h1/default-rack/offswitch |
| am1.allocate(Arrays.asList(ResourceRequest |
| .newInstance(Priority.newInstance(1), "*", |
| Resources.createResource(1 * GB), 2), ResourceRequest |
| .newInstance(Priority.newInstance(1), "/default-rack", |
| Resources.createResource(1 * GB), 2), ResourceRequest |
| .newInstance(Priority.newInstance(1), "h1", |
| Resources.createResource(1 * GB), 1)), null); |
| |
| CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); |
| |
| // Got one nodelocal request and nodelocal allocation |
| cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId())); |
| |
| // Got one nodelocal request and racklocal allocation |
| cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId())); |
| |
| RMAppAttemptMetrics attemptMetrics = rm.getRMContext().getRMApps().get( |
| app1.getApplicationId()).getCurrentAppAttempt() |
| .getRMAppAttemptMetrics(); |
| |
| // We should get one node-local allocation, one rack-local allocation |
| // And one off-switch allocation |
| Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } }, |
| attemptMetrics.getLocalityStatistics()); |
| rm.stop(); |
| } |
| |
| |
| @Test(timeout = 30000) |
| public void testAMLimitDouble() throws Exception { |
| CapacitySchedulerConfiguration config = |
| new CapacitySchedulerConfiguration(); |
| config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, |
| DominantResourceCalculator.class.getName()); |
| CapacitySchedulerConfiguration conf = |
| new CapacitySchedulerConfiguration(config); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| conf.setInt("yarn.scheduler.minimum-allocation-mb", 512); |
| conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| rm.registerNode("127.0.0.1:1234", 10 * GB); |
| rm.registerNode("127.0.0.1:1235", 10 * GB); |
| rm.registerNode("127.0.0.1:1236", 10 * GB); |
| rm.registerNode("127.0.0.1:1237", 10 * GB); |
| ResourceScheduler scheduler = rm.getRMContext().getScheduler(); |
| waitforNMRegistered(scheduler, 4, 5); |
| LeafQueue queueA = |
| (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); |
| Resource amResourceLimit = queueA.getAMResourceLimit(); |
| Assert.assertEquals(4096, amResourceLimit.getMemorySize()); |
| Assert.assertEquals(4, amResourceLimit.getVirtualCores()); |
| rm.stop(); |
| } |
| |
| |
| @Test |
| public void testQueueMappingWithCurrentUserQueueMappingForaGroup() throws |
| Exception { |
| |
| CapacitySchedulerConfiguration config = |
| new CapacitySchedulerConfiguration(); |
| config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| setupQueueConfiguration(config); |
| |
| config.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, |
| TestGroupsCaching.FakeunPrivilegedGroupMapping.class, ShellBasedUnixGroupsMapping.class); |
| config.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES, |
| "a1" +"=" + "agroup" + ""); |
| Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(config); |
| |
| config.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, |
| "g:agroup:%user"); |
| |
| MockRM rm = new MockRM(config); |
| rm.start(); |
| CapacityScheduler cs = ((CapacityScheduler) rm.getResourceScheduler()); |
| cs.start(); |
| |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) |
| .withAppName("appname") |
| .withUser("a1") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app = MockRMAppSubmitter.submit(rm, data); |
| List<ApplicationAttemptId> appsInA1 = cs.getAppsInQueue("a1"); |
| assertEquals(1, appsInA1.size()); |
| rm.stop(); |
| } |
| |
| @Test(timeout = 30000) |
| public void testcheckAndGetApplicationLifetime() throws Exception { |
| long maxLifetime = 10; |
| long defaultLifetime = 5; |
| // positive integer value |
| CapacityScheduler cs = setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.assertEquals(maxLifetime, |
| cs.checkAndGetApplicationLifetime("default", 100)); |
| Assert.assertEquals(9, cs.checkAndGetApplicationLifetime("default", 9)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", -1)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", 0)); |
| Assert.assertEquals(maxLifetime, |
| cs.getMaximumApplicationLifetime("default")); |
| |
| maxLifetime = -1; |
| defaultLifetime = -1; |
| // test for default values |
| cs = setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", -1)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", 0)); |
| Assert.assertEquals(maxLifetime, |
| cs.getMaximumApplicationLifetime("default")); |
| |
| maxLifetime = 10; |
| defaultLifetime = 10; |
| cs = setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.assertEquals(maxLifetime, |
| cs.checkAndGetApplicationLifetime("default", 100)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", -1)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", 0)); |
| Assert.assertEquals(maxLifetime, |
| cs.getMaximumApplicationLifetime("default")); |
| |
| maxLifetime = 0; |
| defaultLifetime = 0; |
| cs = setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", -1)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", 0)); |
| |
| maxLifetime = 10; |
| defaultLifetime = -1; |
| cs = setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.assertEquals(maxLifetime, |
| cs.checkAndGetApplicationLifetime("default", 100)); |
| Assert.assertEquals(maxLifetime, |
| cs.checkAndGetApplicationLifetime("default", -1)); |
| Assert.assertEquals(maxLifetime, |
| cs.checkAndGetApplicationLifetime("default", 0)); |
| |
| maxLifetime = 5; |
| defaultLifetime = 10; |
| try { |
| setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.fail("Expected to fails since maxLifetime < defaultLifetime."); |
| } catch (ServiceStateException sse) { |
| Throwable rootCause = sse.getCause().getCause(); |
| Assert.assertTrue( |
| rootCause.getMessage().contains("can't exceed maximum lifetime")); |
| } |
| |
| maxLifetime = -1; |
| defaultLifetime = 10; |
| cs = setUpCSQueue(maxLifetime, defaultLifetime); |
| Assert.assertEquals(100, |
| cs.checkAndGetApplicationLifetime("default", 100)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", -1)); |
| Assert.assertEquals(defaultLifetime, |
| cs.checkAndGetApplicationLifetime("default", 0)); |
| } |
| |
| private CapacityScheduler setUpCSQueue(long maxLifetime, |
| long defaultLifetime) { |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setQueues(ROOT, |
| new String[] {"default"}); |
| csConf.setCapacity(DEFAULT, 100); |
| csConf.setMaximumLifetimePerQueue(DEFAULT, maxLifetime); |
| csConf.setDefaultLifetimePerQueue(DEFAULT, defaultLifetime); |
| |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| CapacityScheduler cs = new CapacityScheduler(); |
| |
| RMContext rmContext = TestUtils.getMockRMContext(); |
| cs.setConf(conf); |
| cs.setRMContext(rmContext); |
| cs.init(conf); |
| |
| return cs; |
| } |
| |
| @Test (timeout = 60000) |
| public void testClearRequestsBeforeApplyTheProposal() |
| throws Exception { |
| // init RM & NMs & Nodes |
| final MockRM rm = new MockRM(new CapacitySchedulerConfiguration()); |
| rm.start(); |
| final MockNM nm = rm.registerNode("h1:1234", 200 * GB); |
| |
| // submit app |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(200, rm) |
| .withAppName("app") |
| .withUser("user") |
| .build(); |
| final RMApp app = MockRMAppSubmitter.submit(rm, data); |
| MockRM.launchAndRegisterAM(app, rm, nm); |
| |
| // spy capacity scheduler to handle CapacityScheduler#apply |
| final Priority priority = Priority.newInstance(1); |
| final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| final CapacityScheduler spyCs = Mockito.spy(cs); |
| Mockito.doAnswer(new Answer<Object>() { |
| public Object answer(InvocationOnMock invocation) throws Exception { |
| // clear resource request before applying the proposal for container_2 |
| spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), |
| Arrays.asList(ResourceRequest.newInstance(priority, "*", |
| Resources.createResource(1 * GB), 0)), null, |
| Collections.<ContainerId>emptyList(), null, null, |
| NULL_UPDATE_REQUESTS); |
| // trigger real apply which can raise NPE before YARN-6629 |
| try { |
| FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( |
| app.getCurrentAppAttempt().getAppAttemptId()); |
| schedulerApp.apply((Resource) invocation.getArguments()[0], |
| (ResourceCommitRequest) invocation.getArguments()[1], |
| (Boolean) invocation.getArguments()[2]); |
| // the proposal of removed request should be rejected |
| Assert.assertEquals(1, schedulerApp.getLiveContainers().size()); |
| } catch (Throwable e) { |
| Assert.fail(); |
| } |
| return null; |
| } |
| }).when(spyCs).tryCommit(Mockito.any(Resource.class), |
| Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); |
| |
| // rm allocates container_2 to reproduce the process that can raise NPE |
| spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), |
| Arrays.asList(ResourceRequest.newInstance(priority, "*", |
| Resources.createResource(1 * GB), 1)), null, |
| Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS); |
| spyCs.handle(new NodeUpdateSchedulerEvent( |
| spyCs.getNode(nm.getNodeId()).getRMNode())); |
| rm.stop(); |
| } |
| |
| // Testcase for YARN-8528 |
| // This is to test whether ContainerAllocation constants are holding correct |
| // values during scheduling. |
| @Test |
| public void testContainerAllocationLocalitySkipped() throws Exception { |
| Assert.assertEquals(AllocationState.APP_SKIPPED, |
| ContainerAllocation.APP_SKIPPED.getAllocationState()); |
| Assert.assertEquals(AllocationState.LOCALITY_SKIPPED, |
| ContainerAllocation.LOCALITY_SKIPPED.getAllocationState()); |
| Assert.assertEquals(AllocationState.PRIORITY_SKIPPED, |
| ContainerAllocation.PRIORITY_SKIPPED.getAllocationState()); |
| Assert.assertEquals(AllocationState.QUEUE_SKIPPED, |
| ContainerAllocation.QUEUE_SKIPPED.getAllocationState()); |
| |
| // init RM & NMs & Nodes |
| final MockRM rm = new MockRM(new CapacitySchedulerConfiguration()); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| rm.start(); |
| final MockNM nm1 = rm.registerNode("h1:1234", 4 * GB); |
| final MockNM nm2 = rm.registerNode("h2:1234", 6 * GB); // maximum-allocation-mb = 6GB |
| |
| // submit app and request resource |
| // container2 is larger than nm1 total resource, will trigger locality skip |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .build(); |
| final RMApp app = MockRMAppSubmitter.submit(rm, data); |
| final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); |
| am.addRequests(new String[] {"*"}, 5 * GB, 1, 1, 2); |
| am.schedule(); |
| |
| // container1 (am) should be acquired, container2 should not |
| RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| cs.handle(new NodeUpdateSchedulerEvent(node1)); |
| ContainerId cid = ContainerId.newContainerId(am.getApplicationAttemptId(), 1l); |
| assertThat(cs.getRMContainer(cid).getState()). |
| isEqualTo(RMContainerState.ACQUIRED); |
| cid = ContainerId.newContainerId(am.getApplicationAttemptId(), 2l); |
| Assert.assertNull(cs.getRMContainer(cid)); |
| |
| Assert.assertEquals(AllocationState.APP_SKIPPED, |
| ContainerAllocation.APP_SKIPPED.getAllocationState()); |
| Assert.assertEquals(AllocationState.LOCALITY_SKIPPED, |
| ContainerAllocation.LOCALITY_SKIPPED.getAllocationState()); |
| Assert.assertEquals(AllocationState.PRIORITY_SKIPPED, |
| ContainerAllocation.PRIORITY_SKIPPED.getAllocationState()); |
| Assert.assertEquals(AllocationState.QUEUE_SKIPPED, |
| ContainerAllocation.QUEUE_SKIPPED.getAllocationState()); |
| rm.stop(); |
| } |
| |
| /** |
| * Tests |
| * @throws Exception |
| */ |
| @Test |
| public void testCSQueueMetricsDoesNotLeakOnReinit() throws Exception { |
| // Initialize resource map |
| Map<String, ResourceInformation> riMap = new HashMap<>(); |
| |
| // Initialize mandatory resources |
| ResourceInformation memory = |
| ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), |
| ResourceInformation.MEMORY_MB.getUnits(), |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); |
| ResourceInformation vcores = |
| ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), |
| ResourceInformation.VCORES.getUnits(), |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); |
| riMap.put(ResourceInformation.MEMORY_URI, memory); |
| riMap.put(ResourceInformation.VCORES_URI, vcores); |
| |
| ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); |
| |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setResourceComparator(DominantResourceCalculator.class); |
| |
| setupQueueConfiguration(csConf); |
| |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| |
| // Don't reset resource types since we have already configured resource |
| // types |
| conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| csConf = new CapacitySchedulerConfiguration(); |
| setupAdditionalQueues(csConf); |
| cs.reinitialize(csConf, cs.getRMContext()); |
| QueueMetrics a3DefaultPartitionMetrics = QueueMetrics.getQueueMetrics().get( |
| "default.root.a.a3"); |
| |
| Assert.assertSame("Different ParentQueue of siblings is a sign of a memory leak", |
| QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(), |
| QueueMetrics.getQueueMetrics().get("root.a.a3").getParentQueue()); |
| |
| Assert.assertSame("Different ParentQueue of partition metrics is a sign of a memory leak", |
| QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(), |
| a3DefaultPartitionMetrics.getParentQueue()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testCSQueueMetrics() throws Exception { |
| |
| // Initialize resource map |
| Map<String, ResourceInformation> riMap = new HashMap<>(); |
| |
| // Initialize mandatory resources |
| ResourceInformation memory = |
| ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), |
| ResourceInformation.MEMORY_MB.getUnits(), |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); |
| ResourceInformation vcores = |
| ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), |
| ResourceInformation.VCORES.getUnits(), |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); |
| riMap.put(ResourceInformation.MEMORY_URI, memory); |
| riMap.put(ResourceInformation.VCORES_URI, vcores); |
| riMap.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1, |
| ResourceInformation.newInstance( |
| TestQueueMetricsForCustomResources.CUSTOM_RES_1, "", 1, 10)); |
| |
| ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); |
| |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setResourceComparator(DominantResourceCalculator.class); |
| |
| csConf.set(YarnConfiguration.RESOURCE_TYPES, |
| TestQueueMetricsForCustomResources.CUSTOM_RES_1); |
| |
| setupQueueConfiguration(csConf); |
| |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| |
| // Don't reset resource types since we have already configured resource |
| // types |
| conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| RMNode n1 = MockNodes.newNodeInfo(0, |
| MockNodes.newResource(50 * GB, 50, |
| ImmutableMap.<String, String> builder() |
| .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1, |
| String.valueOf(1000)) |
| .build()), |
| 1, "n1"); |
| RMNode n2 = MockNodes.newNodeInfo(0, |
| MockNodes.newResource(50 * GB, 50, |
| ImmutableMap.<String, String> builder() |
| .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1, |
| String.valueOf(2000)) |
| .build()), |
| 2, "n2"); |
| cs.handle(new NodeAddedSchedulerEvent(n1)); |
| cs.handle(new NodeAddedSchedulerEvent(n2)); |
| |
| Map<String, Long> guaranteedCapA11 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| assertEquals(94, guaranteedCapA11 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapA11 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(3000, maxCapA11 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| |
| assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB()); |
| assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB()); |
| assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB()); |
| assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB()); |
| Map<String, Long> guaranteedCapA = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| assertEquals(314, guaranteedCapA |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapA = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(3000, maxCapA |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> guaranteedCapB1 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| assertEquals(2126, guaranteedCapB1 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapB1 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(3000, maxCapB1 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| |
| // Remove a node, metrics should be updated |
| cs.handle(new NodeRemovedSchedulerEvent(n2)); |
| assertEquals(5120, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB()); |
| assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB()); |
| assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB()); |
| assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB()); |
| Map<String, Long> guaranteedCapA1 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| |
| assertEquals(104, guaranteedCapA1 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapA1 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(1000, maxCapA1 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> guaranteedCapB11 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| assertEquals(708, guaranteedCapB11 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapB11 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(1000, maxCapB11 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a") |
| .getMetrics()).getGuaranteedCapacity(), DELTA); |
| assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a") |
| .getMetrics()).getGuaranteedAbsoluteCapacity(), DELTA); |
| assertEquals(B1_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("b1") |
| .getMetrics()).getGuaranteedCapacity(), DELTA); |
| assertEquals((B_CAPACITY / 100) * (B1_CAPACITY / 100), ((CSQueueMetrics)cs |
| .getQueue("b1").getMetrics()).getGuaranteedAbsoluteCapacity(), DELTA); |
| assertEquals(1, ((CSQueueMetrics)cs.getQueue("a").getMetrics()) |
| .getMaxCapacity(), DELTA); |
| assertEquals(1, ((CSQueueMetrics)cs.getQueue("a").getMetrics()) |
| .getMaxAbsoluteCapacity(), DELTA); |
| assertEquals(1, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()) |
| .getMaxCapacity(), DELTA); |
| assertEquals(1, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()) |
| .getMaxAbsoluteCapacity(), DELTA); |
| |
| // Add child queue to a, and reinitialize. Metrics should be updated |
| csConf.setQueues(A, |
| new String[] {"a1", "a2", "a3"}); |
| csConf.setCapacity(A2, 29.5f); |
| csConf.setCapacity(A3, 40.5f); |
| csConf.setMaximumCapacity(A3, |
| 50.0f); |
| |
| cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, |
| null, new RMContainerTokenSecretManager(csConf), |
| new NMTokenSecretManagerInRM(csConf), |
| new ClientToAMTokenSecretManagerInRM(), null)); |
| |
| assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB()); |
| assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB()); |
| assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB()); |
| assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB()); |
| |
| Map<String, Long> guaranteedCapA2 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| assertEquals(30, guaranteedCapA2 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapA2 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(1000, maxCapA2 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| |
| Map<String, Long> guaranteedCapA3 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getGuaranteedCapacity(); |
| assertEquals(42, guaranteedCapA3 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| Map<String, Long> maxCapA3 = |
| ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3") |
| .getMetrics()).getQueueMetricsForCustomResources()) |
| .getMaxCapacity(); |
| assertEquals(500, maxCapA3 |
| .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testReservedContainerLeakWhenMoveApplication() throws Exception { |
| CapacitySchedulerConfiguration csConf |
| = new CapacitySchedulerConfiguration(); |
| csConf.setQueues(ROOT, |
| new String[] {"a", "b"}); |
| csConf.setCapacity(A, 50); |
| csConf.setMaximumCapacity(A, 100); |
| csConf.setUserLimitFactor(A, 100); |
| csConf.setCapacity(B, 50); |
| csConf.setMaximumCapacity(B, 100); |
| csConf.setUserLimitFactor(B, 100); |
| |
| YarnConfiguration conf=new YarnConfiguration(csConf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| RMNodeLabelsManager mgr=new NullRMNodeLabelsManager(); |
| mgr.init(conf); |
| MockRM rm1 = new MockRM(csConf); |
| CapacityScheduler scheduler=(CapacityScheduler) rm1.getResourceScheduler(); |
| rm1.getRMContext().setNodeLabelManager(mgr); |
| rm1.start(); |
| MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB); |
| MockNM nm2 = rm1.registerNode("127.0.0.2:1234", 8 * GB); |
| /* |
| * simulation |
| * app1: (1 AM,1 running container) |
| * app2: (1 AM,1 reserved container) |
| */ |
| // launch an app to queue, AM container should be launched in nm1 |
| MockRMAppSubmissionData submissionData = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) |
| .withAppName("app_1") |
| .withUser("user_1") |
| .withAcls(null) |
| .withQueue("a") |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm1, submissionData); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| |
| // launch another app to queue, AM container should be launched in nm1 |
| submissionData = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) |
| .withAppName("app_2") |
| .withUser("user_1") |
| .withAcls(null) |
| .withQueue("a") |
| .build(); |
| RMApp app2 = MockRMAppSubmitter.submit(rm1, submissionData); |
| MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); |
| |
| am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>()); |
| // this containerRequest should be reserved |
| am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>()); |
| |
| RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| // Do node heartbeats 2 times |
| // First time will allocate container for app1, second time will reserve |
| // container for app2 |
| scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1)); |
| scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1)); |
| |
| FiCaSchedulerApp schedulerApp1 = |
| scheduler.getApplicationAttempt(am1.getApplicationAttemptId()); |
| FiCaSchedulerApp schedulerApp2 = |
| scheduler.getApplicationAttempt(am2.getApplicationAttemptId()); |
| // APP1: 1 AM, 1 allocatedContainer |
| Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); |
| // APP2: 1 AM,1 reservedContainer |
| Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); |
| Assert.assertEquals(1, schedulerApp2.getReservedContainers().size()); |
| //before,move app2 which has one reservedContainer |
| LeafQueue srcQueue = (LeafQueue) scheduler.getQueue("a"); |
| LeafQueue desQueue = (LeafQueue) scheduler.getQueue("b"); |
| Assert.assertEquals(4, srcQueue.getNumContainers()); |
| Assert.assertEquals(10*GB, srcQueue.getUsedResources().getMemorySize()); |
| Assert.assertEquals(0, desQueue.getNumContainers()); |
| Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize()); |
| //app1 ResourceUsage (0 reserved) |
| Assert.assertEquals(5*GB, |
| schedulerApp1 |
| .getAppAttemptResourceUsage().getAllUsed().getMemorySize()); |
| Assert.assertEquals(0, |
| schedulerApp1.getCurrentReservation().getMemorySize()); |
| //app2 ResourceUsage (4GB reserved) |
| Assert.assertEquals(1*GB, |
| schedulerApp2 |
| .getAppAttemptResourceUsage().getAllUsed().getMemorySize()); |
| Assert.assertEquals(4*GB, |
| schedulerApp2.getCurrentReservation().getMemorySize()); |
| //move app2 which has one reservedContainer |
| scheduler.moveApplication(app2.getApplicationId(), "b"); |
| // keep this order |
| // if killing app1 first,the reservedContainer of app2 will be allocated |
| rm1.killApp(app2.getApplicationId()); |
| rm1.killApp(app1.getApplicationId()); |
| //after,moved app2 which has one reservedContainer |
| Assert.assertEquals(0, srcQueue.getNumContainers()); |
| Assert.assertEquals(0, desQueue.getNumContainers()); |
| Assert.assertEquals(0, srcQueue.getUsedResources().getMemorySize()); |
| Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize()); |
| rm1.close(); |
| } |
| |
| /** |
| * (YARN-11191) This test ensures that no deadlock happens while the |
| * refreshQueues is called on the preemptionManager (refresh thread) and the |
| * AbstractCSQueue.getTotalKillableResource is called from the schedule thread. |
| * |
| * @throws Exception TestTimedOutException means deadlock |
| */ |
| @Test (timeout = 20000) |
| public void testRefreshQueueWithOpenPreemption() throws Exception { |
| CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); |
| csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT), new String[]{"a"}); |
| QueuePath a = new QueuePath("root.a"); |
| csConf.setCapacity(a, 100); |
| csConf.setQueues(a, new String[]{"b"}); |
| QueuePath b = new QueuePath("root.a.b"); |
| csConf.setCapacity(b, 100); |
| |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); |
| mgr.init(conf); |
| try (MockRM rm = new MockRM(csConf)) { |
| CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); |
| PreemptionManager preemptionManager = scheduler.getPreemptionManager(); |
| rm.getRMContext().setNodeLabelManager(mgr); |
| rm.start(); |
| |
| AbstractParentQueue queue = (AbstractParentQueue) scheduler.getQueue("a"); |
| |
| // The scheduler thread holds the queue's read-lock for 5 seconds |
| // then the preemption's read-lock is used |
| Thread schedulerThread = new Thread(() -> { |
| queue.readLock.lock(); |
| try { |
| Thread.sleep(5 * 1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| preemptionManager.getKillableContainers("a", |
| queue.getDefaultNodeLabelExpression()); |
| queue.readLock.unlock(); |
| }, "SCHEDULE"); |
| |
| // The complete thread locks/unlocks the queue's write-lock after 1 seconds |
| Thread completeThread = new Thread(() -> { |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| queue.writeLock.lock(); |
| queue.writeLock.unlock(); |
| }, "COMPLETE"); |
| |
| |
| // The refresh thread holds the preemption's write-lock after 2 seconds |
| // while it calls the getChildQueues(ByTryLock) that |
| // locks(tryLocks) the queue's read-lock |
| Thread refreshThread = new Thread(() -> { |
| try { |
| Thread.sleep(2 * 1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| preemptionManager.refreshQueues(queue.getParent(), queue); |
| }, "REFRESH"); |
| schedulerThread.start(); |
| completeThread.start(); |
| refreshThread.start(); |
| |
| schedulerThread.join(); |
| completeThread.join(); |
| refreshThread.join(); |
| } |
| } |
| } |