| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockAM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Set; |
| |
| public class TestCapacitySchedulerWeightMode { |
| private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; |
| private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; |
| private static final String A1 = A + ".a1"; |
| private static final String B1 = B + ".b1"; |
| private static final String B2 = B + ".b2"; |
| |
| private YarnConfiguration conf; |
| |
| RMNodeLabelsManager mgr; |
| |
| @Before |
| public void setUp() throws Exception { |
| conf = new YarnConfiguration(); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| mgr = new NullRMNodeLabelsManager(); |
| mgr.init(conf); |
| } |
| |
| public static <E> Set<E> toSet(E... elements) { |
| Set<E> set = Sets.newHashSet(elements); |
| return set; |
| } |
| |
| /* |
| * Queue structure: |
| * root (*) |
| * ________________ |
| * / \ |
| * a x(weight=100), y(w=50) b y(w=50), z(w=100) |
| * ________________ ______________ |
| * / / \ |
| * a1 ([x,y]: w=100) b1(no) b2([y,z]: w=100) |
| */ |
| public static Configuration getCSConfWithQueueLabelsWeightOnly( |
| Configuration config) { |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( |
| config); |
| |
| // Define top-level queues |
| conf.setQueues(CapacitySchedulerConfiguration.ROOT, |
| new String[] { "a", "b" }); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100); |
| |
| conf.setLabeledQueueWeight(A, RMNodeLabelsManager.NO_LABEL, 1); |
| conf.setMaximumCapacity(A, 10); |
| conf.setAccessibleNodeLabels(A, toSet("x", "y")); |
| conf.setLabeledQueueWeight(A, "x", 100); |
| conf.setLabeledQueueWeight(A, "y", 50); |
| |
| conf.setLabeledQueueWeight(B, RMNodeLabelsManager.NO_LABEL, 9); |
| conf.setMaximumCapacity(B, 100); |
| conf.setAccessibleNodeLabels(B, toSet("y", "z")); |
| conf.setLabeledQueueWeight(B, "y", 50); |
| conf.setLabeledQueueWeight(B, "z", 100); |
| |
| // Define 2nd-level queues |
| conf.setQueues(A, new String[] { "a1" }); |
| conf.setLabeledQueueWeight(A1, RMNodeLabelsManager.NO_LABEL, 100); |
| conf.setMaximumCapacity(A1, 100); |
| conf.setAccessibleNodeLabels(A1, toSet("x", "y")); |
| conf.setDefaultNodeLabelExpression(A1, "x"); |
| conf.setLabeledQueueWeight(A1, "x", 100); |
| conf.setLabeledQueueWeight(A1, "y", 100); |
| |
| conf.setQueues(B, new String[] { "b1", "b2" }); |
| conf.setLabeledQueueWeight(B1, RMNodeLabelsManager.NO_LABEL, 50); |
| conf.setMaximumCapacity(B1, 50); |
| conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET); |
| |
| conf.setLabeledQueueWeight(B2, RMNodeLabelsManager.NO_LABEL, 50); |
| conf.setMaximumCapacity(B2, 50); |
| conf.setAccessibleNodeLabels(B2, toSet("y", "z")); |
| conf.setLabeledQueueWeight(B2, "y", 100); |
| conf.setLabeledQueueWeight(B2, "z", 100); |
| |
| return conf; |
| } |
| |
| /* |
| * Queue structure: |
| * root (*) |
| * _______________________ |
| * / \ |
| * a x(weight=100), y(w=50) b y(w=50), z(w=100) |
| * ________________ ______________ |
| * / / \ |
| * a1 ([x,y]: pct=100%) b1(no) b2([y,z]: percent=100%) |
| * |
| * Parent uses weight, child uses percentage |
| */ |
| public static Configuration getCSConfWithLabelsParentUseWeightChildUsePct( |
| Configuration config) { |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( |
| config); |
| |
| // Define top-level queues |
| conf.setQueues(CapacitySchedulerConfiguration.ROOT, |
| new String[] { "a", "b" }); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100); |
| |
| conf.setLabeledQueueWeight(A, RMNodeLabelsManager.NO_LABEL, 1); |
| conf.setMaximumCapacity(A, 10); |
| conf.setAccessibleNodeLabels(A, toSet("x", "y")); |
| conf.setLabeledQueueWeight(A, "x", 100); |
| conf.setLabeledQueueWeight(A, "y", 50); |
| |
| conf.setLabeledQueueWeight(B, RMNodeLabelsManager.NO_LABEL, 9); |
| conf.setMaximumCapacity(B, 100); |
| conf.setAccessibleNodeLabels(B, toSet("y", "z")); |
| conf.setLabeledQueueWeight(B, "y", 50); |
| conf.setLabeledQueueWeight(B, "z", 100); |
| |
| // Define 2nd-level queues |
| conf.setQueues(A, new String[] { "a1" }); |
| conf.setCapacityByLabel(A1, RMNodeLabelsManager.NO_LABEL, 100); |
| conf.setMaximumCapacity(A1, 100); |
| conf.setAccessibleNodeLabels(A1, toSet("x", "y")); |
| conf.setDefaultNodeLabelExpression(A1, "x"); |
| conf.setCapacityByLabel(A1, "x", 100); |
| conf.setCapacityByLabel(A1, "y", 100); |
| |
| conf.setQueues(B, new String[] { "b1", "b2" }); |
| conf.setCapacityByLabel(B1, RMNodeLabelsManager.NO_LABEL, 50); |
| conf.setMaximumCapacity(B1, 50); |
| conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET); |
| |
| conf.setCapacityByLabel(B2, RMNodeLabelsManager.NO_LABEL, 50); |
| conf.setMaximumCapacity(B2, 50); |
| conf.setAccessibleNodeLabels(B2, toSet("y", "z")); |
| conf.setCapacityByLabel(B2, "y", 100); |
| conf.setCapacityByLabel(B2, "z", 100); |
| |
| return conf; |
| } |
| |
| /* |
| * Queue structure: |
| * root (*) |
| * _______________________ |
| * / \ |
| * a x(=100%), y(50%) b y(=50%), z(=100%) |
| * ________________ ______________ |
| * / / \ |
| * a1 ([x,y]: w=100) b1(no) b2([y,z]: w=100) |
| * |
| * Parent uses weight, child uses percentage |
| */ |
| public static Configuration getCSConfWithLabelsParentUsePctChildUseWeight( |
| Configuration config) { |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( |
| config); |
| |
| // Define top-level queues |
| conf.setQueues(CapacitySchedulerConfiguration.ROOT, |
| new String[] { "a", "b" }); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100); |
| conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100); |
| |
| conf.setCapacityByLabel(A, RMNodeLabelsManager.NO_LABEL, 10); |
| conf.setMaximumCapacity(A, 10); |
| conf.setAccessibleNodeLabels(A, toSet("x", "y")); |
| conf.setCapacityByLabel(A, "x", 100); |
| conf.setCapacityByLabel(A, "y", 50); |
| |
| conf.setCapacityByLabel(B, RMNodeLabelsManager.NO_LABEL, 90); |
| conf.setMaximumCapacity(B, 100); |
| conf.setAccessibleNodeLabels(B, toSet("y", "z")); |
| conf.setCapacityByLabel(B, "y", 50); |
| conf.setCapacityByLabel(B, "z", 100); |
| |
| // Define 2nd-level queues |
| conf.setQueues(A, new String[] { "a1" }); |
| conf.setCapacityByLabel(A1, RMNodeLabelsManager.NO_LABEL, 100); |
| conf.setMaximumCapacity(A1, 100); |
| conf.setAccessibleNodeLabels(A1, toSet("x", "y")); |
| conf.setDefaultNodeLabelExpression(A1, "x"); |
| conf.setCapacityByLabel(A1, "x", 100); |
| conf.setCapacityByLabel(A1, "y", 100); |
| |
| conf.setQueues(B, new String[] { "b1", "b2" }); |
| conf.setCapacityByLabel(B1, RMNodeLabelsManager.NO_LABEL, 50); |
| conf.setMaximumCapacity(B1, 50); |
| conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET); |
| |
| conf.setCapacityByLabel(B2, RMNodeLabelsManager.NO_LABEL, 50); |
| conf.setMaximumCapacity(B2, 50); |
| conf.setAccessibleNodeLabels(B2, toSet("y", "z")); |
| conf.setCapacityByLabel(B2, "y", 100); |
| conf.setCapacityByLabel(B2, "z", 100); |
| |
| return conf; |
| } |
| |
| /** |
| * This is an identical test of |
| * @see {@link TestNodeLabelContainerAllocation#testContainerAllocateWithComplexLabels()} |
| * The only difference is, instead of using label, it uses weight mode |
| * @throws Exception |
| */ |
| @Test(timeout = 300000) |
| public void testContainerAllocateWithComplexLabelsWeightOnly() throws Exception { |
| internalTestContainerAlloationWithNodeLabel( |
| getCSConfWithQueueLabelsWeightOnly(conf)); |
| } |
| |
| /** |
| * This is an identical test of |
| * @see {@link TestNodeLabelContainerAllocation#testContainerAllocateWithComplexLabels()} |
| * The only difference is, instead of using label, it uses weight mode: |
| * Parent uses weight, child uses percent |
| * @throws Exception |
| */ |
| @Test(timeout = 300000) |
| public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed1() throws Exception { |
| internalTestContainerAlloationWithNodeLabel( |
| getCSConfWithLabelsParentUseWeightChildUsePct(conf)); |
| } |
| |
| /** |
| * This is an identical test of |
| * @see {@link TestNodeLabelContainerAllocation#testContainerAllocateWithComplexLabels()} |
| * The only difference is, instead of using label, it uses weight mode: |
| * Parent uses percent, child uses weight |
| * @throws Exception |
| */ |
| @Test(timeout = 300000) |
| public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed2() throws Exception { |
| internalTestContainerAlloationWithNodeLabel( |
| getCSConfWithLabelsParentUsePctChildUseWeight(conf)); |
| } |
| |
| /** |
| * This checks whether the parent prints the correct log about the |
| * configured mode. |
| */ |
| @Test(timeout = 300000) |
| public void testGetCapacityOrWeightStringUsingWeights() throws IOException { |
| try (MockRM rm = new MockRM( |
| getCSConfWithQueueLabelsWeightOnly(conf))) { |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| String capacityOrWeightString = ((ParentQueue) cs.getQueue(A)) |
| .getCapacityOrWeightString(); |
| validateCapacityOrWeightString(capacityOrWeightString, true); |
| |
| capacityOrWeightString = ((LeafQueue) cs.getQueue(A1)) |
| .getCapacityOrWeightString(); |
| validateCapacityOrWeightString(capacityOrWeightString, true); |
| |
| capacityOrWeightString = ((LeafQueue) cs.getQueue(A1)) |
| .getExtendedCapacityOrWeightString(); |
| validateCapacityOrWeightString(capacityOrWeightString, true); |
| } |
| } |
| |
| /** |
| * This checks whether the parent prints the correct log about the |
| * configured mode. |
| */ |
| @Test(timeout = 300000) |
| public void testGetCapacityOrWeightStringParentPctLeafWeights() |
| throws IOException { |
| try (MockRM rm = new MockRM( |
| getCSConfWithLabelsParentUseWeightChildUsePct(conf))) { |
| rm.start(); |
| CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| |
| String capacityOrWeightString = ((ParentQueue) cs.getQueue(A)) |
| .getCapacityOrWeightString(); |
| validateCapacityOrWeightString(capacityOrWeightString, true); |
| |
| capacityOrWeightString = ((LeafQueue) cs.getQueue(A1)) |
| .getCapacityOrWeightString(); |
| validateCapacityOrWeightString(capacityOrWeightString, false); |
| |
| capacityOrWeightString = ((LeafQueue) cs.getQueue(A1)) |
| .getExtendedCapacityOrWeightString(); |
| validateCapacityOrWeightString(capacityOrWeightString, false); |
| } |
| } |
| |
| private void internalTestContainerAlloationWithNodeLabel(Configuration csConf) |
| throws Exception { |
| /* |
| * Queue structure: |
| * root (*) |
| * ________________ |
| * / \ |
| * a x(100%), y(50%) b y(50%), z(100%) |
| * ________________ ______________ |
| * / / \ |
| * a1 (x,y) b1(no) b2(y,z) |
| * 100% y = 100%, z = 100% |
| * |
| * Node structure: |
| * h1 : x |
| * h2 : y |
| * h3 : y |
| * h4 : z |
| * h5 : NO |
| * |
| * Total resource: |
| * x: 4G |
| * y: 6G |
| * z: 2G |
| * *: 2G |
| * |
| * Resource of |
| * a1: x=4G, y=3G, NO=0.2G |
| * b1: NO=0.9G (max=1G) |
| * b2: y=3, z=2G, NO=0.9G (max=1G) |
| * |
| * Each node can only allocate two containers |
| */ |
| |
| // set node -> label |
| mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z")); |
| mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), |
| toSet("x"), NodeId.newInstance("h2", 0), toSet("y"), |
| NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0), |
| toSet("z"), NodeId.newInstance("h5", 0), |
| RMNodeLabelsManager.EMPTY_STRING_SET)); |
| |
| // inject node label manager |
| MockRM rm1 = new MockRM(csConf) { |
| @Override |
| public RMNodeLabelsManager createNodeLabelManager() { |
| return mgr; |
| } |
| }; |
| |
| rm1.getRMContext().setNodeLabelManager(mgr); |
| rm1.start(); |
| MockNM nm1 = rm1.registerNode("h1:1234", 2048); |
| MockNM nm2 = rm1.registerNode("h2:1234", 2048); |
| MockNM nm3 = rm1.registerNode("h3:1234", 2048); |
| MockNM nm4 = rm1.registerNode("h4:1234", 2048); |
| MockNM nm5 = rm1.registerNode("h5:1234", 2048); |
| |
| ContainerId containerId; |
| |
| // launch an app to queue a1 (label = x), and check all container will |
| // be allocated in h1 |
| MockRMAppSubmissionData data2 = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("a1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm1, data2); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| |
| // request a container (label = y). can be allocated on nm2 |
| am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); |
| containerId = |
| ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L); |
| Assert.assertTrue(rm1.waitForState(nm2, containerId, |
| RMContainerState.ALLOCATED)); |
| checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, |
| "h2"); |
| |
| // launch an app to queue b1 (label = y), and check all container will |
| // be allocated in h5 |
| MockRMAppSubmissionData data1 = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("b1") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app2 = MockRMAppSubmitter.submit(rm1, data1); |
| MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5); |
| |
| // request a container for AM, will succeed |
| // and now b1's queue capacity will be used, cannot allocate more containers |
| // (Maximum capacity reached) |
| am2.allocate("*", 1024, 1, new ArrayList<ContainerId>()); |
| containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); |
| Assert.assertFalse(rm1.waitForState(nm4, containerId, |
| RMContainerState.ALLOCATED)); |
| Assert.assertFalse(rm1.waitForState(nm5, containerId, |
| RMContainerState.ALLOCATED)); |
| |
| // launch an app to queue b2 |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("b2") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app3 = MockRMAppSubmitter.submit(rm1, data); |
| MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5); |
| |
| // request a container. try to allocate on nm1 (label = x) and nm3 (label = |
| // y,z). Will successfully allocate on nm3 |
| am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); |
| containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); |
| Assert.assertFalse(rm1.waitForState(nm1, containerId, |
| RMContainerState.ALLOCATED)); |
| Assert.assertTrue(rm1.waitForState(nm3, containerId, |
| RMContainerState.ALLOCATED)); |
| checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, |
| "h3"); |
| |
| // try to allocate container (request label = z) on nm4 (label = y,z). |
| // Will successfully allocate on nm4 only. |
| am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z"); |
| containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L); |
| Assert.assertTrue(rm1.waitForState(nm4, containerId, |
| RMContainerState.ALLOCATED)); |
| checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, |
| "h4"); |
| |
| rm1.close(); |
| } |
| |
| private void checkTaskContainersHost(ApplicationAttemptId attemptId, |
| ContainerId containerId, ResourceManager rm, String host) { |
| YarnScheduler scheduler = rm.getRMContext().getScheduler(); |
| SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId); |
| |
| Assert.assertTrue(appReport.getLiveContainers().size() > 0); |
| for (RMContainer c : appReport.getLiveContainers()) { |
| if (c.getContainerId().equals(containerId)) { |
| Assert.assertEquals(host, c.getAllocatedNode().getHost()); |
| } |
| } |
| } |
| |
| private void validateCapacityOrWeightString(String capacityOrWeightString, |
| boolean shouldContainWeight) { |
| Assert.assertEquals(shouldContainWeight, |
| capacityOrWeightString.contains("weight")); |
| Assert.assertEquals(shouldContainWeight, |
| capacityOrWeightString.contains("normalizedWeight")); |
| Assert.assertEquals(!shouldContainWeight, |
| capacityOrWeightString.contains("capacity")); |
| |
| } |
| } |