| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockAM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestApplicationPriority { |
| private final int GB = 1024; |
| |
| private YarnConfiguration conf; |
| |
| @Before |
| public void setUp() throws Exception { |
| conf = new YarnConfiguration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| } |
| |
| @Test |
| public void testApplicationOrderingWithPriority() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| LeafQueue q = (LeafQueue) cs.getQueue("default"); |
| Assert.assertNotNull(q); |
| |
| String host = "127.0.0.1"; |
| RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(16 * GB), 1, |
| host); |
| cs.handle(new NodeAddedSchedulerEvent(node)); |
| |
| // add app 1 start |
| ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); |
| ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( |
| appId1, 1); |
| |
| RMAppAttemptMetrics attemptMetric1 = new RMAppAttemptMetrics(appAttemptId1, |
| rm.getRMContext()); |
| RMAppImpl app1 = mock(RMAppImpl.class); |
| when(app1.getApplicationId()).thenReturn(appId1); |
| RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); |
| when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); |
| when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); |
| when(app1.getCurrentAppAttempt()).thenReturn(attempt1); |
| |
| rm.getRMContext().getRMApps().put(appId1, app1); |
| |
| SchedulerEvent addAppEvent1 = new AppAddedSchedulerEvent(appId1, "default", |
| "user", null, Priority.newInstance(5)); |
| cs.handle(addAppEvent1); |
| SchedulerEvent addAttemptEvent1 = new AppAttemptAddedSchedulerEvent( |
| appAttemptId1, false); |
| cs.handle(addAttemptEvent1); |
| // add app1 end |
| |
| // add app2 begin |
| ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); |
| ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( |
| appId2, 1); |
| |
| RMAppAttemptMetrics attemptMetric2 = new RMAppAttemptMetrics(appAttemptId2, |
| rm.getRMContext()); |
| RMAppImpl app2 = mock(RMAppImpl.class); |
| when(app2.getApplicationId()).thenReturn(appId2); |
| RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); |
| when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); |
| when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); |
| when(app2.getCurrentAppAttempt()).thenReturn(attempt2); |
| |
| rm.getRMContext().getRMApps().put(appId2, app2); |
| |
| SchedulerEvent addAppEvent2 = new AppAddedSchedulerEvent(appId2, "default", |
| "user", null, Priority.newInstance(8)); |
| cs.handle(addAppEvent2); |
| SchedulerEvent addAttemptEvent2 = new AppAttemptAddedSchedulerEvent( |
| appAttemptId2, false); |
| cs.handle(addAttemptEvent2); |
| // add app end |
| |
| // Now, the first assignment will be for app2 since app2 is of highest |
| // priority |
| assertEquals(q.getApplications().size(), 2); |
| assertEquals(q.getApplications().iterator().next() |
| .getApplicationAttemptId(), appAttemptId2); |
| |
| rm.stop(); |
| } |
| |
| @Test |
| public void testApplicationPriorityAllocation() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| // Set Max Application Priority as 10 |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| Priority appPriority1 = Priority.newInstance(5); |
| MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); |
| RMApp app1 = rm.submitApp(1 * GB, appPriority1); |
| |
| // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 |
| MockAM am1 = MockRM.launchAM(app1, rm, nm1); |
| am1.registerAppAttempt(); |
| |
| // allocate 7 containers for App1 |
| List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1", |
| 7, 2 * GB, nm1); |
| |
| Assert.assertEquals(7, allocated1.size()); |
| Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); |
| |
| // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available |
| SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( |
| nm1.getNodeId()); |
| Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // Submit the second app App2 with priority 8 (Higher than App1) |
| Priority appPriority2 = Priority.newInstance(8); |
| RMApp app2 = rm.submitApp(1 * GB, appPriority2); |
| |
| // kick the scheduler, 1 GB which was free is given to AM of App2 |
| MockAM am2 = MockRM.launchAM(app2, rm, nm1); |
| am2.registerAppAttempt(); |
| |
| // check node report, 16 GB used and 0 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // get scheduler |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| // get scheduler app |
| FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() |
| .get(app1.getApplicationId()).getCurrentAppAttempt(); |
| |
| // kill 2 containers of App1 to free up some space |
| int counter = 0; |
| for (Container c : allocated1) { |
| if (++counter > 2) { |
| break; |
| } |
| cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); |
| } |
| |
| // check node report, 12 GB used and 4 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // send updated request for App1 |
| am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>()); |
| |
| // kick the scheduler, since App2 priority is more than App1, it will get |
| // remaining cluster space. |
| List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1", |
| 2, 2 * GB, nm1); |
| |
| // App2 has got 2 containers now. |
| Assert.assertEquals(2, allocated2.size()); |
| |
| // check node report, 16 GB used and 0 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| rm.stop(); |
| } |
| |
| @Test |
| public void testPriorityWithPendingApplications() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| // Set Max Application Priority as 10 |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| Priority appPriority1 = Priority.newInstance(5); |
| MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * GB); |
| RMApp app1 = rm.submitApp(1 * GB, appPriority1); |
| |
| // kick the scheduler, 1 GB given to AM1, remaining 7GB on nm1 |
| MockAM am1 = MockRM.launchAM(app1, rm, nm1); |
| am1.registerAppAttempt(); |
| |
| // kick the scheduler, 7 containers will be allocated for App1 |
| List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1", |
| 7, 1 * GB, nm1); |
| |
| Assert.assertEquals(7, allocated1.size()); |
| Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemorySize()); |
| |
| // check node report, 8 GB used (1 AM and 7 containers) and 0 GB available |
| SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( |
| nm1.getNodeId()); |
| Assert.assertEquals(8 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // Submit the second app App2 with priority 7 |
| Priority appPriority2 = Priority.newInstance(7); |
| RMApp app2 = rm.submitApp(1 * GB, appPriority2); |
| |
| // Submit the third app App3 with priority 8 |
| Priority appPriority3 = Priority.newInstance(8); |
| RMApp app3 = rm.submitApp(1 * GB, appPriority3); |
| |
| // Submit the second app App4 with priority 6 |
| Priority appPriority4 = Priority.newInstance(6); |
| RMApp app4 = rm.submitApp(1 * GB, appPriority4); |
| |
| // Only one app can run as AM resource limit restricts it. Kill app1, |
| // If app3 (highest priority among rest) gets active, it indicates that |
| // priority is working with pendingApplications. |
| rm.killApp(app1.getApplicationId()); |
| rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); |
| |
| // kick the scheduler, app3 (high among pending) gets free space |
| MockAM am3 = MockRM.launchAM(app3, rm, nm1); |
| am3.registerAppAttempt(); |
| |
| // check node report, 1 GB used and 7 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(7 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| rm.stop(); |
| } |
| |
| @Test |
| public void testMaxPriorityValidation() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| // Set Max Application Priority as 10 |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| Priority maxPriority = Priority.newInstance(10); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| Priority appPriority1 = Priority.newInstance(15); |
| rm.registerNode("127.0.0.1:1234", 8 * GB); |
| RMApp app1 = rm.submitApp(1 * GB, appPriority1); |
| |
| // Application submission should be successful and verify priority |
| Assert.assertEquals(app1.getApplicationSubmissionContext().getPriority(), |
| maxPriority); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testUpdatePriorityAtRuntime() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| // Set Max Application Priority as 10 |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| Priority appPriority1 = Priority.newInstance(5); |
| MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); |
| RMApp app1 = rm.submitApp(1 * GB, appPriority1); |
| |
| // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 |
| MockAM am1 = MockRM.launchAM(app1, rm, nm1); |
| am1.registerAppAttempt(); |
| |
| // get scheduler |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| // Change the priority of App1 to 8 |
| Priority appPriority2 = Priority.newInstance(8); |
| UserGroupInformation ugi = UserGroupInformation |
| .createRemoteUser(app1.getUser()); |
| cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, |
| ugi); |
| |
| // get scheduler app |
| FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() |
| .get(app1.getApplicationId()).getCurrentAppAttempt(); |
| |
| // Verify whether the new priority is updated |
| Assert.assertEquals(appPriority2, schedulerAppAttempt.getPriority()); |
| } |
| |
| @Test |
| public void testUpdateInvalidPriorityAtRuntime() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| // Set Max Application Priority as 10 |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| Priority appPriority1 = Priority.newInstance(5); |
| MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); |
| RMApp app1 = rm.submitApp(1 * GB, appPriority1); |
| |
| // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 |
| MockAM am1 = MockRM.launchAM(app1, rm, nm1); |
| am1.registerAppAttempt(); |
| |
| // get scheduler |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| // Change the priority of App1 to 15 |
| Priority appPriority2 = Priority.newInstance(15); |
| UserGroupInformation ugi = UserGroupInformation |
| .createRemoteUser(app1.getUser()); |
| cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, |
| ugi); |
| |
| // get scheduler app |
| FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() |
| .get(app1.getApplicationId()).getCurrentAppAttempt(); |
| |
| // Verify whether priority 15 is reset to 10 |
| Priority appPriority3 = Priority.newInstance(10); |
| Assert.assertEquals(appPriority3, schedulerAppAttempt.getPriority()); |
| rm.stop(); |
| } |
| |
| @Test(timeout = 180000) |
| public void testRMRestartWithChangeInPriority() throws Exception { |
| conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, |
| false); |
| conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| |
| // PHASE 1: create state in an RM |
| |
| // start RM |
| MockRM rm1 = new MockRM(conf); |
| MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); |
| rm1.start(); |
| |
| MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, |
| rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| |
| Priority appPriority1 = Priority.newInstance(5); |
| RMApp app1 = rm1.submitApp(1 * GB, appPriority1); |
| |
| // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 |
| MockAM am1 = MockRM.launchAM(app1, rm1, nm1); |
| am1.registerAppAttempt(); |
| |
| // get scheduler |
| CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); |
| |
| // Change the priority of App1 to 8 |
| Priority appPriority2 = Priority.newInstance(8); |
| UserGroupInformation ugi = UserGroupInformation |
| .createRemoteUser(app1.getUser()); |
| cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, |
| ugi); |
| |
| // let things settle down |
| Thread.sleep(1000); |
| |
| // create new RM to represent restart and recover state |
| MockRM rm2 = new MockRM(conf, memStore); |
| |
| // start new RM |
| rm2.start(); |
| // change NM to point to new RM |
| nm1.setResourceTrackerService(rm2.getResourceTrackerService()); |
| |
| // Verify RM Apps after this restart |
| Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); |
| |
| // get scheduler app |
| RMApp loadedApp = rm2.getRMContext().getRMApps() |
| .get(app1.getApplicationId()); |
| |
| // Verify whether priority 15 is reset to 10 |
| Assert.assertEquals(appPriority2, loadedApp.getApplicationPriority()); |
| |
| rm2.stop(); |
| rm1.stop(); |
| } |
| |
| @Test |
| public void testApplicationPriorityAllocationWithChangeInPriority() |
| throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| // Set Max Application Priority as 10 |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| Priority appPriority1 = Priority.newInstance(5); |
| MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); |
| RMApp app1 = rm.submitApp(1 * GB, appPriority1); |
| |
| // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 |
| MockAM am1 = MockRM.launchAM(app1, rm, nm1); |
| am1.registerAppAttempt(); |
| |
| // add request for containers and wait for containers to be allocated. |
| int NUM_CONTAINERS = 7; |
| List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1", |
| NUM_CONTAINERS, 2 * GB, nm1); |
| |
| Assert.assertEquals(7, allocated1.size()); |
| Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); |
| |
| // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available |
| SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( |
| nm1.getNodeId()); |
| Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // Submit the second app App2 with priority 8 (Higher than App1) |
| Priority appPriority2 = Priority.newInstance(8); |
| RMApp app2 = rm.submitApp(1 * GB, appPriority2); |
| |
| // kick the scheduler, 1 GB which was free is given to AM of App2 |
| MockAM am2 = MockRM.launchAM(app2, rm, nm1); |
| am2.registerAppAttempt(); |
| |
| // check node report, 16 GB used and 0 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // get scheduler |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| // get scheduler app |
| FiCaSchedulerApp schedulerAppAttemptApp1 = cs.getSchedulerApplications() |
| .get(app1.getApplicationId()).getCurrentAppAttempt(); |
| // kill 2 containers to free up some space |
| int counter = 0; |
| for (Iterator<Container> iterator = allocated1.iterator(); iterator |
| .hasNext();) { |
| Container c = iterator.next(); |
| if (++counter > 2) { |
| break; |
| } |
| cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); |
| iterator.remove(); |
| } |
| |
| // check node report, 12 GB used and 4 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // add request for containers App1 |
| am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>()); |
| |
| // add request for containers App2 and wait for containers to get allocated |
| List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1", |
| 2, 2 * GB, nm1); |
| |
| Assert.assertEquals(2, allocated2.size()); |
| // check node report, 16 GB used and 0 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // kill 1 more |
| counter = 0; |
| for (Iterator<Container> iterator = allocated1.iterator(); iterator |
| .hasNext();) { |
| Container c = iterator.next(); |
| if (++counter > 1) { |
| break; |
| } |
| cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); |
| iterator.remove(); |
| } |
| |
| // check node report, 14 GB used and 2 GB available |
| report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); |
| Assert.assertEquals(14 * GB, report_nm1.getUsedResource().getMemorySize()); |
| Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize()); |
| |
| // Change the priority of App1 to 3 (lowest) |
| Priority appPriority3 = Priority.newInstance(3); |
| UserGroupInformation ugi = UserGroupInformation |
| .createRemoteUser(app2.getUser()); |
| cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null, |
| ugi); |
| |
| // add request for containers App2 |
| am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>()); |
| |
| // add request for containers App1 and wait for containers to get allocated |
| // since priority is more for App1 now, App1 will get a container. |
| List<Container> allocated3 = am1.allocateAndWaitForContainers("127.0.0.1", |
| 1, 2 * GB, nm1); |
| |
| Assert.assertEquals(1, allocated3.size()); |
| // Now App1 will have 5 containers and 1 AM. App2 will have 2 containers. |
| Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); |
| rm.stop(); |
| } |
| |
| /** |
| * <p> |
| * Test case verifies the order of applications activated after RM Restart. |
| * </p> |
| * <li>App-1 and app-2 submitted and scheduled and running with a priority |
| * 5 and 6 Respectively</li> |
| * <li>App-3 submitted and scheduled with a priority 7. This |
| * is not activated since AMResourceLimit is reached</li> |
| * <li>RM restarted</li> |
| * <li>App-1 get activated nevertheless of AMResourceLimit</li> |
| * <li>App-2 and app-3 put in pendingOrderingPolicy</li> |
| * <li>After NM registration, app-3 is activated</li> |
| * <p> |
| * Expected Output : App-2 must get activated since app-2 was running earlier |
| * </p> |
| * @throws Exception |
| */ |
| @Test |
| public void testOrderOfActivatingThePriorityApplicationOnRMRestart() |
| throws Exception { |
| conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); |
| conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| |
| MockRM rm1 = new MockRM(conf); |
| MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); |
| rm1.start(); |
| |
| MockNM nm1 = |
| new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| rm1.drainEvents(); |
| |
| ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); |
| LeafQueue defaultQueue = |
| (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); |
| int memory = (int) (defaultQueue.getAMResourceLimit().getMemorySize() / 2); |
| |
| // App-1 with priority 5 submitted and running |
| Priority appPriority1 = Priority.newInstance(5); |
| RMApp app1 = rm1.submitApp(memory, appPriority1); |
| MockAM am1 = MockRM.launchAM(app1, rm1, nm1); |
| am1.registerAppAttempt(); |
| |
| // App-2 with priority 6 submitted and running |
| Priority appPriority2 = Priority.newInstance(6); |
| RMApp app2 = rm1.submitApp(memory, appPriority2); |
| MockAM am2 = MockRM.launchAM(app2, rm1, nm1); |
| am2.registerAppAttempt(); |
| |
| rm1.drainEvents(); |
| Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); |
| Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); |
| |
| // App-3 with priority 7 submitted and scheduled. But not activated since |
| // AMResourceLimit threshold |
| Priority appPriority3 = Priority.newInstance(7); |
| RMApp app3 = rm1.submitApp(memory, appPriority3); |
| |
| rm1.drainEvents(); |
| Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); |
| Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); |
| |
| Iterator<FiCaSchedulerApp> iterator = |
| defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); |
| FiCaSchedulerApp fcApp2 = iterator.next(); |
| Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), |
| fcApp2.getApplicationAttemptId()); |
| |
| FiCaSchedulerApp fcApp1 = iterator.next(); |
| Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), |
| fcApp1.getApplicationAttemptId()); |
| |
| iterator = defaultQueue.getPendingApplications().iterator(); |
| FiCaSchedulerApp fcApp3 = iterator.next(); |
| Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), |
| fcApp3.getApplicationAttemptId()); |
| |
| // create new RM to represent restart and recover state |
| MockRM rm2 = new MockRM(conf, memStore); |
| |
| // start new RM |
| rm2.start(); |
| // change NM to point to new RM |
| nm1.setResourceTrackerService(rm2.getResourceTrackerService()); |
| |
| // Verify RM Apps after this restart |
| Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); |
| |
| rm2.drainEvents(); |
| scheduler = rm2.getRMContext().getScheduler(); |
| defaultQueue = |
| (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); |
| |
| // wait for all applications to get added to scheduler |
| int count = 50; |
| while (count-- > 0) { |
| if (defaultQueue.getNumPendingApplications() == 3) { |
| break; |
| } |
| Thread.sleep(50); |
| } |
| |
| // Before NM registration, AMResourceLimit threshold is 0. So no |
| // applications get activated. |
| Assert.assertEquals(0, defaultQueue.getNumActiveApplications()); |
| Assert.assertEquals(3, defaultQueue.getNumPendingApplications()); |
| |
| // NM resync to new RM |
| nm1.registerNode(); |
| rm2.drainEvents(); |
| |
| // wait for activating applications |
| count = 50; |
| while (count-- > 0) { |
| if (defaultQueue.getNumActiveApplications() == 2) { |
| break; |
| } |
| Thread.sleep(50); |
| } |
| |
| Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); |
| Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); |
| |
| // verify for order of activated applications iterator |
| iterator = |
| defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); |
| fcApp2 = iterator.next(); |
| Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), |
| fcApp2.getApplicationAttemptId()); |
| |
| fcApp1 = iterator.next(); |
| Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), |
| fcApp1.getApplicationAttemptId()); |
| |
| // verify for pending application iterator. It should be app-3 attempt |
| iterator = defaultQueue.getPendingApplications().iterator(); |
| fcApp3 = iterator.next(); |
| Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), |
| fcApp3.getApplicationAttemptId()); |
| |
| rm2.stop(); |
| rm1.stop(); |
| } |
| |
| @Test(timeout = 120000) |
| public void testUpdatePriorityOnPendingAppAndKillAttempt() throws Exception { |
| int maxPriority = 10; |
| int appPriority = 5; |
| YarnConfiguration conf = new YarnConfiguration(); |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, |
| maxPriority); |
| MockRM rm = new MockRM(conf); |
| rm.init(conf); |
| rm.start(); |
| |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| CSQueue defaultQueue = (LeafQueue) cs.getQueue("default"); |
| |
| // Update priority and kill application with no resource |
| RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); |
| Collection<FiCaSchedulerApp> appsPending = |
| ((LeafQueue) defaultQueue).getPendingApplications(); |
| Collection<FiCaSchedulerApp> activeApps = |
| ((LeafQueue) defaultQueue).getOrderingPolicy().getSchedulableEntities(); |
| |
| // Verify app is in pending state |
| Assert.assertEquals("Pending apps should be 1", 1, appsPending.size()); |
| Assert.assertEquals("Active apps should be 0", 0, activeApps.size()); |
| |
| // kill app1 which is pending |
| killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app1); |
| |
| // Check ordering policy size when resource is added |
| MockNM nm1 = |
| new MockNM("127.0.0.1:1234", 8096, rm.getResourceTrackerService()); |
| nm1.registerNode(); |
| RMApp app2 = rm.submitApp(1024, Priority.newInstance(appPriority)); |
| Assert.assertEquals("Pending apps should be 0", 0, appsPending.size()); |
| Assert.assertEquals("Active apps should be 1", 1, activeApps.size()); |
| RMApp app3 = rm.submitApp(1024, Priority.newInstance(appPriority)); |
| RMApp app4 = rm.submitApp(1024, Priority.newInstance(appPriority)); |
| Assert.assertEquals("Pending apps should be 2", 2, appsPending.size()); |
| Assert.assertEquals("Active apps should be 1", 1, activeApps.size()); |
| // kill app3, pending apps should reduce to 1 |
| killAppAndVerifyOrderingPolicy(rm, defaultQueue, 1, 1, app3); |
| // kill app2, running apps is killed and pending added to running |
| killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 1, app2); |
| // kill app4, all apps are killed and both policy size should be zero |
| killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app4); |
| rm.stop(); |
| } |
| |
| private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue, |
| int appsPendingExpected, int activeAppsExpected, RMApp app) |
| throws YarnException { |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| UserGroupInformation ugi = UserGroupInformation |
| .createRemoteUser(app.getUser()); |
| cs.updateApplicationPriority(Priority.newInstance(2), |
| app.getApplicationId(), null, ugi); |
| SchedulerEvent removeAttempt; |
| removeAttempt = new AppAttemptRemovedSchedulerEvent( |
| app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, |
| false); |
| cs.handle(removeAttempt); |
| rm.drainEvents(); |
| Collection<FiCaSchedulerApp> appsPending = |
| ((LeafQueue) defaultQueue).getPendingApplications(); |
| Collection<FiCaSchedulerApp> activeApps = |
| ((LeafQueue) defaultQueue).getApplications(); |
| Assert.assertEquals("Pending apps should be " + appsPendingExpected, |
| appsPendingExpected, appsPending.size()); |
| Assert.assertEquals("Active apps should be " + activeAppsExpected, |
| activeAppsExpected, activeApps.size()); |
| } |
| |
| } |