blob: 65bce5c9318a8dbbcd13250e85c7069fdde93007 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import static java.util.Collections.emptySet;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSet;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assume.assumeThat;
public class TestQueueParsing {
private static final Logger LOG =
LoggerFactory.getLogger(TestQueueParsing.class);
private static final String A_PATH = CapacitySchedulerConfiguration.ROOT + ".a";
private static final String B_PATH = CapacitySchedulerConfiguration.ROOT + ".b";
private static final String C_PATH = CapacitySchedulerConfiguration.ROOT + ".c";
private static final String A1_PATH = A_PATH + ".a1";
private static final String A2_PATH = A_PATH + ".a2";
private static final String B1_PATH = B_PATH + ".b1";
private static final String B2_PATH = B_PATH + ".b2";
private static final String B3_PATH = B_PATH + ".b3";
private static final String B4_PATH = B_PATH + ".b4";
private static final String C1_PATH = C_PATH + ".c1";
private static final String C2_PATH = C_PATH + ".c2";
private static final String C3_PATH = C_PATH + ".c3";
private static final String C4_PATH = C_PATH + ".c4";
private static final String C11_PATH = C1_PATH + ".c11";
private static final String C12_PATH = C1_PATH + ".c12";
private static final String C13_PATH = C1_PATH + ".c13";
private static final String AX_PATH = "root.a.x";
private static final String AY_PATH = "root.a.y";
private static final String X_PATH = "root.x";
private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
private static final QueuePath A = new QueuePath(A_PATH);
private static final QueuePath B = new QueuePath(B_PATH);
private static final QueuePath C = new QueuePath(C_PATH);
private static final QueuePath A1 = new QueuePath(A1_PATH);
private static final QueuePath A2 = new QueuePath(A2_PATH);
private static final QueuePath B1 = new QueuePath(B1_PATH);
private static final QueuePath B2 = new QueuePath(B2_PATH);
private static final QueuePath B3 = new QueuePath(B3_PATH);
private static final QueuePath B4 = new QueuePath(B4_PATH);
private static final QueuePath C1 = new QueuePath(C1_PATH);
private static final QueuePath C2 = new QueuePath(C2_PATH);
private static final QueuePath C3 = new QueuePath(C3_PATH);
private static final QueuePath C4 = new QueuePath(C4_PATH);
private static final QueuePath C11 = new QueuePath(C11_PATH);
private static final QueuePath C12 = new QueuePath(C12_PATH);
private static final QueuePath C13 = new QueuePath(C13_PATH);
private static final QueuePath AX = new QueuePath(AX_PATH);
private static final QueuePath AY = new QueuePath(AY_PATH);
private static final QueuePath X = new QueuePath(X_PATH);
private static final double DELTA = 0.001;
private static final int GB = 1024;
private RMNodeLabelsManager nodeLabelManager;
@Before
public void setup() throws Exception{
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
Resource clusterResource = Resources.createResource(2 * 8 * GB, 2 * 16);
nodeLabelManager = new NullRMNodeLabelsManager();
nodeLabelManager.init(conf);
((NullRMNodeLabelsManager)nodeLabelManager)
.setResourceForLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource);
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {"a", "b", "c"});
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setCapacity(B, 20);
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 85);
conf.setCapacity(B2, 30);
conf.setMaximumCapacity(B2, 35);
conf.setCapacity(B3, 20);
conf.setMaximumCapacity(B3, 35);
conf.setQueues(C, new String[] {"c1", "c2", "c3", "c4"});
conf.setCapacity(C1, 50);
conf.setMaximumCapacity(C1, 55);
conf.setCapacity(C2, 10);
conf.setMaximumCapacity(C2, 25);
conf.setCapacity(C3, 35);
conf.setMaximumCapacity(C3, 38);
conf.setCapacity(C4, 5);
conf.setMaximumCapacity(C4, 5);
LOG.info("Setup 2nd-level queues");
// Define 3rd-level queues
conf.setQueues(C1, new String[] {"c11", "c12", "c13"});
conf.setCapacity(C11, 15);
conf.setMaximumCapacity(C11, 30);
conf.setCapacity(C12, 45);
conf.setMaximumCapacity(C12, 70);
conf.setCapacity(C13, 40);
conf.setMaximumCapacity(C13, 40);
LOG.info("Setup 3rd-level queues");
}
private void setupQueueConfigurationWithSpacesShouldBeTrimmed(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.set(
QueuePrefixes.getQueuePrefix(ROOT)
+ CapacitySchedulerConfiguration.QUEUES, " a ,b, c");
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setCapacity(B, 20);
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
}
private void setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.set(
QueuePrefixes.getQueuePrefix(ROOT)
+ CapacitySchedulerConfiguration.QUEUES, " a ,b, c");
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setCapacity(B, 20);
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
// sub queues for A
conf.set(QueuePrefixes.getQueuePrefix(A)
+ CapacitySchedulerConfiguration.QUEUES, "a1, a2 ");
conf.setCapacity(A1, 60);
conf.setCapacity(A2, 40);
}
private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {"a", "b"});
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setCapacity(B, 90);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 85);
conf.setCapacity(B2, 30);
conf.setMaximumCapacity(B2, 35);
conf.setCapacity(B3, 20);
conf.setMaximumCapacity(B3, 35);
}
private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(ROOT, "red", 100);
conf.setCapacityByLabel(ROOT, "blue", 100);
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setCapacity(B, 90);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 50);
conf.setMaximumCapacityByLabel(A, "red", 50);
conf.setCapacityByLabel(A, "blue", 50);
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacityByLabel(A1, "red", 50);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 50);
conf.setMaximumCapacityByLabel(A2, "red", 60);
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 50);
conf.setCapacityByLabel(B, "blue", 50);
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 85);
conf.setCapacityByLabel(B1, "red", 50);
conf.setCapacityByLabel(B1, "blue", 50);
conf.setCapacity(B2, 30);
conf.setMaximumCapacity(B2, 35);
conf.setCapacityByLabel(B2, "red", 25);
conf.setCapacityByLabel(B2, "blue", 25);
conf.setCapacity(B3, 20);
conf.setMaximumCapacity(B3, 35);
conf.setCapacityByLabel(B3, "red", 25);
conf.setCapacityByLabel(B3, "blue", 25);
}
private void setupQueueConfigurationWithLabelsAndReleaseCheck(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(ROOT, "red", 100);
conf.setCapacityByLabel(ROOT, "blue", 100);
// The cap <= max-cap check is not needed
conf.setCapacity(A, 50);
conf.setMaximumCapacity(A, 100);
conf.setCapacity(B, 50);
conf.setMaximumCapacity(B, 100);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 50);
conf.setMaximumCapacityByLabel(A, "red", 100);
conf.setCapacityByLabel(A, "blue", 30);
conf.setMaximumCapacityByLabel(A, "blue", 50);
conf.setCapacity(A1, 60);
conf.setMaximumCapacity(A1, 60);
conf.setCapacityByLabel(A1, "red", 60);
conf.setMaximumCapacityByLabel(A1, "red", 30);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setMaximumCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 40);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 40);
conf.setMaximumCapacityByLabel(A2, "red", 60);
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 50);
conf.setMaximumCapacityByLabel(B, "red", 100);
conf.setCapacityByLabel(B, "blue", 70);
conf.setMaximumCapacityByLabel(B, "blue", 100);
conf.setCapacity(B1, 10);
conf.setMaximumCapacity(B1, 10);
conf.setCapacityByLabel(B1, "red", 60);
conf.setMaximumCapacityByLabel(B1, "red", 30);
conf.setCapacityByLabel(B1, "blue", 50);
conf.setMaximumCapacityByLabel(B1, "blue", 100);
conf.setCapacity(B2, 80);
conf.setMaximumCapacity(B2, 40);
conf.setCapacityByLabel(B2, "red", 30);
conf.setCapacityByLabel(B2, "blue", 25);
conf.setCapacity(B3, 10);
conf.setMaximumCapacity(B3, 25);
conf.setCapacityByLabel(B3, "red", 10);
conf.setCapacityByLabel(B3, "blue", 25);
}
private void setupQueueConfigurationWithLabelsInherit(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(ROOT, "red", 100);
conf.setCapacityByLabel(ROOT, "blue", 100);
// Set A configuration
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 100);
conf.setCapacityByLabel(A, "blue", 100);
// Set B configuraiton
conf.setCapacity(B, 90);
conf.setAccessibleNodeLabels(B, CommonNodeLabelsManager.EMPTY_STRING_SET);
// Define 2nd-level queues
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacityByLabel(A1, "red", 50);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 50);
}
private void setupQueueConfigurationWithSingleLevel(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {"a", "b"});
// Set A configuration
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 90);
conf.setCapacityByLabel(A, "blue", 90);
// Set B configuraiton
conf.setCapacity(B, 90);
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 10);
conf.setCapacityByLabel(B, "blue", 10);
}
public MockRM createMockRMWithoutLabels(YarnConfiguration conf) throws Exception {
return createMockRMWithLabels(conf, emptySet());
}
public MockRM createMockRMWithLabels(
YarnConfiguration conf, Set<String> nodeLabels) throws Exception {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(nodeLabels);
Map<NodeId, Set<String>> nodeIdMap = new HashMap<>();
int i = 0;
for (String label : nodeLabels) {
nodeIdMap.put(NodeId.newInstance("h" + ++i, 0), toSet(label));
}
nodeLabelManager.addLabelsToNode(nodeIdMap);
MockRM rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelManager;
}
};
rm.getRMContext().setNodeLabelManager(nodeLabelManager);
rm.start();
for (NodeId key : nodeIdMap.keySet()) {
rm.registerNode(key.toString(), 8 * GB);
}
return rm;
}
@Test
public void testQueueParsing() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue b1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
Assert.assertEquals("Parent B has no MAX_CAP",
0.85, b1.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c12 = capacityScheduler.getQueue("c12");
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.7 * 0.55 * 0.7,
c12.getAbsoluteMaximumCapacity(), DELTA);
}
@Test (expected=java.lang.IllegalArgumentException.class)
public void testRootQueueParsing() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
// non-100 percent value will throw IllegalArgumentException
csConf.setCapacity(ROOT, 90);
}
@Test
public void testQueueParsingReinitializeWithLabels() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
csConf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabels(csConf);
conf = new YarnConfiguration(csConf);
capacityScheduler.reinitialize(conf, rm.getRMContext());
checkQueueLabels(capacityScheduler);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueParsingWithLabels() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
checkQueueLabels(capacityScheduler);
ServiceOperations.stopQuietly(rm);
}
private void checkQueueLabels(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
Assert.assertEquals(0.5, qA2.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.125, qB3.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
}
@Test
public void testQueueParsingWithLeafQueueDisableElasticity() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabelsAndReleaseCheck(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
checkQueueLabelsWithLeafQueueDisableElasticity(capacityScheduler);
ServiceOperations.stopQuietly(rm);
}
private void checkQueueLabelsWithLeafQueueDisableElasticity(
CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.4, qA2.getCapacity(), DELTA);
Assert.assertEquals(0.4, qA2.getQueueCapacities()
.getCapacity("red"), DELTA);
Assert.assertEquals(0.2, qA2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.2, qA2.getQueueCapacities()
.getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.85, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.6, qA2.getQueueCapacities()
.getAbsoluteMaximumCapacity("red"), DELTA);
// check disable elasticity at leaf queue level without label
CSQueue qB2 = capacityScheduler.getQueue("b2");
Assert.assertEquals(0.4, qB2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.4, qB2.getAbsoluteMaximumCapacity(), DELTA);
// check disable elasticity at leaf queue level with label
CSQueue qA1 = capacityScheduler.getQueue("a1");
Assert.assertEquals(0.3, qA1.getQueueCapacities().
getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.3, qA1.getQueueCapacities().
getAbsoluteMaximumCapacity("red"), DELTA);
CSQueue qB1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.3, qB1.getQueueCapacities()
.getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.3, qB1.getQueueCapacities()
.getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.05, qB3.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.175, qB3.getQueueCapacities()
.getAbsoluteCapacity("blue"), DELTA);
Assert.assertEquals(0.25, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities()
.getAbsoluteMaximumCapacity("blue"), DELTA);
}
@Test
public void testQueueParsingWithLabelsInherit() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabelsInherit(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
checkQueueLabelsInheritConfig(capacityScheduler);
ServiceOperations.stopQuietly(rm);
}
private void checkQueueLabelsInheritConfig(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.isEmpty());
}
@Test
public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabelsInherit(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithSingleLevel(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueParsingWhenLabelsNotExist() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueParsingWithUnusedLabels() throws Exception {
// Initialize a cluster with labels, but don't use them, reinitialize
// shouldn't fail
Set<String> labels = ImmutableSet.of("red", "blue");
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
csConf.setAccessibleNodeLabels(ROOT, labels);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, labels);
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
capacityScheduler.reinitialize(csConf, rm.getRMContext());
// check root queue's capacity by label -- they should be all zero
CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
Assert.assertEquals(0, root.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0, root.getQueueCapacities().getCapacity("blue"), DELTA);
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue b1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
Assert.assertEquals("Parent B has no MAX_CAP", 0.85,
b1.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c12 = capacityScheduler.getQueue("c12");
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(),
DELTA);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueParsingShouldTrimSpaces() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertNotNull(a);
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c = capacityScheduler.getQueue("c");
Assert.assertNotNull(c);
Assert.assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
}
@Test
public void testNestedQueueParsingShouldTrimSpaces() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertNotNull(a);
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c = capacityScheduler.getQueue("c");
Assert.assertNotNull(c);
Assert.assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
CSQueue a1 = capacityScheduler.getQueue("a1");
Assert.assertNotNull(a1);
Assert.assertEquals(0.10 * 0.6, a1.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a1.getAbsoluteMaximumCapacity(), DELTA);
CSQueue a2 = capacityScheduler.getQueue("a2");
Assert.assertNotNull(a2);
Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = ServiceStateException.class)
public void testQueueParsingFailWhenSumOfChildrenNonLabeledCapacityNot100Percent()
throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
// If the new queue mode is used it's allowed to leave
// some of the resources of a parent queue unallocated
assumeThat(csConf.isLegacyQueueMode(), is(true));
setupQueueConfiguration(csConf);
csConf.setCapacity(C2, 5);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
ServiceOperations.stopQuietly(rm);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = ServiceStateException.class)
public void testQueueParsingFailWhenSumOfChildrenLabeledCapacityNot100Percent()
throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
// If the new queue mode is used it's allowed to leave
// some of the resources of a parent queue unallocated
assumeThat(csConf.isLegacyQueueMode(), is(true));
setupQueueConfigurationWithLabels(csConf);
csConf.setCapacityByLabel(B3, "red", 24);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
ServiceOperations.stopQuietly(rm);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = ServiceStateException.class)
public void testQueueParsingWithSumOfChildLabelCapacityNot100PercentWithWildCard()
throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
// If the new queue mode is used it's allowed to leave
// some of the resources of a parent queue unallocated
assumeThat(csConf.isLegacyQueueMode(), is(true));
setupQueueConfigurationWithLabels(csConf);
csConf.setCapacityByLabel(B3, "red", 24);
csConf.setAccessibleNodeLabels(ROOT, ImmutableSet.of(RMNodeLabelsManager.ANY));
csConf.setAccessibleNodeLabels(B, ImmutableSet.of(RMNodeLabelsManager.ANY));
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
ServiceOperations.stopQuietly(rm);
}
@Test(expected = IOException.class)
public void testQueueParsingWithMoveQueue() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(ROOT, new String[] {"a"});
csConf.setQueues(A, new String[] {"x", "y"});
csConf.setCapacity(A, 100);
csConf.setCapacity(AX, 50);
csConf.setCapacity(AY, 50);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
csConf.setQueues(ROOT, new String[] {"a", "x"});
csConf.setQueues(A, new String[] {"y"});
csConf.setCapacity(X, 50);
csConf.setCapacity(A, 50);
csConf.setCapacity(AY, 100);
capacityScheduler.reinitialize(csConf, rm.getRMContext());
ServiceOperations.stopQuietly(rm);
}
@Test(timeout = 60000, expected = ServiceStateException.class)
public void testRMStartWrongNodeCapacity() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
Set<String> labels = ImmutableSet.of("x", "y", "z");
// If the new queue mode is used it's allowed to leave
// some of the resources of a parent queue unallocated
assumeThat(csConf.isLegacyQueueMode(), is(true));
// Define top-level queues
csConf.setQueues(ROOT, new String[] {"a"});
csConf.setCapacityByLabel(ROOT, "x", 100);
csConf.setCapacityByLabel(ROOT, "y", 100);
csConf.setCapacityByLabel(ROOT, "z", 100);
csConf.setCapacity(A, 100);
csConf.setAccessibleNodeLabels(A, labels);
csConf.setCapacityByLabel(A, "x", 100);
csConf.setCapacityByLabel(A, "y", 100);
csConf.setCapacityByLabel(A, "z", 70);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, labels);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueOrderingPolicyUpdatedAfterReinitialize() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
// Add a new b4 queue
csConf.setQueues(B,
new String[] { "b1", "b2", "b3", "b4" });
csConf.setCapacity(B4, 0f);
ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b");
checkEqualsToQueueSet(bQ.getChildQueues(),
new String[] { "b1", "b2", "b3" });
capacityScheduler.reinitialize(csConf, rm.getRMContext());
// Check child queue of b
checkEqualsToQueueSet(bQ.getChildQueues(),
new String[] { "b1", "b2", "b3", "b4" });
PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy =
(PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy();
checkEqualsToQueueSet(queueOrderingPolicy.getQueues(),
new String[] { "b1", "b2", "b3", "b4" });
ServiceOperations.stopQuietly(rm);
}
@Test(timeout = 60000)
public void testQueueCapacityWithWeight() throws Exception {
Set<String> labels = ImmutableSet.of("x", "y", "z");
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
// Define top-level queues
csConf.setQueues(ROOT, new String[] {"a"});
csConf.setLabeledQueueWeight(ROOT, "x", 100);
csConf.setLabeledQueueWeight(ROOT, "y", 100);
csConf.setLabeledQueueWeight(ROOT, "z", 100);
csConf.setNonLabeledQueueWeight(A, 100);
csConf.setAccessibleNodeLabels(A, labels);
csConf.setLabeledQueueWeight(A, "x", 100);
csConf.setLabeledQueueWeight(A, "y", 100);
csConf.setLabeledQueueWeight(A, "z", 70);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithLabels(conf, labels);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "", 1f);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "x", 1f);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "y", 1f);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "z", 1f);
verifyQueueAbsCapacity(rm, A_PATH, "", 1f);
verifyQueueAbsCapacity(rm, A_PATH, "x", 1f);
verifyQueueAbsCapacity(rm, A_PATH, "y", 1f);
verifyQueueAbsCapacity(rm, A_PATH, "z", 1f);
ServiceOperations.stopQuietly(rm);
}
@Test
public void testQueueParsingWithDefaultUserLimitValues() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
// Define top-level queues
csConf.setQueues(ROOT, new String[] {"a", "b"});
// Set default value
csConf.setDefaultUserLimit(20);
csConf.setDefaultUserLimitFactor(2.0f);
// Set A configuration and let B use default values
csConf.setCapacity(A, 50);
csConf.setUserLimit(A, 15);
csConf.setUserLimitFactor(A, 1.5f);
csConf.setCapacity(B, 50);
YarnConfiguration conf = new YarnConfiguration(csConf);
MockRM rm = createMockRMWithoutLabels(conf);
// Test
CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
Assert.assertEquals(15,
((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimit(), DELTA);
Assert.assertEquals(1.5,
((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimitFactor(), DELTA);
Assert.assertEquals(20,
((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimit(), DELTA);
Assert.assertEquals(2.0,
((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimitFactor(), DELTA);
// Use hadoop default value
csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(ROOT, new String[] {"a", "b"});
csConf.setCapacity(A, 50);
csConf.setUserLimit(A, 15);
csConf.setUserLimitFactor(A, 1.5f);
csConf.setCapacity(B, 50);
capacityScheduler.reinitialize(csConf, rm.getRMContext());
Assert.assertEquals(15,
((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimit(), DELTA);
Assert.assertEquals(1.5,
((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimitFactor(), DELTA);
Assert.assertEquals(100,
((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimit(), DELTA);
Assert.assertEquals(1,
((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimitFactor(), DELTA);
}
private void verifyQueueAbsCapacity(MockRM rm, String queuePath, String label,
float expectedAbsCapacity) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = cs.getQueue(queuePath);
Assert.assertEquals(expectedAbsCapacity,
queue.getQueueCapacities().getAbsoluteCapacity(label), 1e-6);
}
private void checkEqualsToQueueSet(List<CSQueue> queues, String[] queueNames) {
Set<String> existedQueues = new HashSet<>();
for (CSQueue q : queues) {
existedQueues.add(q.getQueueShortName());
}
for (String q : queueNames) {
Assert.assertTrue(existedQueues.remove(q));
}
Assert.assertTrue(existedQueues.isEmpty());
}
}