blob: 005f40bcadd007662122aa5ea62a59fdf106c480 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class TestQueueMappings {
private static final Log LOG = LogFactory.getLog(TestQueueMappings.class);
private static final String Q1 = "q1";
private static final String Q2 = "q2";
private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q2;
private MockRM resourceManager;
@After
public void tearDown() throws Exception {
if (resourceManager != null) {
LOG.info("Stopping the resource manager");
resourceManager.stop();
}
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 });
conf.setCapacity(Q1_PATH, 10);
conf.setCapacity(Q2_PATH, 90);
LOG.info("Setup top-level queues q1 and q2");
}
@Test (timeout = 60000)
public void testQueueMapping() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
cs.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
// configuration parsing tests - negative test cases
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
checkInvalidQMapping(conf, cs, "g:a", "no queue specified");
checkInvalidQMapping(conf, cs, "u:a:b,g:a",
"multiple mappings with invalid mapping");
checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments");
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
// simple base case for mapping user to queue
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// group mapping test
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// %user tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
cs.reinitialize(conf, null);
checkQMapping("a", Q2, cs);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
cs.reinitialize(conf, null);
checkQMapping("a", "a", cs);
// %primary_group tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:%user:%primary_group");
cs.reinitialize(conf, null);
checkQMapping("a", "agroup", cs);
// non-primary group mapping
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"g:asubgroup1:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// space trimming
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
csConf = new CapacitySchedulerConfiguration();
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
resourceManager = new MockRM(csConf);
resourceManager.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if the user specifies a Q that is still overriden
checkAppQueue(resourceManager, "user", Q2, Q1);
// toggle admin override and retry
conf.setBoolean(
CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
false);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
checkAppQueue(resourceManager, "user", Q2, Q2);
// ensure that if a user does not specify a Q, the user mapping is used
checkAppQueue(resourceManager, "user", null, Q1);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if a user does not specify a Q, the group mapping is used
checkAppQueue(resourceManager, "user", null, Q2);
// if the mapping specifies a queue that does not exist, the job is rejected
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:user:non_existent_queue");
setupQueueConfiguration(csConf);
boolean fail = false;
try {
resourceManager.getResourceScheduler().reinitialize(conf, null);
}
catch (IOException ioex) {
fail = true;
}
Assert.assertTrue("queue initialization failed for non-existent q", fail);
resourceManager.stop();
}
private void checkAppQueue(MockRM resourceManager, String user,
String submissionQueue, String expected)
throws Exception {
RMApp app = resourceManager.submitApp(200, "name", user,
new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
null, "MAPREDUCE", false);
RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
: RMAppState.ACCEPTED;
resourceManager.waitForState(app.getApplicationId(), expectedState);
// get scheduler app
CapacityScheduler cs = (CapacityScheduler)
resourceManager.getResourceScheduler();
SchedulerApplication schedulerApp =
cs.getSchedulerApplications().get(app.getApplicationId());
String queue = "";
if (schedulerApp != null) {
queue = schedulerApp.getQueue().getQueueName();
}
Assert.assertTrue("expected " + expected + " actual " + queue,
expected.equals(queue));
Assert.assertEquals(expected, app.getQueue());
}
private void checkInvalidQMapping(YarnConfiguration conf,
CapacityScheduler cs,
String mapping, String reason)
throws IOException {
boolean fail = false;
try {
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping);
cs.reinitialize(conf, null);
} catch (IOException ex) {
fail = true;
}
Assert.assertTrue("invalid mapping did not throw exception for " + reason,
fail);
}
private void checkQMapping(String user, String expected, CapacityScheduler cs)
throws IOException {
String actual = cs.getMappedQueueForTest(user);
Assert.assertTrue("expected " + expected + " actual " + actual,
expected.equals(actual));
}
}