blob: dee6f57b72faab5850838202a4d263c5e61d30c2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
//import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.dom.DOMSource;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import java.util.Set;
import java.io.File;
import java.io.IOException;
//@Private
public class QueueManagerTestUtils {
/**
* Queue-configuration file for tests that start a cluster and wish to modify
* the queue configuration. This file is always in the unit tests classpath,
* so QueueManager started through JobTracker will automatically pick this up.
*/
public static final String QUEUES_CONFIG_FILE_PATH = new File(System
.getProperty("test.build.extraconf", "build/test/extraconf"),
QueueManager.QUEUE_CONF_FILE_NAME).getAbsolutePath();
private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
/**
* Create and return a new instance of a DOM Document object to build a queue
* tree with.
*
* @return the created {@link Document}
* @throws Exception
*/
public static Document createDocument() throws Exception {
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
.newDocument();
return doc;
}
public static void createSimpleDocument(Document doc) throws Exception {
Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
Properties props = new Properties();
props.setProperty("capacity", "10");
props.setProperty("maxCapacity", "35");
q1.appendChild(createProperties(doc, props));
queues.appendChild(q1);
// Create another parent level p1
Element p1 = createQueue(doc, "p1");
// append child p11 to p1
p1.appendChild(createQueue(doc, "p11"));
Element p12 = createQueue(doc, "p12");
p12.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
p12.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
p12.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
// append p12 to p1.
p1.appendChild(p12);
queues.appendChild(p1);
}
static void createSimpleDocumentWithAcls(Document doc) {
Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
Properties props = new Properties();
props.setProperty("capacity", "10");
props.setProperty("maxCapacity", "35");
q1.appendChild(createProperties(doc, props));
queues.appendChild(q1);
// Create another parent level p1
Element p1 = createQueue(doc, "p1");
// append child p11 to p1
Element p11 = createQueue(doc, "p11");
p11.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
p11.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
p1.appendChild(p11);
// append child p12 to p1
Element p12 = createQueue(doc, "p12");
p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
p12.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "*"));
p12.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "*"));
p1.appendChild(p12);
// append child p13 to p1
Element p13 = createQueue(doc, "p13");
p13.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
p1.appendChild(p13);
// append child p14 to p1
Element p14 = createQueue(doc, "p14");
p14.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
p1.appendChild(p14);
queues.appendChild(p1);
}
/**
* Creates all given queues as 1st level queues(no nesting)
* @param doc the queues config document
* @param queueNames the queues to be added to the queues config document
* @param submitAcls acl-submit-job acls for each of the queues
* @param adminsAcls acl-administer-jobs acls for each of the queues
* @throws Exception
*/
public static void createSimpleDocument(Document doc, String[] queueNames,
String[] submitAcls, String[] adminsAcls) throws Exception {
Element queues = createQueuesNode(doc);
// Create all queues as 1st level queues(no nesting)
for (int i = 0; i < queueNames.length; i++) {
Element q = createQueue(doc, queueNames[i]);
q.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
q.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, submitAcls[i]));
q.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, adminsAcls[i]));
queues.appendChild(q);
}
}
/**
* Creates queues configuration file with given queues at 1st level(i.e.
* no nesting of queues) and with the given queue acls.
* @param queueNames queue names which are to be configured
* @param submitAclStrings acl-submit-job acls for each of the queues
* @param adminsAclStrings acl-administer-jobs acls for each of the queues
* @return Configuration the queues configuration
* @throws Exception
*/
public static void createQueuesConfigFile(String[] queueNames,
String[] submitAclStrings, String[] adminsAclStrings)
throws Exception {
if (queueNames.length > submitAclStrings.length ||
queueNames.length > adminsAclStrings.length) {
LOG.error("Number of queues is more than acls given.");
return;
}
Document doc = createDocument();
createSimpleDocument(doc, queueNames, submitAclStrings, adminsAclStrings);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
}
public static void refreshSimpleDocument(Document doc) throws Exception {
Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
Properties props = new Properties();
props.setProperty("capacity", "70");
props.setProperty("maxCapacity", "35");
q1.appendChild(createProperties(doc, props));
queues.appendChild(q1);
// Create another parent level p1
Element p1 = createQueue(doc, "p1");
// append child p11 to p1
Element p11 = createQueue(doc, "p11");
p11.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
p1.appendChild(p11);
Element p12 = createQueue(doc, "p12");
p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
p12.appendChild(createAcls(doc, "acl-submit-job", "u3"));
p12.appendChild(createAcls(doc, "acl-administer-jobs", "u4"));
// append p12 to p1.
p1.appendChild(p12);
queues.appendChild(p1);
}
/**
* Create the root <queues></queues> element along with the
* <aclsEnabled></aclsEnabled> element.
*
* @param doc
* @param enable
* @return the created element.
*/
public static Element createQueuesNode(Document doc) {
Element queues = doc.createElement("queues");
doc.appendChild(queues);
return queues;
}
public static void writeToFile(Document doc, String filePath)
throws TransformerException {
Transformer trans = TransformerFactory.newInstance().newTransformer();
trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
trans.setOutputProperty(OutputKeys.INDENT, "yes");
DOMSource source = new DOMSource(doc);
trans.transform(source, new StreamResult(new File(filePath)));
}
public static Element createQueue(Document doc, String name) {
Element queue = doc.createElement("queue");
Element nameNode = doc.createElement("name");
nameNode.setTextContent(name);
queue.appendChild(nameNode);
return queue;
}
public static Element createAcls(Document doc, String aclName,
String listNames) {
Element acls = doc.createElement(aclName);
acls.setTextContent(listNames);
return acls;
}
public static Element createState(Document doc, String state) {
Element stateElement = doc.createElement("state");
stateElement.setTextContent(state);
return stateElement;
}
public static Element createProperties(Document doc, Properties props) {
Element propsElement = doc.createElement("properties");
if (props != null) {
Set<String> propList = props.stringPropertyNames();
for (String prop : propList) {
Element property = doc.createElement("property");
property.setAttribute("key", prop);
property.setAttribute("value", (String) props.get(prop));
propsElement.appendChild(property);
}
}
return propsElement;
}
/**
* Delete queues configuration file if exists
*/
public static void deleteQueuesConfigFile() {
if (new File(QUEUES_CONFIG_FILE_PATH).exists()) {
new File(QUEUES_CONFIG_FILE_PATH).delete();
}
}
/**
* Write the given queueHierarchy to the given file.
*
* @param filePath
*
* @param rootQueues
* @throws Exception
*/
public static void writeQueueConfigurationFile(String filePath,
JobQueueInfo[] rootQueues) throws Exception {
Document doc = createDocument();
Element queueElements = createQueuesNode(doc);
for (JobQueueInfo rootQ : rootQueues) {
queueElements.appendChild(QueueConfigurationParser.getQueueElement(doc,
rootQ));
}
writeToFile(doc, filePath);
}
static Job submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime,
final long reduceSleepTime, boolean shouldComplete, String userInfo,
String queueName, Configuration clientConf) throws IOException,
InterruptedException, ClassNotFoundException {
clientConf.set(JTConfig.JT_IPC_ADDRESS, "localhost:"
+ miniMRCluster.getJobTrackerPort());
UserGroupInformation ugi;
if (userInfo != null) {
String[] splits = userInfo.split(",");
String[] groups = new String[splits.length - 1];
System.arraycopy(splits, 1, groups, 0, splits.length - 1);
ugi = UserGroupInformation.createUserForTesting(splits[0], groups);
} else {
ugi = UserGroupInformation.getCurrentUser();
}
if (queueName != null) {
clientConf.set(JobContext.QUEUE_NAME, queueName);
}
final SleepJob sleep = new SleepJob();
sleep.setConf(clientConf);
Job job = ugi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException {
return sleep.createJob(numMappers, numReducers, mapSleepTime,
(int) mapSleepTime, reduceSleepTime, (int) reduceSleepTime);
}});
if (shouldComplete) {
job.waitForCompletion(false);
} else {
job.submit();
// miniMRCluster.getJobTrackerRunner().getJobTracker().jobsToComplete()[]
Cluster cluster = new Cluster(miniMRCluster.createJobConf());
JobStatus[] status = miniMRCluster.getJobTrackerRunner().getJobTracker()
.jobsToComplete();
JobID id = status[status.length -1].getJobID();
Job newJob = cluster.getJob(id);
cluster.close();
return newJob;
}
return job;
}
static MiniMRCluster miniMRCluster;
}