blob: add14ab2fdc717a201b503d4f21a6bd4d3e19adc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
public class TestQueueParsing {
private static final Log LOG = LogFactory.getLog(TestQueueParsing.class);
private static final double DELTA = 0.000001;
private RMNodeLabelsManager nodeLabelManager;
@Before
public void setup() {
nodeLabelManager = new NullRMNodeLabelsManager();
nodeLabelManager.init(new YarnConfiguration());
nodeLabelManager.start();
}
@Test
public void testQueueParsing() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(conf);
capacityScheduler.setRMContext(TestUtils.getMockRMContext());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext());
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue b1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
Assert.assertEquals("Parent B has no MAX_CAP",
0.85, b1.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c12 = capacityScheduler.getQueue("c12");
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.7 * 0.55 * 0.7,
c12.getAbsoluteMaximumCapacity(), DELTA);
ServiceOperations.stopQuietly(capacityScheduler);
}
private void setupQueueConfigurationWithSpacesShouldBeTrimmed(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.set(
CapacitySchedulerConfiguration
.getQueuePrefix(CapacitySchedulerConfiguration.ROOT)
+ CapacitySchedulerConfiguration.QUEUES, " a ,b, c");
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
}
private void setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.set(
CapacitySchedulerConfiguration
.getQueuePrefix(CapacitySchedulerConfiguration.ROOT)
+ CapacitySchedulerConfiguration.QUEUES, " a ,b, c");
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
// sub queues for A
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(A)
+ CapacitySchedulerConfiguration.QUEUES, "a1, a2 ");
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a.a1";
conf.setCapacity(A1, 60);
final String A2 = CapacitySchedulerConfiguration.ROOT + ".a.a2";
conf.setCapacity(A2, 40);
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
final String B1 = B + ".b1";
final String B2 = B + ".b2";
final String B3 = B + ".b3";
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 85);
conf.setCapacity(B2, 30);
conf.setMaximumCapacity(B2, 35);
conf.setCapacity(B3, 20);
conf.setMaximumCapacity(B3, 35);
final String C1 = C + ".c1";
final String C2 = C + ".c2";
final String C3 = C + ".c3";
final String C4 = C + ".c4";
conf.setQueues(C, new String[] {"c1", "c2", "c3", "c4"});
conf.setCapacity(C1, 50);
conf.setMaximumCapacity(C1, 55);
conf.setCapacity(C2, 10);
conf.setMaximumCapacity(C2, 25);
conf.setCapacity(C3, 35);
conf.setMaximumCapacity(C3, 38);
conf.setCapacity(C4, 5);
conf.setMaximumCapacity(C4, 5);
LOG.info("Setup 2nd-level queues");
// Define 3rd-level queues
final String C11 = C1 + ".c11";
final String C12 = C1 + ".c12";
final String C13 = C1 + ".c13";
conf.setQueues(C1, new String[] {"c11", "c12", "c13"});
conf.setCapacity(C11, 15);
conf.setMaximumCapacity(C11, 30);
conf.setCapacity(C12, 45);
conf.setMaximumCapacity(C12, 70);
conf.setCapacity(C13, 40);
conf.setMaximumCapacity(C13, 40);
LOG.info("Setup 3rd-level queues");
}
@Test (expected=java.lang.IllegalArgumentException.class)
public void testRootQueueParsing() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
// non-100 percent value will throw IllegalArgumentException
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 90);
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
ServiceOperations.stopQuietly(capacityScheduler);
}
public void testMaxCapacity() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 50);
conf.setMaximumCapacity(A, 60);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 50);
conf.setMaximumCapacity(B, 45); // Should throw an exception
boolean fail = false;
CapacityScheduler capacityScheduler;
try {
capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
} catch (IllegalArgumentException iae) {
fail = true;
}
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong maxCap",
fail);
conf.setMaximumCapacity(B, 60);
// Now this should work
capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
fail = false;
try {
LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A);
a.setMaxCapacity(45);
} catch (IllegalArgumentException iae) {
fail = true;
}
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
"setMaxCap", fail);
capacityScheduler.stop();
}
private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 90);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
final String B1 = B + ".b1";
final String B2 = B + ".b2";
final String B3 = B + ".b3";
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 85);
conf.setCapacity(B2, 30);
conf.setMaximumCapacity(B2, 35);
conf.setCapacity(B3, 20);
conf.setMaximumCapacity(B3, 35);
}
private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 90);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 50);
conf.setMaximumCapacityByLabel(A, "red", 50);
conf.setCapacityByLabel(A, "blue", 50);
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacityByLabel(A1, "red", 50);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 50);
conf.setMaximumCapacityByLabel(A2, "red", 60);
final String B1 = B + ".b1";
final String B2 = B + ".b2";
final String B3 = B + ".b3";
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 50);
conf.setCapacityByLabel(B, "blue", 50);
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 85);
conf.setCapacityByLabel(B1, "red", 50);
conf.setCapacityByLabel(B1, "blue", 50);
conf.setCapacity(B2, 30);
conf.setMaximumCapacity(B2, 35);
conf.setCapacityByLabel(B2, "red", 25);
conf.setCapacityByLabel(B2, "blue", 25);
conf.setCapacity(B3, 20);
conf.setMaximumCapacity(B3, 35);
conf.setCapacityByLabel(B3, "red", 25);
conf.setCapacityByLabel(B3, "blue", 25);
}
private void setupQueueConfigurationWithLabelsAndReleaseCheck
(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
// The cap <= max-cap check is not needed
conf.setCapacity(A, 50);
conf.setMaximumCapacity(A, 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 50);
conf.setMaximumCapacity(B, 100);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 50);
conf.setMaximumCapacityByLabel(A, "red", 100);
conf.setCapacityByLabel(A, "blue", 30);
conf.setMaximumCapacityByLabel(A, "blue", 50);
conf.setCapacity(A1, 60);
conf.setMaximumCapacity(A1, 60);
conf.setCapacityByLabel(A1, "red", 60);
conf.setMaximumCapacityByLabel(A1, "red", 30);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setMaximumCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 40);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 40);
conf.setMaximumCapacityByLabel(A2, "red", 60);
final String B1 = B + ".b1";
final String B2 = B + ".b2";
final String B3 = B + ".b3";
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 50);
conf.setMaximumCapacityByLabel(B, "red", 100);
conf.setCapacityByLabel(B, "blue", 70);
conf.setMaximumCapacityByLabel(B, "blue", 100);
conf.setCapacity(B1, 10);
conf.setMaximumCapacity(B1, 10);
conf.setCapacityByLabel(B1, "red", 60);
conf.setMaximumCapacityByLabel(B1, "red", 30);
conf.setCapacityByLabel(B1, "blue", 50);
conf.setMaximumCapacityByLabel(B1, "blue", 100);
conf.setCapacity(B2, 80);
conf.setMaximumCapacity(B2, 40);
conf.setCapacityByLabel(B2, "red", 30);
conf.setCapacityByLabel(B2, "blue", 25);
conf.setCapacity(B3, 10);
conf.setMaximumCapacity(B3, 25);
conf.setCapacityByLabel(B3, "red", 10);
conf.setCapacityByLabel(B3, "blue", 25);
}
private void setupQueueConfigurationWithLabelsInherit(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
// Set A configuration
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 100);
conf.setCapacityByLabel(A, "blue", 100);
// Set B configuraiton
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 90);
conf.setAccessibleNodeLabels(B, CommonNodeLabelsManager.EMPTY_STRING_SET);
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setCapacity(A1, 30);
conf.setMaximumCapacity(A1, 45);
conf.setCapacityByLabel(A1, "red", 50);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 70);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 50);
}
private void setupQueueConfigurationWithSingleLevel(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
// Set A configuration
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 90);
conf.setCapacityByLabel(A, "blue", 90);
// Set B configuraiton
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 90);
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 10);
conf.setCapacityByLabel(B, "blue", 10);
}
@Test
public void testQueueParsingReinitializeWithLabels() throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("red", "blue"));
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(conf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(conf);
capacityScheduler.start();
csConf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithLabels(csConf);
conf = new YarnConfiguration(csConf);
capacityScheduler.reinitialize(conf, rmContext);
checkQueueLabels(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
private void checkQueueLabels(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
Assert.assertEquals(0.5, qA2.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.125, qB3.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
}
private void checkQueueLabelsWithLeafQueueDisableElasticity
(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.4, qA2.getCapacity(), DELTA);
Assert.assertEquals(0.4, qA2.getQueueCapacities()
.getCapacity("red"), DELTA);
Assert.assertEquals(0.2, qA2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.2, qA2.getQueueCapacities()
.getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.85, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.6, qA2.getQueueCapacities()
.getAbsoluteMaximumCapacity("red"), DELTA);
// check disable elasticity at leaf queue level without label
CSQueue qB2 = capacityScheduler.getQueue("b2");
Assert.assertEquals(0.4, qB2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.4, qB2.getAbsoluteMaximumCapacity(), DELTA);
// check disable elasticity at leaf queue level with label
CSQueue qA1 = capacityScheduler.getQueue("a1");
Assert.assertEquals(0.3, qA1.getQueueCapacities().
getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.3, qA1.getQueueCapacities().
getAbsoluteMaximumCapacity("red"), DELTA);
CSQueue qB1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.3, qB1.getQueueCapacities()
.getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.3, qB1.getQueueCapacities()
.getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.05, qB3.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.175, qB3.getQueueCapacities()
.getAbsoluteCapacity("blue"), DELTA);
Assert.assertEquals(0.25, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities()
.getAbsoluteMaximumCapacity("blue"), DELTA);
}
private void
checkQueueLabelsInheritConfig(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.isEmpty());
}
@Test
public void testQueueParsingWithLabels() throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
checkQueueLabels(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test
public void testQueueParsingWithLeafQueueDisableElasticity()
throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity
(ImmutableSet.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabelsAndReleaseCheck(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
checkQueueLabelsWithLeafQueueDisableElasticity(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test
public void testQueueParsingWithLabelsInherit() throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabelsInherit(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
checkQueueLabelsInheritConfig(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test
public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager()
throws IOException {
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
rmContext.setNodeLabelManager(nodeLabelsManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
ServiceOperations.stopQuietly(nodeLabelsManager);
}
@Test
public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager()
throws IOException {
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabelsInherit(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
rmContext.setNodeLabelManager(nodeLabelsManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
ServiceOperations.stopQuietly(nodeLabelsManager);
}
@Test
public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager()
throws IOException {
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithSingleLevel(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
rmContext.setNodeLabelManager(nodeLabelsManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
ServiceOperations.stopQuietly(nodeLabelsManager);
}
@Test
public void testQueueParsingWhenLabelsNotExist() throws IOException {
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
rmContext.setNodeLabelManager(nodeLabelsManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
ServiceOperations.stopQuietly(nodeLabelsManager);
}
@Test
public void testQueueParsingWithUnusedLabels() throws IOException {
final ImmutableSet<String> labels = ImmutableSet.of("red", "blue");
// Initialize a cluster with labels, but doesn't use them, reinitialize
// shouldn't fail
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(labels);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, labels);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(conf);
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, rmContext);
// check root queue's capacity by label -- they should be all zero
CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
Assert.assertEquals(0, root.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0, root.getQueueCapacities().getCapacity("blue"), DELTA);
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue b1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
Assert.assertEquals("Parent B has no MAX_CAP", 0.85,
b1.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c12 = capacityScheduler.getQueue("c12");
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(),
DELTA);
capacityScheduler.stop();
}
@Test
public void testQueueParsingShouldTrimSpaces() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(conf);
capacityScheduler.setRMContext(TestUtils.getMockRMContext());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext());
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertNotNull(a);
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c = capacityScheduler.getQueue("c");
Assert.assertNotNull(c);
Assert.assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
}
@Test
public void testNestedQueueParsingShouldTrimSpaces() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(conf);
capacityScheduler.setRMContext(TestUtils.getMockRMContext());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext());
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertNotNull(a);
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
CSQueue c = capacityScheduler.getQueue("c");
Assert.assertNotNull(c);
Assert.assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
CSQueue a1 = capacityScheduler.getQueue("a1");
Assert.assertNotNull(a1);
Assert.assertEquals(0.10 * 0.6, a1.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a1.getAbsoluteMaximumCapacity(), DELTA);
CSQueue a2 = capacityScheduler.getQueue("a2");
Assert.assertNotNull(a2);
Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testQueueParsingFailWhenSumOfChildrenNonLabeledCapacityNot100Percent()
throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet
.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfiguration(csConf);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 5);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testQueueParsingFailWhenSumOfChildrenLabeledCapacityNot100Percent()
throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet
.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3",
"red", 24);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testQueueParsingWithSumOfChildLabelCapacityNot100PercentWithWildCard()
throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet
.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3",
"red", 24);
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT,
ImmutableSet.of(RMNodeLabelsManager.ANY));
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT + ".b",
ImmutableSet.of(RMNodeLabelsManager.ANY));
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test(expected = IOException.class)
public void testQueueParsingWithMoveQueue()
throws IOException {
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
csConf.setQueues("root", new String[] { "a" });
csConf.setQueues("root.a", new String[] { "x", "y" });
csConf.setCapacity("root.a", 100);
csConf.setCapacity("root.a.x", 50);
csConf.setCapacity("root.a.y", 50);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
csConf.setQueues("root", new String[] { "a", "x" });
csConf.setQueues("root.a", new String[] { "y" });
csConf.setCapacity("root.x", 50);
csConf.setCapacity("root.a", 50);
csConf.setCapacity("root.a.y", 100);
capacityScheduler.reinitialize(csConf, rmContext);
}
@Test(timeout = 60000, expected = java.lang.IllegalArgumentException.class)
public void testRMStartWrongNodeCapacity() throws Exception {
YarnConfiguration config = new YarnConfiguration();
nodeLabelManager = new NullRMNodeLabelsManager();
nodeLabelManager.init(config);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" });
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 100);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 100);
conf.setCapacityByLabel(A, "z", 70);
MockRM rm = null;
try {
rm = new MockRM(conf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelManager;
}
};
} finally {
IOUtils.closeStream(rm);
}
}
@Test
public void testQueueOrderingPolicyUpdatedAfterReinitialize()
throws IOException {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutLabels(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(conf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(conf);
capacityScheduler.start();
// Add a new b4 queue
csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".b",
new String[] { "b1", "b2", "b3", "b4" });
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b.b4", 0f);
ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b");
checkEqualsToQueueSet(bQ.getChildQueues(),
new String[] { "b1", "b2", "b3" });
capacityScheduler.reinitialize(new YarnConfiguration(csConf), rmContext);
// Check child queue of b
checkEqualsToQueueSet(bQ.getChildQueues(),
new String[] { "b1", "b2", "b3", "b4" });
PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy =
(PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy();
checkEqualsToQueueSet(queueOrderingPolicy.getQueues(),
new String[] { "b1", "b2", "b3", "b4" });
ServiceOperations.stopQuietly(capacityScheduler);
}
private void checkEqualsToQueueSet(List<CSQueue> queues, String[] queueNames) {
Set<String> existedQueues = new HashSet<>();
for (CSQueue q : queues) {
existedQueues.add(q.getQueueName());
}
for (String q : queueNames) {
Assert.assertTrue(existedQueues.remove(q));
}
Assert.assertTrue(existedQueues.isEmpty());
}
}