blob: 2bf615069f26ed19d3560de36cb5be3df6383bcd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.apache.hadoop.mapred.QueueConfigurationParser.NAME_SEPARATOR;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.submitSleepJob;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.tools.MRAdmin;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
public class TestQueueManagerWithJobTracker {
private static Configuration conf;
@AfterClass
public static void tearDown() throws Exception {
deleteQueuesConfigFile();
}
String adminUser = "adminUser";
String adminGroup = "adminGroup";
String deprecatedSuperGroup = "superGroup";
private void startCluster(boolean aclsEnabled)
throws Exception {
deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocumentWithAcls(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
conf = new Configuration();
conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclsEnabled);
conf.set(MRConfig.MR_SUPERGROUP, deprecatedSuperGroup);
conf.set(MRConfig.MR_ADMINS, adminUser + " " + adminGroup);
JobConf jobConf = new JobConf(conf);
String namenode = "file:///";
miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
}
/**
* Test to check that jobs cannot be submitted to a queue in STOPPED state
* @throws Exception
*/
@Test(expected = IOException.class)
public void testSubmitJobForStoppedQueue() throws Exception {
startCluster(true);
submitSleepJob(10, 10, 100, 100, false, null,
"p1" + NAME_SEPARATOR + "p14", conf);
fail("queue p1:p14 is in stopped state and should not accept jobs");
}
/**
* Test to check that jobs cannot be submitted to a container queue
* @throws Exception
*/
@Test(expected = IOException.class)
public void testSubmitJobForContainerQueue() throws Exception {
startCluster(true);
submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
fail("queue p1 is a container queue and cannot have jobs");
}
/**
* Tests the submission of job with specified acls
* @throws Exception
*/
@Test
public void testAclsForSubmitJob() throws Exception {
startCluster(true);
Job job;
try {
// submit job to queue p1:p13 with unspecified acls
job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+ "p13", conf);
fail("user u1 cannot submit jobs to queue p1:p13");
} catch (Exception e) {
}
// check access to admins
job = submitSleepJob(0, 0, 0, 0, true, adminUser+ ",g1",
"p1" + NAME_SEPARATOR + "p13", conf);
assertTrue("Admin user cannot submit jobs to queue p1:p13",
job.isSuccessful());
job = submitSleepJob(0, 0, 0, 0, true, "u1,"+ adminGroup,
"p1" + NAME_SEPARATOR + "p13", conf);
assertTrue("Admin group member cannot submit jobs to queue p1:p13",
job.isSuccessful());
job = submitSleepJob(0, 0, 0, 0, true, "u1,"+ deprecatedSuperGroup,
"p1" + NAME_SEPARATOR + "p13", conf);
assertTrue("Deprecated super group member cannot submit jobs to queue" +
" p1:p13", job.isSuccessful());
// check for access to submit the job
try {
job = submitSleepJob(0, 0, 0, 0, false, "u2,g1", "p1" + NAME_SEPARATOR
+ "p11", conf);
fail("user u2 cannot submit jobs to queue p1:p11");
} catch (Exception e) {
}
// submit job to queue p1:p11 with acl-submit-job as u1
job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1"
+ NAME_SEPARATOR + "p11", conf);
assertTrue("Job submission for u1 failed in queue : p1:p11.",
job.isSuccessful());
// submit job to queue p1:p12 with acl-submit-job as *
job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1"
+ NAME_SEPARATOR + "p12", conf);
assertTrue("Job submission for u2 failed in queue : p1:p12.",
job.isSuccessful());
}
/**
* Tests the accessibility to kill a job
* @throws Exception
*/
@Test
public void testAccessToKillJob() throws Exception {
startCluster(true);
Job job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1"
+ NAME_SEPARATOR + "p11", conf);
final JobConf jobConf = miniMRCluster.createJobConf();
Cluster cluster = null;
JobID jobID = job.getStatus().getJobID();
//Ensure that the jobinprogress is initied before we issue a kill
//signal to the job.
JobTracker tracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
JobInProgress jip = tracker.getJob(org.apache.hadoop.mapred.JobID
.downgrade(jobID));
tracker.initJob(jip);
try {
final Configuration userConf =
new Configuration(miniMRCluster.createJobConf());
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting("someRandomUser",
new String[] { "someRandomGroup" });
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(userConf);
}
});
cluster.getJob(jobID).killJob();
fail("user 'someRandomeUser' is neither u1 nor in the administer group list");
} catch (Exception e) {
final Configuration userConf = new Configuration(miniMRCluster.createJobConf());
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting("u1",new String[]{"g1"});
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(userConf);
}
});
cluster.getJob(jobID).killJob();
// kill the running job
assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
}
job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1" + NAME_SEPARATOR
+ "p12", conf);
jobID = job.getStatus().getJobID();
//Ensure that the jobinprogress is initied before we issue a kill
//signal to the job.
jip = tracker.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
tracker.initJob(jip);
tracker.killJob(job.getJobID());
// kill the job by the user who submitted the job
assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
final Configuration userConf = new Configuration(miniMRCluster.createJobConf());
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting("u1",new String[]{"g1"});
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(userConf);
}
});
job = submitSleepJob(1, 1, 10, 10, false, "u1,g1", "p1" + NAME_SEPARATOR
+ "p11", conf);
jobID = job.getStatus().getJobID();
//Ensure that the jobinprogress is initied before we issue a kill
//signal to the job.
jip = tracker.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
tracker.initJob(jip);
ugi =
UserGroupInformation.createUserForTesting("u3",new String[]{"g3"});
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(jobConf);
}
});
// try killing job with user not in administer list
try {
cluster.getJob(jobID).killJob();
fail("u3 not in administer list");
} catch (Exception e) {
ugi =
UserGroupInformation.createUserForTesting("u1",new String[]{"g1"});
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(jobConf);
}
});
assertFalse(cluster.getJob(jobID).isComplete());
cluster.getJob(jobID).killJob();
// kill the running job
assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
}
// check kill access to admins
ugi =
UserGroupInformation.createUserForTesting("adminUser", new String[]{"g3"});
checkAccessToKill(tracker, jobConf, ugi);
ugi =
UserGroupInformation.createUserForTesting("u3", new String[]{adminGroup});
checkAccessToKill(tracker, jobConf, ugi);
ugi =
UserGroupInformation.createUserForTesting("u3",
new String[]{deprecatedSuperGroup});
checkAccessToKill(tracker, jobConf, ugi);
}
private void checkAccessToKill(JobTracker tracker, final JobConf mrConf,
UserGroupInformation killer) throws IOException, InterruptedException,
ClassNotFoundException {
Job job = submitSleepJob(1, 1, 100, 100, false, "u1,g1",
"p1" + NAME_SEPARATOR + "p11", conf);
JobID jobID = job.getStatus().getJobID();
//Ensure that the jobinprogress is initied before we issue a kill
//signal to the job.
JobInProgress jip = tracker.getJob(
org.apache.hadoop.mapred.JobID.downgrade(jobID));
tracker.initJob(jip);
Cluster cluster = killer.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(mrConf);
}
});
cluster.getJob(jobID).killJob();
assertEquals("job not killed by " + killer,
cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
}
/**
* Tests job submission after refresh
* @throws Exception
*/
@Test
public void testSubmitJobsAfterRefresh() throws Exception {
startCluster(true);
// test for refresh
deleteQueuesConfigFile();
Document doc = createDocument();
refreshDocument(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
admin.run(new String[] { "-refreshQueues" });
try {
submitSleepJob(10, 10, 100, 100, false, "u1,g1", "p1"
+ NAME_SEPARATOR + "p11", conf);
fail("user u1 is not in the submit jobs' list");
} catch (Exception e) {
}
deleteQueuesConfigFile();
doc = createDocument();
createSimpleDocumentWithAcls(doc);
writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
admin.run(new String[] { "-refreshQueues" });
}
private void refreshDocument(Document doc) {
Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
Properties props = new Properties();
props.setProperty("capacity", "10");
props.setProperty("maxCapacity", "35");
q1.appendChild(createProperties(doc, props));
queues.appendChild(q1);
// Create another parent level p1
Element p1 = createQueue(doc, "p1");
// append child p11 to p1
Element p11 = createQueue(doc, "p11");
p11.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, " "));
p11.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
p1.appendChild(p11);
Element p12 = createQueue(doc, "p12");
p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
p12.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "*"));
p12.appendChild(createAcls(doc,
QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "*"));
// append p12 to p1.
p1.appendChild(p12);
// append child p13 to p1
Element p13 = createQueue(doc, "p13");
p13.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
p1.appendChild(p13);
// append child p14 to p1
Element p14 = createQueue(doc, "p14");
p14.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
p1.appendChild(p14);
queues.appendChild(p1);
}
/**
* Tests job submission when acls are disabled
* @throws Exception
*/
@Test
public void testAclsDisabled() throws Exception {
startCluster(false);
// submit job to queue p1:p11 by any user not in acls-submit-job
Job job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1" + NAME_SEPARATOR
+ "p11", conf);
assertTrue("Job submitted for u2 in queue p1:p11 is not successful.",
job.isSuccessful());
// submit job to queue p1:p11 by user in acls-submit-job
job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+ "p11", conf);
assertTrue("Job submitted for u2 in queue p1:p11 is not successful.",
job.isSuccessful());
job = submitSleepJob(1, 1, 0, 0, false, "u1,g1", "p1" + NAME_SEPARATOR
+ "p11", conf);
// kill the job by any user
final JobConf jobConf = miniMRCluster.createJobConf();
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting("u3",new String[]{"g3"});
Cluster cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException {
return new Cluster(jobConf);
}
});
JobID jobID = job.getStatus().getJobID();
//Ensure that the jobinprogress is initied before we issue a kill
//signal to the job.
JobInProgress jip = miniMRCluster.getJobTrackerRunner().getJobTracker()
.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
miniMRCluster.getJobTrackerRunner().getJobTracker().initJob(jip);
cluster.getJob(jobID).killJob();
assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
}
}