MAPREDUCE-1664. Job Acls affect Queue Acls.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@998003 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e4392fa..d28a041 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@
MAPREDUCE-1866. Removes deprecated class
org.apache.hadoop.streaming.UTF8ByteArrayUtils. (amareshwari)
+ MAPREDUCE-1664. Changes the behaviour of the combination of job-acls
+ when they function together with queue-acls. (Ravi Gummadi via vinodkv)
+
NEW FEATURES
MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.
diff --git a/conf/mapred-queues.xml.template b/conf/mapred-queues.xml.template
index 252f5af..ce6cd20 100644
--- a/conf/mapred-queues.xml.template
+++ b/conf/mapred-queues.xml.template
@@ -18,10 +18,10 @@
<!-- This is the template for queue configuration. The format supports nesting of
queues within queues - a feature called hierarchical queues. All queues are
defined within the 'queues' tag which is the top level element for this
- XML document.
- The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
- on queue operations such as submitting jobs, killing jobs etc. -->
-<queues aclsEnabled="false">
+ XML document. The queue acls configured here for different queues are
+ checked for authorization only if the configuration property
+ mapreduce.cluster.acls.enabled is set to true. -->
+<queues>
<!-- Configuration for a queue is specified by defining a 'queue' element. -->
<queue>
@@ -40,17 +40,37 @@
<!-- Specifies the ACLs to check for submitting jobs to this queue.
If set to '*', it allows all users to submit jobs to the queue.
+ If set to ' '(i.e. space), no user will be allowed to do this
+ operation. The default value for any queue acl is ' '.
For specifying a list of users and groups the format to use is
- user1,user2 group1,group2 -->
- <acl-submit-job>*</acl-submit-job>
+ user1,user2 group1,group2
- <!-- Specifies the ACLs to check for modifying jobs in this queue.
- Modifications include killing jobs, tasks of jobs or changing
+ It is only used if authorization is enabled in Map/Reduce by setting
+ the configuration property mapreduce.cluster.acls.enabled to true.
+
+ Irrespective of this ACL configuration, the user who started the
+ cluster and cluster administrators configured via
+ mapreduce.cluster.administrators can do this operation. -->
+ <acl-submit-job> </acl-submit-job>
+
+ <!-- Specifies the ACLs to check for viewing and modifying jobs in this
+ queue. Modifications include killing jobs, tasks of jobs or changing
priorities.
- If set to '*', it allows all users to submit jobs to the queue.
+ If set to '*', it allows all users to view, modify jobs of the queue.
+ If set to ' '(i.e. space), no user will be allowed to do this
+ operation.
For specifying a list of users and groups the format to use is
- user1,user2 group1,group2 -->
- <acl-administer-jobs>*</acl-administer-jobs>
+ user1,user2 group1,group2
+
+ It is only used if authorization is enabled in Map/Reduce by setting
+ the configuration property mapreduce.cluster.acls.enabled to true.
+
+ Irrespective of this ACL configuration, the user who started the
+ cluster and cluster administrators configured via
+ mapreduce.cluster.administrators can do the above operations on all
+ the jobs in all the queues. The job owner can do all the above
+ operations on his/her job irrespective of this ACL configuration. -->
+ <acl-administer-jobs> </acl-administer-jobs>
</queue>
<!-- Here is a sample of a hierarchical queue configuration
diff --git a/src/c++/task-controller/task-controller.c b/src/c++/task-controller/task-controller.c
index 117f0a7..eb0cbaa 100644
--- a/src/c++/task-controller/task-controller.c
+++ b/src/c++/task-controller/task-controller.c
@@ -227,6 +227,14 @@
}
/**
+ * Get the job ACLs file for the given job log dir.
+ */
+char *get_job_acls_file(const char *log_dir) {
+ return concatenate(JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN, "job_acls_file",
+ 1, log_dir);
+}
+
+/**
* Function to check if the passed tt_root is present in mapreduce.cluster.local.dir
* the task-controller is configured with.
*/
@@ -517,12 +525,20 @@
}
/**
- * Function to prepare the job log dir for the child. It gives the user
- * ownership of the job's log-dir to the user and group ownership to the
- * user running tasktracker.
- * * sudo chown user:mapred log-dir/userlogs/$jobid
- * * sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
- * * sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ * Function to prepare the job log dir(and job acls file in it) for the child.
+ * It gives the user ownership of the job's log-dir to the user and
+ * group ownership to the user running tasktracker(i.e. tt_user).
+ *
+ * * sudo chown user:mapred log-dir/userlogs/$jobid
+ * * if user is not $tt_user,
+ * * sudo chmod 2570 log-dir/userlogs/$jobid
+ * * else
+ * * sudo chmod 2770 log-dir/userlogs/$jobid
+ * * sudo chown user:mapred log-dir/userlogs/$jobid/job-acls.xml
+ * * if user is not $tt_user,
+ * * sudo chmod 2570 log-dir/userlogs/$jobid/job-acls.xml
+ * * else
+ * * sudo chmod 2770 log-dir/userlogs/$jobid/job-acls.xml
*/
int prepare_job_logs(const char *log_dir, const char *job_id,
mode_t permissions) {
@@ -559,6 +575,42 @@
free(job_log_dir);
return -1;
}
+
+ //set ownership and permissions for job_log_dir/job-acls.xml, if exists.
+ char *job_acls_file = get_job_acls_file(job_log_dir);
+ if (job_acls_file == NULL) {
+ fprintf(LOGFILE, "Couldn't get job acls file %s.\n", job_acls_file);
+ free(job_log_dir);
+ return -1;
+ }
+
+ struct stat filestat1;
+ if (stat(job_acls_file, &filestat1) != 0) {
+ if (errno == ENOENT) {
+#ifdef DEBUG
+ fprintf(LOGFILE, "job_acls_file %s doesn't exist. Not doing anything.\n",
+ job_acls_file);
+#endif
+ free(job_acls_file);
+ free(job_log_dir);
+ return 0;
+ } else {
+ // stat failed because of something else!
+ fprintf(LOGFILE, "Failed to stat the job_acls_file %s\n", job_acls_file);
+ free(job_acls_file);
+ free(job_log_dir);
+ return -1;
+ }
+ }
+
+ if (secure_single_path(job_acls_file, user_detail->pw_uid, tasktracker_gid,
+ permissions, 1) != 0) {
+ fprintf(LOGFILE, "Failed to secure the job acls file %s\n", job_acls_file);
+ free(job_acls_file);
+ free(job_log_dir);
+ return -1;
+ }
+ free(job_acls_file);
free(job_log_dir);
return 0;
}
diff --git a/src/c++/task-controller/task-controller.h b/src/c++/task-controller/task-controller.h
index 662b9ab..55d1221 100644
--- a/src/c++/task-controller/task-controller.h
+++ b/src/c++/task-controller/task-controller.h
@@ -90,6 +90,8 @@
#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
+#define JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN "%s/job-acls.xml"
+
#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
diff --git a/src/c++/task-controller/tests/test-task-controller.c b/src/c++/task-controller/tests/test-task-controller.c
index 23a5067..d6c2531 100644
--- a/src/c++/task-controller/tests/test-task-controller.c
+++ b/src/c++/task-controller/tests/test-task-controller.c
@@ -183,6 +183,19 @@
assert(ret == 0);
}
+void test_get_job_acls_file() {
+ char *job_acls_file = (char *) get_job_acls_file(
+ "/tmp/testing/userlogs/job_200906101234_0001");
+ printf("job acls file obtained is %s\n", job_acls_file);
+ int ret = 0;
+ if (strcmp(job_acls_file,
+ "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
+ ret = -1;
+ }
+ free(job_acls_file);
+ assert(ret == 0);
+}
+
void test_get_task_log_dir() {
char *logdir = (char *) get_task_log_dir("/tmp/testing",
"job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
@@ -219,6 +232,9 @@
printf("\nTesting get_job_log_dir()\n");
test_get_job_log_dir();
+ printf("\nTesting get_job_acls_file()\n");
+ test_get_job_acls_file();
+
printf("\nTesting get_task_log_dir()\n");
test_get_task_log_dir();
diff --git a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
index 99a5213..cf77cb7 100644
--- a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
+++ b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.QueueState;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -584,10 +585,8 @@
for (String queueName : queueNames) {
HashMap<String, AccessControlList> aclsMap
= new HashMap<String, AccessControlList>();
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String key = QueueManager.toFullPropertyName(
- queueName,
- oper.getAclName());
+ for (QueueACL qAcl : QueueACL.values()) {
+ String key = toFullPropertyName(queueName, qAcl.getAclName());
aclsMap.put(key, allEnabledAcl);
}
queues[i++] = new Queue(queueName, aclsMap, QueueState.RUNNING);
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
index 5f25800..b6f0c4b 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskReport;
@@ -410,6 +411,11 @@
}
@Override
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException,
InterruptedException {
throw new UnsupportedOperationException();
diff --git a/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml b/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
index 8d83877..33d73b6 100644
--- a/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
+++ b/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
@@ -1762,12 +1762,14 @@
<section>
<title>Job Authorization</title>
- <p>Job level authorization is enabled on the cluster, if the configuration
- <code>mapreduce.cluster.job-authorization-enabled</code> is set to
- true. When enabled, access control checks are done by the JobTracker
- and the TaskTracker before allowing users to view
- job details or to modify a job using Map/Reduce APIs,
- CLI or web user interfaces.</p>
+ <p>Job level authorization and queue level authorization are enabled
+ on the cluster, if the configuration
+ <code>mapreduce.cluster.acls.enabled</code> is set to
+ true. When enabled, access control checks are done by (a) the
+ JobTracker before allowing users to submit jobs to queues and
+ administering these jobs and (b) by the JobTracker and the TaskTracker
+ before allowing users to view job details or to modify a job using
+ MapReduce APIs, CLI or web user interfaces.</p>
<p>A job submitter can specify access control lists for viewing or
modifying a job via the configuration properties
@@ -1775,11 +1777,13 @@
<code>mapreduce.job.acl-modify-job</code> respectively. By default,
nobody is given access in these properties.</p>
- <p>However, irrespective of the ACLs configured, a job's owner,
- the superuser and the members of an admin configured supergroup
- (<code>mapreduce.cluster.permissions.supergroup</code>) always
- have access to view and modify a job.</p>
-
+ <p>However, irrespective of the job ACLs configured, a job's owner,
+ the user who started the cluster and members of an admin configured
+ supergroup (<code>mapreduce.cluster.permissions.supergroup</code>)
+ and queue administrators of the queue to which the job was submitted
+ to (<code>acl-administer-jobs</code>) always have access to view and
+ modify a job.</p>
+
<p> A job view ACL authorizes users against the configured
<code>mapreduce.job.acl-view-job</code> before returning possibly
sensitive information about a job, like: </p>
@@ -1801,10 +1805,13 @@
<li> killing/failing a task of a job </li>
<li> setting the priority of a job </li>
</ul>
- <p>These operations are also protected by the queue level ACL,
- "acl-administer-jobs", configured via mapred-queue-acls.xml. The caller
- will be authorized against both queue level ACLs and job level ACLs,
- depending on what is enabled.</p>
+ <p>These view and modify operations on jobs are also permitted by
+ the queue level ACL, "acl-administer-jobs", configured via
+ mapred-queue-acls.xml. The caller will be able to do the operation
+ if he/she is part of either queue admins ACL or job modification ACL
+ or the user who started the cluster or a member of an admin configured
+ supergroup (<code>mapreduce.cluster.permissions.supergroup</code>).
+ </p>
<p>The format of a job level ACL is the same as the format for a
queue level ACL as defined in the
diff --git a/src/java/mapred-default.xml b/src/java/mapred-default.xml
index 2db49b5..86eb85f 100644
--- a/src/java/mapred-default.xml
+++ b/src/java/mapred-default.xml
@@ -923,7 +923,7 @@
<name>mapreduce.job.queuename</name>
<value>default</value>
<description> Queue to which a job is submitted. This must match one of the
- queues defined in mapred.queue.names for the system. Also, the ACL setup
+ queues defined in mapred-queues.xml for the system. Also, the ACL setup
for the queue must allow the current user to submit a job to the queue.
Before specifying a queue, ensure that the system is configured with
the queue, and access is allowed for submitting jobs to the queue.
@@ -931,69 +931,85 @@
</property>
<property>
- <name>mapreduce.cluster.job-authorization-enabled</name>
+ <name>mapreduce.cluster.acls.enabled</name>
<value>false</value>
- <description> Boolean flag that specifies if job-level authorization checks
- should be enabled on the jobs submitted to the cluster. Job-level
- authorization is enabled if this flag is set to true or disabled otherwise.
- It is disabled by default. If enabled, access control checks are made by
- JobTracker and TaskTracker when requests are made by users for viewing the
- job-details (See mapreduce.job.acl-view-job) or for modifying the job
- (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
- console and web user interfaces.
+ <description> Specifies whether ACLs should be checked
+ for authorization of users for doing various queue and job level operations.
+ ACLs are disabled by default. If enabled, access control checks are made by
+ JobTracker and TaskTracker when requests are made by users for queue
+ operations like submit job to a queue and kill a job in the queue and job
+ operations like viewing the job-details (See mapreduce.job.acl-view-job)
+ or for modifying the job (See mapreduce.job.acl-modify-job) using
+ Map/Reduce APIs, RPCs or via the console and web user interfaces.
+ For enabling this flag(mapreduce.cluster.acls.enabled), this is to be set
+ to true in mapred-site.xml on JobTracker node and on all TaskTracker nodes.
</description>
</property>
<property>
<name>mapreduce.job.acl-modify-job</name>
- <value></value>
+ <value> </value>
<description> Job specific access-control list for 'modifying' the job. It
is only used if authorization is enabled in Map/Reduce by setting the
- configuration property mapreduce.cluster.job-authorization-enabled to true.
+ configuration property mapreduce.cluster.acls.enabled to true.
This specifies the list of users and/or groups who can do modification
operations on the job. For specifying a list of users and groups the
format to use is "user1,user2 group1,group". If set to '*', it allows all
- users/groups to modify this job. If set to '', it allows none. This
- configuration is used to guard all the modifications with respect to this
- job and takes care of all the following operations:
+ users/groups to modify this job. If set to ' '(i.e. space), it allows
+ none. This configuration is used to guard all the modifications with respect
+ to this job and takes care of all the following operations:
o killing this job
o killing a task of this job, failing a task of this job
o setting the priority of this job
Each of these operations are also protected by the per-queue level ACL
"acl-administer-jobs" configured via mapred-queues.xml. So a caller should
- have the authorization to satisfy both the queue-level ACL and the
+ have the authorization to satisfy either the queue-level ACL or the
job-level ACL.
- Irrespective of this ACL configuration, job-owner, superuser and members
- of supergroup configured on JobTracker via
- "mapreduce.cluster.permissions.supergroup",
- can do all the modification operations.
+ Irrespective of this ACL configuration, (a) job-owner, (b) the user who
+ started the cluster, (c) members of an admin configured supergroup
+ configured via mapreduce.cluster.permissions.supergroup and (d) queue
+ administrators of the queue to which this job was submitted to configured
+ via acl-administer-jobs for the specific queue in mapred-queues.xml can
+ do all the modification operations on a job.
- By default, nobody else besides job-owner, superuser/supergroup can
- perform modification operations on a job that they don't own.
+ By default, nobody else besides job-owner, the user who started the cluster,
+ members of supergroup and queue administrators can perform modification
+ operations on a job.
</description>
</property>
<property>
<name>mapreduce.job.acl-view-job</name>
- <value></value>
+ <value> </value>
<description> Job specific access-control list for 'viewing' the job. It is
only used if authorization is enabled in Map/Reduce by setting the
- configuration property mapreduce.cluster.job-authorization-enabled to true.
+ configuration property mapreduce.cluster.acls.enabled to true.
This specifies the list of users and/or groups who can view private details
about the job. For specifying a list of users and groups the
format to use is "user1,user2 group1,group". If set to '*', it allows all
- users/groups to modify this job. If set to '', it allows none. This
- configuration is used to guard some of the job-views and at present only
- protects APIs that can return possibly sensitive information of the
- job-owner like
+ users/groups to modify this job. If set to ' '(i.e. space), it allows
+ none. This configuration is used to guard some of the job-views and at
+ present only protects APIs that can return possibly sensitive information
+ of the job-owner like
o job-level counters
o task-level counters
o tasks' diagnostic information
o task-logs displayed on the TaskTracker web-UI and
o job.xml showed by the JobTracker's web-UI
- Every other piece information of jobs is still accessible by any other
- users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+ Every other piece of information of jobs is still accessible by any other
+ user, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+
+ Irrespective of this ACL configuration, (a) job-owner, (b) the user who
+ started the cluster, (c) members of an admin configured supergroup
+ configured via mapreduce.cluster.permissions.supergroup and (d) queue
+ administrators of the queue to which this job was submitted to configured
+ via acl-administer-jobs for the specific queue in mapred-queues.xml can
+ do all the view operations on a job.
+
+ By default, nobody else besides job-owner, the user who started the
+ cluster, memebers of supergroup and queue administrators can perform
+ view operations on a job.
</description>
</property>
diff --git a/src/java/mapred-queues-default.xml b/src/java/mapred-queues-default.xml
index 4a33f36..f222cd7 100644
--- a/src/java/mapred-queues-default.xml
+++ b/src/java/mapred-queues-default.xml
@@ -17,13 +17,13 @@
-->
<!-- This is the default mapred-queues.xml file that is loaded in the case
that the user does not have such a file on their classpath. -->
-<queues aclsEnabled="false">
+<queues>
<queue>
<name>default</name>
<properties>
</properties>
<state>running</state>
- <acl-submit-job>*</acl-submit-job>
- <acl-administer-jobs>*</acl-administer-jobs>
+ <acl-submit-job> </acl-submit-job>
+ <acl-administer-jobs> </acl-administer-jobs>
</queue>
</queues>
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/mapred/ACLsManager.java b/src/java/org/apache/hadoop/mapred/ACLsManager.java
new file mode 100644
index 0000000..21a5052
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapred/ACLsManager.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.AuditLogger.Constants;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Manages MapReduce cluster administrators and access checks for
+ * job level operations and queue level operations.
+ * Uses JobACLsManager for access checks of job level operations and
+ * QueueManager for queue operations.
+ */
+@InterfaceAudience.Private
+class ACLsManager {
+
+ // MROwner(user who started this mapreduce cluster)'s ugi
+ private final UserGroupInformation mrOwner;
+ // members of supergroup are mapreduce cluster administrators
+ private final String superGroup;
+
+ private final JobACLsManager jobACLsManager;
+ private final QueueManager queueManager;
+
+ private final boolean aclsEnabled;
+
+ ACLsManager(Configuration conf, JobACLsManager jobACLsManager,
+ QueueManager queueManager) throws IOException {
+
+ mrOwner = UserGroupInformation.getCurrentUser();
+ superGroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
+
+ aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+
+ this.jobACLsManager = jobACLsManager;
+ this.queueManager = queueManager;
+ }
+
+ UserGroupInformation getMROwner() {
+ return mrOwner;
+ }
+
+ String getSuperGroup() {
+ return superGroup;
+ }
+
+ JobACLsManager getJobACLsManager() {
+ return jobACLsManager;
+ }
+
+ /**
+ * Is the calling user an admin for the mapreduce cluster ?
+ * i.e. either cluster owner or member of the supergroup
+ * mapreduce.cluster.permissions.supergroup.
+ * @return true, if user is an admin
+ */
+ boolean isMRAdmin(UserGroupInformation callerUGI) {
+ if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
+ return true;
+ }
+ String[] groups = callerUGI.getGroupNames();
+ for(int i=0; i < groups.length; ++i) {
+ if (groups[i].equals(superGroup)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check the ACLs for a user doing the passed operation.
+ * <ul>
+ * <li>If ACLs are disabled, allow all users.</li>
+ * <li>Otherwise, if the operation is not a job operation(for eg.
+ * submit-job-to-queue), then allow only (a) clusterOwner(who started the
+ * cluster), (b) cluster administrators and (c) members of
+ * queue-submit-job-acl for the queue.</li>
+ * <li>If the operation is a job operation, then allow only (a) jobOwner,
+ * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+ * (d) members of queue admins acl for the queue and (e) members of job
+ * acl for the job operation</li>
+ * </ul>
+ *
+ * @param job the job on which operation is requested
+ * @param callerUGI the user who is requesting the operation
+ * @param operation the operation for which authorization is needed
+ * @throws AccessControlException
+ */
+ void checkAccess(JobInProgress job, UserGroupInformation callerUGI,
+ Operation operation) throws AccessControlException {
+
+ String queue = job.getProfile().getQueueName();
+ String jobId = job.getJobID().toString();
+ JobStatus jobStatus = job.getStatus();
+ String jobOwner = jobStatus.getUsername();
+ AccessControlList jobAcl =
+ jobStatus.getJobACLs().get(operation.jobACLNeeded);
+
+ checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
+ }
+
+ /**
+ * Check the ACLs for a user doing the passed job operation.
+ * <ul>
+ * <li>If ACLs are disabled, allow all users.</li>
+ * <li>Otherwise, allow only (a) jobOwner,
+ * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+ * (d) members of job acl for the jobOperation</li>
+ * </ul>
+ *
+ * @param jobStatus the status of the job
+ * @param callerUGI the user who is trying to perform the operation
+ * @param queue the job queue name
+ * @param operation the operation for which authorization is needed
+ */
+ void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+ String queue, Operation operation) throws AccessControlException {
+
+ String jobId = jobStatus.getJobID().toString();
+ String jobOwner = jobStatus.getUsername();
+ AccessControlList jobAcl =
+ jobStatus.getJobACLs().get(operation.jobACLNeeded);
+
+ // If acls are enabled, check if callerUGI is jobOwner, queue admin,
+ // cluster admin or part of job ACL
+ checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
+ }
+
+ /**
+ * Check the ACLs for a user doing the passed operation.
+ * <ul>
+ * <li>If ACLs are disabled, allow all users.</li>
+ * <li>Otherwise, if the operation is not a job operation(for eg.
+ * submit-job-to-queue), then allow only (a) clusterOwner(who started the
+ * cluster), (b) cluster administrators and (c) members of
+ * queue-submit-job-acl for the queue.</li>
+ * <li>If the operation is a job operation, then allow only (a) jobOwner,
+ * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+ * (d) members of queue admins acl for the queue and (e) members of job
+ * acl for the job operation</li>
+ * </ul>
+ *
+ * @param jobId the job id
+ * @param callerUGI the user who is trying to perform the operation
+ * @param queue the job queue name
+ * @param operation the operation for which authorization is needed
+ * @param jobOwner the user who submitted(or is submitting) this job
+ * @param jobAcl could be job-view-acl or job-modify-acl depending on the
+ * job operation.
+ */
+ void checkAccess(String jobId, UserGroupInformation callerUGI,
+ String queue, Operation operation, String jobOwner,
+ AccessControlList jobAcl) throws AccessControlException {
+
+ String user = callerUGI.getShortUserName();
+ String targetResource = jobId + " in queue " + queue;
+
+ if (!aclsEnabled) {
+ AuditLogger.logSuccess(user, operation.name(), targetResource);
+ return;
+ }
+
+ // Allow mapreduce cluster admins to do any queue operation and
+ // any job operation
+ if (isMRAdmin(callerUGI)) {
+ AuditLogger.logSuccess(user, operation.name(), targetResource);
+ return;
+ }
+
+ if (operation == Operation.SUBMIT_JOB) {
+ // This is strictly queue operation(not a job operation)
+ if (!queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI)) {
+ AuditLogger.logFailure(user, operation.name(),
+ queueManager.getQueueACL(queue, operation.qACLNeeded).toString(),
+ targetResource, Constants.UNAUTHORIZED_USER);
+
+ throw new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform "
+ + "operation " + operation.name() + " on queue " + queue
+ + ".\n Please run \"hadoop queue -showacls\" "
+ + "command to find the queues you have access to .");
+ } else {
+ AuditLogger.logSuccess(user, operation.name(), targetResource);
+ return;
+ }
+ }
+
+ // Check if callerUGI is queueAdmin(in some cases only), jobOwner or
+ // part of job-acl.
+
+ // queueManager and queue are null only when called from
+ // TaskTracker(i.e. from TaskLogServlet) for the operation VIEW_TASK_LOGS.
+ // Caller of this method takes care of checking if callerUGI is a
+ // queue administrator for that operation.
+ if (operation == Operation.VIEW_TASK_LOGS) {
+ if (jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+ jobOwner, jobAcl)) {
+ AuditLogger.logSuccess(user, operation.name(), targetResource);
+ return;
+ }
+ } else if (queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI) ||
+ jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+ jobOwner, jobAcl)) {
+ AuditLogger.logSuccess(user, operation.name(), targetResource);
+ return;
+ }
+
+ AuditLogger.logFailure(user, operation.name(), jobAcl.toString(),
+ targetResource, Constants.UNAUTHORIZED_USER);
+
+ throw new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform operation "
+ + operation.name() + " on " + jobId + " that is in the queue "
+ + queue);
+ }
+
+}
diff --git a/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java b/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
index dd34fac..10e4879 100644
--- a/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
+++ b/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.AccessControlException;
@@ -52,7 +51,7 @@
private FileSystem fs;
private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
- private JobACLsManager jobACLsManager = null;
+ private ACLsManager aclsManager;
public static final Log LOG =
LogFactory.getLog(CompletedJobStatusStore.class);
@@ -62,7 +61,8 @@
final static FsPermission JOB_STATUS_STORE_DIR_PERMISSION = FsPermission
.createImmutable((short) 0750); // rwxr-x--
- CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+
+ CompletedJobStatusStore(Configuration conf, ACLsManager aclsManager)
throws IOException {
active =
conf.getBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
@@ -104,7 +104,7 @@
deleteJobStatusDirs();
}
- this.jobACLsManager = aclsManager;
+ this.aclsManager = aclsManager;
LOG.info("Completed job store activated/configured with retain-time : "
+ retainTime + " , job-info-dir : " + jobInfoDir);
@@ -301,7 +301,7 @@
}
/**
- * This method retrieves Counters information from DFS stored using
+ * This method retrieves Counters information from file stored using
* store method.
*
* @param jobId the jobId for which Counters is queried
@@ -315,9 +315,13 @@
FSDataInputStream dataIn = getJobInfoFile(jobId);
if (dataIn != null) {
JobStatus jobStatus = readJobStatus(dataIn);
- jobACLsManager.checkAccess(jobStatus,
- UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
- readJobProfile(dataIn);
+ JobProfile profile = readJobProfile(dataIn);
+ String queue = profile.getQueueName();
+ // authorize the user for job view access
+ aclsManager.checkAccess(jobStatus,
+ UserGroupInformation.getCurrentUser(), queue,
+ Operation.VIEW_JOB_COUNTERS);
+
counters = readCounters(dataIn);
dataIn.close();
}
diff --git a/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java b/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
index c962884..1f05869 100644
--- a/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
+++ b/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.authorize.AccessControlList;
import static org.apache.hadoop.mapred.QueueManager.*;
@@ -46,7 +47,7 @@
return;
}
List<Queue> listq = createQueues(conf);
- this.setAclsEnabled(conf.getBoolean("mapred.acls.enabled", false));
+ this.setAclsEnabled(conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false));
root = new Queue();
root.setName("");
for (Queue q : listq) {
@@ -78,9 +79,8 @@
*/
private QueueState getQueueState(String name, Configuration conf) {
String stateVal = conf.get(
- QueueManager.toFullPropertyName(
- name,"state"),
- QueueState.RUNNING.getStateName());
+ toFullPropertyName(name, "state"),
+ QueueState.RUNNING.getStateName());
return QueueState.getState(stateVal);
}
@@ -105,21 +105,11 @@
queues = conf.getStrings(MAPRED_QUEUE_NAMES_KEY);
}
- // check if the acls flag is defined
- String aclsEnable = conf.get("mapred.acls.enabled");
- if (aclsEnable != null) {
- LOG.warn(
- "Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
- "hadoop-site.xml is deprecated. Configure " +
- "queue hierarchy in " +
- QUEUE_CONF_FILE_NAME);
- }
-
// check if acls are defined
if (queues != null) {
for (String queue : queues) {
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String key = toFullPropertyName(queue, oper.getAclName());
+ for (QueueACL qAcl : QueueACL.values()) {
+ String key = toFullPropertyName(queue, qAcl.getAclName());
String aclString = conf.get(key);
if (aclString != null) {
LOG.warn(
@@ -149,8 +139,8 @@
Configuration conf) {
HashMap<String, AccessControlList> map =
new HashMap<String, AccessControlList>();
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String aclKey = toFullPropertyName(name, oper.getAclName());
+ for (QueueACL qAcl : QueueACL.values()) {
+ String aclKey = toFullPropertyName(name, qAcl.getAclName());
map.put(
aclKey, new AccessControlList(
conf.get(
diff --git a/src/java/org/apache/hadoop/mapred/JSPUtil.java b/src/java/org/apache/hadoop/mapred/JSPUtil.java
index 45730a5..183df63 100644
--- a/src/java/org/apache/hadoop/mapred/JSPUtil.java
+++ b/src/java/org/apache/hadoop/mapred/JSPUtil.java
@@ -95,14 +95,14 @@
* and decide if view should be allowed or not. Job will be null if
* the job with given jobid doesnot exist at the JobTracker.
*/
- public static JobWithViewAccessCheck checkAccessAndGetJob(JobTracker jt,
+ public static JobWithViewAccessCheck checkAccessAndGetJob(final JobTracker jt,
JobID jobid, HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
final JobInProgress job = jt.getJob(jobid);
JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
String user = request.getRemoteUser();
- if (user != null && job != null && jt.isJobLevelAuthorizationEnabled()) {
+ if (user != null && job != null && jt.areACLsEnabled()) {
final UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(user);
try {
@@ -110,7 +110,8 @@
public Void run() throws IOException, ServletException {
// checks job view permission
- job.checkAccess(ugi, JobACL.VIEW_JOB);
+ jt.getACLsManager().checkAccess(job, ugi,
+ Operation.VIEW_JOB_DETAILS);
return null;
}
});
@@ -475,10 +476,10 @@
* Read a job-history log file and construct the corresponding {@link JobInfo}
* . Also cache the {@link JobInfo} for quick serving further requests.
*
- * @param logFile
- * @param fs
- * @param jobTracker
- * @return JobInfo
+ * @param logFile the job history log file
+ * @param fs job tracker file system
+ * @param jobTracker the job tracker
+ * @return JobInfo job's basic information
* @throws IOException
*/
static JobInfo getJobInfo(Path logFile, FileSystem fs,
@@ -506,20 +507,18 @@
}
}
- jobTracker.getJobACLsManager().checkAccess(JobID.forName(jobid),
- UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB,
- jobInfo.getUsername(), jobInfo.getJobACLs().get(JobACL.VIEW_JOB));
return jobInfo;
}
/**
- * Check the access for users to view job-history pages.
+ * Check the access for users to view job-history pages and return
+ * {@link JobInfo}.
*
- * @param request
- * @param response
- * @param jobTracker
- * @param fs
- * @param logFile
+ * @param request http servlet request
+ * @param response http servlet response
+ * @param jobTracker the job tracker
+ * @param fs job tracker file system
+ * @param logFile the job history log file
* @return the job if authorization is disabled or if the authorization checks
* pass. Otherwise return null.
* @throws IOException
@@ -533,19 +532,24 @@
String jobid =
JobHistory.getJobIDFromHistoryFilePath(logFile).toString();
String user = request.getRemoteUser();
- JobInfo job = null;
+
+ JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);
if (user != null) {
+ // authorize user for job-view access
try {
final UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(user);
- job =
- ugi.doAs(new PrivilegedExceptionAction<JobHistoryParser.JobInfo>() {
- public JobInfo run() throws IOException {
- // checks job view permission
- JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);
- return jobInfo;
- }
- });
+
+ AccessControlList viewJobAcl = jobInfo.getJobACLs().get(JobACL.VIEW_JOB);
+ if (viewJobAcl == null) {
+ // may be older job history file of earlier unsecure cluster
+ viewJobAcl = new AccessControlList("*");
+ }
+
+ jobTracker.getACLsManager().checkAccess(jobid, ugi,
+ jobInfo.getJobQueueName(), Operation.VIEW_JOB_DETAILS,
+ jobInfo.getUsername(), viewJobAcl);
+
} catch (AccessControlException e) {
String errMsg =
String.format(
@@ -557,11 +561,9 @@
JSPUtil.setErrorAndForward(errMsg, request, response);
return null;
}
- } else {
- // no authorization needed
- job = JSPUtil.getJobInfo(logFile, fs, jobTracker);
- }
- return job;
+ } // else { no authorization needed }
+
+ return jobInfo;
}
/**
@@ -574,7 +576,7 @@
static void printJobACLs(JobTracker tracker,
Map<JobACL, AccessControlList> jobAcls, JspWriter out)
throws IOException {
- if (tracker.isJobLevelAuthorizationEnabled()) {
+ if (tracker.areACLsEnabled()) {
// Display job-view-acls and job-modify-acls configured for this job
out.print("<b>Job-ACLs:</b><br>");
for (JobACL aclName : JobACL.values()) {
@@ -587,5 +589,9 @@
}
}
}
+ else {
+ out.print("<b>Job-ACLs: " + new AccessControlList("*").toString()
+ + "</b><br>");
+ }
}
}
diff --git a/src/java/org/apache/hadoop/mapred/JobACLsManager.java b/src/java/org/apache/hadoop/mapred/JobACLsManager.java
index 48dfe43..14265e4 100644
--- a/src/java/org/apache/hadoop/mapred/JobACLsManager.java
+++ b/src/java/org/apache/hadoop/mapred/JobACLsManager.java
@@ -20,26 +20,25 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@InterfaceAudience.Private
-public abstract class JobACLsManager {
+class JobACLsManager {
- static final Log LOG = LogFactory.getLog(JobACLsManager.class);
+ JobConf conf;
- public static final String UNAUTHORIZED_JOB_ACCESS_ERROR =
- " is not authorized for performing the operation ";
- protected abstract boolean isJobLevelAuthorizationEnabled();
+ public JobACLsManager(JobConf conf) {
+ this.conf = conf;
+ }
- protected abstract boolean isSuperUserOrSuperGroup(
- UserGroupInformation callerUGI);
+ boolean areACLsEnabled() {
+ return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+ }
/**
* Construct the jobACLs from the configuration so that they can be kept in
@@ -54,7 +53,7 @@
new HashMap<JobACL, AccessControlList>();
// Don't construct anything if authorization is disabled.
- if (!isJobLevelAuthorizationEnabled()) {
+ if (!areACLsEnabled()) {
return acls;
}
@@ -64,7 +63,7 @@
if (aclConfigured == null) {
// If ACLs are not configured at all, we grant no access to anyone. So
// jobOwner and superuser/supergroup _only_ can do 'stuff'
- aclConfigured = "";
+ aclConfigured = " ";
}
acls.put(aclName, new AccessControlList(aclConfigured));
}
@@ -72,69 +71,34 @@
}
/**
- * If authorization is enabled, checks whether the user (in the callerUGI) is
- * authorized to perform the operation specified by 'jobOperation' on the job.
+ * If authorization is enabled, checks whether the user (in the callerUGI)
+ * is authorized to perform the operation specified by 'jobOperation' on
+ * the job by checking if the user is jobOwner or part of job ACL for the
+ * specific job operation.
* <ul>
* <li>The owner of the job can do any operation on the job</li>
- * <li>The superuser/supergroup is always permitted to do operations on any
- * job.</li>
* <li>For all other users/groups job-acls are checked</li>
* </ul>
- *
- * @param jobStatus
- * @param callerUGI
- * @param jobOperation
- */
- void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
- JobACL jobOperation) throws AccessControlException {
-
- JobID jobId = jobStatus.getJobID();
- String jobOwner = jobStatus.getUsername();
- AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
- checkAccess(jobId, callerUGI, jobOperation, jobOwner, acl);
- }
-
- /**
- * If authorization is enabled, checks whether the user (in the callerUGI) is
- * authorized to perform the operation specified by 'jobOperation' on the job.
- * <ul>
- * <li>The owner of the job can do any operation on the job</li>
- * <li>The superuser/supergroup is always permitted to do operations on any
- * job.</li>
- * <li>For all other users/groups job-acls are checked</li>
- * </ul>
- * @param jobId
* @param callerUGI
* @param jobOperation
* @param jobOwner
* @param jobACL
* @throws AccessControlException
*/
- void checkAccess(JobID jobId, UserGroupInformation callerUGI,
- JobACL jobOperation, String jobOwner, AccessControlList jobACL)
- throws AccessControlException {
+ boolean checkAccess(UserGroupInformation callerUGI,
+ JobACL jobOperation, String jobOwner, AccessControlList jobACL) {
String user = callerUGI.getShortUserName();
- if (!isJobLevelAuthorizationEnabled()) {
- return;
+ if (!areACLsEnabled()) {
+ return true;
}
- // Allow uperusers/supergroups
- // Allow Job-owner as job's owner is always part of all the ACLs
- if (callerUGI.getShortUserName().equals(jobOwner)
- || isSuperUserOrSuperGroup(callerUGI)
+ // Allow Job-owner for any operation on the job
+ if (user.equals(jobOwner)
|| jobACL.isUserAllowed(callerUGI)) {
- AuditLogger.logSuccess(user, jobOperation.name(), jobId.toString());
- return;
+ return true;
}
- AuditLogger.logFailure(user, jobOperation.name(), null, jobId.toString(),
- Constants.UNAUTHORIZED_USER);
- throw new AccessControlException(callerUGI
- + UNAUTHORIZED_JOB_ACCESS_ERROR
- + jobOperation.toString() + " on " + jobId + ". "
- + jobOperation.toString()
- + " Access control list configured for this job : "
- + jobACL.toString());
+ return false;
}
}
diff --git a/src/java/org/apache/hadoop/mapred/JobInProgress.java b/src/java/org/apache/hadoop/mapred/JobInProgress.java
index 3d99044..edee0f4 100644
--- a/src/java/org/apache/hadoop/mapred/JobInProgress.java
+++ b/src/java/org/apache/hadoop/mapred/JobInProgress.java
@@ -23,7 +23,6 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashMap;
@@ -43,7 +42,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,7 +49,6 @@
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
@@ -79,13 +76,9 @@
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -432,7 +425,7 @@
String desc = "The username " + conf.getUser() + " obtained from the "
+ "conf doesn't match the username " + user + " the user "
+ "authenticated as";
- AuditLogger.logFailure(user, Queue.QueueOperation.SUBMIT_JOB.name(),
+ AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
conf.getUser(), jobId.toString(), desc);
throw new IOException(desc);
}
@@ -724,12 +717,13 @@
String username = conf.getUser();
if (username == null) { username = ""; }
String jobname = conf.getJobName();
- if (jobname == null) { jobname = ""; }
+ String jobQueueName = conf.getQueueName();
+
setUpLocalizedJobConf(conf, jobId);
jobHistory.setupEventWriter(jobId, conf);
JobSubmittedEvent jse =
new JobSubmittedEvent(jobId, jobname, username, this.startTime,
- jobFile.toString(), status.getJobACLs());
+ jobFile.toString(), status.getJobACLs(), jobQueueName);
jobHistory.logEvent(jse, jobId);
}
@@ -742,25 +736,6 @@
}
/**
- * If authorization is enabled on the JobTracker, checks whether the user (in
- * the callerUGI) is authorized to perform the operation specify by
- * 'jobOperation' on the job.
- * <ul>
- * <li>The owner of the job can do any operation on the job</li>
- * <li>The superuser/supergroup of the JobTracker is always permitted to do
- * operations on any job.</li>
- * <li>For all other users/groups job-acls are checked</li>
- * </ul>
- *
- * @param callerUGI
- * @param jobOperation
- */
- void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
- throws AccessControlException {
- jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
- }
-
- /**
* If the number of taks is greater than the configured value
* throw an exception that will fail job initialization
*/
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 915985d..851fba5 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -77,7 +77,6 @@
import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -105,6 +104,7 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -1289,7 +1289,6 @@
"expireLaunchingTasks");
final CompletedJobStatusStore completedJobStatusStore;
- private JobTrackerJobACLsManager jobACLsManager;
Thread completedJobsStoreThread = null;
final RecoveryManager recoveryManager;
@@ -1330,8 +1329,8 @@
FileSystem fs = null;
Path systemDir = null;
JobConf conf;
- private final UserGroupInformation mrOwner;
- private final String supergroup;
+
+ private final ACLsManager aclsManager;
long limitMaxMemForMapTasks;
long limitMaxMemForReduceTasks;
@@ -1347,7 +1346,7 @@
retiredJobsCacheSize = 0;
infoServer = null;
queueManager = null;
- supergroup = null;
+ aclsManager = null;
taskScheduler = null;
trackerIdentifier = null;
recoveryManager = null;
@@ -1355,7 +1354,6 @@
completedJobStatusStore = null;
tasktrackerExpiryInterval = 0;
myInstrumentation = new JobTrackerMetricsInst(this, new JobConf());
- mrOwner = null;
secretManager = null;
localFs = null;
}
@@ -1382,11 +1380,7 @@
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
localMachine);
- mrOwner = UserGroupInformation.getCurrentUser();
-
- supergroup = conf.get(MR_SUPERGROUP, "supergroup");
- LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName()
- + " and supergroup as " + supergroup);
+
clock = newClock;
long secretKeyInterval =
@@ -1443,9 +1437,15 @@
this.hostsReader = new HostsFileReader(conf.get(JTConfig.JT_HOSTS_FILENAME, ""),
conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
- Configuration queuesConf = new Configuration(this.conf);
- queueManager = new QueueManager(queuesConf);
+ Configuration clusterConf = new Configuration(this.conf);
+ queueManager = new QueueManager(clusterConf);
+ aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+ LOG.info("Starting jobtracker with owner as " +
+ getMROwner().getShortUserName() + " and supergroup as " +
+ getSuperGroup());
+
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass
= conf.getClass(JT_TASK_SCHEDULER,
@@ -1526,7 +1526,7 @@
try {
// if we haven't contacted the namenode go ahead and do it
if (fs == null) {
- fs = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return FileSystem.get(conf);
}});
@@ -1538,9 +1538,10 @@
}
try {
FileStatus systemDirStatus = fs.getFileStatus(systemDir);
- if (!systemDirStatus.getOwner().equals(mrOwner.getShortUserName())) {
+ if (!systemDirStatus.getOwner().equals(
+ getMROwner().getShortUserName())) {
throw new AccessControlException("The systemdir " + systemDir +
- " is not owned by " + mrOwner.getShortUserName());
+ " is not owned by " + getMROwner().getShortUserName());
}
if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
LOG.warn("Incorrect permissions on " + systemDir +
@@ -1607,7 +1608,8 @@
final String historyLogDir =
jobHistory.getCompletedJobHistoryLocation().toString();
infoServer.setAttribute("historyLogDir", historyLogDir);
- FileSystem historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ FileSystem historyFS = getMROwner().doAs(
+ new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return new Path(historyLogDir).getFileSystem(conf);
}
@@ -1620,10 +1622,8 @@
this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS,
NetworkTopology.DEFAULT_HOST_LEVEL);
- // Initialize the jobACLSManager
- jobACLsManager = new JobTrackerJobACLsManager(this);
//initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+ completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
}
private static SimpleDateFormat getDateFormat() {
@@ -3056,7 +3056,7 @@
throw ioe;
}
try {
- checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
+ aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
} catch (AccessControlException ace) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ace);
@@ -3109,19 +3109,8 @@
LOG.info("Job " + jobId + " added successfully for user '"
+ job.getJobConf().getUser() + "' to queue '"
+ job.getJobConf().getQueueName() + "'");
- AuditLogger.logSuccess(job.getUser(),
- Queue.QueueOperation.SUBMIT_JOB.name(), jobId.toString());
- return job.getStatus();
- }
- /**
- * Is job-level authorization enabled on the JT?
- *
- * @return
- */
- boolean isJobLevelAuthorizationEnabled() {
- return conf.getBoolean(
- MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+ return job.getStatus();
}
/**
@@ -3143,45 +3132,12 @@
}
/**
- * Check the ACLs for a user doing the passed queue-operation and the passed
- * job operation.
- * <ul>
- * <li>Superuser/supergroup can do any operation on the job</li>
- * <li>For any other user/group, the configured ACLs for the corresponding
- * queue and the job are checked.</li>
- * </ul>
- *
- * @param job
- * @param callerUGI
- * @param oper
- * @param jobOperation
- * @throws AccessControlException
- * @throws IOException
+ * Are ACLs for authorization checks enabled on the MR cluster ?
+ *
+ * @return true if ACLs(job acls and queue acls) are enabled
*/
- private void checkAccess(JobInProgress job,
- UserGroupInformation callerUGI, Queue.QueueOperation oper,
- JobACL jobOperation) throws AccessControlException {
-
- // get the queue and verify the queue access
- String queue = job.getProfile().getQueueName();
- if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
- throw new AccessControlException("User "
- + callerUGI.getShortUserName()
- + " cannot perform "
- + "operation " + oper + " on queue " + queue +
- ".\n Please run \"hadoop queue -showacls\" " +
- "command to find the queues you have access" +
- " to .");
- }
-
- // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
- // job itself isn't created by that time.
- if (jobOperation == null) {
- return;
- }
-
- // check the access to the job
- job.checkAccess(callerUGI, jobOperation);
+ boolean areACLsEnabled() {
+ return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
}
/**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3295,8 +3251,8 @@
}
// check both queue-level and job-level access
- checkAccess(job, UserGroupInformation.getCurrentUser(),
- Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+ aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Operation.KILL_JOB);
killJob(job);
}
@@ -3530,8 +3486,8 @@
JobInProgress job = jobs.get(oldJobID);
if (job != null) {
// check the job-access
- job.checkAccess(UserGroupInformation.getCurrentUser(),
- JobACL.VIEW_JOB);
+ aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Operation.VIEW_JOB_COUNTERS);
if (!isJobInited(job)) {
return EMPTY_COUNTERS;
@@ -3703,8 +3659,8 @@
// Check authorization
JobInProgress job = jobs.get(jobid);
if (job != null) {
- job.checkAccess(UserGroupInformation.getCurrentUser(),
- JobACL.VIEW_JOB);
+ aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Operation.VIEW_JOB_DETAILS);
} else {
return EMPTY_TASK_REPORTS;
}
@@ -3779,8 +3735,8 @@
if (job != null) {
// check the access to the job.
- job.checkAccess(UserGroupInformation.getCurrentUser(),
- JobACL.VIEW_JOB);
+ aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Operation.VIEW_JOB_DETAILS);
if (isJobInited(job)) {
TaskInProgress tip = job.getTaskInProgress(tipId);
@@ -3851,8 +3807,9 @@
if (tip != null) {
// check both queue-level and job-level access
- checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
- Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+ aclsManager.checkAccess(tip.getJob(),
+ UserGroupInformation.getCurrentUser(),
+ shouldFail ? Operation.FAIL_TASK : Operation.KILL_TASK);
return tip.killTask(taskid, shouldFail);
}
@@ -3899,8 +3856,9 @@
*/
public String getStagingAreaDir() throws IOException {
try {
- final String user = UserGroupInformation.getCurrentUser().getShortUserName();
- return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+ final String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ return getMROwner().doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
@@ -3923,6 +3881,18 @@
return jobHistory.getCompletedJobHistoryLocation().toString();
}
+ /**
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
+ */
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ AccessControlList acl =
+ queueManager.getQueueACL(queueName, QueueACL.ADMINISTER_JOBS);
+ if (acl == null) {
+ acl = new AccessControlList(" ");
+ }
+ return acl;
+ }
+
///////////////////////////////////////////////////////////////
// JobTracker methods
///////////////////////////////////////////////////////////////
@@ -3954,8 +3924,8 @@
if (job != null) {
// check both queue-level and job-level access
- checkAccess(job, UserGroupInformation.getCurrentUser(),
- Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+ aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Operation.SET_JOB_PRIORITY);
synchronized (taskScheduler) {
JobStatus oldStatus = (JobStatus)job.getStatus().clone();
@@ -4136,24 +4106,6 @@
removeMarkedTasks(trackerName);
}
}
-
- /**
- * Is the calling user a super user? Or part of the supergroup?
- * @return true, if it is a super user
- */
- static boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI,
- UserGroupInformation superUser, String superGroup) {
- if (superUser.getShortUserName().equals(callerUGI.getShortUserName())) {
- return true;
- }
- String[] groups = callerUGI.getGroupNames();
- for(int i=0; i < groups.length; ++i) {
- if (groups[i].equals(superGroup)) {
- return true;
- }
- }
- return false;
- }
/**
* Rereads the config to get hosts and exclude list file names.
@@ -4162,10 +4114,9 @@
public synchronized void refreshNodes() throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
// check access
- if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser(), mrOwner,
- supergroup)) {
+ if (!isMRAdmin(UserGroupInformation.getCurrentUser())) {
AuditLogger.logFailure(user, Constants.REFRESH_NODES,
- mrOwner + " " + supergroup, Constants.JOBTRACKER,
+ getMROwner() + " " + getSuperGroup(), Constants.JOBTRACKER,
Constants.UNAUTHORIZED_USER);
throw new AccessControlException(user +
" is not authorized to refresh nodes.");
@@ -4175,15 +4126,19 @@
// call the actual api
refreshHosts();
}
-
+
UserGroupInformation getMROwner() {
- return mrOwner;
+ return aclsManager.getMROwner();
}
String getSuperGroup() {
- return supergroup;
+ return aclsManager.getSuperGroup();
}
-
+
+ boolean isMRAdmin(UserGroupInformation ugi) {
+ return aclsManager.isMRAdmin(ugi);
+ }
+
private synchronized void refreshHosts() throws IOException {
// Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
// Update the file names and refresh internal includes and excludes list
@@ -4260,8 +4215,8 @@
if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
dumpConfiguration(new PrintWriter(System.out));
System.out.println();
- QueueManager.dumpConfiguration(new PrintWriter(System.out),
- new JobConf());
+ Configuration conf = new Configuration();
+ QueueManager.dumpConfiguration(new PrintWriter(System.out), conf);
}
else {
System.out.println("usage: JobTracker [-dumpConfiguration]");
@@ -4612,16 +4567,20 @@
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
localMachine);
- mrOwner = UserGroupInformation.getCurrentUser();
- supergroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
secretManager = null;
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
conf.get("mapred.hosts.exclude", ""));
// queue manager
- Configuration queuesConf = new Configuration(this.conf);
- queueManager = new QueueManager(queuesConf);
+ Configuration clusterConf = new Configuration(this.conf);
+ queueManager = new QueueManager(clusterConf);
+
+ aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+ LOG.info("Starting jobtracker with owner as " +
+ getMROwner().getShortUserName() + " and supergroup as " +
+ getSuperGroup());
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass
@@ -4646,7 +4605,7 @@
jobHistory = new JobHistory();
final JobTracker jtFinal = this;
try {
- historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ historyFS = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
jobHistory.init(jtFinal, conf, jtFinal.localMachine, jtFinal.startTime);
jobHistory.initDone(conf, fs);
@@ -4690,11 +4649,8 @@
this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
NetworkTopology.DEFAULT_HOST_LEVEL);
- // Initialize the jobACLSManager
- jobACLsManager = new JobTrackerJobACLsManager(this);
-
//initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+ completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
}
/**
@@ -4756,9 +4712,13 @@
}
JobACLsManager getJobACLsManager() {
- return jobACLsManager;
+ return aclsManager.getJobACLsManager();
}
-
+
+ ACLsManager getACLsManager() {
+ return aclsManager;
+ }
+
/**
*
* @return true if delegation token operation is allowed
diff --git a/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java b/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
deleted file mode 100644
index 9dac26a..0000000
--- a/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Manages the job ACLs and the operations on them at JobTracker.
- *
- */
-@InterfaceAudience.Private
-public class JobTrackerJobACLsManager extends JobACLsManager {
-
- static final Log LOG = LogFactory.getLog(JobTrackerJobACLsManager.class);
-
- private JobTracker jobTracker = null;
-
- public JobTrackerJobACLsManager(JobTracker tracker) {
- jobTracker = tracker;
- }
-
- @Override
- protected boolean isJobLevelAuthorizationEnabled() {
- return jobTracker.isJobLevelAuthorizationEnabled();
- }
-
- @Override
- protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
- return JobTracker.isSuperUserOrSuperGroup(callerUGI,
- jobTracker.getMROwner(), jobTracker.getSuperGroup());
- }
-
-}
diff --git a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 1338d07..dad802d 100644
--- a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -51,7 +51,6 @@
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -59,6 +58,7 @@
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
/** Implements MapReduce locally, in-process, for debugging. */
@@ -698,6 +698,13 @@
}
/**
+ * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
+ */
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return new AccessControlList(" ");// no queue admins for local job runner
+ }
+
+ /**
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
diff --git a/src/java/org/apache/hadoop/mapred/Operation.java b/src/java/org/apache/hadoop/mapred/Operation.java
new file mode 100644
index 0000000..4df9be4
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapred/Operation.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.JobACL;
+
+/**
+ * Generic operation that maps to the dependent set of ACLs that drive the
+ * authorization of the operation.
+ */
+@InterfaceAudience.Private
+public enum Operation {
+ VIEW_JOB_COUNTERS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+ VIEW_JOB_DETAILS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+ VIEW_TASK_LOGS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+ KILL_JOB(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+ FAIL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+ KILL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+ SET_JOB_PRIORITY(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+ SUBMIT_JOB(QueueACL.SUBMIT_JOB, null);
+
+ public QueueACL qACLNeeded;
+ public JobACL jobACLNeeded;
+
+ Operation(QueueACL qACL, JobACL jobACL) {
+ this.qACLNeeded = qACL;
+ this.jobACLNeeded = jobACL;
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/mapred/Queue.java b/src/java/org/apache/hadoop/mapred/Queue.java
index 85f33db..8f4f3e8 100644
--- a/src/java/org/apache/hadoop/mapred/Queue.java
+++ b/src/java/org/apache/hadoop/mapred/Queue.java
@@ -42,8 +42,7 @@
private String name = null;
//acls list
- private Map<String,
- org.apache.hadoop.security.authorize.AccessControlList> acls;
+ private Map<String, AccessControlList> acls;
//Queue State
private QueueState state = QueueState.RUNNING;
@@ -59,34 +58,6 @@
private Properties props;
/**
- * Enum representing an operation that can be performed on a queue.
- */
- static enum QueueOperation {
- SUBMIT_JOB ("acl-submit-job", false),
- ADMINISTER_JOBS ("acl-administer-jobs", true);
- // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
- // users in UI
- // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
- // configuring queues.
-
- private final String aclName;
- private final boolean jobOwnerAllowed;
-
- QueueOperation(String aclName, boolean jobOwnerAllowed) {
- this.aclName = aclName;
- this.jobOwnerAllowed = jobOwnerAllowed;
- }
-
- final String getAclName() {
- return aclName;
- }
-
- final boolean isJobOwnerAllowed() {
- return jobOwnerAllowed;
- }
- }
-
- /**
* Default constructor is useful in creating the hierarchy.
* The variables are populated using mutator methods.
*/
@@ -133,7 +104,7 @@
* @return Map containing the operations that can be performed and
* who can perform the operations.
*/
- Map<String, org.apache.hadoop.security.authorize.AccessControlList> getAcls() {
+ Map<String, AccessControlList> getAcls() {
return acls;
}
diff --git a/src/java/org/apache/hadoop/mapred/QueueACL.java b/src/java/org/apache/hadoop/mapred/QueueACL.java
new file mode 100644
index 0000000..c659c34
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapred/QueueACL.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Enum representing an AccessControlList that drives set of operations that
+ * can be performed on a queue.
+ */
+@InterfaceAudience.Private
+public enum QueueACL {
+ SUBMIT_JOB ("acl-submit-job"),
+ ADMINISTER_JOBS ("acl-administer-jobs");
+ // Currently this ACL acl-administer-jobs is checked for the operations
+ // FAIL_TASK, KILL_TASK, KILL_JOB, SET_JOB_PRIORITY and VIEW_JOB.
+
+ // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
+ // users in UI
+ // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
+ // configuring queues.
+
+ private final String aclName;
+
+ QueueACL(String aclName) {
+ this.aclName = aclName;
+ }
+
+ public final String getAclName() {
+ return aclName;
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java b/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
index f8011e9..e0738d4 100644
--- a/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
+++ b/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
@@ -20,7 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.Queue.QueueOperation;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.authorize.AccessControlList;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
@@ -72,11 +72,17 @@
static final String QUEUE_TAG = "queue";
static final String ACL_SUBMIT_JOB_TAG = "acl-submit-job";
static final String ACL_ADMINISTER_JOB_TAG = "acl-administer-jobs";
+
+ // The value read from queues config file for this tag is not used at all.
+ // To enable queue acls and job acls, mapreduce.cluster.acls.enabled is
+ // to be set in mapred-site.xml
+ @Deprecated
+ static final String ACLS_ENABLED_TAG = "aclsEnabled";
+
static final String PROPERTIES_TAG = "properties";
static final String STATE_TAG = "state";
static final String QUEUE_NAME_TAG = "name";
static final String QUEUES_TAG = "queues";
- static final String ACLS_ENABLED_TAG = "aclsEnabled";
static final String PROPERTY_TAG = "property";
static final String KEY_TAG = "key";
static final String VALUE_TAG = "value";
@@ -88,7 +94,8 @@
}
- QueueConfigurationParser(String confFile) {
+ QueueConfigurationParser(String confFile, boolean areAclsEnabled) {
+ aclsEnabled = areAclsEnabled;
File file = new File(confFile).getAbsoluteFile();
if (!file.exists()) {
throw new RuntimeException("Configuration file not found at " +
@@ -105,7 +112,8 @@
}
}
- QueueConfigurationParser(InputStream xmlInput) {
+ QueueConfigurationParser(InputStream xmlInput, boolean areAclsEnabled) {
+ aclsEnabled = areAclsEnabled;
loadFrom(xmlInput);
}
@@ -184,8 +192,14 @@
NamedNodeMap nmp = queuesNode.getAttributes();
Node acls = nmp.getNamedItem(ACLS_ENABLED_TAG);
- if (acls != null && acls.getTextContent().equalsIgnoreCase("true")) {
- setAclsEnabled(true);
+ if (acls != null) {
+ LOG.warn("Configuring " + ACLS_ENABLED_TAG + " flag in " +
+ QueueManager.QUEUE_CONF_FILE_NAME + " is not valid. " +
+ "This tag is ignored. Configure " +
+ MRConfig.MR_ACLS_ENABLED + " in mapred-site.xml. See the " +
+ " documentation of " + MRConfig.MR_ACLS_ENABLED +
+ ", which is used for enabling job level authorization and " +
+ " queue level authorization.");
}
NodeList props = queuesNode.getChildNodes();
@@ -269,9 +283,9 @@
name += nameValue;
newQueue.setName(name);
submitKey = toFullPropertyName(name,
- Queue.QueueOperation.SUBMIT_JOB.getAclName());
+ QueueACL.SUBMIT_JOB.getAclName());
adminKey = toFullPropertyName(name,
- Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
+ QueueACL.ADMINISTER_JOBS.getAclName());
}
if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
@@ -299,11 +313,11 @@
}
if (!acls.containsKey(submitKey)) {
- acls.put(submitKey, new AccessControlList("*"));
+ acls.put(submitKey, new AccessControlList(" "));
}
if (!acls.containsKey(adminKey)) {
- acls.put(adminKey, new AccessControlList("*"));
+ acls.put(adminKey, new AccessControlList(" "));
}
//Set acls
diff --git a/src/java/org/apache/hadoop/mapred/QueueManager.java b/src/java/org/apache/hadoop/mapred/QueueManager.java
index 0b75916..326d627 100644
--- a/src/java/org/apache/hadoop/mapred/QueueManager.java
+++ b/src/java/org/apache/hadoop/mapred/QueueManager.java
@@ -20,11 +20,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -39,7 +39,6 @@
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.List;
@@ -82,24 +81,25 @@
* in mapred-site.xml. However, when configured in the latter, there is
* no support for hierarchical queues.
*/
-
-class QueueManager {
+@InterfaceAudience.Private
+public class QueueManager {
private static final Log LOG = LogFactory.getLog(QueueManager.class);
// Map of a queue name and Queue object
private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
private Map<String, Queue> allQueues = new HashMap<String, Queue>();
- static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+ public static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml";
- // Prefix in configuration for queue related keys
- static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
- = "mapred.queue.";
- //Resource in which queue acls are configured.
+ //Prefix in configuration for queue related keys
+ static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = "mapred.queue.";
+ //Resource in which queue acls are configured.
private Queue root = null;
- private boolean isAclEnabled = false;
+
+ // represents if job and queue acls are enabled on the mapreduce cluster
+ private boolean areAclsEnabled = false;
/**
* Factory method to create an appropriate instance of a queue
@@ -117,7 +117,7 @@
* @return Queue configuration parser
*/
static QueueConfigurationParser getQueueConfigurationParser(
- Configuration conf, boolean reloadConf) {
+ Configuration conf, boolean reloadConf, boolean areAclsEnabled) {
if (conf != null && conf.get(
DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
if (reloadConf) {
@@ -136,7 +136,8 @@
InputStream stream = null;
try {
stream = xmlInUrl.openStream();
- return new QueueConfigurationParser(new BufferedInputStream(stream));
+ return new QueueConfigurationParser(new BufferedInputStream(stream),
+ areAclsEnabled);
} catch (IOException ioe) {
throw new RuntimeException("Couldn't open queue configuration at " +
xmlInUrl, ioe);
@@ -146,8 +147,13 @@
}
}
- public QueueManager() {
- initialize(getQueueConfigurationParser(null, false));
+ QueueManager() {// acls are disabled
+ this(false);
+ }
+
+ QueueManager(boolean areAclsEnabled) {
+ this.areAclsEnabled = areAclsEnabled;
+ initialize(getQueueConfigurationParser(null, false, areAclsEnabled));
}
/**
@@ -159,10 +165,11 @@
* is found in mapred-site.xml, it will then look for site configuration
* in mapred-queues.xml supporting hierarchical queues.
*
- * @param conf Configuration object where queue configuration is specified.
+ * @param clusterConf mapreduce cluster configuration
*/
- public QueueManager(Configuration conf) {
- initialize(getQueueConfigurationParser(conf, false));
+ public QueueManager(Configuration clusterConf) {
+ areAclsEnabled = clusterConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+ initialize(getQueueConfigurationParser(clusterConf, false, areAclsEnabled));
}
/**
@@ -174,8 +181,10 @@
*
* @param confFile File where the queue configuration is found.
*/
- QueueManager(String confFile) {
- QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+ QueueManager(String confFile, boolean areAclsEnabled) {
+ this.areAclsEnabled = areAclsEnabled;
+ QueueConfigurationParser cp =
+ new QueueConfigurationParser(confFile, areAclsEnabled);
initialize(cp);
}
@@ -196,10 +205,8 @@
allQueues.putAll(leafQueues);
LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
- this.isAclEnabled = cp.isAclsEnabled();
}
-
/**
* Return the set of leaf level queues configured in the system to
* which jobs are submitted.
@@ -215,52 +222,22 @@
}
/**
- * Return true if the given {@link Queue.QueueOperation} can be
- * performed by the specified user on the given queue.
+ * Return true if the given user is part of the ACL for the given
+ * {@link QueueACL} name for the given queue.
* <p/>
* An operation is allowed if all users are provided access for this
* operation, or if either the user or any of the groups specified is
* provided access.
*
* @param queueName Queue on which the operation needs to be performed.
- * @param oper The operation to perform
+ * @param qACL The queue ACL name to be checked
* @param ugi The user and groups who wish to perform the operation.
- * @return true if the operation is allowed, false otherwise.
+ * @return true if the operation is allowed, false otherwise.
*/
public synchronized boolean hasAccess(
- String queueName,
- Queue.QueueOperation oper,
- UserGroupInformation ugi) {
- return hasAccess(queueName, null, oper, ugi);
- }
+ String queueName, QueueACL qACL, UserGroupInformation ugi) {
- /**
- * Return true if the given {@link Queue.QueueOperation} can be
- * performed by the specified user on the specified job in the given queue.
- * <p/>
- * An operation is allowed either if the owner of the job is the user
- * performing the task, all users are provided access for this
- * operation, or if either the user or any of the groups specified is
- * provided access.
- * <p/>
- * If the {@link Queue.QueueOperation} is not job specific then the
- * job parameter is ignored.
- *
- * @param queueName Queue on which the operation needs to be performed.
- * @param job The {@link JobInProgress} on which the operation is being
- * performed.
- * @param oper The operation to perform
- * @param ugi The user and groups who wish to perform the operation.
- * @return true if the operation is allowed, false otherwise.
- */
- public synchronized boolean hasAccess(
- String queueName, JobInProgress job,
- Queue.QueueOperation oper,
- UserGroupInformation ugi) {
-
Queue q = leafQueues.get(queueName);
- String user = ugi.getShortUserName();
- String jobId = job == null ? "-" : job.getJobID().toString();
if (q == null) {
LOG.info("Queue " + queueName + " is not present");
@@ -272,50 +249,23 @@
return false;
}
- if (!isAclsEnabled()) {
+ if (!areAclsEnabled()) {
return true;
}
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "checking access for : "
- + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
- }
-
- if (oper.isJobOwnerAllowed()) {
- if (job != null
- && job.getJobConf().getUser().equals(ugi.getShortUserName())) {
- AuditLogger.logSuccess(user, oper.name(), queueName);
- return true;
- }
+ LOG.debug("Checking access for the acl " + toFullPropertyName(queueName,
+ qACL.getAclName()) + " for user " + ugi.getShortUserName());
}
AccessControlList acl = q.getAcls().get(
- toFullPropertyName(
- queueName,
- oper.getAclName()));
+ toFullPropertyName(queueName, qACL.getAclName()));
if (acl == null) {
- AuditLogger.logFailure(user, oper.name(), null, queueName,
- "Disabled queue ACLs, job : " + jobId);
return false;
}
- // Check the ACL list
- boolean allowed = acl.isAllAllowed();
- if (!allowed) {
- // Check the allowed users list
- if (acl.isUserAllowed(ugi)) {
- allowed = true;
- }
- }
- if (allowed) {
- AuditLogger.logSuccess(user, oper.name(), queueName);
- } else {
- AuditLogger.logFailure(user, oper.name(), null, queueName,
- Constants.UNAUTHORIZED_USER + ", job : " + jobId);
- }
-
- return allowed;
+ // Check if user is part of the ACL
+ return acl.isUserAllowed(ugi);
}
/**
@@ -392,7 +342,7 @@
// Create a new configuration parser using the passed conf object.
QueueConfigurationParser cp =
- QueueManager.getQueueConfigurationParser(conf, true);
+ getQueueConfigurationParser(conf, true, areAclsEnabled);
/*
* (1) Validate the refresh of properties owned by QueueManager. As of now,
@@ -441,7 +391,8 @@
LOG.info("Queue configuration is refreshed successfully.");
}
- static final String toFullPropertyName(
+ // this method is for internal use only
+ public static final String toFullPropertyName(
String queue,
String property) {
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
@@ -512,16 +463,16 @@
//List of all QueueAclsInfo objects , this list is returned
ArrayList<QueueAclsInfo> queueAclsInfolist =
new ArrayList<QueueAclsInfo>();
- Queue.QueueOperation[] operations = Queue.QueueOperation.values();
+ QueueACL[] qAcls = QueueACL.values();
for (String queueName : leafQueues.keySet()) {
QueueAclsInfo queueAclsInfo = null;
ArrayList<String> operationsAllowed = null;
- for (Queue.QueueOperation operation : operations) {
- if (hasAccess(queueName, operation, ugi)) {
+ for (QueueACL qAcl : qAcls) {
+ if (hasAccess(queueName, qAcl, ugi)) {
if (operationsAllowed == null) {
operationsAllowed = new ArrayList<String>();
}
- operationsAllowed.add(operation.getAclName());
+ operationsAllowed.add(qAcl.getAclName());
}
}
if (operationsAllowed != null) {
@@ -534,8 +485,7 @@
}
}
return queueAclsInfolist.toArray(
- new QueueAclsInfo[
- queueAclsInfolist.size()]);
+ new QueueAclsInfo[queueAclsInfolist.size()]);
}
/**
@@ -611,8 +561,8 @@
*
* @return true if ACLs are enabled.
*/
- boolean isAclsEnabled() {
- return isAclEnabled;
+ boolean areAclsEnabled() {
+ return areAclsEnabled;
}
/**
@@ -623,7 +573,30 @@
Queue getRoot() {
return root;
}
-
+
+ /**
+ * Returns the specific queue ACL for the given queue.
+ * Returns null if the given queue does not exist or the acl is not
+ * configured for that queue.
+ * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
+ * ACL with all users.
+ */
+ synchronized AccessControlList getQueueACL(String queueName,
+ QueueACL qACL) {
+ if (areAclsEnabled) {
+ Queue q = leafQueues.get(queueName);
+ if (q != null) {
+ return q.getAcls().get(toFullPropertyName(
+ queueName, qACL.getAclName()));
+ }
+ else {
+ LOG.warn("Queue " + queueName + " is not present.");
+ return null;
+ }
+ }
+ return new AccessControlList("*");
+ }
+
/**
* Dumps the configuration of hierarchy of queues
* @param out the writer object to which dump is written
@@ -646,17 +619,21 @@
MAPRED_QUEUE_NAMES_KEY) != null) {
return;
}
+
JsonFactory dumpFactory = new JsonFactory();
JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
QueueConfigurationParser parser;
+ boolean aclsEnabled = false;
+ if (conf != null) {
+ aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+ }
if (configFile != null && !"".equals(configFile)) {
- parser = new QueueConfigurationParser(configFile);
+ parser = new QueueConfigurationParser(configFile, aclsEnabled);
}
else {
- parser = QueueManager.getQueueConfigurationParser(null, false);
+ parser = getQueueConfigurationParser(null, false, aclsEnabled);
}
dumpGenerator.writeStartObject();
- dumpGenerator.writeBooleanField("acls_enabled", parser.isAclsEnabled());
dumpGenerator.writeFieldName("queues");
dumpGenerator.writeStartArray();
dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
@@ -684,11 +661,11 @@
AccessControlList administerJobsList = null;
if (queue.getAcls() != null) {
submitJobList =
- queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
- Queue.QueueOperation.SUBMIT_JOB.getAclName()));
+ queue.getAcls().get(toFullPropertyName(queue.getName(),
+ QueueACL.SUBMIT_JOB.getAclName()));
administerJobsList =
- queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
- Queue.QueueOperation.ADMINISTER_JOBS.getAclName()));
+ queue.getAcls().get(toFullPropertyName(queue.getName(),
+ QueueACL.ADMINISTER_JOBS.getAclName()));
}
String aclsSubmitJobValue = " ";
if (submitJobList != null ) {
diff --git a/src/java/org/apache/hadoop/mapred/TaskLogServlet.java b/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
index 77d769e..17da588 100644
--- a/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
+++ b/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
@@ -30,9 +30,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -121,42 +121,54 @@
* users and groups specified in configuration using
* mapreduce.job.acl-view-job to view job.
*/
- private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+ private void checkAccessForTaskLogs(JobConf conf, String user, String jobId,
TaskTracker tracker) throws AccessControlException {
- if (!tracker.isJobLevelAuthorizationEnabled()) {
+ if (!tracker.areACLsEnabled()) {
return;
}
- // buiild job view acl by reading from conf
+ // buiild job view ACL by reading from conf
AccessControlList jobViewACL = tracker.getJobACLsManager().
constructJobACLs(conf).get(JobACL.VIEW_JOB);
+ // read job queue name from conf
+ String queue = conf.getQueueName();
+
+ // build queue admins ACL by reading from conf
+ AccessControlList queueAdminsACL = new AccessControlList(
+ conf.get(toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName()), " "));
+
String jobOwner = conf.get(JobContext.USER_NAME);
UserGroupInformation callerUGI =
UserGroupInformation.createRemoteUser(user);
- tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
- jobOwner, jobViewACL);
+ // check if user is queue admin or cluster admin or jobOwner or member of
+ // job-view-acl
+ if (!queueAdminsACL.isUserAllowed(callerUGI)) {
+ tracker.getACLsManager().checkAccess(jobId, callerUGI, queue,
+ Operation.VIEW_TASK_LOGS, jobOwner, jobViewACL);
+ }
}
/**
- * Builds a Configuration object by reading the xml file.
+ * Builds a JobConf object by reading the job-acls.xml file.
* This doesn't load the default resources.
*
- * Returns null if job-acls.xml is not there in userlogs/$jobid/attempt-dir on
+ * Returns null if job-acls.xml is not there in userlogs/$jobid on
* local file system. This can happen when we restart the cluster with job
* level authorization enabled(but was disabled on earlier cluster) and
* viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
* cluster).
*/
- static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
- boolean isCleanup) {
+ static JobConf getConfFromJobACLsFile(JobID jobId) {
Path jobAclsFilePath = new Path(
- TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
- Configuration conf = null;
+ TaskLog.getJobDir(jobId).toString(),
+ TaskTracker.jobACLsFile);
+ JobConf conf = null;
if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
- conf = new Configuration(false);
+ conf = new JobConf(false);
conf.addResource(jobAclsFilePath);
}
return conf;
@@ -228,15 +240,15 @@
ServletContext context = getServletContext();
TaskTracker taskTracker = (TaskTracker) context.getAttribute(
"task.tracker");
+ JobID jobId = attemptId.getJobID();
+
// get jobACLConf from ACLs file
- Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+ JobConf jobACLConf = getConfFromJobACLsFile(jobId);
// Ignore authorization if job-acls.xml is not found
if (jobACLConf != null) {
- JobID jobId = attemptId.getJobID();
-
try {
- checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
- taskTracker);
+ checkAccessForTaskLogs(jobACLConf, user,
+ jobId.toString(), taskTracker);
} catch (AccessControlException e) {
String errMsg = "User " + user + " failed to view tasklogs of job " +
jobId + "!\n\n" + e.getMessage();
diff --git a/src/java/org/apache/hadoop/mapred/TaskRunner.java b/src/java/org/apache/hadoop/mapred/TaskRunner.java
index 7435f85..73a4326 100644
--- a/src/java/org/apache/hadoop/mapred/TaskRunner.java
+++ b/src/java/org/apache/hadoop/mapred/TaskRunner.java
@@ -19,7 +19,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -34,7 +33,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -77,8 +75,6 @@
protected JobConf conf;
JvmManager jvmManager;
- static String jobACLsFile = "job-acl.xml";
-
public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
JobConf conf) {
this.tip = tip;
@@ -286,34 +282,10 @@
Localizer.PermissionsHandler.setPermissions(logDir,
Localizer.PermissionsHandler.sevenZeroZero);
}
- // write job acls into a file to know the access for task logs
- writeJobACLs(logDir);
+
return logFiles;
}
- // Writes job-view-acls and user name into an xml file
- private void writeJobACLs(File logDir) throws IOException {
- File aclFile = new File(logDir, TaskRunner.jobACLsFile);
- Configuration aclConf = new Configuration(false);
-
- // set the job view acls in aclConf
- String jobViewACLs = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB);
- if (jobViewACLs != null) {
- aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACLs);
- }
- // set jobOwner as mapreduce.job.user.name in aclConf
- String jobOwner = conf.getUser();
- aclConf.set(MRJobConfig.USER_NAME, jobOwner);
- FileOutputStream out = new FileOutputStream(aclFile);
- try {
- aclConf.writeXml(out);
- } finally {
- out.close();
- }
- Localizer.PermissionsHandler.setPermissions(aclFile,
- Localizer.PermissionsHandler.sevenZeroZero);
- }
-
/**
* Write the child's configuration to the disk and set it in configuration so
* that the child can pick it up from there.
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index c515bc7..0b125e2 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -20,6 +20,7 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
@@ -78,6 +79,7 @@
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
@@ -160,6 +162,11 @@
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
+ // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
+ // each job at job localization time. This will be used by TaskLogServlet for
+ // authorizing viewing of task logs of that job
+ static String jobACLsFile = "job-acls.xml";
+
volatile boolean running = true;
private LocalDirAllocator localDirAllocator;
@@ -249,9 +256,7 @@
private int maxReduceSlots;
private int failures;
- // MROwner's ugi
- private UserGroupInformation mrOwner;
- private String supergroup;
+ private ACLsManager aclsManager;
// Performance-related config knob to send an out-of-band heartbeat
// on task completion
@@ -275,9 +280,6 @@
private long reservedPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
- // Manages job acls of jobs in TaskTracker
- private TaskTrackerJobACLsManager jobACLsManager;
-
/**
* the minimum interval between jobtracker polls
*/
@@ -590,13 +592,11 @@
* close().
*/
synchronized void initialize() throws IOException, InterruptedException {
- UserGroupInformation.setConfiguration(fConf);
- SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
- mrOwner = UserGroupInformation.getCurrentUser();
- supergroup = fConf.get(MRConfig.MR_SUPERGROUP, "supergroup");
- LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
- + " and supergroup as " + supergroup);
+ aclsManager = new ACLsManager(fConf, new JobACLsManager(fConf), null);
+ LOG.info("Starting tasktracker with owner as " +
+ getMROwner().getShortUserName() + " and supergroup as " +
+ getSuperGroup());
localFs = FileSystem.getLocal(fConf);
// use configured nameserver & interface to get local hostname
@@ -687,7 +687,8 @@
this.distributedCacheManager.startCleanupThread();
this.jobClient = (InterTrackerProtocol)
- mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
+ UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
return RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
@@ -735,19 +736,22 @@
}
UserGroupInformation getMROwner() {
- return mrOwner;
+ return aclsManager.getMROwner();
}
String getSuperGroup() {
- return supergroup;
+ return aclsManager.getSuperGroup();
}
-
+
+ boolean isMRAdmin(UserGroupInformation ugi) {
+ return aclsManager.isMRAdmin(ugi);
+ }
+
/**
- * Is job level authorization enabled on the TT ?
+ * Are ACLs for authorization checks enabled on the MR cluster ?
*/
- boolean isJobLevelAuthorizationEnabled() {
- return fConf.getBoolean(
- MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+ boolean areACLsEnabled() {
+ return fConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
}
public static Class<?>[] getInstrumentationClasses(Configuration conf) {
@@ -998,7 +1002,7 @@
JobConf localJobConf = localizeJobFiles(t, rjob);
// initialize job log directory
- initializeJobLogDir(jobId);
+ initializeJobLogDir(jobId, localJobConf);
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -1098,12 +1102,64 @@
return localJobConf;
}
- // create job userlog dir
- void initializeJobLogDir(JobID jobId) {
+ // Create job userlog dir.
+ // Create job acls file in job log dir, if needed.
+ void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+ throws IOException {
// remove it from tasklog cleanup thread first,
// it might be added there because of tasktracker reinit or restart
taskLogCleanupThread.unmarkJobFromLogDeletion(jobId);
localizer.initializeJobLogDir(jobId);
+
+ if (areACLsEnabled()) {
+ // Create job-acls.xml file in job userlog dir and write the needed
+ // info for authorization of users for viewing task logs of this job.
+ writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
+ }
+ }
+
+ /**
+ * Creates job-acls.xml under the given directory logDir and writes
+ * job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+ * file.
+ * queue name is the queue to which the job was submitted to.
+ * queue-admins-acl is the queue admins ACL of the queue to which this
+ * job was submitted to.
+ * @param conf job configuration
+ * @param logDir job userlog dir
+ * @throws IOException
+ */
+ private static void writeJobACLs(JobConf conf, File logDir)
+ throws IOException {
+ File aclFile = new File(logDir, jobACLsFile);
+ JobConf aclConf = new JobConf(false);
+
+ // set the job view acl in aclConf
+ String jobViewACL = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
+ aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACL);
+
+ // set the job queue name in aclConf
+ String queue = conf.getQueueName();
+ aclConf.setQueueName(queue);
+
+ // set the queue admins acl in aclConf
+ String qACLName = toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName());
+ String queueAdminsACL = conf.get(qACLName, " ");
+ aclConf.set(qACLName, queueAdminsACL);
+
+ // set jobOwner as user.name in aclConf
+ String jobOwner = conf.getUser();
+ aclConf.set("user.name", jobOwner);
+
+ FileOutputStream out = new FileOutputStream(aclFile);
+ try {
+ aclConf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ Localizer.PermissionsHandler.setPermissions(aclFile,
+ Localizer.PermissionsHandler.sevenZeroZero);
}
/**
@@ -1318,8 +1374,10 @@
checkJettyPort(httpPort);
// create task log cleanup thread
setTaskLogCleanupThread(new UserLogCleaner(fConf));
- // Initialize the jobACLSManager
- jobACLsManager = new TaskTrackerJobACLsManager(this);
+
+ UserGroupInformation.setConfiguration(fConf);
+ SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
+
initialize();
}
@@ -4043,7 +4101,11 @@
return localJobTokenFileStr;
}
- TaskTrackerJobACLsManager getJobACLsManager() {
- return jobACLsManager;
+ JobACLsManager getJobACLsManager() {
+ return aclsManager.getJobACLsManager();
+ }
+
+ ACLsManager getACLsManager() {
+ return aclsManager;
}
}
diff --git a/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java b/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
deleted file mode 100644
index f1f34e4..0000000
--- a/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Manages the job ACLs and the operations on them at TaskTracker.
- *
- */
-@InterfaceAudience.Private
-public class TaskTrackerJobACLsManager extends JobACLsManager {
-
- static final Log LOG = LogFactory.getLog(TaskTrackerJobACLsManager.class);
-
- private TaskTracker taskTracker = null;
-
- public TaskTrackerJobACLsManager(TaskTracker tracker) {
- taskTracker = tracker;
- }
-
- @Override
- protected boolean isJobLevelAuthorizationEnabled() {
- return taskTracker.isJobLevelAuthorizationEnabled();
- }
-
- @Override
- protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
- return JobTracker.isSuperUserOrSuperGroup(callerUGI,
- taskTracker.getMROwner(), taskTracker.getSuperGroup());
- }
-}
diff --git a/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index db2c5f8..bd317cf 100644
--- a/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -41,12 +41,15 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.QueueACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.codehaus.jackson.JsonParseException;
@@ -355,6 +358,14 @@
conf.setInt("mapred.map.tasks", maps);
LOG.info("number of splits:" + maps);
+ // write "queue admins of the queue to which job is being submitted"
+ // to job file.
+ String queue = conf.get(MRJobConfig.QUEUE_NAME,
+ JobConf.DEFAULT_QUEUE_NAME);
+ AccessControlList acl = submitClient.getQueueAdmins(queue);
+ conf.set(toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
+
// Write job file to submit dir
writeConf(conf, submitJobFile);
diff --git a/src/java/org/apache/hadoop/mapreduce/MRConfig.java b/src/java/org/apache/hadoop/mapreduce/MRConfig.java
index 545fefb..e3439aa 100644
--- a/src/java/org/apache/hadoop/mapreduce/MRConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/MRConfig.java
@@ -37,8 +37,7 @@
public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
public static final String REDUCEMEMORY_MB =
"mapreduce.cluster.reducememory.mb";
- public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG =
- "mapreduce.cluster.job-authorization-enabled";
+ public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
public static final String MR_SUPERGROUP =
"mapreduce.cluster.permissions.supergroup";
diff --git a/src/java/org/apache/hadoop/mapreduce/QueueInfo.java b/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
index 992c8fc..366bc18 100644
--- a/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
+++ b/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
@@ -60,7 +60,7 @@
*/
public QueueInfo() {
// make it running by default.
- this.queueState = queueState.RUNNING;
+ this.queueState = QueueState.RUNNING;
children = new ArrayList<QueueInfo>();
props = new Properties();
}
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr b/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
index b8d0c76..3d2bc49 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
@@ -77,8 +77,9 @@
{"name": "jobConfPath", "type": "string"},
{"name": "acls", "type": {"type": "map",
"values": "string"
- }
- }
+ }
+ },
+ {"name": "jobQueueName", "type": "string"}
]
},
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
index 2c25bae..1306224 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
@@ -314,6 +314,7 @@
info.submitTime = event.getSubmitTime();
info.jobConfPath = event.getJobConfPath();
info.jobACLs = event.getJobAcls();
+ info.jobQueueName = event.getJobQueueName();
}
/**
@@ -325,6 +326,7 @@
JobID jobid;
String username;
String jobname;
+ String jobQueueName;
String jobConfPath;
long launchTime;
int totalMaps;
@@ -349,7 +351,7 @@
submitTime = launchTime = finishTime = -1;
totalMaps = totalReduces = failedMaps = failedReduces = 0;
finishedMaps = finishedReduces = 0;
- username = jobname = jobConfPath = "";
+ username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
}
@@ -358,6 +360,7 @@
public void printAll() {
System.out.println("JOBNAME: " + jobname);
System.out.println("USERNAME: " + username);
+ System.out.println("JOB_QUEUE_NAME: " + jobQueueName);
System.out.println("SUBMIT_TIME" + submitTime);
System.out.println("LAUNCH_TIME: " + launchTime);
System.out.println("JOB_STATUS: " + jobStatus);
@@ -383,6 +386,8 @@
public String getUsername() { return username; }
/** Get the job name */
public String getJobname() { return jobname; }
+ /** Get the job queue name */
+ public String getJobQueueName() { return jobQueueName; }
/** Get the path for the job configuration file */
public String getJobConfPath() { return jobConfPath; }
/** Get the job launch time */
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
index c0aecb4..b1785e0 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
@@ -40,18 +40,6 @@
private JobSubmitted datum = new JobSubmitted();
/**
- * @deprecated Use
- * {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
- * instead.
- */
- @Deprecated
- public JobSubmittedEvent(JobID id, String jobName, String userName,
- long submitTime, String jobConfPath) {
- this(id, jobName, userName, submitTime, jobConfPath,
- new HashMap<JobACL, AccessControlList>());
- }
-
- /**
* Create an event to record job submission
* @param id The job Id of the job
* @param jobName Name of the job
@@ -59,10 +47,11 @@
* @param submitTime Time of submission
* @param jobConfPath Path of the Job Configuration file
* @param jobACLs The configured acls for the job.
+ * @param jobQueueName The job-queue to which this job was submitted to
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
- Map<JobACL, AccessControlList> jobACLs) {
+ Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
datum.jobid = new Utf8(id.toString());
datum.jobName = new Utf8(jobName);
datum.userName = new Utf8(userName);
@@ -74,6 +63,9 @@
entry.getValue().getAclString()));
}
datum.acls = jobAcls;
+ if (jobQueueName != null) {
+ datum.jobQueueName = new Utf8(jobQueueName);
+ }
}
JobSubmittedEvent() {}
@@ -87,6 +79,13 @@
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
/** Get the Job name */
public String getJobName() { return datum.jobName.toString(); }
+ /** Get the Job queue name */
+ public String getJobQueueName() {
+ if (datum.jobQueueName != null) {
+ return datum.jobQueueName.toString();
+ }
+ return null;
+ }
/** Get the user name */
public String getUserName() { return datum.userName.toString(); }
/** Get the submit time */
diff --git a/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
index 82b13bf..700919b 100644
--- a/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
@@ -108,8 +109,10 @@
* Version 32: Added delegation tokens (add, renew, cancel)
* Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
* Version 34: Modified submitJob to use Credentials instead of TokenStorage.
+ * Version 35: Added the method getQueueAdmins(queueName) as part of
+ * MAPREDUCE-1664.
*/
- public static final long versionID = 34L;
+ public static final long versionID = 35L;
/**
* Allocate a name for the job.
@@ -144,6 +147,17 @@
public long getTaskTrackerExpiryInterval() throws IOException,
InterruptedException;
+
+ /**
+ * Get the administrators of the given job-queue.
+ * This method is for hadoop internal use only.
+ * @param queueName
+ * @return Queue administrators ACL for the queue to which job is
+ * submitted to
+ * @throws IOException
+ */
+ public AccessControlList getQueueAdmins(String queueName) throws IOException;
+
/**
* Kill the indicated job
*/
diff --git a/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java b/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
index eb8b44a..fb20212 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
@@ -54,6 +54,9 @@
new String[] {MRConfig.MAPMEMORY_MB});
Configuration.addDeprecation("mapred.cluster.reduce.memory.mb",
new String[] {MRConfig.REDUCEMEMORY_MB});
+ Configuration.addDeprecation("mapred.acls.enabled",
+ new String[] {MRConfig.MR_ACLS_ENABLED});
+
Configuration.addDeprecation("mapred.cluster.max.map.memory.mb",
new String[] {JTConfig.JT_MAX_MAPMEMORY_MB});
Configuration.addDeprecation("mapred.cluster.max.reduce.memory.mb",
diff --git a/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java b/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
index c76cf3d..dee6f57 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
@@ -22,7 +22,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.QueueState;
@@ -31,15 +30,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import static org.apache.hadoop.mapred.Queue.*;
-import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
-
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
import javax.xml.transform.Transformer;
@@ -53,13 +43,18 @@
import java.util.Set;
import java.io.File;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
//@Private
public class QueueManagerTestUtils {
- final static String CONFIG = new File("test-mapred-queues.xml")
- .getAbsolutePath();
+ /**
+ * Queue-configuration file for tests that start a cluster and wish to modify
+ * the queue configuration. This file is always in the unit tests classpath,
+ * so QueueManager started through JobTracker will automatically pick this up.
+ */
+ public static final String QUEUES_CONFIG_FILE_PATH = new File(System
+ .getProperty("test.build.extraconf", "build/test/extraconf"),
+ QueueManager.QUEUE_CONF_FILE_NAME).getAbsolutePath();
+
private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
/**
@@ -76,7 +71,7 @@
}
public static void createSimpleDocument(Document doc) throws Exception {
- Element queues = createQueuesNode(doc, "true");
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -106,8 +101,8 @@
queues.appendChild(p1);
}
- static void createSimpleDocumentWithAcls(Document doc, String aclsEnabled) {
- Element queues = createQueuesNode(doc, aclsEnabled);
+ static void createSimpleDocumentWithAcls(Document doc) {
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -150,8 +145,56 @@
queues.appendChild(p1);
}
+ /**
+ * Creates all given queues as 1st level queues(no nesting)
+ * @param doc the queues config document
+ * @param queueNames the queues to be added to the queues config document
+ * @param submitAcls acl-submit-job acls for each of the queues
+ * @param adminsAcls acl-administer-jobs acls for each of the queues
+ * @throws Exception
+ */
+ public static void createSimpleDocument(Document doc, String[] queueNames,
+ String[] submitAcls, String[] adminsAcls) throws Exception {
+
+ Element queues = createQueuesNode(doc);
+
+ // Create all queues as 1st level queues(no nesting)
+ for (int i = 0; i < queueNames.length; i++) {
+ Element q = createQueue(doc, queueNames[i]);
+
+ q.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+ q.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, submitAcls[i]));
+ q.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, adminsAcls[i]));
+ queues.appendChild(q);
+ }
+ }
+
+ /**
+ * Creates queues configuration file with given queues at 1st level(i.e.
+ * no nesting of queues) and with the given queue acls.
+ * @param queueNames queue names which are to be configured
+ * @param submitAclStrings acl-submit-job acls for each of the queues
+ * @param adminsAclStrings acl-administer-jobs acls for each of the queues
+ * @return Configuration the queues configuration
+ * @throws Exception
+ */
+ public static void createQueuesConfigFile(String[] queueNames,
+ String[] submitAclStrings, String[] adminsAclStrings)
+ throws Exception {
+ if (queueNames.length > submitAclStrings.length ||
+ queueNames.length > adminsAclStrings.length) {
+ LOG.error("Number of queues is more than acls given.");
+ return;
+ }
+ Document doc = createDocument();
+ createSimpleDocument(doc, queueNames, submitAclStrings, adminsAclStrings);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ }
+
public static void refreshSimpleDocument(Document doc) throws Exception {
- Element queues = createQueuesNode(doc, "true");
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -189,10 +232,9 @@
* @param enable
* @return the created element.
*/
- public static Element createQueuesNode(Document doc, String enable) {
+ public static Element createQueuesNode(Document doc) {
Element queues = doc.createElement("queues");
doc.appendChild(queues);
- queues.setAttribute("aclsEnabled", enable);
return queues;
}
@@ -240,9 +282,12 @@
return propsElement;
}
- public static void checkForConfigFile() {
- if (new File(CONFIG).exists()) {
- new File(CONFIG).delete();
+ /**
+ * Delete queues configuration file if exists
+ */
+ public static void deleteQueuesConfigFile() {
+ if (new File(QUEUES_CONFIG_FILE_PATH).exists()) {
+ new File(QUEUES_CONFIG_FILE_PATH).delete();
}
}
@@ -257,7 +302,7 @@
public static void writeQueueConfigurationFile(String filePath,
JobQueueInfo[] rootQueues) throws Exception {
Document doc = createDocument();
- Element queueElements = createQueuesNode(doc, String.valueOf(true));
+ Element queueElements = createQueuesNode(doc);
for (JobQueueInfo rootQ : rootQueues) {
queueElements.appendChild(QueueConfigurationParser.getQueueElement(doc,
rootQ));
@@ -265,27 +310,6 @@
writeToFile(doc, filePath);
}
- static class QueueManagerConfigurationClassLoader extends ClassLoader {
- @Override
- public URL getResource(String name) {
- if (!name.equals(QueueManager.QUEUE_CONF_FILE_NAME)) {
- return super.getResource(name);
- } else {
- File resourceFile = new File(CONFIG);
- if (!resourceFile.exists()) {
- throw new IllegalStateException(
- "Queue Manager configuration file not found");
- }
- try {
- return resourceFile.toURL();
- } catch (MalformedURLException e) {
- LOG.fatal("Unable to form URL for the resource file : ");
- }
- return super.getResource(name);
- }
- }
- }
-
static Job submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime,
final long reduceSleepTime, boolean shouldComplete, String userInfo,
String queueName, Configuration clientConf) throws IOException,
@@ -329,12 +353,4 @@
}
static MiniMRCluster miniMRCluster;
-
- static void setUpCluster(Configuration conf) throws IOException {
- JobConf jobConf = new JobConf(conf);
- String namenode = "file:///";
- Thread.currentThread().setContextClassLoader(
- new QueueManagerTestUtils.QueueManagerConfigurationClassLoader());
- miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
- }
}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
index 4446629..22420a1 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
@@ -589,7 +590,7 @@
// Also JobACLs should be correct
if (mr.getJobTrackerRunner().getJobTracker()
- .isJobLevelAuthorizationEnabled()) {
+ .areACLsEnabled()) {
AccessControlList acl = new AccessControlList(
conf.get(JobACL.VIEW_JOB.getAclName(), " "));
assertTrue("VIEW_JOB ACL is not properly logged to history file.",
@@ -601,6 +602,9 @@
acl.toString().equals(
jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString()));
}
+
+ // Validate the job queue name
+ assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
}
public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
@@ -726,10 +730,11 @@
/** Run a job that will be succeeded and validate its history file format
* and its content.
*/
- public void testJobHistoryFile() throws IOException {
+ public void testJobHistoryFile() throws Exception {
MiniMRCluster mr = null;
try {
JobConf conf = new JobConf();
+
// keep for less time
conf.setLong("mapred.jobtracker.retirejob.check", 1000);
conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
@@ -739,7 +744,7 @@
conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
// Enable ACLs so that they are logged to history
- conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
@@ -966,7 +971,8 @@
Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
JobSubmittedEvent jse =
- new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs);
+ new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs,
+ "default");
jh.logEvent(jse, jobId);
jh.closeWriter(jobId);
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
index c869962..7ef641f 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
@@ -63,6 +63,7 @@
"~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:\"'ZXCVBNM<>?" +
"\t\b\n\f\"\n in it";
+ String weirdJobQueueName = "my\njob\nQueue\\";
conf.setUser(username);
MiniMRCluster mr = null;
@@ -84,7 +85,7 @@
jobACLs.put(JobACL.MODIFY_JOB, modifyJobACL);
JobSubmittedEvent jse =
new JobSubmittedEvent(jobId, weirdJob, username, 12345, weirdPath,
- jobACLs);
+ jobACLs, weirdJobQueueName);
jh.logEvent(jse, jobId);
JobFinishedEvent jfe =
@@ -121,6 +122,7 @@
assertTrue (jobInfo.getUsername().equals(username));
assertTrue(jobInfo.getJobname().equals(weirdJob));
+ assertTrue(jobInfo.getJobQueueName().equals(weirdJobQueueName));
assertTrue(jobInfo.getJobConfPath().equals(weirdPath));
Map<JobACL, AccessControlList> parsedACLs = jobInfo.getJobACLs();
assertEquals(2, parsedACLs.size());
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
index 94735fe..7b9f3c1 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
@@ -17,31 +17,31 @@
*/
package org.apache.hadoop.mapred;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.io.File;
-import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.QueueInfo;
+import org.junit.After;
import org.junit.Test;
import org.w3c.dom.Document;
public class TestJobQueueClient {
+
+ @After
+ public void tearDown() throws Exception {
+ deleteQueuesConfigFile();
+ }
+
@Test
public void testQueueOrdering() throws Exception {
// create some sample queues in a hierarchy..
@@ -91,13 +91,15 @@
@Test
public void testGetQueue() throws Exception {
- checkForConfigFile();
+
+ deleteQueuesConfigFile();
Document doc = createDocument();
- createSimpleDocumentWithAcls(doc, "true");
- writeToFile(doc, CONFIG);
- Configuration conf = new Configuration();
- conf.addResource(CONFIG);
- setUpCluster(conf);
+ createSimpleDocumentWithAcls(doc);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ JobConf jobConf = new JobConf();
+ String namenode = "file:///";
+ miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+
JobClient jc = new JobClient(miniMRCluster.createJobConf());
// test for existing queue
QueueInfo queueInfo = jc.getQueueInfo("q1");
@@ -105,7 +107,5 @@
// try getting a non-existing queue
queueInfo = jc.getQueueInfo("queue");
assertNull(queueInfo);
-
- new File(CONFIG).delete();
}
}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java b/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
index 0b2ed67..3f13f4d 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
@@ -26,7 +26,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.UserGroupInformation;
@@ -202,6 +201,13 @@
File jobLogDir = TaskLog.getJobDir(jobId);
checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ // check job-acls.xml file permissions
+ checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+ + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+ // validate the content of job ACLs file
+ validateJobACLsFileContent();
}
@Override
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
index 47011b7..6518c93 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import javax.security.auth.login.LoginException;
import junit.framework.TestCase;
+
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -31,58 +34,38 @@
private QueueManager queueManager;
private JobConf conf = null;
UserGroupInformation currentUGI = null;
- String submitAcl = Queue.QueueOperation.SUBMIT_JOB.getAclName();
- String adminAcl = Queue.QueueOperation.ADMINISTER_JOBS.getAclName();
+ String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+ String adminAcl = QueueACL.ADMINISTER_JOBS.getAclName();
- private void setupConfForNoAccess() throws IOException,LoginException {
+ @Override
+ protected void tearDown() {
+ deleteQueuesConfigFile();
+ }
+
+ // No access for queues for the user currentUGI
+ private void setupConfForNoAccess() throws Exception {
currentUGI = UserGroupInformation.getLoginUser();
String userName = currentUGI.getUserName();
+
+ String[] queueNames = {"qu1", "qu2"};
+ // Only user u1 has access for queue qu1
+ // Only group g2 has acls for the queue qu2
+ createQueuesConfigFile(
+ queueNames, new String[]{"u1", " g2"}, new String[]{"u1", " g2"});
+
conf = new JobConf();
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
- conf.setBoolean("mapred.acls.enabled",true);
-
- conf.set("mapred.queue.names", "qu1,qu2");
- //Only user u1 has access
- conf.set("mapred.queue.qu1.acl-submit-job", "u1");
- conf.set("mapred.queue.qu1.acl-administer-jobs", "u1");
- //q2 only group g2 has acls for the queues
- conf.set("mapred.queue.qu2.acl-submit-job", " g2");
- conf.set("mapred.queue.qu2.acl-administer-jobs", " g2");
queueManager = new QueueManager(conf);
-
}
/**
* sets up configuration for acls test.
* @return
*/
- private void setupConf(boolean aclSwitch) throws IOException,LoginException{
+ private void setupConf(boolean aclSwitch) throws Exception{
currentUGI = UserGroupInformation.getLoginUser();
String userName = currentUGI.getUserName();
- conf = new JobConf();
-
- conf.setBoolean("mapred.acls.enabled", aclSwitch);
-
- conf.set("mapred.queue.names", "qu1,qu2,qu3,qu4,qu5,qu6,qu7");
- //q1 Has acls for all the users, supports both submit and administer
- conf.set("mapred.queue.qu1.acl-submit-job", "*");
- conf.set("mapred.queue.qu1-acl-administer-jobs", "*");
- //q2 only u2 has acls for the queues
- conf.set("mapred.queue.qu2.acl-submit-job", "u2");
- conf.set("mapred.queue.qu2.acl-administer-jobs", "u2");
- //q3 Only u2 has submit operation access rest all have administer access
- conf.set("mapred.queue.qu3.acl-submit-job", "u2");
- conf.set("mapred.queue.qu3.acl-administer-jobs", "*");
- //q4 Only u2 has administer access , anyone can do submit
- conf.set("mapred.queue.qu4.acl-submit-job", "*");
- conf.set("mapred.queue.qu4.acl-administer-jobs", "u2");
- //qu6 only current user has submit access
- conf.set("mapred.queue.qu6.acl-submit-job",userName);
- conf.set("mapred.queue.qu6.acl-administrator-jobs","u2");
- //qu7 only current user has administrator access
- conf.set("mapred.queue.qu7.acl-submit-job","u2");
- conf.set("mapred.queue.qu7.acl-administrator-jobs",userName);
- //qu8 only current group has access
StringBuilder groupNames = new StringBuilder("");
String[] ugiGroupNames = currentUGI.getGroupNames();
int max = ugiGroupNames.length-1;
@@ -92,22 +75,38 @@
groupNames.append(",");
}
}
- conf.set("mapred.queue.qu5.acl-submit-job"," "+groupNames.toString());
- conf.set("mapred.queue.qu5.acl-administrator-jobs"," "
- +groupNames.toString());
+ String groupsAcl = " " + groupNames.toString();
+
+ //q1 Has acls for all the users, supports both submit and administer
+ //q2 only u2 has acls for the queues
+ //q3 Only u2 has submit operation access rest all have administer access
+ //q4 Only u2 has administer access , anyone can do submit
+ //qu5 only current user's groups has access
+ //qu6 only current user has submit access
+ //qu7 only current user has administrator access
+ String[] queueNames =
+ {"qu1", "qu2", "qu3", "qu4", "qu5", "qu6", "qu7"};
+ String[] submitAcls =
+ {"*", "u2", "u2", "*", groupsAcl, userName, "u2"};
+ String[] adminsAcls =
+ {"*", "u2", "*", "u2", groupsAcl, "u2", userName};
+ createQueuesConfigFile(queueNames, submitAcls, adminsAcls);
+
+ conf = new JobConf();
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclSwitch);
queueManager = new QueueManager(conf);
}
- public void testQueueAclsForCurrentuser() throws IOException,LoginException {
+ public void testQueueAclsForCurrentuser() throws Exception {
setupConf(true);
QueueAclsInfo[] queueAclsInfoList =
queueManager.getQueueAcls(currentUGI);
checkQueueAclsInfo(queueAclsInfoList);
}
- public void testQueueAclsForCurrentUserAclsDisabled() throws IOException,
- LoginException {
+ // Acls are disabled on the mapreduce cluster
+ public void testQueueAclsForCurrentUserAclsDisabled() throws Exception {
setupConf(false);
//fetch the acls info for current user.
QueueAclsInfo[] queueAclsInfoList = queueManager.
@@ -115,7 +114,7 @@
checkQueueAclsInfo(queueAclsInfoList);
}
- public void testQueueAclsForNoAccess() throws IOException,LoginException {
+ public void testQueueAclsForNoAccess() throws Exception {
setupConfForNoAccess();
QueueAclsInfo[] queueAclsInfoList = queueManager.
getQueueAcls(currentUGI);
@@ -124,7 +123,7 @@
private void checkQueueAclsInfo(QueueAclsInfo[] queueAclsInfoList)
throws IOException {
- if (conf.get("mapred.acls.enabled").equalsIgnoreCase("true")) {
+ if (conf.get(MRConfig.MR_ACLS_ENABLED).equalsIgnoreCase("true")) {
for (int i = 0; i < queueAclsInfoList.length; i++) {
QueueAclsInfo acls = queueAclsInfoList[i];
String queueName = acls.getQueueName();
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
index 3f5b344..0a7dd3c 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
@@ -22,8 +22,11 @@
import org.apache.commons.logging.LogFactory;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jackson.map.ObjectMapper;
@@ -31,7 +34,6 @@
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import java.io.File;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
@@ -47,26 +49,32 @@
@After
public void tearDown() throws Exception {
- new File(CONFIG).delete();
+ deleteQueuesConfigFile();
+ }
+
+ // create UGI with the given user name and the fixed group name "myGroup"
+ private UserGroupInformation createUGI(String userName) {
+ return UserGroupInformation.createUserForTesting(
+ userName, new String[]{"myGroup"});
}
@Test
public void testDefault() throws Exception {
+ deleteQueuesConfigFile();
QueueManager qm = new QueueManager();
Queue root = qm.getRoot();
assertEquals(root.getChildren().size(), 1);
assertEquals(root.getChildren().iterator().next().getName(), "default");
- assertFalse(qm.isAclsEnabled());
assertNull(root.getChildren().iterator().next().getChildren());
}
@Test
public void testXMLParsing() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
Set<Queue> rootQueues = qm.getRoot().getChildren();
List<String> names = new ArrayList<String>();
for (Queue q : rootQueues) {
@@ -101,62 +109,63 @@
assertTrue(
q.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
q.getName(), ACL_SUBMIT_JOB_TAG)).isUserAllowed(
- UserGroupInformation.createRemoteUser("u1")));
+ createUGI("u1")));
assertTrue(
q.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
q.getName(),
ACL_ADMINISTER_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+ .isUserAllowed(createUGI("u2")));
assertTrue(q.getState().equals(QueueState.STOPPED));
}
@Test
public void testhasAccess() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- createSimpleDocumentWithAcls(doc,"true");
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ createSimpleDocumentWithAcls(doc);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
UserGroupInformation ugi;
// test for acls access when acls are set with *
- ugi = UserGroupInformation.createRemoteUser("u1");
+ ugi = createUGI("u1");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
- ugi = UserGroupInformation.createRemoteUser("u2");
+ QueueACL.SUBMIT_JOB, ugi));
+ ugi = createUGI("u2");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
- Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ QueueACL.ADMINISTER_JOBS, ugi));
// test for acls access when acls are not set with *
- ugi = UserGroupInformation.createRemoteUser("u1");
+ ugi = createUGI("u1");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
- ugi = UserGroupInformation.createRemoteUser("u2");
+ QueueACL.SUBMIT_JOB, ugi));
+ ugi = createUGI("u2");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
- Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ QueueACL.ADMINISTER_JOBS, ugi));
- // test for acls access when acls are not specified but acls is enabled
- ugi = UserGroupInformation.createRemoteUser("u1");
- assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
- ugi = UserGroupInformation.createRemoteUser("u2");
- assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
- Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ // Test for acls access when acls are not specified but acls are enabled.
+ // By default, the queue acls for any queue are empty.
+ ugi = createUGI("u1");
+ assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ QueueACL.SUBMIT_JOB, ugi));
+ ugi = createUGI("u2");
+ assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ QueueACL.ADMINISTER_JOBS, ugi));
assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
}
@Test
public void testQueueView() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
for (Queue queue : qm.getRoot().getChildren()) {
checkHierarchy(queue, qm);
@@ -176,25 +185,21 @@
@Test
public void testhasAccessForParent() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser("u1");
- assertFalse(
- qm.hasAccess(
- "p1",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
+ UserGroupInformation ugi = createUGI("u1");
+ assertFalse(qm.hasAccess("p1", QueueACL.SUBMIT_JOB, ugi));
}
@Test
public void testValidation() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "q1");
q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
@@ -203,9 +208,9 @@
q1.appendChild(createQueue(doc, "p16"));
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (RuntimeException re) {
LOG.info(re.getMessage());
@@ -214,27 +219,27 @@
@Test
public void testInvalidName() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "");
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (Exception re) {
re.printStackTrace();
LOG.info(re.getMessage());
}
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
- queues = createQueuesNode(doc, "false");
+ queues = createQueuesNode(doc);
q1 = doc.createElement("queue");
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception as configuration is wrong ");
} catch (RuntimeException re) {
re.printStackTrace();
@@ -244,10 +249,10 @@
@Test
public void testMissingConfigFile() throws Exception {
- checkForConfigFile(); // deletes file
+ deleteQueuesConfigFile(); // deletes file
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception for missing file when " +
"explicitly passed.");
} catch (RuntimeException re) {
@@ -266,9 +271,9 @@
@Test
public void testEmptyProperties() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "q1");
Element p = createProperties(doc, null);
q1.appendChild(p);
@@ -277,11 +282,11 @@
@Test
public void testEmptyFile() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception as configuration is wrong ");
} catch (Exception re) {
re.printStackTrace();
@@ -291,11 +296,11 @@
@Test
public void testJobQueueInfoGeneration() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
List<JobQueueInfo> rootQueues =
qm.getRoot().getJobQueueInfo().getChildren();
@@ -338,11 +343,11 @@
*/
@Test
public void testRefresh() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
Queue beforeRefreshRoot = qm.getRoot();
//remove the file and create new one.
Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
@@ -360,16 +365,16 @@
"p1" + NAME_SEPARATOR + "p12")) {
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(), ACL_SUBMIT_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u1")));
+ .isUserAllowed(createUGI("u1")));
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(),
ACL_ADMINISTER_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+ .isUserAllowed(createUGI("u2")));
assertTrue(child.getState().equals(QueueState.STOPPED));
} else {
assertTrue(child.getState().equals(QueueState.RUNNING));
@@ -377,11 +382,11 @@
}
}
}
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
refreshSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, true);
qm.getRoot().isHierarchySameAs(cp.getRoot());
qm.setQueues(
cp.getRoot().getChildren().toArray(
@@ -403,17 +408,17 @@
"p1" + NAME_SEPARATOR + "p12")) {
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(),
ACL_SUBMIT_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u3")));
+ .isUserAllowed(createUGI("u3")));
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(),
ACL_ADMINISTER_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u4")));
+ .isUserAllowed(createUGI("u4")));
assertTrue(child.getState().equals(QueueState.RUNNING));
} else {
assertTrue(child.getState().equals(QueueState.STOPPED));
@@ -425,20 +430,20 @@
@Test
public void testRefreshWithInvalidFile() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "");
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (Throwable re) {
@@ -548,19 +553,21 @@
*/
@Test
public void testDumpConfiguration() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+
StringWriter out = new StringWriter();
- QueueManager.dumpConfiguration(out,CONFIG,null);
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ QueueManager.dumpConfiguration(out, QUEUES_CONFIG_FILE_PATH, conf);
+
ObjectMapper mapper = new ObjectMapper();
// parse the Json dump
JsonQueueTree queueTree =
mapper.readValue(out.toString(), JsonQueueTree.class);
-
- // check if acls_enabled is correct
- assertEquals(true, queueTree.isAcls_enabled());
+
// check for the number of top-level queues
assertEquals(2, queueTree.getQueues().length);
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
index 5f20f08..b46c758 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapred;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,6 +36,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+
import org.junit.After;
import org.junit.Test;
@@ -48,19 +49,12 @@
private static final Log LOG =
LogFactory.getLog(TestQueueManagerRefresh.class);
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
- File queueConfigFile =
- new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
-
/**
* Remove the configuration file after the test's done.
*/
@After
public void tearDown() {
- if (queueConfigFile.exists()) {
- queueConfigFile.delete();
- }
+ deleteQueuesConfigFile();
}
/**
@@ -96,8 +90,8 @@
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
@@ -107,8 +101,8 @@
queues[0].addChild(newQueue);
// Rewrite the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
testRefreshFailureWithChangeOfHierarchy(qManager);
@@ -127,8 +121,8 @@
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
@@ -137,8 +131,8 @@
queues[0].removeChild(q2);
// Rewrite the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
testRefreshFailureWithChangeOfHierarchy(qManager);
}
@@ -187,8 +181,8 @@
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
@@ -226,8 +220,8 @@
}
// write the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
@@ -261,8 +255,8 @@
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
- QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ writeQueueConfigurationFile(
+ QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
index ca14c3a..4e60a04 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
@@ -38,6 +38,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -48,16 +49,21 @@
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.mapred.DeprecatedQueueConfigurationParser.*;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
public class TestQueueManagerWithDeprecatedConf extends TestCase {
- static final Log LOG = LogFactory.getLog(TestQueueManagerWithDeprecatedConf.class);
-
+ static final Log LOG = LogFactory.getLog(
+ TestQueueManagerWithDeprecatedConf.class);
+
+ String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+ String adminAcl = QueueACL.ADMINISTER_JOBS.getAclName();
public void testMultipleQueues() {
JobConf conf = new JobConf();
- conf.set("mapred.queue.names", "q1,q2,Q3");
+ conf.set(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY,
+ "q1,q2,Q3");
QueueManager qMgr = new QueueManager(conf);
Set<String> expQueues = new TreeSet<String>();
expQueues.add("q1");
@@ -68,7 +74,8 @@
public void testSchedulerInfo() {
JobConf conf = new JobConf();
- conf.set("mapred.queue.names", "qq1,qq2");
+ conf.set(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY,
+ "qq1,qq2");
QueueManager qMgr = new QueueManager(conf);
qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
@@ -85,44 +92,50 @@
try {
// queue properties with which the cluster is started.
Properties hadoopConfProps = new Properties();
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+ hadoopConfProps.put(DeprecatedQueueConfigurationParser.
+ MAPRED_QUEUE_NAMES_KEY, "default,q1,q2");
+ hadoopConfProps.put(MRConfig.MR_ACLS_ENABLED, "true");
- //properties for mapred-queue-acls.xml
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("unknownUser");
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u1");
- hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "*");
- hadoopConfProps.put("mapred.queue.default.acl-administer-jobs", ugi.getUserName());
- hadoopConfProps.put("mapred.queue.q1.acl-administer-jobs", "u1");
- hadoopConfProps.put("mapred.queue.q2.acl-administer-jobs", "*");
+ hadoopConfProps.put(toFullPropertyName(
+ "default", submitAcl), ugi.getUserName());
+ hadoopConfProps.put(toFullPropertyName(
+ "q1", submitAcl), "u1");
+ hadoopConfProps.put(toFullPropertyName(
+ "q2", submitAcl), "*");
+ hadoopConfProps.put(toFullPropertyName(
+ "default", adminAcl), ugi.getUserName());
+ hadoopConfProps.put(toFullPropertyName(
+ "q1", adminAcl), "u2");
+ hadoopConfProps.put(toFullPropertyName(
+ "q2", adminAcl), "*");
UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
Configuration conf = new JobConf();
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
QueueManager queueManager = new QueueManager(conf);
- //Testing access to queue.
+ //Testing job submission access to queues.
assertTrue("User Job Submission failed.",
- queueManager.hasAccess("default", Queue.QueueOperation.
+ queueManager.hasAccess("default", QueueACL.
SUBMIT_JOB, ugi));
assertFalse("User Job Submission failed.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
+ queueManager.hasAccess("q1", QueueACL.
SUBMIT_JOB, ugi));
assertTrue("User Job Submission failed.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
+ queueManager.hasAccess("q2", QueueACL.
SUBMIT_JOB, ugi));
- //Testing the admin acls
+ //Testing the administer-jobs acls
assertTrue("User Job Submission failed.",
- queueManager.hasAccess("default", Queue.QueueOperation.ADMINISTER_JOBS, ugi));
- assertFalse("User Job Submission failed.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- ADMINISTER_JOBS, ugi));
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- ADMINISTER_JOBS, ugi));
-
+ queueManager.hasAccess("default",
+ QueueACL.ADMINISTER_JOBS, ugi));
+ assertFalse("User Job Submission failed.",
+ queueManager.hasAccess("q1", QueueACL.
+ ADMINISTER_JOBS, ugi));
+ assertTrue("User Job Submission failed.",
+ queueManager.hasAccess("q2", QueueACL.
+ ADMINISTER_JOBS, ugi));
} finally {
//Cleanup the configuration files in all cases
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
index c3200fc..0e1fd48 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
@@ -19,8 +19,8 @@
package org.apache.hadoop.mapred;
import static org.apache.hadoop.mapred.QueueConfigurationParser.NAME_SEPARATOR;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
@@ -29,7 +29,6 @@
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.submitSleepJob;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
import static org.junit.Assert.assertEquals;
@@ -37,7 +36,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
@@ -47,11 +45,12 @@
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -60,21 +59,26 @@
private static Configuration conf;
- @BeforeClass
- public static void setUp() throws Exception {
- checkForConfigFile();
- Document doc = createDocument();
- createSimpleDocumentWithAcls(doc, "true");
- writeToFile(doc, CONFIG);
- conf = new Configuration();
- conf.addResource(CONFIG);
- conf.set("mapred.committer.job.setup.cleanup.needed", "false");
- setUpCluster(conf);
- }
-
@AfterClass
public static void tearDown() throws Exception {
- new File(CONFIG).delete();
+ deleteQueuesConfigFile();
+ }
+
+ private void startCluster(boolean aclsEnabled)
+ throws Exception {
+
+ deleteQueuesConfigFile();
+ Document doc = createDocument();
+ createSimpleDocumentWithAcls(doc);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ conf = new Configuration();
+ conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclsEnabled);
+
+ JobConf jobConf = new JobConf(conf);
+ String namenode = "file:///";
+ miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+
}
/**
@@ -83,6 +87,8 @@
*/
@Test(expected = IOException.class)
public void testSubmitJobForStoppedQueue() throws Exception {
+ startCluster(true);
+
submitSleepJob(10, 10, 100, 100, false, null,
"p1" + NAME_SEPARATOR + "p14", conf);
fail("queue p1:p14 is in stopped state and should not accept jobs");
@@ -94,8 +100,10 @@
*/
@Test(expected = IOException.class)
public void testSubmitJobForContainerQueue() throws Exception {
- submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
- fail("queue p1 is a container queue and cannot have jobs");
+ startCluster(true);
+
+ submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
+ fail("queue p1 is a container queue and cannot have jobs");
}
/**
@@ -104,12 +112,16 @@
*/
@Test
public void testAclsForSubmitJob() throws Exception {
+ startCluster(true);
+
Job job;
+ try {
// submit job to queue p1:p13 with unspecified acls
job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+ "p13", conf);
- assertTrue("Job submission for u1 failed in queue : p1:p13.",
- job.isSuccessful());
+ fail("user u1 cannot submit jobs to queue p1:p13");
+ } catch (Exception e) {
+ }
// check for access to submit the job
try {
job = submitSleepJob(0, 0, 0, 0, false, "u2,g1", "p1" + NAME_SEPARATOR
@@ -117,11 +129,17 @@
fail("user u2 cannot submit jobs to queue p1:p11");
} catch (Exception e) {
}
- // submit job to queue p1:p11 with acls-submit-job as u1
+ // submit job to queue p1:p11 with acl-submit-job as u1
job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1"
+ NAME_SEPARATOR + "p11", conf);
assertTrue("Job submission for u1 failed in queue : p1:p11.",
job.isSuccessful());
+
+ // submit job to queue p1:p12 with acl-submit-job as *
+ job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1"
+ + NAME_SEPARATOR + "p12", conf);
+ assertTrue("Job submission for u2 failed in queue : p1:p12.",
+ job.isSuccessful());
}
/**
@@ -130,6 +148,8 @@
*/
@Test
public void testAccessToKillJob() throws Exception {
+ startCluster(true);
+
Job job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1"
+ NAME_SEPARATOR + "p11", conf);
final JobConf jobConf = miniMRCluster.createJobConf();
@@ -229,11 +249,13 @@
*/
@Test
public void testSubmitJobsAfterRefresh() throws Exception {
+ startCluster(true);
+
// test for refresh
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
refreshDocument(doc);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
admin.run(new String[] { "-refreshQueues" });
try {
@@ -242,15 +264,15 @@
fail("user u1 is not in the submit jobs' list");
} catch (Exception e) {
}
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
- createSimpleDocumentWithAcls(doc, "true");
- writeToFile(doc, CONFIG);
+ createSimpleDocumentWithAcls(doc);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
admin.run(new String[] { "-refreshQueues" });
}
private void refreshDocument(Document doc) {
- Element queues = createQueuesNode(doc, "true");
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -298,12 +320,7 @@
*/
@Test
public void testAclsDisabled() throws Exception {
- checkForConfigFile();
- Document doc = createDocument();
- createSimpleDocumentWithAcls(doc, "false");
- writeToFile(doc, CONFIG);
- MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
- admin.run(new String[] { "-refreshQueues" });
+ startCluster(false);
// submit job to queue p1:p11 by any user not in acls-submit-job
Job job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1" + NAME_SEPARATOR
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java b/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
index f796a7a..b77e9d7 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
@@ -32,6 +32,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesConfigFile;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.UserGroupInformation;
@@ -156,11 +160,9 @@
// clean up
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(TEST_DIR, true);
-
+
JobConf conf = new JobConf();
conf.set(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, "1024");
- conf.set(
- DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY, "default");
MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
@@ -239,10 +241,11 @@
true);
mr.getJobTrackerConf().setInt(JTConfig.JT_TASKS_PER_JOB, 25);
- mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
+ mr.getJobTrackerConf().setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job",
- ugi.getUserName());
+ mr.getJobTrackerConf().set(toFullPropertyName(
+ "default", QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
// start the jobtracker
LOG.info("Starting jobtracker");
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java b/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
index 869ea6a..c95cc57 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
@@ -39,12 +39,12 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
@@ -122,12 +122,14 @@
System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
trackerFConf = new JobConf();
+
trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
localDirs = new String[numLocalDirs];
for (int i = 0; i < numLocalDirs; i++) {
localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
}
trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+ trackerFConf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
// Create the job configuration file. Same as trackerConf in this test.
jobConf = new JobConf(trackerFConf);
@@ -139,6 +141,15 @@
jobConf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 0);
jobConf.setUser(getJobOwner().getShortUserName());
+ String queue = "default";
+ // set job queue name in job conf
+ jobConf.setQueueName(queue);
+ // Set queue admins acl in job conf similar to what JobClient does so that
+ // it goes into job conf also.
+ jobConf.set(toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName()),
+ "qAdmin1,qAdmin2 qAdminsGroup1,qAdminsGroup2");
+
Job job = Job.getInstance(jobConf);
String jtIdentifier = "200907202331";
jobId = new JobID(jtIdentifier, 1);
@@ -527,6 +538,37 @@
.exists());
checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
taskTrackerUGI.getGroupNames()[0]);
+
+ // Make sure that the job ACLs file job-acls.xml exists in job userlog dir
+ File jobACLsFile = new File(jobLogDir, TaskTracker.jobACLsFile);
+ assertTrue("JobACLsFile is missing in the job userlog dir " + jobLogDir,
+ jobACLsFile.exists());
+
+ // With default task controller, the job-acls.xml file is owned by TT and
+ // permissions are 700
+ checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
+ taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
+
+ validateJobACLsFileContent();
+ }
+
+ // Validate the contents of jobACLsFile ( i.e. user name, job-view-acl, queue
+ // name and queue-admins-acl ).
+ protected void validateJobACLsFileContent() {
+ JobConf jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(jobId);
+
+ assertTrue(jobACLsConf.get("user.name").equals(
+ localizedJobConf.getUser()));
+ assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
+ equals(localizedJobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB)));
+
+ String queue = localizedJobConf.getQueueName();
+ assertTrue(queue.equalsIgnoreCase(jobACLsConf.getQueueName()));
+
+ String qACLName = toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName());
+ assertTrue(jobACLsConf.get(qACLName).equals(
+ localizedJobConf.get(qACLName)));
}
/**
@@ -645,24 +687,6 @@
+ expectedStderr.toString() + " Observed : "
+ attemptLogFiles[1].toString(), expectedStderr.toString().equals(
attemptLogFiles[1].toString()));
-
- // Make sure that the job ACLs file exists in the task log dir
- File jobACLsFile = new File(logDir, TaskRunner.jobACLsFile);
- assertTrue("JobACLsFile is missing in the task log dir " + logDir,
- jobACLsFile.exists());
-
- // With default task controller, the job-acls file is owned by TT and
- // permissions are 700
- checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
- taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
-
- // Validate the contents of jobACLsFile(both user name and job-view-acls)
- Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(task
- .getTaskID(), task.isTaskCleanupTask());
- assertTrue(jobACLsConf.get(MRJobConfig.USER_NAME).equals(
- localizedJobConf.getUser()));
- assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
- equals(localizedJobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB)));
}
/**
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java b/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
index f2aa35b..1707305 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
@@ -55,6 +55,7 @@
taskLogCleanupThread = new UserLogCleaner(conf);
taskLogCleanupThread.setClock(myClock);
tt = new TaskTracker();
+ tt.setConf(new JobConf(conf));
tt.setLocalizer(localizer);
tt.setTaskLogCleanupThread(taskLogCleanupThread);
}
@@ -67,8 +68,9 @@
private File localizeJob(JobID jobid) throws IOException {
File jobUserlog = TaskLog.getJobDir(jobid);
+ JobConf conf = new JobConf();
// localize job log directory
- tt.initializeJobLogDir(jobid);
+ tt.initializeJobLogDir(jobid, conf);
assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
return jobUserlog;
}
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java b/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
index 39cbe0d..25f6a92 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
@@ -40,6 +40,8 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
@@ -57,12 +59,18 @@
private static final Log LOG = LogFactory.getLog(
TestWebUIAuthorization.class);
- // user1 submits the jobs
+ // users who submit the jobs
private static final String jobSubmitter = "user1";
+ private static final String jobSubmitter1 = "user11";
+ private static final String jobSubmitter2 = "user12";
+ private static final String jobSubmitter3 = "user13";
+
// mrOwner starts the cluster
private static String mrOwner = null;
// member of supergroup
private static final String superGroupMember = "user2";
+ // admin of "default" queue
+ private static final String qAdmin = "user3";
// "colleague1" is there in job-view-acls config
private static final String viewColleague = "colleague1";
// "colleague2" is there in job-modify-acls config
@@ -72,10 +80,17 @@
// "evilJohn" is not having view/modify access on the jobs
private static final String unauthorizedUser = "evilJohn";
+ @Override
protected void setUp() throws Exception {
// do not do anything
};
+ @Override
+ protected void tearDown() throws Exception {
+ deleteQueuesConfigFile();
+ super.tearDown();
+ }
+
/** access a url, ignoring some IOException such as the page does not exist */
static int getHttpStatusCode(String urlstring, String userName,
String method) throws IOException {
@@ -112,45 +127,51 @@
* Validates the given jsp/servlet against different user names who
* can(or cannot) view the job.
* (1) jobSubmitter can view the job
- * (2) superGroupMember can view the job
- * (3) user mentioned in job-view-acls should be able to view the job
- * (4) user mentioned in job-modify-acls but not in job-view-acls
+ * (2) superGroupMember can view any job
+ * (3) mrOwner can view any job
+ * (4) qAdmins of the queue to which job is submitted to can view any job in
+ * that queue.
+ * (5) user mentioned in job-view-acl should be able to view the
+ * job irrespective of job-modify-acl.
+ * (6) user mentioned in job-modify-acl but not in job-view-acl
* cannot view the job
- * (5) other unauthorized users cannot view the job
+ * (7) other unauthorized users cannot view the job
*/
private void validateViewJob(String url, String method)
throws IOException {
- assertEquals("Incorrect return code for " + jobSubmitter,
+ assertEquals("Incorrect return code for job submitter " + jobSubmitter,
HttpURLConnection.HTTP_OK, getHttpStatusCode(url, jobSubmitter,
method));
- assertEquals("Incorrect return code for " + superGroupMember,
- HttpURLConnection.HTTP_OK, getHttpStatusCode(url, superGroupMember,
- method));
- assertEquals("Incorrect return code for " + mrOwner,
+ assertEquals("Incorrect return code for supergroup-member " +
+ superGroupMember, HttpURLConnection.HTTP_OK,
+ getHttpStatusCode(url, superGroupMember, method));
+ assertEquals("Incorrect return code for MR-owner " + mrOwner,
HttpURLConnection.HTTP_OK, getHttpStatusCode(url, mrOwner, method));
- assertEquals("Incorrect return code for " + viewColleague,
- HttpURLConnection.HTTP_OK, getHttpStatusCode(url, viewColleague,
- method));
- assertEquals("Incorrect return code for " + viewAndModifyColleague,
- HttpURLConnection.HTTP_OK, getHttpStatusCode(url,
- viewAndModifyColleague, method));
- assertEquals("Incorrect return code for " + modifyColleague,
- HttpURLConnection.HTTP_UNAUTHORIZED, getHttpStatusCode(url,
- modifyColleague, method));
- assertEquals("Incorrect return code for " + unauthorizedUser,
- HttpURLConnection.HTTP_UNAUTHORIZED, getHttpStatusCode(url,
- unauthorizedUser, method));
+ assertEquals("Incorrect return code for queue admin " + qAdmin,
+ HttpURLConnection.HTTP_OK, getHttpStatusCode(url, qAdmin, method));
+ assertEquals("Incorrect return code for user in job-view-acl " +
+ viewColleague, HttpURLConnection.HTTP_OK,
+ getHttpStatusCode(url, viewColleague, method));
+ assertEquals("Incorrect return code for user in job-view-acl and " +
+ "job-modify-acl " + viewAndModifyColleague, HttpURLConnection.HTTP_OK,
+ getHttpStatusCode(url, viewAndModifyColleague, method));
+ assertEquals("Incorrect return code for user in job-modify-acl " +
+ modifyColleague, HttpURLConnection.HTTP_UNAUTHORIZED,
+ getHttpStatusCode(url, modifyColleague, method));
+ assertEquals("Incorrect return code for unauthorizedUser " +
+ unauthorizedUser, HttpURLConnection.HTTP_UNAUTHORIZED,
+ getHttpStatusCode(url, unauthorizedUser, method));
}
/**
* Validates the given jsp/servlet against different user names who
* can(or cannot) modify the job.
- * (1) jobSubmitter and superGroupMember can modify the job. But we are not
- * validating this in this method. Let the caller explicitly validate
- * this, if needed.
- * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+ * (1) jobSubmitter, mrOwner, qAdmin and superGroupMember can modify the job.
+ * But we are not validating this in this method. Let the caller
+ * explicitly validate this, if needed.
+ * (2) user mentioned in job-view-acl but not in job-modify-acl cannot
* modify the job
- * (3) user mentioned in job-modify-acls (irrespective of job-view-acls)
+ * (3) user mentioned in job-modify-acl (irrespective of job-view-acl)
* can modify the job
* (4) other unauthorized users cannot modify the job
*/
@@ -240,13 +261,13 @@
Properties props = new Properties();
props.setProperty("hadoop.http.filter.initializers",
DummyFilterInitializer.class.getName());
- props.setProperty(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG,
+ props.setProperty(MRConfig.MR_ACLS_ENABLED,
String.valueOf(true));
+
props.setProperty("dfs.permissions.enabled", "false");
props.setProperty("mapred.job.tracker.history.completed.location",
"historyDoneFolderOnHDFS");
- props.setProperty("mapreduce.job.committer.setup.cleanup.needed",
- "false");
+ props.setProperty(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
@@ -255,11 +276,17 @@
MyGroupsProvider.mapping.put(unauthorizedUser, Arrays.asList("evilSociety"));
MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
MyGroupsProvider.mapping.put(viewAndModifyColleague, Arrays.asList("group3"));
+ MyGroupsProvider.mapping.put(qAdmin, Arrays.asList("group4"));
+
mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
- new String[] { "group4", "group5" }));
+ new String[] { "group5", "group6" }));
- startCluster(true, props);
+ String[] queueNames = {"default"};
+ String[] submitAclStrings = new String[] { jobSubmitter };
+ String[] adminsAclStrings = new String[] { qAdmin };
+ startCluster(props, queueNames, submitAclStrings, adminsAclStrings);
+
MiniMRCluster cluster = getMRCluster();
int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
@@ -338,30 +365,57 @@
Integer.toString(attemptsMap.get(attempt).getHttpPort()),
attempt.toString()) + "&filter=" + TaskLog.LogName.STDERR;
validateViewJob(stderrURL, "GET");
+ }
+ }
- // delete job-acls.xml file from the task log dir of attempt and verify
- // if unauthorized users can view task logs of attempt.
- File attemptLogDir = TaskLog.getAttemptDir(
- org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
- Path jobACLsFilePath = new Path(attemptLogDir.toString(),
- TaskRunner.jobACLsFile);
- new File(jobACLsFilePath.toUri().getPath()).delete();
+ // For each tip, let us test the effect of deletion of job-acls.xml file and
+ // deletion of task log dir for each of the attempts of the tip.
+
+ // delete job-acls.xml file from the job userlog dir and verify
+ // if unauthorized users can view task logs of each attempt.
+ Path jobACLsFilePath = new Path(TaskLog.getJobDir(jobid).toString(),
+ TaskTracker.jobACLsFile);
+ assertTrue("Could not delete job-acls.xml file.",
+ new File(jobACLsFilePath.toUri().getPath()).delete());
+
+ for (TaskID tip : tipsMap.keySet()) {
+
+ Map<TaskAttemptID, TaskAttemptInfo> attemptsMap =
+ tipsMap.get(tip).getAllTaskAttempts();
+ for (TaskAttemptID attempt : attemptsMap.keySet()) {
+
+ String stdoutURL = TaskLogServlet.getTaskLogUrl("localhost",
+ Integer.toString(attemptsMap.get(attempt).getHttpPort()),
+ attempt.toString()) + "&filter=" + TaskLog.LogName.STDOUT;;
+
+ String stderrURL = TaskLogServlet.getTaskLogUrl("localhost",
+ Integer.toString(attemptsMap.get(attempt).getHttpPort()),
+ attempt.toString()) + "&filter=" + TaskLog.LogName.STDERR;
+
+ // unauthorized users can view task logs of each attempt because
+ // job-acls.xml file is deleted.
assertEquals("Incorrect return code for " + unauthorizedUser,
HttpURLConnection.HTTP_OK, getHttpStatusCode(stdoutURL,
- unauthorizedUser, "GET"));
+ unauthorizedUser, "GET"));
assertEquals("Incorrect return code for " + unauthorizedUser,
HttpURLConnection.HTTP_OK, getHttpStatusCode(stderrURL,
- unauthorizedUser, "GET"));
+ unauthorizedUser, "GET"));
// delete the whole task log dir of attempt and verify that we get
// correct response code (i.e. HTTP_GONE) when task logs are accessed.
+ File attemptLogDir = TaskLog.getAttemptDir(
+ org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
FileUtil.fullyDelete(attemptLogDir);
+
+ // Try accessing tasklogs - STDOUT and STDERR now(after the whole
+ // attempt log dir is deleted).
assertEquals("Incorrect return code for " + jobSubmitter,
HttpURLConnection.HTTP_GONE, getHttpStatusCode(stdoutURL,
- jobSubmitter, "GET"));
+ jobSubmitter, "GET"));
+
assertEquals("Incorrect return code for " + jobSubmitter,
HttpURLConnection.HTTP_GONE, getHttpStatusCode(stderrURL,
- jobSubmitter, "GET"));
+ jobSubmitter, "GET"));
}
}
@@ -377,6 +431,21 @@
}
/**
+ * Creates queues configuration file with the given queues and acls and starts
+ * cluster with that queues configuration file.
+ * @param props configuration properties to inject to the mini cluster
+ * @param queueNames the job queues on the cluster
+ * @param submitAclStrings acl-submit-job acls for all queues
+ * @param adminsAclStrings acl-administer-jobs acls for all queues
+ * @throws Exception
+ */
+ private void startCluster(Properties props, String[] queueNames,
+ String[] submitAclStrings, String[] adminsAclStrings) throws Exception {
+ createQueuesConfigFile(queueNames, submitAclStrings, adminsAclStrings);
+ startCluster(true, props);
+ }
+
+ /**
* Starts a sleep job and tries to kill the job using jobdetails.jsp as
* (1) viewColleague (2) unauthorizedUser (3) modifyColleague
* (4) viewAndModifyColleague (5) mrOwner (6) superGroupMember and
@@ -387,11 +456,13 @@
* (1) jobSubmitter, mrOwner and superGroupMember can do both view and modify
* on the job. But we are not validating this in this method. Let the
* caller explicitly validate this, if needed.
- * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+ * (2) user mentioned in job-view-acls and job-modify-acls can do this
+ * (3) user mentioned in job-view-acls but not in job-modify-acls cannot
* do this
- * (3) user mentioned in job-modify-acls but not in job-view-acls cannot
+ * (4) user mentioned in job-modify-acls but not in job-view-acls cannot
* do this
- * (4) other unauthorized users cannot do this
+ * (5) qAdmin cannot do this because he doesn't have view access to the job
+ * (6) other unauthorized users cannot do this
*
* @throws Exception
*/
@@ -419,8 +490,11 @@
getHttpStatusCode(url, unauthorizedUser, "POST"));
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, modifyColleague, "POST"));
+
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, viewAndModifyColleague, "POST"));
+ assertTrue("killJob using jobdetails.jsp failed for a job for which "
+ + "user has job-view and job-modify permissions", job.isComplete());
} finally {
if (!job.isComplete()) {
LOG.info("Killing job " + jobid + " from finally block");
@@ -430,14 +504,16 @@
}
}
- // check if jobSubmitter, mrOwner and superGroupMember can do killJob
- // using jobdetails.jsp url
+ // Check if jobSubmitter, mrOwner, superGroupMember and queueAdmins
+ // can do killJob using jobdetails.jsp url
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
- jobSubmitter);
+ jobSubmitter);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
- mrOwner);
+ mrOwner);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
- superGroupMember);
+ superGroupMember);
+ confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+ qAdmin);
}
/**
@@ -486,47 +562,47 @@
// jobTrackerJSP killJob url
String url = jobTrackerJSP + "&killJobs=true";
// view-job-acl doesn't matter for killJob from jobtracker jsp page
- conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+ conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
- // Let us start jobs as 4 different users(none of these 4 users is
+ // Let us start 4 jobs as 4 different users(none of these 4 users is
// mrOwner and none of these users is a member of superGroup). So only
- // based on the config JobContext.JOB_ACL_MODIFY_JOB being set here,
- // killJob on each of the jobs will be succeeded.
+ // based on the config MRJobConfig.JOB_ACL_MODIFY_JOB being set here
+ // and the jobSubmitter, killJob on each of the jobs will be succeeded.
// start 1st job.
// Out of these 4 users, only jobSubmitter can do killJob on 1st job
- conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, "");
+ conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, " ");
Job job1 = startSleepJobAsUser(jobSubmitter, conf);
org.apache.hadoop.mapreduce.JobID jobid = job1.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
// start 2nd job.
- // Out of these 4 users, only viewColleague can do killJob on 2nd job
- Job job2 = startSleepJobAsUser(viewColleague, conf);
+ // Out of these 4 users, only jobSubmitter1 can do killJob on 2nd job
+ Job job2 = startSleepJobAsUser(jobSubmitter1, conf);
jobid = job2.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
// start 3rd job.
- // Out of these 4 users, only modifyColleague can do killJob on 3rd job
- Job job3 = startSleepJobAsUser(modifyColleague, conf);
+ // Out of these 4 users, only jobSubmitter2 can do killJob on 3rd job
+ Job job3 = startSleepJobAsUser(jobSubmitter2, conf);
jobid = job3.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
// start 4rd job.
- // Out of these 4 users, viewColleague and viewAndModifyColleague
+ // Out of these 4 users, jobSubmitter1 and jobSubmitter3
// can do killJob on 4th job
- conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, viewColleague);
- Job job4 = startSleepJobAsUser(viewAndModifyColleague, conf);
+ conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, jobSubmitter1);
+ Job job4 = startSleepJobAsUser(jobSubmitter3, conf);
jobid = job4.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
try {
- // Try killing all the 4 jobs as user viewColleague who can kill only
+ // Try killing all the 4 jobs as user jobSubmitter1 who can kill only
// 2nd and 4th jobs. Check if 1st and 3rd jobs are not killed and
// 2nd and 4th jobs got killed
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
- getHttpStatusCode(url, viewColleague, "POST"));
+ getHttpStatusCode(url, jobSubmitter1, "POST"));
assertFalse("killJob succeeded for a job for which user doesnot "
+ " have job-modify permission", job1.isComplete());
assertFalse("killJob succeeded for a job for which user doesnot "
@@ -557,12 +633,12 @@
Properties props = new Properties();
props.setProperty("hadoop.http.filter.initializers",
DummyFilterInitializer.class.getName());
- props.setProperty(
- MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, String.valueOf(true));
+ props.setProperty(MRConfig.MR_ACLS_ENABLED, String.valueOf(true));
+
props.setProperty("dfs.permissions.enabled", "false");
props.setProperty(JSPUtil.PRIVATE_ACTIONS_KEY, "true");
- props.setProperty("mapreduce.job.committer.setup.cleanup.needed", "false");
+ props.setProperty(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
@@ -571,12 +647,22 @@
MyGroupsProvider.mapping.put(unauthorizedUser, Arrays.asList("evilSociety"));
MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
MyGroupsProvider.mapping.put(viewAndModifyColleague, Arrays.asList("group3"));
+ MyGroupsProvider.mapping.put(qAdmin, Arrays.asList("group4"));
mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
- new String[] { "group4", "group5" }));
+ new String[] { "group5", "group6" }));
+
+ MyGroupsProvider.mapping.put(jobSubmitter1, Arrays.asList("group7"));
+ MyGroupsProvider.mapping.put(jobSubmitter2, Arrays.asList("group7"));
+ MyGroupsProvider.mapping.put(jobSubmitter3, Arrays.asList("group7"));
- startCluster(true, props);
+ String[] queueNames = {"default"};
+ String[] submitAclStrings = {jobSubmitter + "," + jobSubmitter1 + ","
+ + jobSubmitter2 + "," + jobSubmitter3};
+ String[] adminsAclStrings = new String[]{qAdmin};
+ startCluster(props, queueNames, submitAclStrings, adminsAclStrings);
+
MiniMRCluster cluster = getMRCluster();
int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
@@ -630,8 +716,8 @@
viewAndModifyColleague);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, jobSubmitter);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrOwner);
- confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
- superGroupMember);
+ confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, superGroupMember);
+ confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, qAdmin);
// validate killing of multiple jobs using jobtracker jsp and check
// if all the jobs which can be killed by user are actually the ones that
@@ -676,7 +762,7 @@
"&changeJobPriority=true&setJobPriority="+"HIGH"+"&jobCheckBox=" +
jobid.toString();
validateModifyJob(jobTrackerJSPSetJobPriorityAction, "GET");
- // jobSubmitter, mrOwner and superGroupMember are not validated for
+ // jobSubmitter, mrOwner, qAdmin and superGroupMember are not validated for
// job-modify permission in validateModifyJob(). So let us do it
// explicitly here
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
@@ -684,6 +770,8 @@
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, superGroupMember, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+ jobTrackerJSPSetJobPriorityAction, qAdmin, "GET"));
+ assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, mrOwner, "GET"));
}
@@ -759,6 +847,8 @@
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, mrOwner, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
+ getHttpStatusCode(jobTrackerJSP, qAdmin, "GET"));
+ assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, superGroupMember, "GET"));
}
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java b/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
index 27ea213..de2fcc4 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
@@ -26,11 +26,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Operation;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Before;
@@ -56,23 +57,33 @@
TestJobACLs.class.getCanonicalName() + Path.SEPARATOR
+ "completed-job-store");
+ private String jobSubmitter = "jobSubmitter";
+ private String viewColleague = "viewColleague";
+ private String modifyColleague = "modifyColleague";
+ private String qAdmin = "qAdmin";
+
/**
* Start the cluster before running the actual test.
*
* @throws IOException
*/
@Before
- public void setup() throws IOException {
+ public void setup() throws Exception {
// Start the cluster
startCluster(false);
}
- private void startCluster(boolean reStart) throws IOException {
- UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
+ private void startCluster(boolean reStart) throws Exception {
+
+ // Configure job queues
+ String[] queueNames = {"default"};
+ createQueuesConfigFile(queueNames,
+ new String[] { jobSubmitter }, new String[] { qAdmin });
+
JobConf conf = new JobConf();
- // Enable job-level authorization
- conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+ // Enable queue and job level authorization
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
// Enable CompletedJobStore
FileSystem fs = FileSystem.getLocal(conf);
@@ -84,6 +95,7 @@
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
conf.set(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
+ UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, MR_UGI, conf);
}
@@ -92,6 +104,7 @@
*/
@After
public void tearDown() {
+ deleteQueuesConfigFile();
if (mr != null) {
mr.shutdown();
}
@@ -106,10 +119,10 @@
* @throws ClassNotFoundException
*/
@Test
- public void testACLS() throws IOException, InterruptedException,
- ClassNotFoundException {
+ public void testACLS() throws Exception {
verifyACLViewJob();
- verifyACLModifyJob();
+ verifyACLModifyJob(modifyColleague);
+ verifyACLModifyJob(qAdmin);
verifyACLPersistence();
}
@@ -123,18 +136,21 @@
// Set the job up.
final Configuration myConf = mr.createJobConf();
- myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "user1,user3");
+ myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague);
// Submit the job as user1
- Job job = submitJobAsUser(myConf, "user1");
+ Job job = submitJobAsUser(myConf, jobSubmitter);
final JobID jobId = job.getJobID();
// Try operations as an unauthorized user.
- verifyViewJobAsUnauthorizedUser(myConf, jobId, "user2");
+ verifyViewJobAsUnauthorizedUser(myConf, jobId, modifyColleague);
- // Try operations as an authorized user.
- verifyViewJobAsAuthorizedUser(myConf, jobId, "user3");
+ // Try operations as an authorized user, who is part of view-job-acl.
+ verifyViewJobAsAuthorizedUser(myConf, jobId, viewColleague);
+
+ // Try operations as an authorized user, who is a queue administrator.
+ verifyViewJobAsAuthorizedUser(myConf, jobId, qAdmin);
// Clean up the job
job.killJob();
@@ -242,7 +258,7 @@
fail("AccessControlException expected..");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
- JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.VIEW_JOB));
+ " cannot perform operation " + JobACL.VIEW_JOB));
} catch (InterruptedException e) {
fail("Exception .. interrupted.." + e);
}
@@ -253,7 +269,7 @@
fail("AccessControlException expected..");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
- JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.VIEW_JOB));
+ " cannot perform operation " + JobACL.VIEW_JOB));
} catch (InterruptedException e) {
fail("Exception .. interrupted.." + e);
}
@@ -264,29 +280,29 @@
}
/**
- * Verify JobContext.Job_ACL_MODIFY_JOB
+ * Verify MRConfig.Job_ACL_MODIFY_JOB
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
- private void verifyACLModifyJob() throws IOException,
+ private void verifyACLModifyJob(String authorizedUser) throws IOException,
InterruptedException, ClassNotFoundException {
// Set the job up.
final Configuration myConf = mr.createJobConf();
- myConf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, "user1,user3");
+ myConf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, modifyColleague);
// Submit the job as user1
- Job job = submitJobAsUser(myConf, "user1");
+ Job job = submitJobAsUser(myConf, jobSubmitter);
final JobID jobId = job.getJobID();
// Try operations as an unauthorized user.
- verifyModifyJobAsUnauthorizedUser(myConf, jobId, "user2");
+ verifyModifyJobAsUnauthorizedUser(myConf, jobId, viewColleague);
// Try operations as an authorized user.
- verifyModifyJobAsAuthorizedUser(myConf, jobId, "user3");
+ verifyModifyJobAsAuthorizedUser(myConf, jobId, authorizedUser);
}
private void verifyModifyJobAsAuthorizedUser(
@@ -357,7 +373,7 @@
fail("AccessControlException expected..");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
- JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.MODIFY_JOB));
+ " cannot perform operation " + Operation.KILL_JOB));
} catch (InterruptedException e) {
fail("Exception .. interrupted.." + e);
}
@@ -368,7 +384,7 @@
fail("AccessControlException expected..");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
- JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.MODIFY_JOB));
+ " cannot perform operation " + Operation.SET_JOB_PRIORITY));
} catch (InterruptedException e) {
fail("Exception .. interrupted.." + e);
}
@@ -378,15 +394,14 @@
});
}
- private void verifyACLPersistence() throws IOException,
- InterruptedException {
+ private void verifyACLPersistence() throws Exception {
// Set the job up.
final Configuration myConf = mr.createJobConf();
- myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "user2 group2");
+ myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague + " group2");
// Submit the job as user1
- Job job = submitJobAsUser(myConf, "user1");
+ Job job = submitJobAsUser(myConf, jobSubmitter);
final JobID jobId = job.getJobID();
@@ -406,11 +421,14 @@
final Configuration myNewJobConf = mr.createJobConf();
// Now verify view-job works off CompletedJobStore
- verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, "user2");
+ verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, viewColleague);
+ verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, qAdmin);
// Only JobCounters is persisted on the JobStore. So test counters only.
UserGroupInformation unauthorizedUGI =
- UserGroupInformation.createUserForTesting("user3", new String[] {});
+ UserGroupInformation.createUserForTesting(
+ modifyColleague, new String[] {});
+
unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
@SuppressWarnings("null")
@Override
@@ -432,7 +450,7 @@
fail("AccessControlException expected..");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
- JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.VIEW_JOB));
+ " cannot perform operation " + Operation.VIEW_JOB_COUNTERS));
} catch (InterruptedException e) {
fail("Exception .. interrupted.." + e);
}
diff --git a/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java b/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
index 5466f69..15d4ce1 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
@@ -21,10 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
diff --git a/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java b/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
index 7ca0891..dd2649e 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.tools.rumen;
-import java.text.ParseException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -72,7 +71,11 @@
String submitTime = line.get("SUBMIT_TIME");
String jobConf = line.get("JOBCONF");
String user = line.get("USER");
+ if (user == null) {
+ user = "nulluser";
+ }
String jobName = line.get("JOBNAME");
+ String jobQueueName = line.get("JOB_QUEUE");// could be null
if (submitTime != null) {
Job20LineHistoryEventEmitter that =
@@ -82,8 +85,8 @@
Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
- return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
- : user, that.originalSubmitTime, jobConf, jobACLs);
+ return new JobSubmittedEvent(jobID, jobName, user,
+ that.originalSubmitTime, jobConf, jobACLs, jobQueueName);
}
return null;
diff --git a/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java b/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
index 59c7dec..ca77dda 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
@@ -544,6 +544,8 @@
result.setJobName(event.getJobName());
result.setUser(event.getUserName());
result.setSubmitTime(event.getSubmitTime());
+ // job queue name is set when conf file is processed.
+ // See JobBuilder.process(Properties) method for details.
}
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {