blob: c46ecd97896febc8479fe4cf47cbcf003badc4b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestAllocationFileLoaderService {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath();
final static String ALLOC_FILE = new File(TEST_DIR,
"test-queues").getAbsolutePath();
private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
@Test
public void testGetAllocationFileFromFileSystem()
throws IOException, URISyntaxException {
Configuration conf = new YarnConfiguration();
File baseDir =
new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
MiniDFSCluster hdfsCluster = builder.build();
String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort()
+ Path.SEPARATOR + TEST_FAIRSCHED_XML;
URL fschedURL = Thread.currentThread().getContextClassLoader()
.getResource(TEST_FAIRSCHED_XML);
FileSystem fs = FileSystem.get(conf);
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(fsAllocPath, allocationFile.toString());
assertTrue(fs.exists(allocationFile));
hdfsCluster.shutdown(true);
}
@Test (expected = UnsupportedFileSystemException.class)
public void testDenyGetAllocationFileFromUnsupportedFileSystem()
throws UnsupportedFileSystemException {
Configuration conf = new YarnConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.getAllocationFile(conf);
}
@Test
public void testGetAllocationFileFromClasspath() {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
TEST_FAIRSCHED_XML);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService();
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
assertTrue(fs.exists(allocationFile));
} catch (IOException e) {
fail("Unable to access allocation file from classpath: " + e);
}
}
@Test (timeout = 10000)
public void testReload() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\">");
out.println(" <maxRunningApps>1</maxRunningApps>");
out.println(" </queue>");
out.println(" <queue name=\"queueB\" />");
out.println(" <queuePlacementPolicy>");
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
ControlledClock clock = new ControlledClock();
clock.setTime(0);
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
clock);
allocLoader.reloadIntervalMs = 5;
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
// Verify conf
QueuePlacementPolicy policy = allocConf.getPlacementPolicy();
List<QueuePlacementRule> rules = policy.getRules();
assertEquals(1, rules.size());
assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.size());
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueA"));
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueB"));
confHolder.allocConf = null;
// Modify file and advance the clock
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueB\">");
out.println(" <maxRunningApps>3</maxRunningApps>");
out.println(" </queue>");
out.println(" <queuePlacementPolicy>");
out.println(" <rule name='specified' />");
out.println(" <rule name='nestedUserQueue' >");
out.println(" <rule name='primaryGroup' />");
out.println(" </rule>");
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
clock.tickMsec(System.currentTimeMillis()
+ AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000);
allocLoader.start();
while (confHolder.allocConf == null) {
Thread.sleep(20);
}
// Verify conf
allocConf = confHolder.allocConf;
policy = allocConf.getPlacementPolicy();
rules = policy.getRules();
assertEquals(3, rules.size());
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
.getClass());
assertEquals(QueuePlacementRule.PrimaryGroup.class,
((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.size());
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueB"));
}
@Test
public void testAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<queue name=\"queueA\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<maxResources>2048mb,10vcores</maxResources>");
out.println("</queue>");
// Give queue B a minimum of 2048 M
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<maxResources>5120mb,110vcores</maxResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("<schedulingPolicy>fair</schedulingPolicy>");
out.println("</queue>");
// Give queue C no minimum
out.println("<queue name=\"queueC\">");
out.println("<minResources>5120mb,0vcores</minResources>");
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
out.println("</queue>");
// Give queue D a limit of 3 running apps and 0.4f maxAMShare
out.println("<queue name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("<maxAMShare>0.4</maxAMShare>");
out.println("</queue>");
// Give queue E a preemption timeout of one minute
out.println("<queue name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("</queue>");
// Make queue F a parent queue without configured leaf queues using the
// 'type' attribute
out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("<maxChildResources>2048mb,64vcores</maxChildResources>");
out.println("</queue>");
// Create hierarchical queues G,H, with different min/fair share preemption
// timeouts and preemption thresholds. Also add a child default to make sure
// it doesn't impact queue H.
out.println("<queue name=\"queueG\">");
out.println("<maxChildResources>2048mb,64vcores</maxChildResources>");
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>");
out.println(" <queue name=\"queueH\">");
out.println(" <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>0.7</fairSharePreemptionThreshold>");
out.println(" </queue>");
out.println("</queue>");
// Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of max resource per queue to 4G and 100 cores
out.println("<queueMaxResourcesDefault>4096mb,100vcores</queueMaxResourcesDefault>");
// Set default limit of apps per user to 5
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
// Set default limit of AMResourceShare to 0.5f
out.println("<queueMaxAMShareDefault>0.5f</queueMaxAMShareDefault>");
// Give user1 a limit of 10 jobs
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>10</maxRunningApps>");
out.println("</user>");
// Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>");
// Set default fair share preemption timeout to 5 minutes
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
// Set default fair share preemption threshold to 0.4
out.println("<defaultFairSharePreemptionThreshold>0.4</defaultFairSharePreemptionThreshold>");
// Set default scheduling policy to DRF
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(2048, 10),
queueConf.getMaxResources("root.queueA").getResource());
assertEquals(Resources.createResource(5120, 110),
queueConf.getMaxResources("root.queueB").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueC").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueD").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueE").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueF").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG.queueH").getResource());
assertEquals(Resources.createResource(1024, 0),
queueConf.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048, 0),
queueConf.getMinResources("root.queueB"));
assertEquals(Resources.createResource(5120, 0),
queueConf.getMinResources("root.queueC"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueE"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueF"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueG"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueG.queueH"));
assertNull("Max child resources unexpectedly set for queue root.queueA",
queueConf.getMaxChildResources("root.queueA"));
assertNull("Max child resources unexpectedly set for queue root.queueB",
queueConf.getMaxChildResources("root.queueB"));
assertNull("Max child resources unexpectedly set for queue root.queueC",
queueConf.getMaxChildResources("root.queueC"));
assertNull("Max child resources unexpectedly set for queue root.queueD",
queueConf.getMaxChildResources("root.queueD"));
assertNull("Max child resources unexpectedly set for queue root.queueE",
queueConf.getMaxChildResources("root.queueE"));
assertEquals(Resources.createResource(2048, 64),
queueConf.getMaxChildResources("root.queueF").getResource());
assertEquals(Resources.createResource(2048, 64),
queueConf.getMaxChildResources("root.queueG").getResource());
assertNull("Max child resources unexpectedly set for "
+ "queue root.queueG.queueH",
queueConf.getMaxChildResources("root.queueG.queueH"));
assertEquals(15, queueConf.getQueueMaxApps("root."
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueConf.getQueueMaxApps("root.queueA"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueB"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueC"));
assertEquals(3, queueConf.getQueueMaxApps("root.queueD"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueE"));
assertEquals(10, queueConf.getUserMaxApps("user1"));
assertEquals(5, queueConf.getUserMaxApps("user2"));
assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01);
assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01);
// Root should get * ACL
assertEquals("*", queueConf.getQueueAcl("root",
QueueACL.ADMINISTER_QUEUE).getAclString());
assertEquals("*", queueConf.getQueueAcl("root",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Unspecified queues should get default ACL
assertEquals(" ", queueConf.getQueueAcl("root.queueA",
QueueACL.ADMINISTER_QUEUE).getAclString());
assertEquals(" ", queueConf.getQueueAcl("root.queueA",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueB",
QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue C ACL
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueF"));
assertEquals(50000, queueConf.getMinSharePreemptionTimeout("root.queueG"));
assertEquals(40000, queueConf.getMinSharePreemptionTimeout("root.queueG.queueH"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueF"));
assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01);
assertEquals(.6f,
queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01);
assertEquals(.7f,
queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01);
assertTrue(queueConf.getConfiguredQueues()
.get(FSQueueType.PARENT)
.contains("root.queueF"));
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
.contains("root.queueG"));
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueG.queueH"));
// Verify existing queues have default scheduling policy
assertEquals(DominantResourceFairnessPolicy.NAME,
queueConf.getSchedulingPolicy("root").getName());
assertEquals(DominantResourceFairnessPolicy.NAME,
queueConf.getSchedulingPolicy("root.queueA").getName());
// Verify default is overriden if specified explicitly
assertEquals(FairSharePolicy.NAME,
queueConf.getSchedulingPolicy("root.queueB").getName());
// Verify new queue gets default scheduling policy
assertEquals(DominantResourceFairnessPolicy.NAME,
queueConf.getSchedulingPolicy("root.newqueue").getName());
}
@Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<pool name=\"queueA\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</pool>");
// Give queue B a minimum of 2048 M
out.println("<pool name=\"queueB\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</pool>");
// Give queue C no minimum
out.println("<pool name=\"queueC\">");
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
out.println("</pool>");
// Give queue D a limit of 3 running apps
out.println("<pool name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("</pool>");
// Give queue E a preemption timeout of one minute and 0.3f threshold
out.println("<pool name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionThreshold>0.3</fairSharePreemptionThreshold>");
out.println("</pool>");
// Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
// Give user1 a limit of 10 jobs
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>10</maxRunningApps>");
out.println("</user>");
// Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
// Set default fair share preemption threshold to 0.6f
out.println("<defaultFairSharePreemptionThreshold>0.6</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
assertEquals(5, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024, 0),
queueConf.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048, 0),
queueConf.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueC"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueE"));
assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueConf.getQueueMaxApps("root.queueA"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueB"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueC"));
assertEquals(3, queueConf.getQueueMaxApps("root.queueD"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueE"));
assertEquals(10, queueConf.getUserMaxApps("user1"));
assertEquals(5, queueConf.getUserMaxApps("user2"));
// Unspecified queues should get default ACL
assertEquals(" ", queueConf.getQueueAcl("root.queueA",
QueueACL.ADMINISTER_QUEUE).getAclString());
assertEquals(" ", queueConf.getQueueAcl("root.queueA",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueB",
QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue C ACL
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root."
+ YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
assertEquals(.3f,
queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
}
@Test
public void testSimplePlacementPolicyFromConf() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
List<QueuePlacementRule> rules = placementPolicy.getRules();
assertEquals(2, rules.size());
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
assertEquals(false, rules.get(0).create);
assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass());
}
/**
* Verify that you can't place queues at the same level as the root queue in
* the allocations file.
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueAlongsideRoot() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println("</queue>");
out.println("<queue name=\"other\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
/**
* Verify that you can't include periods as the queue name in the allocations
* file.
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingPeriods() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"parent1.child1\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
/**
* Verify that you can't have the queue name with whitespace only in the
* allocations file.
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingOnlyWhitespace() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\" \">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
@Test
public void testParentTagWithReservation() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"parent\" type=\"parent\">");
out.println("<reservation>");
out.println("</reservation>");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
try {
allocLoader.reloadAllocations();
} catch (AllocationConfigurationException ex) {
assertEquals(ex.getMessage(), "The configuration settings for root.parent"
+ " are invalid. A queue element that contains child queue elements"
+ " or that has the type='parent' attribute cannot also include a"
+ " reservation element.");
}
}
@Test
public void testParentWithReservation() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"parent\">");
out.println("<reservation>");
out.println("</reservation>");
out.println(" <queue name=\"child\">");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
try {
allocLoader.reloadAllocations();
} catch (AllocationConfigurationException ex) {
assertEquals(ex.getMessage(), "The configuration settings for root.parent"
+ " are invalid. A queue element that contains child queue elements"
+ " or that has the type='parent' attribute cannot also include a"
+ " reservation element.");
}
}
@Test
public void testParentTagWithChild() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"parent\" type=\"parent\">");
out.println(" <queue name=\"child\">");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
// Check whether queue 'parent' and 'child' are loaded successfully
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
.contains("root.parent"));
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.parent.child"));
}
/**
* Verify that you can't have the queue name with just a non breaking
* whitespace in the allocations file.
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingNBWhitespace() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new OutputStreamWriter(
new FileOutputStream(ALLOC_FILE), StandardCharsets.UTF_8));
out.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
out.println("<allocations>");
out.println("<queue name=\"\u00a0\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
/**
* Verify that defaultQueueSchedulingMode can't accept FIFO as a value.
*/
@Test (expected = AllocationConfigurationException.class)
public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<defaultQueueSchedulingPolicy>fifo</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
@Test
public void testReservableQueue() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"reservable\">");
out.println("<reservation>");
out.println("</reservation>");
out.println("</queue>");
out.println("<queue name=\"other\">");
out.println("</queue>");
out.println("<reservation-agent>DummyAgentName</reservation-agent>");
out.println("<reservation-policy>AnyAdmissionPolicy</reservation-policy>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
String reservableQueueName = "root.reservable";
String nonreservableQueueName = "root.other";
assertFalse(allocConf.isReservable(nonreservableQueueName));
assertTrue(allocConf.isReservable(reservableQueueName));
assertTrue(allocConf.getMoveOnExpiry(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
allocConf.getReservationWindow(reservableQueueName));
assertEquals(100, allocConf.getInstantaneousMaxCapacity
(reservableQueueName),
0.0001);
assertEquals(
"DummyAgentName",
allocConf.getReservationAgent(reservableQueueName));
assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001);
assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName));
assertEquals("AnyAdmissionPolicy",
allocConf.getReservationAdmissionPolicy(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_PLANNER_NAME,
allocConf.getReplanner(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW,
allocConf.getEnforcementWindow(reservableQueueName));
}
/**
* Verify that you can't have dynamic user queue and reservable queue on
* the same queue
*/
@Test (expected = AllocationConfigurationException.class)
public void testReservableCannotBeCombinedWithDynamicUserQueue()
throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"notboth\" type=\"parent\" >");
out.println("<reservation>");
out.println("</reservation>");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
private class ReloadListener implements AllocationFileLoaderService.Listener {
public AllocationConfiguration allocConf;
@Override
public void onReload(AllocationConfiguration info) {
allocConf = info;
}
}
}