blob: 0a7dd3c9fe9e566a18e1912fa17ac6a7eb585ab2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class TestQueueManager {
private static final Log LOG = LogFactory.getLog(
TestQueueManager.class);
@After
public void tearDown() throws Exception {
deleteQueuesConfigFile();
}
// create UGI with the given user name and the fixed group name "myGroup"
private UserGroupInformation createUGI(String userName) {
return UserGroupInformation.createUserForTesting(
userName, new String[]{"myGroup"});
}
@Test
public void testDefault() throws Exception {
deleteQueuesConfigFile();
QueueManager qm = new QueueManager();
Queue root = qm.getRoot();
assertEquals(root.getChildren().size(), 1);
assertEquals(root.getChildren().iterator().next().getName(), "default");
assertNull(root.getChildren().iterator().next().getChildren());
}
@Test
public void testXMLParsing() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
Set<Queue> rootQueues = qm.getRoot().getChildren();
List<String> names = new ArrayList<String>();
for (Queue q : rootQueues) {
names.add(q.getName());
}
//Size of root.
assertEquals(rootQueues.size(), 2);
//check root level queues
assertTrue(names.contains("q1"));
assertTrue(names.contains("p1"));
//check for leaf names
Set<String> leafNames = qm.getLeafQueueNames();
Queue p = qm.getQueue("p1");
Set<Queue> children = p.getChildren();
assertTrue(children.size() == 2);
//check leaf level queues
assertTrue(
leafNames.contains(
"p1" + NAME_SEPARATOR + "p11"));
assertTrue(
leafNames.contains(
"p1" + NAME_SEPARATOR + "p12"));
Queue q = qm.getQueue(
"p1" + NAME_SEPARATOR + "p12");
assertTrue(
q.getAcls().get(
toFullPropertyName(
q.getName(), ACL_SUBMIT_JOB_TAG)).isUserAllowed(
createUGI("u1")));
assertTrue(
q.getAcls().get(
toFullPropertyName(
q.getName(),
ACL_ADMINISTER_JOB_TAG))
.isUserAllowed(createUGI("u2")));
assertTrue(q.getState().equals(QueueState.STOPPED));
}
@Test
public void testhasAccess() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocumentWithAcls(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
UserGroupInformation ugi;
// test for acls access when acls are set with *
ugi = createUGI("u1");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
QueueACL.SUBMIT_JOB, ugi));
ugi = createUGI("u2");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
QueueACL.ADMINISTER_JOBS, ugi));
// test for acls access when acls are not set with *
ugi = createUGI("u1");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
QueueACL.SUBMIT_JOB, ugi));
ugi = createUGI("u2");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
QueueACL.ADMINISTER_JOBS, ugi));
// Test for acls access when acls are not specified but acls are enabled.
// By default, the queue acls for any queue are empty.
ugi = createUGI("u1");
assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
QueueACL.SUBMIT_JOB, ugi));
ugi = createUGI("u2");
assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
QueueACL.ADMINISTER_JOBS, ugi));
assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
}
@Test
public void testQueueView() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
for (Queue queue : qm.getRoot().getChildren()) {
checkHierarchy(queue, qm);
}
}
private void checkHierarchy(Queue queue, QueueManager queueManager) {
JobQueueInfo jobQueueInfo = queueManager.getJobQueueInfo(queue.getName());
assertEquals(queue.getName(),jobQueueInfo.getQueueName());
assertEquals(queue.getState(),jobQueueInfo.getState());
if (queue.getChildren() !=null && queue.getChildren().size() > 0) {
for (Queue childQueue : queue.getChildren()) {
checkHierarchy(childQueue, queueManager);
}
}
}
@Test
public void testhasAccessForParent() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
UserGroupInformation ugi = createUGI("u1");
assertFalse(qm.hasAccess("p1", QueueACL.SUBMIT_JOB, ugi));
}
@Test
public void testValidation() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "q1");
q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
q1.appendChild(createAcls(doc, "acl-administer-jobs", "u2"));
q1.appendChild(createQueue(doc, "p15"));
q1.appendChild(createQueue(doc, "p16"));
queues.appendChild(q1);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (RuntimeException re) {
LOG.info(re.getMessage());
}
}
@Test
public void testInvalidName() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "");
queues.appendChild(q1);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (Exception re) {
re.printStackTrace();
LOG.info(re.getMessage());
}
deleteQueuesConfigFile();
doc = createDocument();
queues = createQueuesNode(doc);
q1 = doc.createElement("queue");
queues.appendChild(q1);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception as configuration is wrong ");
} catch (RuntimeException re) {
re.printStackTrace();
LOG.info(re.getMessage());
}
}
@Test
public void testMissingConfigFile() throws Exception {
deleteQueuesConfigFile(); // deletes file
try {
new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception for missing file when " +
"explicitly passed.");
} catch (RuntimeException re) {
}
// If we just want to pick up the queues from the class loader
// it should fall through to the default. The class loader is set to
// load CONFIG for the "mapred-queues.xml" resource, but it's missing
// so should fall through to mapred-queues-default.xml
QueueManager qm = new QueueManager();
List<JobQueueInfo> rootQueues =
qm.getRoot().getJobQueueInfo().getChildren();
assertEquals(1, rootQueues.size());
assertEquals("default", rootQueues.get(0).getQueueName());
}
@Test
public void testEmptyProperties() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "q1");
Element p = createProperties(doc, null);
q1.appendChild(p);
queues.appendChild(q1);
}
@Test
public void testEmptyFile() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception as configuration is wrong ");
} catch (Exception re) {
re.printStackTrace();
LOG.info(re.getMessage());
}
}
@Test
public void testJobQueueInfoGeneration() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
List<JobQueueInfo> rootQueues =
qm.getRoot().getJobQueueInfo().getChildren();
assertEquals(rootQueues.size(), 2);
List<String> names = new ArrayList<String>();
for (JobQueueInfo q : rootQueues) {
names.add(q.getQueueName());
if (q.getQueueName().equals("q1")) {
Properties p = q.getProperties();
assertEquals(p.getProperty("capacity"), "10");
assertEquals(p.getProperty("maxCapacity"), "35");
assertTrue(q.getChildren().isEmpty());
} else if (q.getQueueName().equals("p1")) {
List<JobQueueInfo> children = q.getChildren();
assertEquals(children.size(), 2);
for (JobQueueInfo child : children) {
if (child.getQueueName().equals(
"p1" + NAME_SEPARATOR + "p12")) {
assertEquals(
child.getQueueState(), QueueState.STOPPED.getStateName());
} else if (child.getQueueName().equals(
"p1" + NAME_SEPARATOR + "p11")) {
assertEquals(
child.getQueueState(), QueueState.RUNNING.getStateName());
} else {
fail("Only 2 children");
}
}
} else {
fail("Only 2 queues with q1 and p1 ");
}
}
}
/**
* Test the refresh of queues.
*
* @throws Exception
*/
@Test
public void testRefresh() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
Queue beforeRefreshRoot = qm.getRoot();
//remove the file and create new one.
Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
for (Queue qs : rootQueues) {
if (qs.getName().equals("q1")) {
assertEquals(qs.getProperties().getProperty("capacity"), "10");
assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
} else if (qs.getName().equals("p1")) {
Set<Queue> children = qs.getChildren();
for (Queue child : children) {
if (child.getName().equals(
"p1" + NAME_SEPARATOR + "p12")) {
assertTrue(
child.getAcls().get(
toFullPropertyName(
child.getName(), ACL_SUBMIT_JOB_TAG))
.isUserAllowed(createUGI("u1")));
assertTrue(
child.getAcls().get(
toFullPropertyName(
child.getName(),
ACL_ADMINISTER_JOB_TAG))
.isUserAllowed(createUGI("u2")));
assertTrue(child.getState().equals(QueueState.STOPPED));
} else {
assertTrue(child.getState().equals(QueueState.RUNNING));
}
}
}
}
deleteQueuesConfigFile();
doc = createDocument();
refreshSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, true);
qm.getRoot().isHierarchySameAs(cp.getRoot());
qm.setQueues(
cp.getRoot().getChildren().toArray(
new Queue[cp.getRoot().getChildren().size()]));
Queue afterRefreshRoot = qm.getRoot();
//remove the file and create new one.
rootQueues = afterRefreshRoot.getChildren();
for (Queue qs : rootQueues) {
if (qs.getName().equals("q1")) {
assertEquals(qs.getProperties().getProperty("capacity"), "70");
assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
} else if (qs.getName().equals("p1")) {
Set<Queue> children = qs.getChildren();
for (Queue child : children) {
if (child.getName().equals(
"p1" + NAME_SEPARATOR + "p12")) {
assertTrue(
child.getAcls().get(
toFullPropertyName(
child.getName(),
ACL_SUBMIT_JOB_TAG))
.isUserAllowed(createUGI("u3")));
assertTrue(
child.getAcls().get(
toFullPropertyName(
child.getName(),
ACL_ADMINISTER_JOB_TAG))
.isUserAllowed(createUGI("u4")));
assertTrue(child.getState().equals(QueueState.RUNNING));
} else {
assertTrue(child.getState().equals(QueueState.STOPPED));
}
}
}
}
}
@Test
public void testRefreshWithInvalidFile() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
deleteQueuesConfigFile();
doc = createDocument();
Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "");
queues.appendChild(q1);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (Throwable re) {
re.printStackTrace();
LOG.info(re.getMessage());
}
}
/**
* Class to store the array of queues retrieved by parsing the string
* that is dumped in Json format
*/
static class JsonQueueTree {
boolean acls_enabled;
JsonQueue[] queues;
public JsonQueue[] getQueues() {
return queues;
}
public void setQueues(JsonQueue[] queues) {
this.queues = queues;
}
public boolean isAcls_enabled() {
return acls_enabled;
}
public void setAcls_enabled(boolean aclsEnabled) {
acls_enabled = aclsEnabled;
}
}
/**
* Class to store the contents of each queue that is dumped in JSON format.
*/
static class JsonQueue {
String name;
String state;
String acl_submit_job;
String acl_administer_jobs;
JsonProperty[] properties;
JsonQueue[] children;
public String getName() {
return name;
}
public String getState() {
return state;
}
public JsonProperty[] getProperties() {
return properties;
}
public JsonQueue[] getChildren() {
return children;
}
public void setName(String name) {
this.name = name;
}
public void setState(String state) {
this.state = state;
}
public void setProperties(JsonProperty[] properties) {
this.properties = properties;
}
public void setChildren(JsonQueue[] children) {
this.children = children;
}
public String getAcl_submit_job() {
return acl_submit_job;
}
public void setAcl_submit_job(String aclSubmitJob) {
acl_submit_job = aclSubmitJob;
}
public String getAcl_administer_jobs() {
return acl_administer_jobs;
}
public void setAcl_administer_jobs(String aclAdministerJobs) {
acl_administer_jobs = aclAdministerJobs;
}
}
/**
* Class to store the contents of attribute "properties" in Json dump
*/
static class JsonProperty {
String key;
String value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
/**
* checks the format of the dump in JSON format when
* QueueManager.dumpConfiguration(Writer) is called.
* @throws Exception
*/
@Test
public void testDumpConfiguration() throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
StringWriter out = new StringWriter();
Configuration conf = new Configuration(false);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
QueueManager.dumpConfiguration(out, QUEUES_CONFIG_FILE_PATH, conf);
ObjectMapper mapper = new ObjectMapper();
// parse the Json dump
JsonQueueTree queueTree =
mapper.readValue(out.toString(), JsonQueueTree.class);
// check for the number of top-level queues
assertEquals(2, queueTree.getQueues().length);
HashMap<String, JsonQueue> topQueues = new HashMap<String, JsonQueue>();
for (JsonQueue topQueue : queueTree.getQueues()) {
topQueues.put(topQueue.getName(), topQueue);
}
// check for consistency in number of children
assertEquals(2, topQueues.get("p1").getChildren().length);
HashMap<String, JsonQueue> childQueues = new HashMap<String, JsonQueue>();
for (JsonQueue child : topQueues.get("p1").getChildren()) {
childQueues.put(child.getName(), child);
}
// check for consistency in state
assertEquals("stopped", childQueues.get("p1:p12").getState());
// check for consistency in properties
HashMap<String, JsonProperty> q1_properties =
new HashMap<String, JsonProperty>();
for (JsonProperty prop : topQueues.get("q1").getProperties()) {
q1_properties.put(prop.getKey(), prop);
}
assertEquals("10", q1_properties.get("capacity").getValue());
assertEquals("35", q1_properties.get("maxCapacity").getValue());
// check for acls
assertEquals("u1 ", childQueues.get("p1:p12").getAcl_submit_job());
assertEquals("u2 ", childQueues.get("p1:p12").getAcl_administer_jobs());
}
}