MAPREDUCE-1664. Job Acls affect Queue Acls.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@998003 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e4392fa..d28a041 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@
     MAPREDUCE-1866. Removes deprecated class
     org.apache.hadoop.streaming.UTF8ByteArrayUtils. (amareshwari)
 
+    MAPREDUCE-1664. Changes the behaviour of the combination of job-acls
+    when they function together with queue-acls. (Ravi Gummadi via vinodkv)
+
   NEW FEATURES
 
     MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.
diff --git a/conf/mapred-queues.xml.template b/conf/mapred-queues.xml.template
index 252f5af..ce6cd20 100644
--- a/conf/mapred-queues.xml.template
+++ b/conf/mapred-queues.xml.template
@@ -18,10 +18,10 @@
 <!-- This is the template for queue configuration. The format supports nesting of
      queues within queues - a feature called hierarchical queues. All queues are
      defined within the 'queues' tag which is the top level element for this
-     XML document.
-     The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
-     on queue operations such as submitting jobs, killing jobs etc. -->
-<queues aclsEnabled="false">
+     XML document. The queue acls configured here for different queues are
+     checked for authorization only if the configuration property
+     mapreduce.cluster.acls.enabled is set to true. -->
+<queues>
 
   <!-- Configuration for a queue is specified by defining a 'queue' element. -->
   <queue>
@@ -40,17 +40,37 @@
 
     <!-- Specifies the ACLs to check for submitting jobs to this queue.
          If set to '*', it allows all users to submit jobs to the queue.
+         If set to ' '(i.e. space), no user will be allowed to do this
+         operation. The default value for any queue acl is ' '.
          For specifying a list of users and groups the format to use is
-         user1,user2 group1,group2 -->
-    <acl-submit-job>*</acl-submit-job>
+         user1,user2 group1,group2
 
-    <!-- Specifies the ACLs to check for modifying jobs in this queue.
-         Modifications include killing jobs, tasks of jobs or changing
+         It is only used if authorization is enabled in Map/Reduce by setting
+         the configuration property mapreduce.cluster.acls.enabled to true.
+
+         Irrespective of this ACL configuration, the user who started the
+         cluster and cluster administrators configured via
+         mapreduce.cluster.administrators can do this operation. -->
+    <acl-submit-job> </acl-submit-job>
+
+    <!-- Specifies the ACLs to check for viewing and modifying jobs in this
+         queue. Modifications include killing jobs, tasks of jobs or changing
          priorities.
-         If set to '*', it allows all users to submit jobs to the queue.
+         If set to '*', it allows all users to view, modify jobs of the queue.
+         If set to ' '(i.e. space), no user will be allowed to do this
+         operation.
          For specifying a list of users and groups the format to use is
-         user1,user2 group1,group2 -->
-    <acl-administer-jobs>*</acl-administer-jobs>
+         user1,user2 group1,group2
+
+         It is only used if authorization is enabled in Map/Reduce by setting
+         the configuration property mapreduce.cluster.acls.enabled to true.
+
+         Irrespective of this ACL configuration, the user who started the
+         cluster  and cluster administrators configured via
+         mapreduce.cluster.administrators can do the above operations on all
+         the jobs in all the queues. The job owner can do all the above
+         operations on his/her job irrespective of this ACL configuration. -->
+    <acl-administer-jobs> </acl-administer-jobs>
   </queue>
 
   <!-- Here is a sample of a hierarchical queue configuration
diff --git a/src/c++/task-controller/task-controller.c b/src/c++/task-controller/task-controller.c
index 117f0a7..eb0cbaa 100644
--- a/src/c++/task-controller/task-controller.c
+++ b/src/c++/task-controller/task-controller.c
@@ -227,6 +227,14 @@
 }
 
 /**
+ * Get the job ACLs file for the given job log dir.
+ */
+char *get_job_acls_file(const char *log_dir) {
+  return concatenate(JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN, "job_acls_file",
+                     1, log_dir);
+}
+
+/**
  * Function to check if the passed tt_root is present in mapreduce.cluster.local.dir
  * the task-controller is configured with.
  */
@@ -517,12 +525,20 @@
 }
 
 /**
- * Function to prepare the job log dir for the child. It gives the user
- * ownership of the job's log-dir to the user and group ownership to the
- * user running tasktracker.
- *     *  sudo chown user:mapred log-dir/userlogs/$jobid
- *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
- *     *  sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ * Function to prepare the job log dir(and job acls file in it) for the child.
+ * It gives the user ownership of the job's log-dir to the user and
+ * group ownership to the user running tasktracker(i.e. tt_user).
+ *
+ *   *  sudo chown user:mapred log-dir/userlogs/$jobid
+ *   *    if user is not $tt_user,
+ *   *      sudo chmod 2570 log-dir/userlogs/$jobid
+ *   *    else
+ *   *      sudo chmod 2770 log-dir/userlogs/$jobid
+ *   *  sudo chown user:mapred log-dir/userlogs/$jobid/job-acls.xml
+ *   *    if user is not $tt_user,
+ *   *      sudo chmod 2570 log-dir/userlogs/$jobid/job-acls.xml
+ *   *    else
+ *   *      sudo chmod 2770 log-dir/userlogs/$jobid/job-acls.xml 
  */
 int prepare_job_logs(const char *log_dir, const char *job_id,
     mode_t permissions) {
@@ -559,6 +575,42 @@
     free(job_log_dir);
     return -1;
   }
+
+  //set ownership and permissions for job_log_dir/job-acls.xml, if exists.
+  char *job_acls_file = get_job_acls_file(job_log_dir);
+  if (job_acls_file == NULL) {
+    fprintf(LOGFILE, "Couldn't get job acls file %s.\n", job_acls_file);
+    free(job_log_dir);
+    return -1; 
+  }
+
+  struct stat filestat1;
+  if (stat(job_acls_file, &filestat1) != 0) {
+    if (errno == ENOENT) {
+#ifdef DEBUG
+      fprintf(LOGFILE, "job_acls_file %s doesn't exist. Not doing anything.\n",
+          job_acls_file);
+#endif
+      free(job_acls_file);
+      free(job_log_dir);
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the job_acls_file %s\n", job_acls_file);
+      free(job_acls_file);
+      free(job_log_dir);
+      return -1;
+    }
+  }
+
+  if (secure_single_path(job_acls_file, user_detail->pw_uid, tasktracker_gid,
+      permissions, 1) != 0) {
+    fprintf(LOGFILE, "Failed to secure the job acls file %s\n", job_acls_file);
+    free(job_acls_file);
+    free(job_log_dir);
+    return -1;
+  }
+  free(job_acls_file);
   free(job_log_dir);
   return 0;
 }
diff --git a/src/c++/task-controller/task-controller.h b/src/c++/task-controller/task-controller.h
index 662b9ab..55d1221 100644
--- a/src/c++/task-controller/task-controller.h
+++ b/src/c++/task-controller/task-controller.h
@@ -90,6 +90,8 @@
 
 #define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
 
+#define JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN "%s/job-acls.xml"
+
 #define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
diff --git a/src/c++/task-controller/tests/test-task-controller.c b/src/c++/task-controller/tests/test-task-controller.c
index 23a5067..d6c2531 100644
--- a/src/c++/task-controller/tests/test-task-controller.c
+++ b/src/c++/task-controller/tests/test-task-controller.c
@@ -183,6 +183,19 @@
   assert(ret == 0);
 }
 
+void test_get_job_acls_file() {
+  char *job_acls_file = (char *) get_job_acls_file(
+    "/tmp/testing/userlogs/job_200906101234_0001");
+  printf("job acls file obtained is %s\n", job_acls_file);
+  int ret = 0;
+  if (strcmp(job_acls_file,
+    "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
+    ret = -1;
+  }
+  free(job_acls_file);
+  assert(ret == 0);
+}
+
 void test_get_task_log_dir() {
   char *logdir = (char *) get_task_log_dir("/tmp/testing",
     "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
@@ -219,6 +232,9 @@
   printf("\nTesting get_job_log_dir()\n");
   test_get_job_log_dir();
 
+  printf("\nTesting get_job_acls_file()\n");
+  test_get_job_acls_file();
+
   printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
 
diff --git a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
index 99a5213..cf77cb7 100644
--- a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
+++ b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.QueueState;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -584,10 +585,8 @@
       for (String queueName : queueNames) {
         HashMap<String, AccessControlList> aclsMap
           = new HashMap<String, AccessControlList>();
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = QueueManager.toFullPropertyName(
-            queueName,
-            oper.getAclName());
+        for (QueueACL qAcl : QueueACL.values()) {
+          String key = toFullPropertyName(queueName, qAcl.getAclName());
           aclsMap.put(key, allEnabledAcl);
         }
         queues[i++] = new Queue(queueName, aclsMap, QueueState.RUNNING);
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
index 5f25800..b6f0c4b 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskReport;
@@ -410,6 +411,11 @@
   }
 
   @Override
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException,
       InterruptedException {
     throw new UnsupportedOperationException();
diff --git a/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml b/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
index 8d83877..33d73b6 100644
--- a/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
+++ b/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
@@ -1762,12 +1762,14 @@
         
         <section>
           <title>Job Authorization</title>
-          <p>Job level authorization is enabled on the cluster, if the configuration
-          <code>mapreduce.cluster.job-authorization-enabled</code> is set to
-          true. When enabled, access control checks are done by the JobTracker
-          and the TaskTracker before allowing users to view
-          job details or to modify a job using Map/Reduce APIs,
-          CLI or web user interfaces.</p>
+          <p>Job level authorization and queue level authorization are enabled
+          on the cluster, if the configuration
+          <code>mapreduce.cluster.acls.enabled</code> is set to
+          true. When enabled, access control checks are done by (a) the
+          JobTracker before allowing users to submit jobs to queues and
+          administering these jobs and (b) by the JobTracker and the TaskTracker
+          before allowing users to view job details or to modify a job using
+          MapReduce APIs, CLI or web user interfaces.</p>
          
           <p>A job submitter can specify access control lists for viewing or
           modifying a job via the configuration properties
@@ -1775,11 +1777,13 @@
           <code>mapreduce.job.acl-modify-job</code> respectively. By default, 
           nobody is given access in these properties.</p> 
           
-          <p>However, irrespective of the ACLs configured, a job's owner,
-          the superuser and the members of an admin configured supergroup
-          (<code>mapreduce.cluster.permissions.supergroup</code>) always
-          have access to view and modify a job.</p>
-          
+          <p>However, irrespective of the job ACLs configured, a job's owner,
+          the user who started the cluster and members of an admin configured
+          supergroup (<code>mapreduce.cluster.permissions.supergroup</code>)
+          and queue administrators of the queue to which the job was submitted
+          to (<code>acl-administer-jobs</code>) always have access to view and
+          modify a job.</p>
+
           <p> A job view ACL authorizes users against the configured 
           <code>mapreduce.job.acl-view-job</code> before returning possibly 
           sensitive information about a job, like: </p>
@@ -1801,10 +1805,13 @@
             <li> killing/failing a task of a job </li>
             <li> setting the priority of a job </li>
           </ul>
-          <p>These operations are also protected by the queue level ACL,
-          "acl-administer-jobs", configured via mapred-queue-acls.xml. The caller
-          will be authorized against both queue level ACLs and job level ACLs,
-          depending on what is enabled.</p>
+          <p>These view and modify operations on jobs are also permitted by
+          the queue level ACL, "acl-administer-jobs", configured via
+          mapred-queue-acls.xml. The caller will be able to do the operation
+          if he/she is part of either queue admins ACL or job modification ACL
+          or the user who started the cluster or a member of an admin configured
+          supergroup (<code>mapreduce.cluster.permissions.supergroup</code>).
+          </p>
           
           <p>The format of a job level ACL is the same as the format for a
           queue level ACL as defined in the
diff --git a/src/java/mapred-default.xml b/src/java/mapred-default.xml
index 2db49b5..86eb85f 100644
--- a/src/java/mapred-default.xml
+++ b/src/java/mapred-default.xml
@@ -923,7 +923,7 @@
   <name>mapreduce.job.queuename</name>
   <value>default</value>
   <description> Queue to which a job is submitted. This must match one of the
-    queues defined in mapred.queue.names for the system. Also, the ACL setup
+    queues defined in mapred-queues.xml for the system. Also, the ACL setup
     for the queue must allow the current user to submit a job to the queue.
     Before specifying a queue, ensure that the system is configured with 
     the queue, and access is allowed for submitting jobs to the queue.
@@ -931,69 +931,85 @@
 </property>
 
 <property>
-  <name>mapreduce.cluster.job-authorization-enabled</name>
+  <name>mapreduce.cluster.acls.enabled</name>
   <value>false</value>
-  <description> Boolean flag that specifies if job-level authorization checks
-  should be enabled on the jobs submitted to the cluster.  Job-level
-  authorization is enabled if this flag is set to true or disabled otherwise.
-  It is disabled by default. If enabled, access control checks are made by
-  JobTracker and TaskTracker when requests are made by users for viewing the
-  job-details (See mapreduce.job.acl-view-job) or for modifying the job
-  (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
-  console and web user interfaces.
+  <description> Specifies whether ACLs should be checked
+    for authorization of users for doing various queue and job level operations.
+    ACLs are disabled by default. If enabled, access control checks are made by
+    JobTracker and TaskTracker when requests are made by users for queue
+    operations like submit job to a queue and kill a job in the queue and job
+    operations like viewing the job-details (See mapreduce.job.acl-view-job)
+    or for modifying the job (See mapreduce.job.acl-modify-job) using
+    Map/Reduce APIs, RPCs or via the console and web user interfaces.
+    For enabling this flag(mapreduce.cluster.acls.enabled), this is to be set
+    to true in mapred-site.xml on JobTracker node and on all TaskTracker nodes.
   </description>
 </property>
 
 <property>
   <name>mapreduce.job.acl-modify-job</name>
-  <value></value>
+  <value> </value>
   <description> Job specific access-control list for 'modifying' the job. It
     is only used if authorization is enabled in Map/Reduce by setting the
-    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    configuration property mapreduce.cluster.acls.enabled to true.
     This specifies the list of users and/or groups who can do modification
     operations on the job. For specifying a list of users and groups the
     format to use is "user1,user2 group1,group". If set to '*', it allows all
-    users/groups to modify this job. If set to '', it allows none. This
-    configuration is used to guard all the modifications with respect to this
-    job and takes care of all the following operations:
+    users/groups to modify this job. If set to ' '(i.e. space), it allows
+    none. This configuration is used to guard all the modifications with respect
+    to this job and takes care of all the following operations:
       o killing this job
       o killing a task of this job, failing a task of this job
       o setting the priority of this job
     Each of these operations are also protected by the per-queue level ACL
     "acl-administer-jobs" configured via mapred-queues.xml. So a caller should
-    have the authorization to satisfy both the queue-level ACL and the
+    have the authorization to satisfy either the queue-level ACL or the
     job-level ACL.
 
-    Irrespective of this ACL configuration, job-owner, superuser and members
-    of supergroup configured on JobTracker via 
-    "mapreduce.cluster.permissions.supergroup",
-    can do all the modification operations.
+    Irrespective of this ACL configuration, (a) job-owner, (b) the user who
+    started the cluster, (c) members of an admin configured supergroup
+    configured via mapreduce.cluster.permissions.supergroup and (d) queue
+    administrators of the queue to which this job was submitted to configured
+    via acl-administer-jobs for the specific queue in mapred-queues.xml can
+    do all the modification operations on a job.
 
-    By default, nobody else besides job-owner, superuser/supergroup can
-    perform modification operations on a job that they don't own.
+    By default, nobody else besides job-owner, the user who started the cluster,
+    members of supergroup and queue administrators can perform modification
+    operations on a job.
   </description>
 </property>
 
 <property>
   <name>mapreduce.job.acl-view-job</name>
-  <value></value>
+  <value> </value>
   <description> Job specific access-control list for 'viewing' the job. It is
     only used if authorization is enabled in Map/Reduce by setting the
-    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    configuration property mapreduce.cluster.acls.enabled to true.
     This specifies the list of users and/or groups who can view private details
     about the job. For specifying a list of users and groups the
     format to use is "user1,user2 group1,group". If set to '*', it allows all
-    users/groups to modify this job. If set to '', it allows none. This
-    configuration is used to guard some of the job-views and at present only
-    protects APIs that can return possibly sensitive information of the
-    job-owner like
+    users/groups to modify this job. If set to ' '(i.e. space), it allows
+    none. This configuration is used to guard some of the job-views and at
+    present only protects APIs that can return possibly sensitive information
+    of the job-owner like
       o job-level counters
       o task-level counters
       o tasks' diagnostic information
       o task-logs displayed on the TaskTracker web-UI and
       o job.xml showed by the JobTracker's web-UI
-    Every other piece information of jobs is still accessible by any other
-    users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+    Every other piece of information of jobs is still accessible by any other
+    user, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+
+    Irrespective of this ACL configuration, (a) job-owner, (b) the user who
+    started the cluster, (c) members of an admin configured supergroup
+    configured via mapreduce.cluster.permissions.supergroup and (d) queue
+    administrators of the queue to which this job was submitted to configured
+    via acl-administer-jobs for the specific queue in mapred-queues.xml can
+    do all the view operations on a job.
+
+    By default, nobody else besides job-owner, the user who started the
+    cluster, memebers of supergroup and queue administrators can perform
+    view operations on a job.
   </description>
 </property>
 
diff --git a/src/java/mapred-queues-default.xml b/src/java/mapred-queues-default.xml
index 4a33f36..f222cd7 100644
--- a/src/java/mapred-queues-default.xml
+++ b/src/java/mapred-queues-default.xml
@@ -17,13 +17,13 @@
 -->
 <!-- This is the default mapred-queues.xml file that is loaded in the case
      that the user does not have such a file on their classpath. -->
-<queues aclsEnabled="false">
+<queues>
   <queue>
     <name>default</name>
     <properties>
     </properties>
     <state>running</state>
-    <acl-submit-job>*</acl-submit-job>
-    <acl-administer-jobs>*</acl-administer-jobs>
+    <acl-submit-job> </acl-submit-job>
+    <acl-administer-jobs> </acl-administer-jobs>
   </queue>
 </queues>
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/mapred/ACLsManager.java b/src/java/org/apache/hadoop/mapred/ACLsManager.java
new file mode 100644
index 0000000..21a5052
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapred/ACLsManager.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.AuditLogger.Constants;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Manages MapReduce cluster administrators and access checks for
+ * job level operations and queue level operations.
+ * Uses JobACLsManager for access checks of job level operations and
+ * QueueManager for queue operations.
+ */
+@InterfaceAudience.Private
+class ACLsManager {
+
+  // MROwner(user who started this mapreduce cluster)'s ugi
+  private final UserGroupInformation mrOwner;
+  // members of supergroup are mapreduce cluster administrators
+  private final String superGroup;
+  
+  private final JobACLsManager jobACLsManager;
+  private final QueueManager queueManager;
+  
+  private final boolean aclsEnabled;
+
+  ACLsManager(Configuration conf, JobACLsManager jobACLsManager,
+      QueueManager queueManager) throws IOException {
+
+    mrOwner = UserGroupInformation.getCurrentUser();
+    superGroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
+    
+    aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+
+    this.jobACLsManager = jobACLsManager;
+    this.queueManager = queueManager;
+  }
+
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
+  String getSuperGroup() {
+    return superGroup;
+  }
+
+  JobACLsManager getJobACLsManager() {
+    return jobACLsManager;
+  }
+
+  /**
+   * Is the calling user an admin for the mapreduce cluster ?
+   * i.e. either cluster owner or member of the supergroup
+   *      mapreduce.cluster.permissions.supergroup.
+   * @return true, if user is an admin
+   */
+  boolean isMRAdmin(UserGroupInformation callerUGI) {
+    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
+      return true;
+    }
+    String[] groups = callerUGI.getGroupNames();
+    for(int i=0; i < groups.length; ++i) {
+      if (groups[i].equals(superGroup)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, if the operation is not a job operation(for eg.
+   *  submit-job-to-queue), then allow only (a) clusterOwner(who started the
+   *  cluster), (b) cluster administrators and (c) members of
+   *  queue-submit-job-acl for the queue.</li>
+   * <li>If the operation is a job operation, then allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+   * (d) members of queue admins acl for the queue and (e) members of job
+   * acl for the job operation</li>
+   * </ul>
+   * 
+   * @param job   the job on which operation is requested
+   * @param callerUGI  the user who is requesting the operation
+   * @param operation  the operation for which authorization is needed
+   * @throws AccessControlException
+   */
+   void checkAccess(JobInProgress job, UserGroupInformation callerUGI,
+       Operation operation) throws AccessControlException {
+
+    String queue = job.getProfile().getQueueName();
+    String jobId = job.getJobID().toString();
+    JobStatus jobStatus = job.getStatus();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList jobAcl =
+        jobStatus.getJobACLs().get(operation.jobACLNeeded);
+
+    checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed job operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+   * (d) members of job acl for the jobOperation</li>
+   * </ul>
+   * 
+   * @param jobStatus  the status of the job
+   * @param callerUGI  the user who is trying to perform the operation
+   * @param queue      the job queue name
+   * @param operation  the operation for which authorization is needed
+   */
+  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+      String queue, Operation operation) throws AccessControlException {
+
+    String jobId = jobStatus.getJobID().toString();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList jobAcl =
+      jobStatus.getJobACLs().get(operation.jobACLNeeded);
+
+    // If acls are enabled, check if callerUGI is jobOwner, queue admin,
+    // cluster admin or part of job ACL
+    checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, if the operation is not a job operation(for eg.
+   *  submit-job-to-queue), then allow only (a) clusterOwner(who started the
+   *  cluster), (b) cluster administrators and (c) members of
+   *  queue-submit-job-acl for the queue.</li>
+   * <li>If the operation is a job operation, then allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+   * (d) members of queue admins acl for the queue and (e) members of job
+   * acl for the job operation</li>
+   * </ul>
+   * 
+   * @param jobId      the job id
+   * @param callerUGI  the user who is trying to perform the operation
+   * @param queue      the job queue name
+   * @param operation  the operation for which authorization is needed
+   * @param jobOwner   the user who submitted(or is submitting) this job
+   * @param jobAcl     could be job-view-acl or job-modify-acl depending on the
+   *                   job operation.
+   */
+  void checkAccess(String jobId, UserGroupInformation callerUGI,
+      String queue, Operation operation, String jobOwner,
+      AccessControlList jobAcl) throws AccessControlException {
+
+    String user = callerUGI.getShortUserName();
+    String targetResource = jobId + " in queue " + queue;
+
+    if (!aclsEnabled) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
+    }
+
+    // Allow mapreduce cluster admins to do any queue operation and
+    // any job operation
+    if (isMRAdmin(callerUGI)) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
+    }
+
+    if (operation == Operation.SUBMIT_JOB) {
+      // This is strictly queue operation(not a job operation)
+      if (!queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI)) {
+        AuditLogger.logFailure(user, operation.name(),
+            queueManager.getQueueACL(queue, operation.qACLNeeded).toString(),
+            targetResource, Constants.UNAUTHORIZED_USER);
+
+        throw new AccessControlException("User "
+            + callerUGI.getShortUserName() + " cannot perform "
+            + "operation " + operation.name() + " on queue " + queue
+            + ".\n Please run \"hadoop queue -showacls\" "
+            + "command to find the queues you have access to .");
+      } else {
+        AuditLogger.logSuccess(user, operation.name(), targetResource);
+        return;
+      }
+    }
+
+    // Check if callerUGI is queueAdmin(in some cases only), jobOwner or
+    // part of job-acl.
+
+    // queueManager and queue are null only when called from
+    // TaskTracker(i.e. from TaskLogServlet) for the operation VIEW_TASK_LOGS.
+    // Caller of this method takes care of checking if callerUGI is a
+    // queue administrator for that operation.
+    if (operation == Operation.VIEW_TASK_LOGS) {
+      if (jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+          jobOwner, jobAcl)) {
+        AuditLogger.logSuccess(user, operation.name(), targetResource);
+        return;
+      }
+    } else if (queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI) ||
+        jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+            jobOwner, jobAcl)) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
+    }
+
+    AuditLogger.logFailure(user, operation.name(), jobAcl.toString(),
+        targetResource, Constants.UNAUTHORIZED_USER);
+
+    throw new AccessControlException("User "
+        + callerUGI.getShortUserName() + " cannot perform operation "
+        + operation.name() + " on " + jobId + " that is in the queue "
+        + queue);
+  }
+
+}
diff --git a/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java b/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
index dd34fac..10e4879 100644
--- a/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
+++ b/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.AccessControlException;
@@ -52,7 +51,7 @@
   private FileSystem fs;
   private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
 
-  private JobACLsManager jobACLsManager = null;
+  private ACLsManager aclsManager;
 
   public static final Log LOG =
           LogFactory.getLog(CompletedJobStatusStore.class);
@@ -62,7 +61,8 @@
   final static FsPermission JOB_STATUS_STORE_DIR_PERMISSION = FsPermission
       .createImmutable((short) 0750); // rwxr-x--
 
-  CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+
+  CompletedJobStatusStore(Configuration conf, ACLsManager aclsManager)
       throws IOException {
     active =
       conf.getBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
@@ -104,7 +104,7 @@
         deleteJobStatusDirs();
       }
 
-      this.jobACLsManager = aclsManager;
+      this.aclsManager = aclsManager;
 
       LOG.info("Completed job store activated/configured with retain-time : " 
                + retainTime + " , job-info-dir : " + jobInfoDir);
@@ -301,7 +301,7 @@
   }
 
   /**
-   * This method retrieves Counters information from DFS stored using
+   * This method retrieves Counters information from file stored using
    * store method.
    *
    * @param jobId the jobId for which Counters is queried
@@ -315,9 +315,13 @@
         FSDataInputStream dataIn = getJobInfoFile(jobId);
         if (dataIn != null) {
           JobStatus jobStatus = readJobStatus(dataIn);
-          jobACLsManager.checkAccess(jobStatus,
-              UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
-          readJobProfile(dataIn);
+          JobProfile profile = readJobProfile(dataIn);
+          String queue = profile.getQueueName();
+          // authorize the user for job view access
+          aclsManager.checkAccess(jobStatus,
+              UserGroupInformation.getCurrentUser(), queue,
+              Operation.VIEW_JOB_COUNTERS);
+
           counters = readCounters(dataIn);
           dataIn.close();
         }
diff --git a/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java b/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
index c962884..1f05869 100644
--- a/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
+++ b/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import static org.apache.hadoop.mapred.QueueManager.*;
@@ -46,7 +47,7 @@
       return;
     }
     List<Queue> listq = createQueues(conf);
-    this.setAclsEnabled(conf.getBoolean("mapred.acls.enabled", false));
+    this.setAclsEnabled(conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false));
     root = new Queue();
     root.setName("");
     for (Queue q : listq) {
@@ -78,9 +79,8 @@
    */
   private QueueState getQueueState(String name, Configuration conf) {
     String stateVal = conf.get(
-      QueueManager.toFullPropertyName(
-        name,"state"),
-      QueueState.RUNNING.getStateName());
+        toFullPropertyName(name, "state"),
+        QueueState.RUNNING.getStateName());
     return QueueState.getState(stateVal);
   }
 
@@ -105,21 +105,11 @@
       queues = conf.getStrings(MAPRED_QUEUE_NAMES_KEY);
     }
 
-    // check if the acls flag is defined
-    String aclsEnable = conf.get("mapred.acls.enabled");
-    if (aclsEnable != null) {
-      LOG.warn(
-        "Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
-          "hadoop-site.xml is deprecated. Configure " +
-          "queue hierarchy in " +
-          QUEUE_CONF_FILE_NAME);
-    }
-
     // check if acls are defined
     if (queues != null) {
       for (String queue : queues) {
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = toFullPropertyName(queue, oper.getAclName());
+        for (QueueACL qAcl : QueueACL.values()) {
+          String key = toFullPropertyName(queue, qAcl.getAclName());
           String aclString = conf.get(key);
           if (aclString != null) {
             LOG.warn(
@@ -149,8 +139,8 @@
     Configuration conf) {
     HashMap<String, AccessControlList> map =
       new HashMap<String, AccessControlList>();
-    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-      String aclKey = toFullPropertyName(name, oper.getAclName());
+    for (QueueACL qAcl : QueueACL.values()) {
+      String aclKey = toFullPropertyName(name, qAcl.getAclName());
       map.put(
         aclKey, new AccessControlList(
           conf.get(
diff --git a/src/java/org/apache/hadoop/mapred/JSPUtil.java b/src/java/org/apache/hadoop/mapred/JSPUtil.java
index 45730a5..183df63 100644
--- a/src/java/org/apache/hadoop/mapred/JSPUtil.java
+++ b/src/java/org/apache/hadoop/mapred/JSPUtil.java
@@ -95,14 +95,14 @@
    *         and decide if view should be allowed or not. Job will be null if
    *         the job with given jobid doesnot exist at the JobTracker.
    */
-  public static JobWithViewAccessCheck checkAccessAndGetJob(JobTracker jt,
+  public static JobWithViewAccessCheck checkAccessAndGetJob(final JobTracker jt,
       JobID jobid, HttpServletRequest request, HttpServletResponse response)
       throws ServletException, IOException {
     final JobInProgress job = jt.getJob(jobid);
     JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
 
     String user = request.getRemoteUser();
-    if (user != null && job != null && jt.isJobLevelAuthorizationEnabled()) {
+    if (user != null && job != null && jt.areACLsEnabled()) {
       final UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(user);
       try {
@@ -110,7 +110,8 @@
           public Void run() throws IOException, ServletException {
 
             // checks job view permission
-            job.checkAccess(ugi, JobACL.VIEW_JOB);
+            jt.getACLsManager().checkAccess(job, ugi,
+                Operation.VIEW_JOB_DETAILS);
             return null;
           }
         });
@@ -475,10 +476,10 @@
    * Read a job-history log file and construct the corresponding {@link JobInfo}
    * . Also cache the {@link JobInfo} for quick serving further requests.
    * 
-   * @param logFile
-   * @param fs
-   * @param jobTracker
-   * @return JobInfo
+   * @param logFile      the job history log file
+   * @param fs           job tracker file system
+   * @param jobTracker   the job tracker
+   * @return JobInfo     job's basic information
    * @throws IOException
    */
   static JobInfo getJobInfo(Path logFile, FileSystem fs,
@@ -506,20 +507,18 @@
       }
     }
 
-    jobTracker.getJobACLsManager().checkAccess(JobID.forName(jobid),
-        UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB,
-        jobInfo.getUsername(), jobInfo.getJobACLs().get(JobACL.VIEW_JOB));
     return jobInfo;
   }
 
   /**
-   * Check the access for users to view job-history pages.
+   * Check the access for users to view job-history pages and return
+   * {@link JobInfo}.
    * 
-   * @param request
-   * @param response
-   * @param jobTracker
-   * @param fs
-   * @param logFile
+   * @param request     http servlet request
+   * @param response    http servlet response
+   * @param jobTracker  the job tracker
+   * @param fs          job tracker file system
+   * @param logFile     the job history log file
    * @return the job if authorization is disabled or if the authorization checks
    *         pass. Otherwise return null.
    * @throws IOException
@@ -533,19 +532,24 @@
     String jobid =
         JobHistory.getJobIDFromHistoryFilePath(logFile).toString();
     String user = request.getRemoteUser();
-    JobInfo job = null;
+
+    JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);
     if (user != null) {
+      // authorize user for job-view access
       try {
         final UserGroupInformation ugi =
             UserGroupInformation.createRemoteUser(user);
-        job =
-            ugi.doAs(new PrivilegedExceptionAction<JobHistoryParser.JobInfo>() {
-              public JobInfo run() throws IOException {
-                // checks job view permission
-                JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);
-                return jobInfo;
-              }
-            });
+        
+        AccessControlList viewJobAcl = jobInfo.getJobACLs().get(JobACL.VIEW_JOB);
+        if (viewJobAcl == null) {
+          // may be older job history file of earlier unsecure cluster
+          viewJobAcl = new AccessControlList("*");
+        }
+
+        jobTracker.getACLsManager().checkAccess(jobid, ugi,
+            jobInfo.getJobQueueName(), Operation.VIEW_JOB_DETAILS,
+            jobInfo.getUsername(), viewJobAcl);
+
       } catch (AccessControlException e) {
         String errMsg =
             String.format(
@@ -557,11 +561,9 @@
         JSPUtil.setErrorAndForward(errMsg, request, response);
         return null;
       }
-    } else {
-      // no authorization needed
-      job = JSPUtil.getJobInfo(logFile, fs, jobTracker);
-    }
-    return job;
+    } // else { no authorization needed }
+
+    return jobInfo;
   }
 
   /**
@@ -574,7 +576,7 @@
   static void printJobACLs(JobTracker tracker,
       Map<JobACL, AccessControlList> jobAcls, JspWriter out)
       throws IOException {
-    if (tracker.isJobLevelAuthorizationEnabled()) {
+    if (tracker.areACLsEnabled()) {
       // Display job-view-acls and job-modify-acls configured for this job
       out.print("<b>Job-ACLs:</b><br>");
       for (JobACL aclName : JobACL.values()) {
@@ -587,5 +589,9 @@
         }
       }
     }
+    else {
+      out.print("<b>Job-ACLs: " + new AccessControlList("*").toString()
+          + "</b><br>");
+    }
   }
 }
diff --git a/src/java/org/apache/hadoop/mapred/JobACLsManager.java b/src/java/org/apache/hadoop/mapred/JobACLsManager.java
index 48dfe43..14265e4 100644
--- a/src/java/org/apache/hadoop/mapred/JobACLsManager.java
+++ b/src/java/org/apache/hadoop/mapred/JobACLsManager.java
@@ -20,26 +20,25 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 
 @InterfaceAudience.Private
-public abstract class JobACLsManager {
+class JobACLsManager {
 
-  static final Log LOG = LogFactory.getLog(JobACLsManager.class);
+  JobConf conf;
 
-  public static final String UNAUTHORIZED_JOB_ACCESS_ERROR =
-      " is not authorized for performing the operation ";
-  protected abstract boolean isJobLevelAuthorizationEnabled();
+  public JobACLsManager(JobConf conf) {
+    this.conf = conf;
+  }
 
-  protected abstract boolean isSuperUserOrSuperGroup(
-      UserGroupInformation callerUGI);
+  boolean areACLsEnabled() {
+    return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+  }
 
   /**
    * Construct the jobACLs from the configuration so that they can be kept in
@@ -54,7 +53,7 @@
         new HashMap<JobACL, AccessControlList>();
 
     // Don't construct anything if authorization is disabled.
-    if (!isJobLevelAuthorizationEnabled()) {
+    if (!areACLsEnabled()) {
       return acls;
     }
 
@@ -64,7 +63,7 @@
       if (aclConfigured == null) {
         // If ACLs are not configured at all, we grant no access to anyone. So
         // jobOwner and superuser/supergroup _only_ can do 'stuff'
-        aclConfigured = "";
+        aclConfigured = " ";
       }
       acls.put(aclName, new AccessControlList(aclConfigured));
     }
@@ -72,69 +71,34 @@
   }
 
   /**
-   * If authorization is enabled, checks whether the user (in the callerUGI) is
-   * authorized to perform the operation specified by 'jobOperation' on the job.
+   * If authorization is enabled, checks whether the user (in the callerUGI)
+   * is authorized to perform the operation specified by 'jobOperation' on
+   * the job by checking if the user is jobOwner or part of job ACL for the
+   * specific job operation.
    * <ul>
    * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup is always permitted to do operations on any
-   * job.</li>
    * <li>For all other users/groups job-acls are checked</li>
    * </ul>
-   * 
-   * @param jobStatus
-   * @param callerUGI
-   * @param jobOperation
-   */
-  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
-      JobACL jobOperation) throws AccessControlException {
-
-    JobID jobId = jobStatus.getJobID();
-    String jobOwner = jobStatus.getUsername();
-    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
-    checkAccess(jobId, callerUGI, jobOperation, jobOwner, acl);
-  }
-
-  /**
-   * If authorization is enabled, checks whether the user (in the callerUGI) is
-   * authorized to perform the operation specified by 'jobOperation' on the job.
-   * <ul>
-   * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup is always permitted to do operations on any
-   * job.</li>
-   * <li>For all other users/groups job-acls are checked</li>
-   * </ul>
-   * @param jobId
    * @param callerUGI
    * @param jobOperation
    * @param jobOwner
    * @param jobACL
    * @throws AccessControlException
    */
-  void checkAccess(JobID jobId, UserGroupInformation callerUGI,
-      JobACL jobOperation, String jobOwner, AccessControlList jobACL)
-      throws AccessControlException {
+  boolean checkAccess(UserGroupInformation callerUGI,
+      JobACL jobOperation, String jobOwner, AccessControlList jobACL) {
 
     String user = callerUGI.getShortUserName();
-    if (!isJobLevelAuthorizationEnabled()) {
-      return;
+    if (!areACLsEnabled()) {
+      return true;
     }
 
-    // Allow uperusers/supergroups
-    // Allow Job-owner as job's owner is always part of all the ACLs
-    if (callerUGI.getShortUserName().equals(jobOwner)
-        || isSuperUserOrSuperGroup(callerUGI)
+    // Allow Job-owner for any operation on the job
+    if (user.equals(jobOwner)
         || jobACL.isUserAllowed(callerUGI)) {
-      AuditLogger.logSuccess(user, jobOperation.name(),  jobId.toString());
-      return;
+      return true;
     }
 
-    AuditLogger.logFailure(user, jobOperation.name(), null, jobId.toString(),
-                           Constants.UNAUTHORIZED_USER);
-    throw new AccessControlException(callerUGI
-        + UNAUTHORIZED_JOB_ACCESS_ERROR
-        + jobOperation.toString() + " on " + jobId + ". "
-        + jobOperation.toString()
-        + " Access control list configured for this job : "
-        + jobACL.toString());
+    return false;
   }
 }
diff --git a/src/java/org/apache/hadoop/mapred/JobInProgress.java b/src/java/org/apache/hadoop/mapred/JobInProgress.java
index 3d99044..edee0f4 100644
--- a/src/java/org/apache/hadoop/mapred/JobInProgress.java
+++ b/src/java/org/apache/hadoop/mapred/JobInProgress.java
@@ -23,7 +23,6 @@
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -43,7 +42,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,7 +49,6 @@
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -79,13 +76,9 @@
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -432,7 +425,7 @@
         String desc = "The username " + conf.getUser() + " obtained from the "
             + "conf doesn't match the username " + user + " the user "
             + "authenticated as";
-        AuditLogger.logFailure(user, Queue.QueueOperation.SUBMIT_JOB.name(),
+        AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
             conf.getUser(), jobId.toString(), desc);
         throw new IOException(desc);
       }
@@ -724,12 +717,13 @@
     String username = conf.getUser();
     if (username == null) { username = ""; }
     String jobname = conf.getJobName();
-    if (jobname == null) { jobname = ""; }
+    String jobQueueName = conf.getQueueName();
+
     setUpLocalizedJobConf(conf, jobId);
     jobHistory.setupEventWriter(jobId, conf);
     JobSubmittedEvent jse =
         new JobSubmittedEvent(jobId, jobname, username, this.startTime,
-            jobFile.toString(), status.getJobACLs());
+            jobFile.toString(), status.getJobACLs(), jobQueueName);
     jobHistory.logEvent(jse, jobId);
     
   }
@@ -742,25 +736,6 @@
   }
 
   /**
-   * If authorization is enabled on the JobTracker, checks whether the user (in
-   * the callerUGI) is authorized to perform the operation specify by
-   * 'jobOperation' on the job.
-   * <ul>
-   * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup of the JobTracker is always permitted to do
-   * operations on any job.</li>
-   * <li>For all other users/groups job-acls are checked</li>
-   * </ul>
-   * 
-   * @param callerUGI
-   * @param jobOperation
-   */
-  void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
-      throws AccessControlException {
-    jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
-  }
-
-  /**
    * If the number of taks is greater than the configured value
    * throw an exception that will fail job initialization
    */
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 915985d..851fba5 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -77,7 +77,6 @@
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -105,6 +104,7 @@
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -1289,7 +1289,6 @@
                                                 "expireLaunchingTasks");
 
   final CompletedJobStatusStore completedJobStatusStore;
-  private JobTrackerJobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   final RecoveryManager recoveryManager;
 
@@ -1330,8 +1329,8 @@
   FileSystem fs = null;
   Path systemDir = null;
   JobConf conf;
-  private final UserGroupInformation mrOwner;
-  private final String supergroup;
+
+  private final ACLsManager aclsManager;
 
   long limitMaxMemForMapTasks;
   long limitMaxMemForReduceTasks;
@@ -1347,7 +1346,7 @@
     retiredJobsCacheSize = 0;
     infoServer = null;
     queueManager = null;
-    supergroup = null;
+    aclsManager = null;
     taskScheduler = null;
     trackerIdentifier = null;
     recoveryManager = null;
@@ -1355,7 +1354,6 @@
     completedJobStatusStore = null;
     tasktrackerExpiryInterval = 0;
     myInstrumentation = new JobTrackerMetricsInst(this, new JobConf());
-    mrOwner = null;
     secretManager = null;
     localFs = null;
   }
@@ -1382,11 +1380,7 @@
     UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
         localMachine);
-    mrOwner = UserGroupInformation.getCurrentUser();
-    
-    supergroup = conf.get(MR_SUPERGROUP, "supergroup");
-    LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName() 
-             + " and supergroup as " + supergroup);
+
     clock = newClock;
     
     long secretKeyInterval = 
@@ -1443,9 +1437,15 @@
     this.hostsReader = new HostsFileReader(conf.get(JTConfig.JT_HOSTS_FILENAME, ""),
                                            conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
 
-    Configuration queuesConf = new Configuration(this.conf);
-    queueManager = new QueueManager(queuesConf);
+    Configuration clusterConf = new Configuration(this.conf);
+    queueManager = new QueueManager(clusterConf);
     
+    aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+    LOG.info("Starting jobtracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
+
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass(JT_TASK_SCHEDULER,
@@ -1526,7 +1526,7 @@
       try {
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
-          fs = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+          fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
             public FileSystem run() throws IOException {
               return FileSystem.get(conf);
           }});
@@ -1538,9 +1538,10 @@
         }
         try {
           FileStatus systemDirStatus = fs.getFileStatus(systemDir);
-          if (!systemDirStatus.getOwner().equals(mrOwner.getShortUserName())) {
+          if (!systemDirStatus.getOwner().equals(
+              getMROwner().getShortUserName())) {
             throw new AccessControlException("The systemdir " + systemDir + 
-                " is not owned by " + mrOwner.getShortUserName());
+                " is not owned by " + getMROwner().getShortUserName());
           }
           if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
             LOG.warn("Incorrect permissions on " + systemDir + 
@@ -1607,7 +1608,8 @@
     final String historyLogDir = 
       jobHistory.getCompletedJobHistoryLocation().toString();
     infoServer.setAttribute("historyLogDir", historyLogDir);
-    FileSystem historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    FileSystem historyFS = getMROwner().doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
       public FileSystem run() throws IOException {
         return new Path(historyLogDir).getFileSystem(conf);
       }
@@ -1620,10 +1622,8 @@
     this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS, 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    // Initialize the jobACLSManager
-    jobACLsManager = new JobTrackerJobACLsManager(this);
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -3056,7 +3056,7 @@
         throw ioe;
       }
       try {
-        checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
+        aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
       } catch (AccessControlException ace) {
         LOG.warn("Access denied for user " + job.getJobConf().getUser()
             + ". Ignoring job " + jobId, ace);
@@ -3109,19 +3109,8 @@
     LOG.info("Job " + jobId + " added successfully for user '" 
              + job.getJobConf().getUser() + "' to queue '" 
              + job.getJobConf().getQueueName() + "'");
-    AuditLogger.logSuccess(job.getUser(),
-        Queue.QueueOperation.SUBMIT_JOB.name(), jobId.toString());
-    return job.getStatus();
-  }
 
-  /**
-   * Is job-level authorization enabled on the JT?
-   * 
-   * @return
-   */
-  boolean isJobLevelAuthorizationEnabled() {
-    return conf.getBoolean(
-        MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+    return job.getStatus();
   }
 
   /**
@@ -3143,45 +3132,12 @@
   }
 
   /**
-   * Check the ACLs for a user doing the passed queue-operation and the passed
-   * job operation.
-   * <ul>
-   * <li>Superuser/supergroup can do any operation on the job</li>
-   * <li>For any other user/group, the configured ACLs for the corresponding
-   * queue and the job are checked.</li>
-   * </ul>
-   * 
-   * @param job
-   * @param callerUGI
-   * @param oper
-   * @param jobOperation
-   * @throws AccessControlException
-   * @throws IOException
+   * Are ACLs for authorization checks enabled on the MR cluster ?
+   *
+   * @return true if ACLs(job acls and queue acls) are enabled
    */
-  private void checkAccess(JobInProgress job,
-      UserGroupInformation callerUGI, Queue.QueueOperation oper,
-      JobACL jobOperation) throws AccessControlException {
-
-    // get the queue and verify the queue access
-    String queue = job.getProfile().getQueueName();
-    if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
-      throw new AccessControlException("User " 
-                            + callerUGI.getShortUserName() 
-                            + " cannot perform "
-                            + "operation " + oper + " on queue " + queue +
-                            ".\n Please run \"hadoop queue -showacls\" " +
-                            "command to find the queues you have access" +
-                            " to .");
-    }
-
-    // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
-    // job itself isn't created by that time.
-    if (jobOperation == null) {
-      return;
-    }
-
-    // check the access to the job
-    job.checkAccess(callerUGI, jobOperation);
+  boolean areACLsEnabled() {
+    return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
   }
 
   /**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3295,8 +3251,8 @@
     }
 
     // check both queue-level and job-level access
-    checkAccess(job, UserGroupInformation.getCurrentUser(),
-        Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+    aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+        Operation.KILL_JOB);
 
     killJob(job);
   }
@@ -3530,8 +3486,8 @@
       JobInProgress job = jobs.get(oldJobID);
       if (job != null) {
 	// check the job-access
-        job.checkAccess(UserGroupInformation.getCurrentUser(),
-            JobACL.VIEW_JOB);
+        aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+            Operation.VIEW_JOB_COUNTERS);
 
         if (!isJobInited(job)) {
 	  return EMPTY_COUNTERS;
@@ -3703,8 +3659,8 @@
     // Check authorization
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
-          JobACL.VIEW_JOB);
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     } else { 
       return EMPTY_TASK_REPORTS;
     }
@@ -3779,8 +3735,8 @@
     if (job != null) {
 
       // check the access to the job.
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
-          JobACL.VIEW_JOB);
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
 
       if (isJobInited(job)) {
         TaskInProgress tip = job.getTaskInProgress(tipId);
@@ -3851,8 +3807,9 @@
     if (tip != null) {
 
       // check both queue-level and job-level access
-      checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
-          Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+      aclsManager.checkAccess(tip.getJob(),
+          UserGroupInformation.getCurrentUser(),
+          shouldFail ? Operation.FAIL_TASK : Operation.KILL_TASK);
 
       return tip.killTask(taskid, shouldFail);
     }
@@ -3899,8 +3856,9 @@
    */
   public String getStagingAreaDir() throws IOException {
     try {
-      final String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+      final String user =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      return getMROwner().doAs(new PrivilegedExceptionAction<String>() {
         @Override
         public String run() throws Exception {
           Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
@@ -3923,6 +3881,18 @@
     return jobHistory.getCompletedJobHistoryLocation().toString();
   }
 
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+	  AccessControlList acl =
+		  queueManager.getQueueACL(queueName, QueueACL.ADMINISTER_JOBS);
+	  if (acl == null) {
+		  acl = new AccessControlList(" ");
+	  }
+	  return acl;
+  }
+
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////
@@ -3954,8 +3924,8 @@
     if (job != null) {
 
       // check both queue-level and job-level access
-      checkAccess(job, UserGroupInformation.getCurrentUser(),
-          Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.SET_JOB_PRIORITY);
 
       synchronized (taskScheduler) {
         JobStatus oldStatus = (JobStatus)job.getStatus().clone();
@@ -4136,24 +4106,6 @@
       removeMarkedTasks(trackerName);
     }
   }
-  
-  /**
-   * Is the calling user a super user? Or part of the supergroup?
-   * @return true, if it is a super user
-   */
-  static boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI,
-      UserGroupInformation superUser, String superGroup) {
-    if (superUser.getShortUserName().equals(callerUGI.getShortUserName())) {
-      return true;
-    }
-    String[] groups = callerUGI.getGroupNames();
-    for(int i=0; i < groups.length; ++i) {
-      if (groups[i].equals(superGroup)) {
-        return true;
-      }
-    }
-    return false;
-  }
 
   /**
    * Rereads the config to get hosts and exclude list file names.
@@ -4162,10 +4114,9 @@
   public synchronized void refreshNodes() throws IOException {
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     // check access
-    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser(), mrOwner,
-                                 supergroup)) {
+    if (!isMRAdmin(UserGroupInformation.getCurrentUser())) {
       AuditLogger.logFailure(user, Constants.REFRESH_NODES,
-          mrOwner + " " + supergroup, Constants.JOBTRACKER,
+          getMROwner() + " " + getSuperGroup(), Constants.JOBTRACKER,
           Constants.UNAUTHORIZED_USER);
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -4175,15 +4126,19 @@
     // call the actual api
     refreshHosts();
   }
-  
+
   UserGroupInformation getMROwner() {
-    return mrOwner;
+    return aclsManager.getMROwner();
   }
 
   String getSuperGroup() {
-    return supergroup;
+    return aclsManager.getSuperGroup();
   }
-  
+
+  boolean isMRAdmin(UserGroupInformation ugi) {
+    return aclsManager.isMRAdmin(ugi);
+  }
+
   private synchronized void refreshHosts() throws IOException {
     // Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
     // Update the file names and refresh internal includes and excludes list
@@ -4260,8 +4215,8 @@
         if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
           dumpConfiguration(new PrintWriter(System.out));
           System.out.println();
-          QueueManager.dumpConfiguration(new PrintWriter(System.out),
-              new JobConf());
+          Configuration conf = new Configuration();
+          QueueManager.dumpConfiguration(new PrintWriter(System.out), conf);
         }
         else {
           System.out.println("usage: JobTracker [-dumpConfiguration]");
@@ -4612,16 +4567,20 @@
     UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
         localMachine);
-    mrOwner = UserGroupInformation.getCurrentUser();
-    supergroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
     
     secretManager = null;
     
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
         conf.get("mapred.hosts.exclude", ""));
     // queue manager
-    Configuration queuesConf = new Configuration(this.conf);
-    queueManager = new QueueManager(queuesConf);
+    Configuration clusterConf = new Configuration(this.conf);
+    queueManager = new QueueManager(clusterConf);
+
+    aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+    LOG.info("Starting jobtracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
 
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
@@ -4646,7 +4605,7 @@
     jobHistory = new JobHistory();
     final JobTracker jtFinal = this;
     try {
-      historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      historyFS = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
           jobHistory.init(jtFinal, conf, jtFinal.localMachine, jtFinal.startTime);
           jobHistory.initDone(conf, fs);
@@ -4690,11 +4649,8 @@
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    // Initialize the jobACLSManager
-    jobACLsManager = new JobTrackerJobACLsManager(this);
-
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   /**
@@ -4756,9 +4712,13 @@
   }
 
   JobACLsManager getJobACLsManager() {
-    return jobACLsManager;
+    return aclsManager.getJobACLsManager();
   }
-  
+
+  ACLsManager getACLsManager() {
+    return aclsManager;
+  }
+
   /**
    * 
    * @return true if delegation token operation is allowed
diff --git a/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java b/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
deleted file mode 100644
index 9dac26a..0000000
--- a/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Manages the job ACLs and the operations on them at JobTracker.
- *
- */
-@InterfaceAudience.Private
-public class JobTrackerJobACLsManager extends JobACLsManager {
-
-  static final Log LOG = LogFactory.getLog(JobTrackerJobACLsManager.class);
-
-  private JobTracker jobTracker = null;
-
-  public JobTrackerJobACLsManager(JobTracker tracker) {
-    jobTracker = tracker;
-  }
-
-  @Override
-  protected boolean isJobLevelAuthorizationEnabled() {
-    return jobTracker.isJobLevelAuthorizationEnabled();
-  }
-
-  @Override
-  protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
-    return JobTracker.isSuperUserOrSuperGroup(callerUGI,
-        jobTracker.getMROwner(), jobTracker.getSuperGroup());
-  }
-
-}
diff --git a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 1338d07..dad802d 100644
--- a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -51,7 +51,6 @@
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -59,6 +58,7 @@
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 
 /** Implements MapReduce locally, in-process, for debugging. */
@@ -698,6 +698,13 @@
   }
 
   /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+	  return new AccessControlList(" ");// no queue admins for local job runner
+  }
+
+  /**
    * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
diff --git a/src/java/org/apache/hadoop/mapred/Operation.java b/src/java/org/apache/hadoop/mapred/Operation.java
new file mode 100644
index 0000000..4df9be4
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapred/Operation.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.JobACL;
+
+/**
+ * Generic operation that maps to the dependent set of ACLs that drive the
+ * authorization of the operation.
+ */
+@InterfaceAudience.Private
+public enum Operation {
+  VIEW_JOB_COUNTERS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  VIEW_JOB_DETAILS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  VIEW_TASK_LOGS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  KILL_JOB(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  FAIL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  KILL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  SET_JOB_PRIORITY(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  SUBMIT_JOB(QueueACL.SUBMIT_JOB, null);
+  
+  public QueueACL qACLNeeded;
+  public JobACL jobACLNeeded;
+  
+  Operation(QueueACL qACL, JobACL jobACL) {
+    this.qACLNeeded = qACL;
+    this.jobACLNeeded = jobACL;
+  }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/mapred/Queue.java b/src/java/org/apache/hadoop/mapred/Queue.java
index 85f33db..8f4f3e8 100644
--- a/src/java/org/apache/hadoop/mapred/Queue.java
+++ b/src/java/org/apache/hadoop/mapred/Queue.java
@@ -42,8 +42,7 @@
   private String name = null;
 
   //acls list
-  private Map<String, 
-              org.apache.hadoop.security.authorize.AccessControlList> acls;
+  private Map<String, AccessControlList> acls;
 
   //Queue State
   private QueueState state = QueueState.RUNNING;
@@ -59,34 +58,6 @@
   private Properties props;
 
   /**
-   * Enum representing an operation that can be performed on a queue.
-   */
-  static enum QueueOperation {
-    SUBMIT_JOB ("acl-submit-job", false),
-    ADMINISTER_JOBS ("acl-administer-jobs", true);
-    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
-    //       users in UI
-    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
-    //       configuring queues.
-
-    private final String aclName;
-    private final boolean jobOwnerAllowed;
-
-    QueueOperation(String aclName, boolean jobOwnerAllowed) {
-      this.aclName = aclName;
-      this.jobOwnerAllowed = jobOwnerAllowed;
-    }
-
-    final String getAclName() {
-      return aclName;
-    }
-
-    final boolean isJobOwnerAllowed() {
-      return jobOwnerAllowed;
-    }
-  }
-
-  /**
    * Default constructor is useful in creating the hierarchy.
    * The variables are populated using mutator methods.
    */
@@ -133,7 +104,7 @@
    * @return Map containing the operations that can be performed and
    *          who can perform the operations.
    */
-  Map<String, org.apache.hadoop.security.authorize.AccessControlList> getAcls() {
+  Map<String, AccessControlList> getAcls() {
     return acls;
   }
   
diff --git a/src/java/org/apache/hadoop/mapred/QueueACL.java b/src/java/org/apache/hadoop/mapred/QueueACL.java
new file mode 100644
index 0000000..c659c34
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapred/QueueACL.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Enum representing an AccessControlList that drives set of operations that
+ * can be performed on a queue.
+ */
+@InterfaceAudience.Private
+public enum QueueACL {
+  SUBMIT_JOB ("acl-submit-job"),
+  ADMINISTER_JOBS ("acl-administer-jobs");
+  // Currently this ACL acl-administer-jobs is checked for the operations
+  // FAIL_TASK, KILL_TASK, KILL_JOB, SET_JOB_PRIORITY and VIEW_JOB.
+
+  // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
+  //       users in UI
+  // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
+  //       configuring queues.
+
+  private final String aclName;
+
+  QueueACL(String aclName) {
+    this.aclName = aclName;
+  }
+
+  public final String getAclName() {
+    return aclName;
+  }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java b/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
index f8011e9..e0738d4 100644
--- a/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
+++ b/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
@@ -20,7 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.Queue.QueueOperation;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
@@ -72,11 +72,17 @@
   static final String QUEUE_TAG = "queue";
   static final String ACL_SUBMIT_JOB_TAG = "acl-submit-job";
   static final String ACL_ADMINISTER_JOB_TAG = "acl-administer-jobs";
+
+  // The value read from queues config file for this tag is not used at all.
+  // To enable queue acls and job acls, mapreduce.cluster.acls.enabled is
+  // to be set in mapred-site.xml
+  @Deprecated
+  static final String ACLS_ENABLED_TAG = "aclsEnabled";
+
   static final String PROPERTIES_TAG = "properties";
   static final String STATE_TAG = "state";
   static final String QUEUE_NAME_TAG = "name";
   static final String QUEUES_TAG = "queues";
-  static final String ACLS_ENABLED_TAG = "aclsEnabled";
   static final String PROPERTY_TAG = "property";
   static final String KEY_TAG = "key";
   static final String VALUE_TAG = "value";
@@ -88,7 +94,8 @@
     
   }
 
-  QueueConfigurationParser(String confFile) {
+  QueueConfigurationParser(String confFile, boolean areAclsEnabled) {
+    aclsEnabled = areAclsEnabled;
     File file = new File(confFile).getAbsoluteFile();
     if (!file.exists()) {
       throw new RuntimeException("Configuration file not found at " +
@@ -105,7 +112,8 @@
     }
   }
 
-  QueueConfigurationParser(InputStream xmlInput) {
+  QueueConfigurationParser(InputStream xmlInput, boolean areAclsEnabled) {
+    aclsEnabled = areAclsEnabled;
     loadFrom(xmlInput);
   }
 
@@ -184,8 +192,14 @@
       NamedNodeMap nmp = queuesNode.getAttributes();
       Node acls = nmp.getNamedItem(ACLS_ENABLED_TAG);
 
-      if (acls != null && acls.getTextContent().equalsIgnoreCase("true")) {
-        setAclsEnabled(true);
+      if (acls != null) {
+        LOG.warn("Configuring " + ACLS_ENABLED_TAG + " flag in " +
+            QueueManager.QUEUE_CONF_FILE_NAME + " is not valid. " +
+            "This tag is ignored. Configure " +
+            MRConfig.MR_ACLS_ENABLED + " in mapred-site.xml. See the " +
+            " documentation of " + MRConfig.MR_ACLS_ENABLED +
+            ", which is used for enabling job level authorization and " +
+            " queue level authorization.");
       }
 
       NodeList props = queuesNode.getChildNodes();
@@ -269,9 +283,9 @@
         name += nameValue;
         newQueue.setName(name);
         submitKey = toFullPropertyName(name,
-            Queue.QueueOperation.SUBMIT_JOB.getAclName());
+            QueueACL.SUBMIT_JOB.getAclName());
         adminKey = toFullPropertyName(name,
-            Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
+            QueueACL.ADMINISTER_JOBS.getAclName());
       }
 
       if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
@@ -299,11 +313,11 @@
     }
     
     if (!acls.containsKey(submitKey)) {
-      acls.put(submitKey, new AccessControlList("*"));
+      acls.put(submitKey, new AccessControlList(" "));
     }
     
     if (!acls.containsKey(adminKey)) {
-      acls.put(adminKey, new AccessControlList("*"));
+      acls.put(adminKey, new AccessControlList(" "));
     }
     
     //Set acls
diff --git a/src/java/org/apache/hadoop/mapred/QueueManager.java b/src/java/org/apache/hadoop/mapred/QueueManager.java
index 0b75916..326d627 100644
--- a/src/java/org/apache/hadoop/mapred/QueueManager.java
+++ b/src/java/org/apache/hadoop/mapred/QueueManager.java
@@ -20,11 +20,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
 import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -39,7 +39,6 @@
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.List;
@@ -82,24 +81,25 @@
  * in mapred-site.xml. However, when configured in the latter, there is
  * no support for hierarchical queues.
  */
-
-class QueueManager {
+@InterfaceAudience.Private
+public class QueueManager {
 
   private static final Log LOG = LogFactory.getLog(QueueManager.class);
 
   // Map of a queue name and Queue object
   private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
   private Map<String, Queue> allQueues = new HashMap<String, Queue>();
-  static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+  public static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
   static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml";
 
-  // Prefix in configuration for queue related keys
-  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
-    = "mapred.queue.";
-  //Resource in which queue acls are configured.
+  //Prefix in configuration for queue related keys
+  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = "mapred.queue.";
 
+  //Resource in which queue acls are configured.
   private Queue root = null;
-  private boolean isAclEnabled = false;
+  
+  // represents if job and queue acls are enabled on the mapreduce cluster
+  private boolean areAclsEnabled = false;
 
   /**
    * Factory method to create an appropriate instance of a queue
@@ -117,7 +117,7 @@
    * @return Queue configuration parser
    */
   static QueueConfigurationParser getQueueConfigurationParser(
-    Configuration conf, boolean reloadConf) {
+    Configuration conf, boolean reloadConf, boolean areAclsEnabled) {
     if (conf != null && conf.get(
       DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
       if (reloadConf) {
@@ -136,7 +136,8 @@
       InputStream stream = null;
       try {
         stream = xmlInUrl.openStream();
-        return new QueueConfigurationParser(new BufferedInputStream(stream));
+        return new QueueConfigurationParser(new BufferedInputStream(stream),
+            areAclsEnabled);
       } catch (IOException ioe) {
         throw new RuntimeException("Couldn't open queue configuration at " +
                                    xmlInUrl, ioe);
@@ -146,8 +147,13 @@
     }
   }
 
-  public QueueManager() {
-    initialize(getQueueConfigurationParser(null, false));
+  QueueManager() {// acls are disabled
+    this(false);
+  }
+
+  QueueManager(boolean areAclsEnabled) {
+    this.areAclsEnabled = areAclsEnabled;
+    initialize(getQueueConfigurationParser(null, false, areAclsEnabled));
   }
 
   /**
@@ -159,10 +165,11 @@
    * is found in mapred-site.xml, it will then look for site configuration
    * in mapred-queues.xml supporting hierarchical queues.
    *
-   * @param conf Configuration object where queue configuration is specified.
+   * @param clusterConf    mapreduce cluster configuration
    */
-  public QueueManager(Configuration conf) {
-    initialize(getQueueConfigurationParser(conf, false));
+  public QueueManager(Configuration clusterConf) {
+    areAclsEnabled = clusterConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    initialize(getQueueConfigurationParser(clusterConf, false, areAclsEnabled));
   }
 
   /**
@@ -174,8 +181,10 @@
    *
    * @param confFile File where the queue configuration is found.
    */
-  QueueManager(String confFile) {
-    QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+  QueueManager(String confFile, boolean areAclsEnabled) {
+    this.areAclsEnabled = areAclsEnabled;
+    QueueConfigurationParser cp =
+        new QueueConfigurationParser(confFile, areAclsEnabled);
     initialize(cp);
   }
 
@@ -196,10 +205,8 @@
     allQueues.putAll(leafQueues);
 
     LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
-    this.isAclEnabled = cp.isAclsEnabled();
   }
 
-
   /**
    * Return the set of leaf level queues configured in the system to
    * which jobs are submitted.
@@ -215,52 +222,22 @@
   }
 
   /**
-   * Return true if the given {@link Queue.QueueOperation} can be
-   * performed by the specified user on the given queue.
+   * Return true if the given user is part of the ACL for the given
+   * {@link QueueACL} name for the given queue.
    * <p/>
    * An operation is allowed if all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
    *
    * @param queueName Queue on which the operation needs to be performed.
-   * @param oper      The operation to perform
+   * @param qACL      The queue ACL name to be checked
    * @param ugi       The user and groups who wish to perform the operation.
-   * @return true if the operation is allowed, false otherwise.
+   * @return true     if the operation is allowed, false otherwise.
    */
   public synchronized boolean hasAccess(
-    String queueName,
-    Queue.QueueOperation oper,
-    UserGroupInformation ugi) {
-    return hasAccess(queueName, null, oper, ugi);
-  }
+    String queueName, QueueACL qACL, UserGroupInformation ugi) {
 
-  /**
-   * Return true if the given {@link Queue.QueueOperation} can be
-   * performed by the specified user on the specified job in the given queue.
-   * <p/>
-   * An operation is allowed either if the owner of the job is the user
-   * performing the task, all users are provided access for this
-   * operation, or if either the user or any of the groups specified is
-   * provided access.
-   * <p/>
-   * If the {@link Queue.QueueOperation} is not job specific then the
-   * job parameter is ignored.
-   *
-   * @param queueName Queue on which the operation needs to be performed.
-   * @param job       The {@link JobInProgress} on which the operation is being
-   *                  performed.
-   * @param oper      The operation to perform
-   * @param ugi       The user and groups who wish to perform the operation.
-   * @return true if the operation is allowed, false otherwise.
-   */
-  public synchronized boolean hasAccess(
-    String queueName, JobInProgress job,
-    Queue.QueueOperation oper,
-    UserGroupInformation ugi) {
-    
     Queue q = leafQueues.get(queueName);
-    String user = ugi.getShortUserName();
-    String jobId = job == null ? "-" : job.getJobID().toString();
 
     if (q == null) {
       LOG.info("Queue " + queueName + " is not present");
@@ -272,50 +249,23 @@
       return false;
     }
 
-    if (!isAclsEnabled()) {
+    if (!areAclsEnabled()) {
       return true;
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "checking access for : "
-          + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
-    }
-
-    if (oper.isJobOwnerAllowed()) {
-      if (job != null
-        && job.getJobConf().getUser().equals(ugi.getShortUserName())) {
-        AuditLogger.logSuccess(user, oper.name(), queueName);
-        return true;
-      }
+      LOG.debug("Checking access for the acl " + toFullPropertyName(queueName,
+        qACL.getAclName()) + " for user " + ugi.getShortUserName());
     }
 
     AccessControlList acl = q.getAcls().get(
-      toFullPropertyName(
-        queueName,
-        oper.getAclName()));
+        toFullPropertyName(queueName, qACL.getAclName()));
     if (acl == null) {
-      AuditLogger.logFailure(user, oper.name(), null, queueName,
-                             "Disabled queue ACLs, job : " + jobId);
       return false;
     }
 
-    // Check the ACL list
-    boolean allowed = acl.isAllAllowed();
-    if (!allowed) {
-      // Check the allowed users list
-      if (acl.isUserAllowed(ugi)) {
-        allowed = true;
-      }
-    }
-    if (allowed) {
-      AuditLogger.logSuccess(user, oper.name(), queueName);
-    } else {
-      AuditLogger.logFailure(user, oper.name(), null, queueName,
-                             Constants.UNAUTHORIZED_USER + ", job : " + jobId);
-    }
-
-    return allowed;
+    // Check if user is part of the ACL
+    return acl.isUserAllowed(ugi);
   }
 
   /**
@@ -392,7 +342,7 @@
 
     // Create a new configuration parser using the passed conf object.
     QueueConfigurationParser cp =
-        QueueManager.getQueueConfigurationParser(conf, true);
+        getQueueConfigurationParser(conf, true, areAclsEnabled);
 
     /*
      * (1) Validate the refresh of properties owned by QueueManager. As of now,
@@ -441,7 +391,8 @@
     LOG.info("Queue configuration is refreshed successfully.");
   }
 
-  static final String toFullPropertyName(
+  // this method is for internal use only
+  public static final String toFullPropertyName(
     String queue,
     String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
@@ -512,16 +463,16 @@
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
       new ArrayList<QueueAclsInfo>();
-    Queue.QueueOperation[] operations = Queue.QueueOperation.values();
+    QueueACL[] qAcls = QueueACL.values();
     for (String queueName : leafQueues.keySet()) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
-      for (Queue.QueueOperation operation : operations) {
-        if (hasAccess(queueName, operation, ugi)) {
+      for (QueueACL qAcl : qAcls) {
+        if (hasAccess(queueName, qAcl, ugi)) {
           if (operationsAllowed == null) {
             operationsAllowed = new ArrayList<String>();
           }
-          operationsAllowed.add(operation.getAclName());
+          operationsAllowed.add(qAcl.getAclName());
         }
       }
       if (operationsAllowed != null) {
@@ -534,8 +485,7 @@
       }
     }
     return queueAclsInfolist.toArray(
-      new QueueAclsInfo[
-        queueAclsInfolist.size()]);
+      new QueueAclsInfo[queueAclsInfolist.size()]);
   }
 
   /**
@@ -611,8 +561,8 @@
    *
    * @return true if ACLs are enabled.
    */
-  boolean isAclsEnabled() {
-    return isAclEnabled;
+  boolean areAclsEnabled() {
+    return areAclsEnabled;
   }
 
   /**
@@ -623,7 +573,30 @@
   Queue getRoot() {
     return root;
   }
-  
+
+  /**
+   * Returns the specific queue ACL for the given queue.
+   * Returns null if the given queue does not exist or the acl is not
+   * configured for that queue.
+   * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
+   * ACL with all users.
+   */
+  synchronized AccessControlList getQueueACL(String queueName,
+      QueueACL qACL) {
+    if (areAclsEnabled) {
+      Queue q = leafQueues.get(queueName);
+      if (q != null) {
+        return q.getAcls().get(toFullPropertyName(
+          queueName, qACL.getAclName()));
+      }
+      else {
+        LOG.warn("Queue " + queueName + " is not present.");
+        return null;
+      }
+    }
+    return new AccessControlList("*");
+  }
+
   /**
    * Dumps the configuration of hierarchy of queues
    * @param out the writer object to which dump is written
@@ -646,17 +619,21 @@
         MAPRED_QUEUE_NAMES_KEY) != null) {
       return;
     }
+    
     JsonFactory dumpFactory = new JsonFactory();
     JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
     QueueConfigurationParser parser;
+    boolean aclsEnabled = false;
+    if (conf != null) {
+      aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    }
     if (configFile != null && !"".equals(configFile)) {
-      parser = new QueueConfigurationParser(configFile);
+      parser = new QueueConfigurationParser(configFile, aclsEnabled);
     }
     else {
-      parser = QueueManager.getQueueConfigurationParser(null, false);
+      parser = getQueueConfigurationParser(null, false, aclsEnabled);
     }
     dumpGenerator.writeStartObject();
-    dumpGenerator.writeBooleanField("acls_enabled", parser.isAclsEnabled());
     dumpGenerator.writeFieldName("queues");
     dumpGenerator.writeStartArray();
     dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
@@ -684,11 +661,11 @@
       AccessControlList administerJobsList = null;
       if (queue.getAcls() != null) {
         submitJobList =
-          queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
-              Queue.QueueOperation.SUBMIT_JOB.getAclName()));
+          queue.getAcls().get(toFullPropertyName(queue.getName(),
+              QueueACL.SUBMIT_JOB.getAclName()));
         administerJobsList =
-          queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
-              Queue.QueueOperation.ADMINISTER_JOBS.getAclName()));
+          queue.getAcls().get(toFullPropertyName(queue.getName(),
+              QueueACL.ADMINISTER_JOBS.getAclName()));
       }
       String aclsSubmitJobValue = " ";
       if (submitJobList != null ) {
diff --git a/src/java/org/apache/hadoop/mapred/TaskLogServlet.java b/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
index 77d769e..17da588 100644
--- a/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
+++ b/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
@@ -30,9 +30,9 @@
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -121,42 +121,54 @@
    * users and groups specified in configuration using
    * mapreduce.job.acl-view-job to view job.
    */
-  private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+  private void checkAccessForTaskLogs(JobConf conf, String user, String jobId,
       TaskTracker tracker) throws AccessControlException {
 
-    if (!tracker.isJobLevelAuthorizationEnabled()) {
+    if (!tracker.areACLsEnabled()) {
       return;
     }
 
-    // buiild job view acl by reading from conf
+    // buiild job view ACL by reading from conf
     AccessControlList jobViewACL = tracker.getJobACLsManager().
         constructJobACLs(conf).get(JobACL.VIEW_JOB);
 
+    // read job queue name from conf
+    String queue = conf.getQueueName();
+
+    // build queue admins ACL by reading from conf
+    AccessControlList queueAdminsACL = new AccessControlList(
+        conf.get(toFullPropertyName(queue,
+            QueueACL.ADMINISTER_JOBS.getAclName()), " "));
+
     String jobOwner = conf.get(JobContext.USER_NAME);
     UserGroupInformation callerUGI =
         UserGroupInformation.createRemoteUser(user);
 
-    tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
-        jobOwner, jobViewACL);
+    // check if user is queue admin or cluster admin or jobOwner or member of
+    // job-view-acl
+    if (!queueAdminsACL.isUserAllowed(callerUGI)) {
+      tracker.getACLsManager().checkAccess(jobId, callerUGI, queue,
+          Operation.VIEW_TASK_LOGS, jobOwner, jobViewACL);
+    }
   }
 
   /**
-   * Builds a Configuration object by reading the xml file.
+   * Builds a JobConf object by reading the job-acls.xml file.
    * This doesn't load the default resources.
    *
-   * Returns null if job-acls.xml is not there in userlogs/$jobid/attempt-dir on
+   * Returns null if job-acls.xml is not there in userlogs/$jobid on
    * local file system. This can happen when we restart the cluster with job
    * level authorization enabled(but was disabled on earlier cluster) and
    * viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
    * cluster).
    */
-  static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
-      boolean isCleanup) {
+  static JobConf getConfFromJobACLsFile(JobID jobId) {
     Path jobAclsFilePath = new Path(
-        TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
-    Configuration conf = null;
+        TaskLog.getJobDir(jobId).toString(),
+        TaskTracker.jobACLsFile);
+    JobConf conf = null;
     if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
-      conf = new Configuration(false);
+      conf = new JobConf(false);
       conf.addResource(jobAclsFilePath);
     }
     return conf;
@@ -228,15 +240,15 @@
       ServletContext context = getServletContext();
       TaskTracker taskTracker = (TaskTracker) context.getAttribute(
           "task.tracker");
+      JobID jobId = attemptId.getJobID();
+
       // get jobACLConf from ACLs file
-      Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+      JobConf jobACLConf = getConfFromJobACLsFile(jobId);
       // Ignore authorization if job-acls.xml is not found
       if (jobACLConf != null) {
-        JobID jobId = attemptId.getJobID();
-
         try {
-          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
-              taskTracker);
+          checkAccessForTaskLogs(jobACLConf, user,
+              jobId.toString(), taskTracker);
         } catch (AccessControlException e) {
           String errMsg = "User " + user + " failed to view tasklogs of job " +
               jobId + "!\n\n" + e.getMessage();
diff --git a/src/java/org/apache/hadoop/mapred/TaskRunner.java b/src/java/org/apache/hadoop/mapred/TaskRunner.java
index 7435f85..73a4326 100644
--- a/src/java/org/apache/hadoop/mapred/TaskRunner.java
+++ b/src/java/org/apache/hadoop/mapred/TaskRunner.java
@@ -19,7 +19,6 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -34,7 +33,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -77,8 +75,6 @@
   protected JobConf conf;
   JvmManager jvmManager;
 
-  static String jobACLsFile = "job-acl.xml";
-
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
       JobConf conf) {
     this.tip = tip;
@@ -286,34 +282,10 @@
       Localizer.PermissionsHandler.setPermissions(logDir,
           Localizer.PermissionsHandler.sevenZeroZero);
     }
-    // write job acls into a file to know the access for task logs
-    writeJobACLs(logDir);
+
     return logFiles;
   }
 
-  // Writes job-view-acls and user name into an xml file
-  private void writeJobACLs(File logDir) throws IOException {
-    File aclFile = new File(logDir, TaskRunner.jobACLsFile);
-    Configuration aclConf = new Configuration(false);
-
-    // set the job view acls in aclConf
-    String jobViewACLs = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB);
-    if (jobViewACLs != null) {
-      aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACLs);
-    }
-    // set jobOwner as mapreduce.job.user.name in aclConf
-    String jobOwner = conf.getUser();
-    aclConf.set(MRJobConfig.USER_NAME, jobOwner);
-    FileOutputStream out = new FileOutputStream(aclFile);
-    try {
-      aclConf.writeXml(out);
-    } finally {
-      out.close();
-    }
-    Localizer.PermissionsHandler.setPermissions(aclFile,
-        Localizer.PermissionsHandler.sevenZeroZero);
-  }
-
   /**
    * Write the child's configuration to the disk and set it in configuration so
    * that the child can pick it up from there.
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index c515bc7..0b125e2 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -20,6 +20,7 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
@@ -78,6 +79,7 @@
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
@@ -160,6 +162,11 @@
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
+  // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
+  // each job at job localization time. This will be used by TaskLogServlet for
+  // authorizing viewing of task logs of that job
+  static String jobACLsFile = "job-acls.xml";
+
   volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
@@ -249,9 +256,7 @@
   private int maxReduceSlots;
   private int failures;
 
-  // MROwner's ugi
-  private UserGroupInformation mrOwner;
-  private String supergroup;
+  private ACLsManager aclsManager;
   
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
@@ -275,9 +280,6 @@
   private long reservedPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
-  // Manages job acls of jobs in TaskTracker
-  private TaskTrackerJobACLsManager jobACLsManager;
-
   /**
    * the minimum interval between jobtracker polls
    */
@@ -590,13 +592,11 @@
    * close().
    */
   synchronized void initialize() throws IOException, InterruptedException {
-    UserGroupInformation.setConfiguration(fConf);
-    SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
-    mrOwner = UserGroupInformation.getCurrentUser();
 
-    supergroup = fConf.get(MRConfig.MR_SUPERGROUP, "supergroup");
-    LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
-             + " and supergroup as " + supergroup);
+    aclsManager = new ACLsManager(fConf, new JobACLsManager(fConf), null);
+    LOG.info("Starting tasktracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
 
     localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
@@ -687,7 +687,8 @@
     this.distributedCacheManager.startCleanupThread();
 
     this.jobClient = (InterTrackerProtocol) 
-    mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
+    UserGroupInformation.getLoginUser().doAs(
+        new PrivilegedExceptionAction<Object>() {
       public Object run() throws IOException {
         return RPC.waitForProxy(InterTrackerProtocol.class,
             InterTrackerProtocol.versionID, 
@@ -735,19 +736,22 @@
   }
 
   UserGroupInformation getMROwner() {
-    return mrOwner;
+    return aclsManager.getMROwner();
   }
 
   String getSuperGroup() {
-    return supergroup;
+    return aclsManager.getSuperGroup();
   }
-  
+
+  boolean isMRAdmin(UserGroupInformation ugi) {
+    return aclsManager.isMRAdmin(ugi);
+  }
+
   /**
-   * Is job level authorization enabled on the TT ?
+   * Are ACLs for authorization checks enabled on the MR cluster ?
    */
-  boolean isJobLevelAuthorizationEnabled() {
-    return fConf.getBoolean(
-        MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+  boolean areACLsEnabled() {
+    return fConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
   }
 
   public static Class<?>[] getInstrumentationClasses(Configuration conf) {
@@ -998,7 +1002,7 @@
        
         JobConf localJobConf = localizeJobFiles(t, rjob);
         // initialize job log directory
-        initializeJobLogDir(jobId);
+        initializeJobLogDir(jobId, localJobConf);
 
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -1098,12 +1102,64 @@
     return localJobConf;
   }
 
-  // create job userlog dir
-  void initializeJobLogDir(JobID jobId) {
+  // Create job userlog dir.
+  // Create job acls file in job log dir, if needed.
+  void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+      throws IOException {
     // remove it from tasklog cleanup thread first,
     // it might be added there because of tasktracker reinit or restart
     taskLogCleanupThread.unmarkJobFromLogDeletion(jobId);
     localizer.initializeJobLogDir(jobId);
+
+    if (areACLsEnabled()) {
+      // Create job-acls.xml file in job userlog dir and write the needed
+      // info for authorization of users for viewing task logs of this job.
+      writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
+    }
+  }
+
+  /**
+   *  Creates job-acls.xml under the given directory logDir and writes
+   *  job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+   *  file.
+   *  queue name is the queue to which the job was submitted to.
+   *  queue-admins-acl is the queue admins ACL of the queue to which this
+   *  job was submitted to.
+   * @param conf   job configuration
+   * @param logDir job userlog dir
+   * @throws IOException
+   */
+  private static void writeJobACLs(JobConf conf, File logDir)
+      throws IOException {
+    File aclFile = new File(logDir, jobACLsFile);
+    JobConf aclConf = new JobConf(false);
+
+    // set the job view acl in aclConf
+    String jobViewACL = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
+    aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACL);
+
+    // set the job queue name in aclConf
+    String queue = conf.getQueueName();
+    aclConf.setQueueName(queue);
+
+    // set the queue admins acl in aclConf
+    String qACLName = toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    String queueAdminsACL = conf.get(qACLName, " ");
+    aclConf.set(qACLName, queueAdminsACL);
+
+    // set jobOwner as user.name in aclConf
+    String jobOwner = conf.getUser();
+    aclConf.set("user.name", jobOwner);
+
+    FileOutputStream out = new FileOutputStream(aclFile);
+    try {
+      aclConf.writeXml(out);
+    } finally {
+      out.close();
+    }
+    Localizer.PermissionsHandler.setPermissions(aclFile,
+        Localizer.PermissionsHandler.sevenZeroZero);
   }
 
   /**
@@ -1318,8 +1374,10 @@
     checkJettyPort(httpPort);
     // create task log cleanup thread
     setTaskLogCleanupThread(new UserLogCleaner(fConf));
-    // Initialize the jobACLSManager
-    jobACLsManager = new TaskTrackerJobACLsManager(this);
+
+    UserGroupInformation.setConfiguration(fConf);
+    SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
+
     initialize();
   }
 
@@ -4043,7 +4101,11 @@
       return localJobTokenFileStr;
     }
 
-    TaskTrackerJobACLsManager getJobACLsManager() {
-      return jobACLsManager;
+    JobACLsManager getJobACLsManager() {
+      return aclsManager.getJobACLsManager();
+    }
+    
+    ACLsManager getACLsManager() {
+      return aclsManager;
     }
 }
diff --git a/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java b/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
deleted file mode 100644
index f1f34e4..0000000
--- a/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Manages the job ACLs and the operations on them at TaskTracker.
- *
- */
-@InterfaceAudience.Private
-public class TaskTrackerJobACLsManager extends JobACLsManager {
-
-  static final Log LOG = LogFactory.getLog(TaskTrackerJobACLsManager.class);
-
-  private TaskTracker taskTracker = null;
-
-  public TaskTrackerJobACLsManager(TaskTracker tracker) {
-    taskTracker = tracker;
-  }
-
-  @Override
-  protected boolean isJobLevelAuthorizationEnabled() {
-    return taskTracker.isJobLevelAuthorizationEnabled();
-  }
-
-  @Override
-  protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
-    return JobTracker.isSuperUserOrSuperGroup(callerUGI,
-        taskTracker.getMROwner(), taskTracker.getSuperGroup());
-  }
-}
diff --git a/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index db2c5f8..bd317cf 100644
--- a/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -41,12 +41,15 @@
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.QueueACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.codehaus.jackson.JsonParseException;
@@ -355,6 +358,14 @@
       conf.setInt("mapred.map.tasks", maps);
       LOG.info("number of splits:" + maps);
 
+      // write "queue admins of the queue to which job is being submitted"
+      // to job file.
+      String queue = conf.get(MRJobConfig.QUEUE_NAME,
+          JobConf.DEFAULT_QUEUE_NAME);
+      AccessControlList acl = submitClient.getQueueAdmins(queue);
+      conf.set(toFullPropertyName(queue,
+          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
+
       // Write job file to submit dir
       writeConf(conf, submitJobFile);
       
diff --git a/src/java/org/apache/hadoop/mapreduce/MRConfig.java b/src/java/org/apache/hadoop/mapreduce/MRConfig.java
index 545fefb..e3439aa 100644
--- a/src/java/org/apache/hadoop/mapreduce/MRConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/MRConfig.java
@@ -37,8 +37,7 @@
   public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
   public static final String REDUCEMEMORY_MB = 
     "mapreduce.cluster.reducememory.mb";
-  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG = 
-    "mapreduce.cluster.job-authorization-enabled";
+  public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
   public static final String MR_SUPERGROUP =
     "mapreduce.cluster.permissions.supergroup";
 
diff --git a/src/java/org/apache/hadoop/mapreduce/QueueInfo.java b/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
index 992c8fc..366bc18 100644
--- a/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
+++ b/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
@@ -60,7 +60,7 @@
    */
   public QueueInfo() {
     // make it running by default.
-    this.queueState = queueState.RUNNING;
+    this.queueState = QueueState.RUNNING;
     children = new ArrayList<QueueInfo>();
     props = new Properties();
   }
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr b/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
index b8d0c76..3d2bc49 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
@@ -77,8 +77,9 @@
           {"name": "jobConfPath", "type": "string"},
           {"name": "acls", "type": {"type": "map",
                                     "values": "string"
-                                    }
-          }
+                                   }
+          },
+          {"name": "jobQueueName", "type": "string"}
       ]
      },
 
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
index 2c25bae..1306224 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
@@ -314,6 +314,7 @@
     info.submitTime = event.getSubmitTime();
     info.jobConfPath = event.getJobConfPath();
     info.jobACLs = event.getJobAcls();
+    info.jobQueueName = event.getJobQueueName();
   }
 
   /**
@@ -325,6 +326,7 @@
     JobID jobid;
     String username;
     String jobname;
+    String jobQueueName;
     String jobConfPath;
     long launchTime;
     int totalMaps;
@@ -349,7 +351,7 @@
       submitTime = launchTime = finishTime = -1;
       totalMaps = totalReduces = failedMaps = failedReduces = 0;
       finishedMaps = finishedReduces = 0;
-      username = jobname = jobConfPath = "";
+      username = jobname = jobConfPath = jobQueueName = "";
       tasksMap = new HashMap<TaskID, TaskInfo>();
       jobACLs = new HashMap<JobACL, AccessControlList>();
     }
@@ -358,6 +360,7 @@
     public void printAll() {
       System.out.println("JOBNAME: " + jobname);
       System.out.println("USERNAME: " + username);
+      System.out.println("JOB_QUEUE_NAME: " + jobQueueName);
       System.out.println("SUBMIT_TIME" + submitTime);
       System.out.println("LAUNCH_TIME: " + launchTime);
       System.out.println("JOB_STATUS: " + jobStatus);
@@ -383,6 +386,8 @@
     public String getUsername() { return username; }
     /** Get the job name */
     public String getJobname() { return jobname; }
+    /** Get the job queue name */
+    public String getJobQueueName() { return jobQueueName; }
     /** Get the path for the job configuration file */
     public String getJobConfPath() { return jobConfPath; }
     /** Get the job launch time */
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
index c0aecb4..b1785e0 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
@@ -40,18 +40,6 @@
   private JobSubmitted datum = new JobSubmitted();
 
   /**
-   * @deprecated Use
-   *             {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
-   *             instead.
-   */
-  @Deprecated
-  public JobSubmittedEvent(JobID id, String jobName, String userName,
-      long submitTime, String jobConfPath) {
-    this(id, jobName, userName, submitTime, jobConfPath,
-        new HashMap<JobACL, AccessControlList>());
-  }
-
-  /**
    * Create an event to record job submission
    * @param id The job Id of the job
    * @param jobName Name of the job
@@ -59,10 +47,11 @@
    * @param submitTime Time of submission
    * @param jobConfPath Path of the Job Configuration file
    * @param jobACLs The configured acls for the job.
+   * @param jobQueueName The job-queue to which this job was submitted to
    */
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
-      Map<JobACL, AccessControlList> jobACLs) {
+      Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
     datum.jobid = new Utf8(id.toString());
     datum.jobName = new Utf8(jobName);
     datum.userName = new Utf8(userName);
@@ -74,6 +63,9 @@
           entry.getValue().getAclString()));
     }
     datum.acls = jobAcls;
+    if (jobQueueName != null) {
+      datum.jobQueueName = new Utf8(jobQueueName);
+    }
   }
 
   JobSubmittedEvent() {}
@@ -87,6 +79,13 @@
   public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the Job name */
   public String getJobName() { return datum.jobName.toString(); }
+  /** Get the Job queue name */
+  public String getJobQueueName() {
+    if (datum.jobQueueName != null) {
+      return datum.jobQueueName.toString();
+    }
+    return null;
+  }
   /** Get the user name */
   public String getUserName() { return datum.userName.toString(); }
   /** Get the submit time */
diff --git a/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
index 82b13bf..700919b 100644
--- a/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -108,8 +109,10 @@
    * Version 32: Added delegation tokens (add, renew, cancel)
    * Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
    * Version 34: Modified submitJob to use Credentials instead of TokenStorage.
+   * Version 35: Added the method getQueueAdmins(queueName) as part of
+   *             MAPREDUCE-1664.
    */
-  public static final long versionID = 34L;
+  public static final long versionID = 35L;
 
   /**
    * Allocate a name for the job.
@@ -144,6 +147,17 @@
   
   public long getTaskTrackerExpiryInterval() throws IOException,
                                                InterruptedException;
+  
+  /**
+   * Get the administrators of the given job-queue.
+   * This method is for hadoop internal use only.
+   * @param queueName
+   * @return Queue administrators ACL for the queue to which job is
+   *         submitted to
+   * @throws IOException
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException;
+
   /**
    * Kill the indicated job
    */
diff --git a/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java b/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
index eb8b44a..fb20212 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
@@ -54,6 +54,9 @@
       new String[] {MRConfig.MAPMEMORY_MB});
     Configuration.addDeprecation("mapred.cluster.reduce.memory.mb", 
       new String[] {MRConfig.REDUCEMEMORY_MB});
+    Configuration.addDeprecation("mapred.acls.enabled", 
+        new String[] {MRConfig.MR_ACLS_ENABLED});
+
     Configuration.addDeprecation("mapred.cluster.max.map.memory.mb", 
       new String[] {JTConfig.JT_MAX_MAPMEMORY_MB});
     Configuration.addDeprecation("mapred.cluster.max.reduce.memory.mb", 
diff --git a/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java b/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
index c76cf3d..dee6f57 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
@@ -22,7 +22,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.QueueState;
@@ -31,15 +30,6 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import static org.apache.hadoop.mapred.Queue.*;
-import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
-
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.Transformer;
@@ -53,13 +43,18 @@
 import java.util.Set;
 import java.io.File;
 import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
 
 //@Private
 public class QueueManagerTestUtils {
-  final static String CONFIG = new File("test-mapred-queues.xml")
-      .getAbsolutePath();
+  /**
+   * Queue-configuration file for tests that start a cluster and wish to modify
+   * the queue configuration. This file is always in the unit tests classpath,
+   * so QueueManager started through JobTracker will automatically pick this up.
+   */
+  public static final String QUEUES_CONFIG_FILE_PATH = new File(System
+      .getProperty("test.build.extraconf", "build/test/extraconf"),
+      QueueManager.QUEUE_CONF_FILE_NAME).getAbsolutePath();
+
   private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
 
   /**
@@ -76,7 +71,7 @@
   }
 
   public static void createSimpleDocument(Document doc) throws Exception {
-    Element queues = createQueuesNode(doc, "true");
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -106,8 +101,8 @@
     queues.appendChild(p1);
   }
 
-  static void createSimpleDocumentWithAcls(Document doc, String aclsEnabled) {
-    Element queues = createQueuesNode(doc, aclsEnabled);
+  static void createSimpleDocumentWithAcls(Document doc) {
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -150,8 +145,56 @@
     queues.appendChild(p1);
   }
 
+  /**
+   * Creates all given queues as 1st level queues(no nesting)
+   * @param doc         the queues config document
+   * @param queueNames  the queues to be added to the queues config document
+   * @param submitAcls  acl-submit-job acls for each of the queues
+   * @param adminsAcls  acl-administer-jobs acls for each of the queues
+   * @throws Exception
+   */
+  public static void createSimpleDocument(Document doc, String[] queueNames,
+      String[] submitAcls, String[] adminsAcls) throws Exception {
+
+    Element queues = createQueuesNode(doc);
+
+    // Create all queues as 1st level queues(no nesting)
+    for (int i = 0; i < queueNames.length; i++) {
+      Element q = createQueue(doc, queueNames[i]);
+
+      q.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+      q.appendChild(createAcls(doc,
+          QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, submitAcls[i]));
+      q.appendChild(createAcls(doc,
+          QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, adminsAcls[i]));
+      queues.appendChild(q);
+    }
+  }
+
+  /**
+   * Creates queues configuration file with given queues at 1st level(i.e.
+   * no nesting of queues) and with the given queue acls.
+   * @param queueNames        queue names which are to be configured
+   * @param submitAclStrings  acl-submit-job acls for each of the queues
+   * @param adminsAclStrings  acl-administer-jobs acls for each of the queues
+   * @return Configuration    the queues configuration
+   * @throws Exception
+   */
+  public static void createQueuesConfigFile(String[] queueNames,
+      String[] submitAclStrings, String[] adminsAclStrings)
+  throws Exception {
+    if (queueNames.length > submitAclStrings.length ||
+        queueNames.length > adminsAclStrings.length) {
+      LOG.error("Number of queues is more than acls given.");
+      return;
+    }
+    Document doc = createDocument();
+    createSimpleDocument(doc, queueNames, submitAclStrings, adminsAclStrings);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+  }
+
   public static void refreshSimpleDocument(Document doc) throws Exception {
-    Element queues = createQueuesNode(doc, "true");
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -189,10 +232,9 @@
    * @param enable
    * @return the created element.
    */
-  public static Element createQueuesNode(Document doc, String enable) {
+  public static Element createQueuesNode(Document doc) {
     Element queues = doc.createElement("queues");
     doc.appendChild(queues);
-    queues.setAttribute("aclsEnabled", enable);
     return queues;
   }
 
@@ -240,9 +282,12 @@
     return propsElement;
   }
 
-  public static void checkForConfigFile() {
-    if (new File(CONFIG).exists()) {
-      new File(CONFIG).delete();
+  /**
+   * Delete queues configuration file if exists
+   */
+  public static void deleteQueuesConfigFile() {
+    if (new File(QUEUES_CONFIG_FILE_PATH).exists()) {
+      new File(QUEUES_CONFIG_FILE_PATH).delete();
     }
   }
 
@@ -257,7 +302,7 @@
   public static void writeQueueConfigurationFile(String filePath,
       JobQueueInfo[] rootQueues) throws Exception {
     Document doc = createDocument();
-    Element queueElements = createQueuesNode(doc, String.valueOf(true));
+    Element queueElements = createQueuesNode(doc);
     for (JobQueueInfo rootQ : rootQueues) {
       queueElements.appendChild(QueueConfigurationParser.getQueueElement(doc,
           rootQ));
@@ -265,27 +310,6 @@
     writeToFile(doc, filePath);
   }
 
-  static class QueueManagerConfigurationClassLoader extends ClassLoader {
-    @Override
-    public URL getResource(String name) {
-      if (!name.equals(QueueManager.QUEUE_CONF_FILE_NAME)) {
-        return super.getResource(name);
-      } else {
-        File resourceFile = new File(CONFIG);
-        if (!resourceFile.exists()) {
-          throw new IllegalStateException(
-              "Queue Manager configuration file not found");
-        }
-        try {
-          return resourceFile.toURL();
-        } catch (MalformedURLException e) {
-          LOG.fatal("Unable to form URL for the resource file : ");
-        }
-        return super.getResource(name);
-      }
-    }
-  }
-
   static Job submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime,
       final long reduceSleepTime, boolean shouldComplete, String userInfo,
       String queueName, Configuration clientConf) throws IOException,
@@ -329,12 +353,4 @@
   }
 
   static MiniMRCluster miniMRCluster;
-
-  static void setUpCluster(Configuration conf) throws IOException {
-    JobConf jobConf = new JobConf(conf);
-    String namenode = "file:///";
-    Thread.currentThread().setContextClassLoader(
-        new QueueManagerTestUtils.QueueManagerConfigurationClassLoader());
-    miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
-  }
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
index 4446629..22420a1 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -589,7 +590,7 @@
     
     // Also JobACLs should be correct
     if (mr.getJobTrackerRunner().getJobTracker()
-        .isJobLevelAuthorizationEnabled()) {
+        .areACLsEnabled()) {
       AccessControlList acl = new AccessControlList(
           conf.get(JobACL.VIEW_JOB.getAclName(), " "));
       assertTrue("VIEW_JOB ACL is not properly logged to history file.",
@@ -601,6 +602,9 @@
           acl.toString().equals(
           jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString()));
     }
+    
+    // Validate the job queue name
+    assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
   }
 
   public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
@@ -726,10 +730,11 @@
   /** Run a job that will be succeeded and validate its history file format
    *  and its content.
    */
-  public void testJobHistoryFile() throws IOException {
+  public void testJobHistoryFile() throws Exception {
     MiniMRCluster mr = null;
     try {
       JobConf conf = new JobConf();
+
       // keep for less time
       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
       conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
@@ -739,7 +744,7 @@
       conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
 
       // Enable ACLs so that they are logged to history
-      conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+      conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 
@@ -966,7 +971,8 @@
       Map<JobACL, AccessControlList> jobACLs =
           new HashMap<JobACL, AccessControlList>();
       JobSubmittedEvent jse =
-        new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs);
+        new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs,
+        "default");
       jh.logEvent(jse, jobId);
       jh.closeWriter(jobId);
 
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
index c869962..7ef641f 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
@@ -63,6 +63,7 @@
                     "~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:\"'ZXCVBNM<>?" +
                     "\t\b\n\f\"\n in it";
 
+    String weirdJobQueueName = "my\njob\nQueue\\";
     conf.setUser(username);
 
     MiniMRCluster mr = null;
@@ -84,7 +85,7 @@
     jobACLs.put(JobACL.MODIFY_JOB, modifyJobACL);
     JobSubmittedEvent jse =
         new JobSubmittedEvent(jobId, weirdJob, username, 12345, weirdPath,
-            jobACLs);
+            jobACLs, weirdJobQueueName);
     jh.logEvent(jse, jobId);
 
     JobFinishedEvent jfe =
@@ -121,6 +122,7 @@
 
     assertTrue (jobInfo.getUsername().equals(username));
     assertTrue(jobInfo.getJobname().equals(weirdJob));
+    assertTrue(jobInfo.getJobQueueName().equals(weirdJobQueueName));
     assertTrue(jobInfo.getJobConfPath().equals(weirdPath));
     Map<JobACL, AccessControlList> parsedACLs = jobInfo.getJobACLs();
     assertEquals(2, parsedACLs.size());
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
index 94735fe..7b9f3c1 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
@@ -17,31 +17,31 @@
  */
 package org.apache.hadoop.mapred;
 
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
-import java.io.File;
-import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.QueueInfo;
+import org.junit.After;
 import org.junit.Test;
 import org.w3c.dom.Document;
 
 public class TestJobQueueClient {
+
+  @After
+  public void tearDown() throws Exception {
+    deleteQueuesConfigFile();
+  }
+
   @Test
   public void testQueueOrdering() throws Exception {
     // create some sample queues in a hierarchy..
@@ -91,13 +91,15 @@
   
   @Test
   public void testGetQueue() throws Exception {
-    checkForConfigFile();
+
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    createSimpleDocumentWithAcls(doc, "true");
-    writeToFile(doc, CONFIG);
-    Configuration conf = new Configuration();
-    conf.addResource(CONFIG);
-    setUpCluster(conf);
+    createSimpleDocumentWithAcls(doc);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    JobConf jobConf = new JobConf();
+    String namenode = "file:///";
+    miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+
     JobClient jc = new JobClient(miniMRCluster.createJobConf());
     // test for existing queue
     QueueInfo queueInfo = jc.getQueueInfo("q1");
@@ -105,7 +107,5 @@
     // try getting a non-existing queue
     queueInfo = jc.getQueueInfo("queue");
     assertNull(queueInfo);
-
-    new File(CONFIG).delete();
   }
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java b/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
index 0b2ed67..3f13f4d 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
@@ -26,7 +26,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -202,6 +201,13 @@
     File jobLogDir = TaskLog.getJobDir(jobId);
     checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
         ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    // check job-acls.xml file permissions
+    checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+        + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    
+    // validate the content of job ACLs file
+    validateJobACLsFileContent();
   }
 
   @Override
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
index 47011b7..6518c93 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
@@ -18,8 +18,11 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import javax.security.auth.login.LoginException;
 import junit.framework.TestCase;
+
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -31,58 +34,38 @@
   private QueueManager queueManager;
   private JobConf conf = null;
   UserGroupInformation currentUGI = null;
-  String submitAcl = Queue.QueueOperation.SUBMIT_JOB.getAclName();
-  String adminAcl  = Queue.QueueOperation.ADMINISTER_JOBS.getAclName();
+  String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+  String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
 
-  private void setupConfForNoAccess() throws IOException,LoginException {
+  @Override
+  protected void tearDown() {
+    deleteQueuesConfigFile();
+  }
+
+  // No access for queues for the user currentUGI
+  private void setupConfForNoAccess() throws Exception {
     currentUGI = UserGroupInformation.getLoginUser();
     String userName = currentUGI.getUserName();
+
+    String[] queueNames = {"qu1", "qu2"};
+    // Only user u1 has access for queue qu1
+    // Only group g2 has acls for the queue qu2
+    createQueuesConfigFile(
+        queueNames, new String[]{"u1", " g2"}, new String[]{"u1", " g2"});
+
     conf = new JobConf();
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
-    conf.setBoolean("mapred.acls.enabled",true);
-
-    conf.set("mapred.queue.names", "qu1,qu2");
-    //Only user u1 has access
-    conf.set("mapred.queue.qu1.acl-submit-job", "u1");
-    conf.set("mapred.queue.qu1.acl-administer-jobs", "u1");
-    //q2 only group g2 has acls for the queues
-    conf.set("mapred.queue.qu2.acl-submit-job", " g2");
-    conf.set("mapred.queue.qu2.acl-administer-jobs", " g2");
     queueManager = new QueueManager(conf);
-
   }
 
   /**
    *  sets up configuration for acls test.
    * @return
    */
-  private void setupConf(boolean aclSwitch) throws IOException,LoginException{
+  private void setupConf(boolean aclSwitch) throws Exception{
     currentUGI = UserGroupInformation.getLoginUser();
     String userName = currentUGI.getUserName();
-    conf = new JobConf();
-
-    conf.setBoolean("mapred.acls.enabled", aclSwitch);
-
-    conf.set("mapred.queue.names", "qu1,qu2,qu3,qu4,qu5,qu6,qu7");
-    //q1 Has acls for all the users, supports both submit and administer
-    conf.set("mapred.queue.qu1.acl-submit-job", "*");
-    conf.set("mapred.queue.qu1-acl-administer-jobs", "*");
-    //q2 only u2 has acls for the queues
-    conf.set("mapred.queue.qu2.acl-submit-job", "u2");
-    conf.set("mapred.queue.qu2.acl-administer-jobs", "u2");
-    //q3  Only u2 has submit operation access rest all have administer access
-    conf.set("mapred.queue.qu3.acl-submit-job", "u2");
-    conf.set("mapred.queue.qu3.acl-administer-jobs", "*");
-    //q4 Only u2 has administer access , anyone can do submit
-    conf.set("mapred.queue.qu4.acl-submit-job", "*");
-    conf.set("mapred.queue.qu4.acl-administer-jobs", "u2");
-    //qu6 only current user has submit access
-    conf.set("mapred.queue.qu6.acl-submit-job",userName);
-    conf.set("mapred.queue.qu6.acl-administrator-jobs","u2");
-    //qu7 only current user has administrator access
-    conf.set("mapred.queue.qu7.acl-submit-job","u2");
-    conf.set("mapred.queue.qu7.acl-administrator-jobs",userName);
-    //qu8 only current group has access
     StringBuilder groupNames = new StringBuilder("");
     String[] ugiGroupNames = currentUGI.getGroupNames();
     int max = ugiGroupNames.length-1;
@@ -92,22 +75,38 @@
         groupNames.append(",");
       }
     }
-    conf.set("mapred.queue.qu5.acl-submit-job"," "+groupNames.toString());
-    conf.set("mapred.queue.qu5.acl-administrator-jobs"," "
-            +groupNames.toString());
+    String groupsAcl = " " + groupNames.toString();
+
+    //q1 Has acls for all the users, supports both submit and administer
+    //q2 only u2 has acls for the queues
+    //q3  Only u2 has submit operation access rest all have administer access
+    //q4 Only u2 has administer access , anyone can do submit
+    //qu5 only current user's groups has access
+    //qu6 only current user has submit access
+    //qu7 only current user has administrator access
+    String[] queueNames =
+        {"qu1", "qu2", "qu3", "qu4", "qu5", "qu6", "qu7"};
+    String[] submitAcls =
+        {"*", "u2", "u2", "*", groupsAcl, userName, "u2"};
+    String[] adminsAcls =
+        {"*", "u2", "*", "u2", groupsAcl, "u2", userName};
+    createQueuesConfigFile(queueNames, submitAcls, adminsAcls);
+
+    conf = new JobConf();
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclSwitch);
 
     queueManager = new QueueManager(conf);
   }
 
-  public void testQueueAclsForCurrentuser() throws IOException,LoginException {
+  public void testQueueAclsForCurrentuser() throws Exception {
     setupConf(true);
     QueueAclsInfo[] queueAclsInfoList =
             queueManager.getQueueAcls(currentUGI);
     checkQueueAclsInfo(queueAclsInfoList);
   }
 
-  public void testQueueAclsForCurrentUserAclsDisabled() throws IOException,
-          LoginException {
+  // Acls are disabled on the mapreduce cluster
+  public void testQueueAclsForCurrentUserAclsDisabled() throws Exception {
     setupConf(false);
     //fetch the acls info for current user.
     QueueAclsInfo[] queueAclsInfoList = queueManager.
@@ -115,7 +114,7 @@
     checkQueueAclsInfo(queueAclsInfoList);
   }
 
-  public void testQueueAclsForNoAccess() throws IOException,LoginException {
+  public void testQueueAclsForNoAccess() throws Exception {
     setupConfForNoAccess();
     QueueAclsInfo[] queueAclsInfoList = queueManager.
             getQueueAcls(currentUGI);
@@ -124,7 +123,7 @@
 
   private void checkQueueAclsInfo(QueueAclsInfo[] queueAclsInfoList)
           throws IOException {
-    if (conf.get("mapred.acls.enabled").equalsIgnoreCase("true")) {
+    if (conf.get(MRConfig.MR_ACLS_ENABLED).equalsIgnoreCase("true")) {
       for (int i = 0; i < queueAclsInfoList.length; i++) {
         QueueAclsInfo acls = queueAclsInfoList[i];
         String queueName = acls.getQueueName();
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
index 3f5b344..0a7dd3c 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
@@ -22,8 +22,11 @@
 import org.apache.commons.logging.LogFactory;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import static org.junit.Assert.*;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -31,7 +34,6 @@
 import org.junit.Test;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import java.io.File;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -47,26 +49,32 @@
 
   @After
   public void tearDown() throws Exception {
-    new File(CONFIG).delete();
+    deleteQueuesConfigFile();
+  }
+
+  // create UGI with the given user name and the fixed group name "myGroup"
+  private UserGroupInformation createUGI(String userName) {
+    return UserGroupInformation.createUserForTesting(
+        userName, new String[]{"myGroup"});
   }
 
   @Test
   public void testDefault() throws Exception {
+    deleteQueuesConfigFile();
     QueueManager qm = new QueueManager();
     Queue root = qm.getRoot();
     assertEquals(root.getChildren().size(), 1);
     assertEquals(root.getChildren().iterator().next().getName(), "default");
-    assertFalse(qm.isAclsEnabled());
     assertNull(root.getChildren().iterator().next().getChildren());
   }
 
   @Test
   public void testXMLParsing() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
     Set<Queue> rootQueues = qm.getRoot().getChildren();
     List<String> names = new ArrayList<String>();
     for (Queue q : rootQueues) {
@@ -101,62 +109,63 @@
 
     assertTrue(
       q.getAcls().get(
-        QueueManager.toFullPropertyName(
+          toFullPropertyName(
           q.getName(), ACL_SUBMIT_JOB_TAG)).isUserAllowed(
-        UserGroupInformation.createRemoteUser("u1")));
+              createUGI("u1")));
 
     assertTrue(
       q.getAcls().get(
-        QueueManager.toFullPropertyName(
+          toFullPropertyName(
           q.getName(),
           ACL_ADMINISTER_JOB_TAG))
-        .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+        .isUserAllowed(createUGI("u2")));
     assertTrue(q.getState().equals(QueueState.STOPPED));
   }
 
   @Test
   public void testhasAccess() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    createSimpleDocumentWithAcls(doc,"true");
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    createSimpleDocumentWithAcls(doc);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
 
     UserGroupInformation ugi;
     // test for acls access when acls are set with *
-    ugi = UserGroupInformation.createRemoteUser("u1");
+    ugi = createUGI("u1");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
-    ugi = UserGroupInformation.createRemoteUser("u2");
+        QueueACL.SUBMIT_JOB, ugi));
+    ugi = createUGI("u2");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
-        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+        QueueACL.ADMINISTER_JOBS, ugi));
     
     // test for acls access when acls are not set with *
-    ugi = UserGroupInformation.createRemoteUser("u1");
+    ugi = createUGI("u1");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
-    ugi = UserGroupInformation.createRemoteUser("u2");
+        QueueACL.SUBMIT_JOB, ugi));
+    ugi = createUGI("u2");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
-        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+        QueueACL.ADMINISTER_JOBS, ugi));
     
-    // test for acls access when acls are not specified but acls is enabled
-    ugi = UserGroupInformation.createRemoteUser("u1");
-    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
-    ugi = UserGroupInformation.createRemoteUser("u2");
-    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
-        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+    // Test for acls access when acls are not specified but acls are enabled.
+    // By default, the queue acls for any queue are empty.
+    ugi = createUGI("u1");
+    assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+        QueueACL.SUBMIT_JOB, ugi));
+    ugi = createUGI("u2");
+    assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+        QueueACL.ADMINISTER_JOBS, ugi));
     
     assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
   }
   
   @Test
   public void testQueueView() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
     
     for (Queue queue : qm.getRoot().getChildren()) {
       checkHierarchy(queue, qm);
@@ -176,25 +185,21 @@
 
   @Test
   public void testhasAccessForParent() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
 
-    UserGroupInformation ugi =
-      UserGroupInformation.createRemoteUser("u1");
-    assertFalse(
-      qm.hasAccess(
-        "p1",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
+    UserGroupInformation ugi = createUGI("u1");
+    assertFalse(qm.hasAccess("p1", QueueACL.SUBMIT_JOB, ugi));
   }
 
   @Test
   public void testValidation() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "q1");
 
     q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
@@ -203,9 +208,9 @@
     q1.appendChild(createQueue(doc, "p16"));
 
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
       fail("Should throw an exception as configuration is wrong ");
     } catch (RuntimeException re) {
       LOG.info(re.getMessage());
@@ -214,27 +219,27 @@
 
   @Test
   public void testInvalidName() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "");
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
       fail("Should throw an exception as configuration is wrong ");
     } catch (Exception re) {
       re.printStackTrace();
       LOG.info(re.getMessage());
     }
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
-    queues = createQueuesNode(doc, "false");
+    queues = createQueuesNode(doc);
     q1 = doc.createElement("queue");
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
       fail("Should throw an exception as configuration is wrong ");
     } catch (RuntimeException re) {
       re.printStackTrace();
@@ -244,10 +249,10 @@
 
   @Test
   public void testMissingConfigFile() throws Exception {
-    checkForConfigFile(); // deletes file
+    deleteQueuesConfigFile(); // deletes file
 
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
       fail("Should throw an exception for missing file when " +
            "explicitly passed.");
     } catch (RuntimeException re) {
@@ -266,9 +271,9 @@
 
   @Test
   public void testEmptyProperties() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "q1");
     Element p = createProperties(doc, null);
     q1.appendChild(p);
@@ -277,11 +282,11 @@
 
   @Test
   public void testEmptyFile() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
       fail("Should throw an exception as configuration is wrong ");
     } catch (Exception re) {
       re.printStackTrace();
@@ -291,11 +296,11 @@
 
   @Test
   public void testJobQueueInfoGeneration() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
 
     List<JobQueueInfo> rootQueues =
       qm.getRoot().getJobQueueInfo().getChildren();
@@ -338,11 +343,11 @@
    */
   @Test
   public void testRefresh() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
     Queue beforeRefreshRoot = qm.getRoot();
     //remove the file and create new one.
     Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
@@ -360,16 +365,16 @@
             "p1" + NAME_SEPARATOR + "p12")) {
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(), ACL_SUBMIT_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u1")));
+                .isUserAllowed(createUGI("u1")));
 
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(),
                   ACL_ADMINISTER_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+                .isUserAllowed(createUGI("u2")));
             assertTrue(child.getState().equals(QueueState.STOPPED));
           } else {
             assertTrue(child.getState().equals(QueueState.RUNNING));
@@ -377,11 +382,11 @@
         }
       }
     }
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
     refreshSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, true);
     qm.getRoot().isHierarchySameAs(cp.getRoot());
     qm.setQueues(
       cp.getRoot().getChildren().toArray(
@@ -403,17 +408,17 @@
             "p1" + NAME_SEPARATOR + "p12")) {
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(),
                   ACL_SUBMIT_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u3")));
+                .isUserAllowed(createUGI("u3")));
 
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(),
                   ACL_ADMINISTER_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u4")));
+                .isUserAllowed(createUGI("u4")));
             assertTrue(child.getState().equals(QueueState.RUNNING));
           } else {
             assertTrue(child.getState().equals(QueueState.STOPPED));
@@ -425,20 +430,20 @@
 
   @Test
   public void testRefreshWithInvalidFile() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
 
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "");
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+      QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, false);
 
       fail("Should throw an exception as configuration is wrong ");
     } catch (Throwable re) {
@@ -548,19 +553,21 @@
    */
   @Test
   public void testDumpConfiguration() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);    
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+
     StringWriter out = new StringWriter();
-    QueueManager.dumpConfiguration(out,CONFIG,null);
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    QueueManager.dumpConfiguration(out, QUEUES_CONFIG_FILE_PATH, conf);
+
     ObjectMapper mapper = new ObjectMapper();
     // parse the Json dump
     JsonQueueTree queueTree =
       mapper.readValue(out.toString(), JsonQueueTree.class);
-    
-    // check if acls_enabled is correct
-    assertEquals(true, queueTree.isAcls_enabled());
+
     // check for the number of top-level queues
     assertEquals(2, queueTree.getQueues().length);
     
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
index 5f20f08..b46c758 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,6 +36,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -48,19 +49,12 @@
   private static final Log LOG =
       LogFactory.getLog(TestQueueManagerRefresh.class);
 
-  String queueConfigPath =
-      System.getProperty("test.build.extraconf", "build/test/extraconf");
-  File queueConfigFile =
-      new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
-
   /**
    * Remove the configuration file after the test's done.
    */
   @After
   public void tearDown() {
-    if (queueConfigFile.exists()) {
-      queueConfigFile.delete();
-    }
+    deleteQueuesConfigFile();
   }
 
   /**
@@ -96,8 +90,8 @@
     JobQueueInfo[] queues = getSimpleQueueHierarchy();
 
     // write the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     QueueManager qManager = new QueueManager();
 
@@ -107,8 +101,8 @@
     queues[0].addChild(newQueue);
 
     // Rewrite the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     testRefreshFailureWithChangeOfHierarchy(qManager);
 
@@ -127,8 +121,8 @@
     JobQueueInfo[] queues = getSimpleQueueHierarchy();
 
     // write the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     QueueManager qManager = new QueueManager();
 
@@ -137,8 +131,8 @@
     queues[0].removeChild(q2);
 
     // Rewrite the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     testRefreshFailureWithChangeOfHierarchy(qManager);
   }
@@ -187,8 +181,8 @@
     JobQueueInfo[] queues = getSimpleQueueHierarchy();
 
     // write the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     QueueManager qManager = new QueueManager();
 
@@ -226,8 +220,8 @@
     }
 
     // write the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     QueueManager qManager = new QueueManager();
 
@@ -261,8 +255,8 @@
     JobQueueInfo[] queues = getSimpleQueueHierarchy();
 
     // write the configuration file
-    QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    writeQueueConfigurationFile(
+        QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] });
 
     QueueManager qManager = new QueueManager();
 
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
index ca14c3a..4e60a04 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
@@ -38,6 +38,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -48,16 +49,21 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import static org.apache.hadoop.mapred.DeprecatedQueueConfigurationParser.*;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 public class TestQueueManagerWithDeprecatedConf extends TestCase {
 
-  static final Log LOG = LogFactory.getLog(TestQueueManagerWithDeprecatedConf.class);
-  
+  static final Log LOG = LogFactory.getLog(
+      TestQueueManagerWithDeprecatedConf.class);
+
+  String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+  String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
 
   
   public void testMultipleQueues() {
     JobConf conf = new JobConf();
-    conf.set("mapred.queue.names", "q1,q2,Q3");
+    conf.set(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY,
+        "q1,q2,Q3");
     QueueManager qMgr = new QueueManager(conf);
     Set<String> expQueues = new TreeSet<String>();
     expQueues.add("q1");
@@ -68,7 +74,8 @@
 
   public void testSchedulerInfo() {
     JobConf conf = new JobConf();
-    conf.set("mapred.queue.names", "qq1,qq2");
+    conf.set(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY,
+        "qq1,qq2");
     QueueManager qMgr = new QueueManager(conf);
     qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
     qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
@@ -85,44 +92,50 @@
     try {
       // queue properties with which the cluster is started.
       Properties hadoopConfProps = new Properties();
-      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
-      hadoopConfProps.put("mapred.acls.enabled", "true");
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      hadoopConfProps.put(DeprecatedQueueConfigurationParser.
+          MAPRED_QUEUE_NAMES_KEY, "default,q1,q2");
+      hadoopConfProps.put(MRConfig.MR_ACLS_ENABLED, "true");
 
-      //properties for mapred-queue-acls.xml
       UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser("unknownUser");
-      hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
-      hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u1");
-      hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "*");
-      hadoopConfProps.put("mapred.queue.default.acl-administer-jobs", ugi.getUserName());
-      hadoopConfProps.put("mapred.queue.q1.acl-administer-jobs", "u1");
-      hadoopConfProps.put("mapred.queue.q2.acl-administer-jobs", "*");
+      hadoopConfProps.put(toFullPropertyName(
+          "default", submitAcl), ugi.getUserName());
+      hadoopConfProps.put(toFullPropertyName(
+          "q1", submitAcl), "u1");
+      hadoopConfProps.put(toFullPropertyName(
+          "q2", submitAcl), "*");
+      hadoopConfProps.put(toFullPropertyName(
+          "default", adminAcl), ugi.getUserName());
+      hadoopConfProps.put(toFullPropertyName(
+          "q1", adminAcl), "u2");
+      hadoopConfProps.put(toFullPropertyName(
+          "q2", adminAcl), "*");
 
       UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
 
       Configuration conf = new JobConf();
+      conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
       QueueManager queueManager = new QueueManager(conf);
-      //Testing access to queue.
+      //Testing job submission access to queues.
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("default", Queue.QueueOperation.
+          queueManager.hasAccess("default", QueueACL.
               SUBMIT_JOB, ugi));
       assertFalse("User Job Submission failed.",
-          queueManager.hasAccess("q1", Queue.QueueOperation.
+          queueManager.hasAccess("q1", QueueACL.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
+          queueManager.hasAccess("q2", QueueACL.
               SUBMIT_JOB, ugi));
-      //Testing the admin acls
+      //Testing the administer-jobs acls
       assertTrue("User Job Submission failed.",
-           queueManager.hasAccess("default", Queue.QueueOperation.ADMINISTER_JOBS, ugi));
-       assertFalse("User Job Submission failed.",
-           queueManager.hasAccess("q1", Queue.QueueOperation.
-               ADMINISTER_JOBS, ugi));
-       assertTrue("User Job Submission failed.",
-           queueManager.hasAccess("q2", Queue.QueueOperation.
-               ADMINISTER_JOBS, ugi));
-
+          queueManager.hasAccess("default",
+              QueueACL.ADMINISTER_JOBS, ugi));
+      assertFalse("User Job Submission failed.",
+          queueManager.hasAccess("q1", QueueACL.
+              ADMINISTER_JOBS, ugi));
+      assertTrue("User Job Submission failed.",
+          queueManager.hasAccess("q2", QueueACL.
+              ADMINISTER_JOBS, ugi));
  
     } finally {
       //Cleanup the configuration files in all cases
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
index c3200fc..0e1fd48 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
@@ -19,8 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import static org.apache.hadoop.mapred.QueueConfigurationParser.NAME_SEPARATOR;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
@@ -29,7 +29,6 @@
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.submitSleepJob;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
 import static org.junit.Assert.assertEquals;
@@ -37,7 +36,6 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.File;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Properties;
@@ -47,11 +45,12 @@
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -60,21 +59,26 @@
 
   private static Configuration conf;
 
-  @BeforeClass
-  public static void setUp() throws Exception {
-    checkForConfigFile();
-    Document doc = createDocument();
-    createSimpleDocumentWithAcls(doc, "true");
-    writeToFile(doc, CONFIG);
-    conf = new Configuration();
-    conf.addResource(CONFIG);
-    conf.set("mapred.committer.job.setup.cleanup.needed", "false");
-    setUpCluster(conf);
-  }
-
   @AfterClass
   public static void tearDown() throws Exception {
-    new File(CONFIG).delete();
+    deleteQueuesConfigFile();
+  }
+
+  private void startCluster(boolean aclsEnabled)
+      throws Exception {
+
+    deleteQueuesConfigFile();
+    Document doc = createDocument();
+    createSimpleDocumentWithAcls(doc);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    conf = new Configuration();
+    conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclsEnabled);
+
+    JobConf jobConf = new JobConf(conf);
+    String namenode = "file:///";
+    miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+
   }
 
   /**
@@ -83,6 +87,8 @@
    */
   @Test(expected = IOException.class)
   public void testSubmitJobForStoppedQueue() throws Exception {
+    startCluster(true);
+
     submitSleepJob(10, 10, 100, 100, false, null,
         "p1" + NAME_SEPARATOR + "p14", conf);
     fail("queue p1:p14 is in stopped state and should not accept jobs");
@@ -94,8 +100,10 @@
    */
   @Test(expected = IOException.class)
   public void testSubmitJobForContainerQueue() throws Exception {
-      submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
-      fail("queue p1 is a container queue and cannot have jobs");
+    startCluster(true);
+
+    submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
+    fail("queue p1 is a container queue and cannot have jobs");
   }
 
   /**
@@ -104,12 +112,16 @@
    */
   @Test
   public void testAclsForSubmitJob() throws Exception {
+    startCluster(true);
+
     Job job;
+    try {
     // submit job to queue p1:p13 with unspecified acls 
     job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
         + "p13", conf);
-    assertTrue("Job submission for u1 failed in queue : p1:p13.",
-        job.isSuccessful());
+    fail("user u1 cannot submit jobs to queue p1:p13");
+    } catch (Exception e) {
+    }
     // check for access to submit the job
     try {
       job = submitSleepJob(0, 0, 0, 0, false, "u2,g1", "p1" + NAME_SEPARATOR
@@ -117,11 +129,17 @@
       fail("user u2 cannot submit jobs to queue p1:p11");
     } catch (Exception e) {
     }
-    // submit job to queue p1:p11 with acls-submit-job as u1
+    // submit job to queue p1:p11 with acl-submit-job as u1
     job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1"
         + NAME_SEPARATOR + "p11", conf);
     assertTrue("Job submission for u1 failed in queue : p1:p11.",
         job.isSuccessful());
+    
+    // submit job to queue p1:p12 with acl-submit-job as *
+    job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1"
+        + NAME_SEPARATOR + "p12", conf);
+    assertTrue("Job submission for u2 failed in queue : p1:p12.",
+        job.isSuccessful());
   }
 
   /**
@@ -130,6 +148,8 @@
    */
   @Test
   public void testAccessToKillJob() throws Exception {
+    startCluster(true);
+
     Job job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1"
         + NAME_SEPARATOR + "p11", conf);
     final JobConf jobConf = miniMRCluster.createJobConf();
@@ -229,11 +249,13 @@
    */
   @Test
   public void testSubmitJobsAfterRefresh() throws Exception {
+    startCluster(true);
+
     // test for refresh
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     refreshDocument(doc);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
     admin.run(new String[] { "-refreshQueues" });
     try {
@@ -242,15 +264,15 @@
       fail("user u1 is not in the submit jobs' list");
     } catch (Exception e) {
     }
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
-    createSimpleDocumentWithAcls(doc, "true");
-    writeToFile(doc, CONFIG);
+    createSimpleDocumentWithAcls(doc);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     admin.run(new String[] { "-refreshQueues" });
   }
 
   private void refreshDocument(Document doc) {
-    Element queues = createQueuesNode(doc, "true");
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -298,12 +320,7 @@
    */
   @Test
   public void testAclsDisabled() throws Exception {
-    checkForConfigFile();
-    Document doc = createDocument();
-    createSimpleDocumentWithAcls(doc, "false");
-    writeToFile(doc, CONFIG);
-    MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
-    admin.run(new String[] { "-refreshQueues" });
+    startCluster(false);
 
     // submit job to queue p1:p11 by any user not in acls-submit-job
     Job job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1" + NAME_SEPARATOR
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java b/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
index f796a7a..b77e9d7 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
@@ -32,6 +32,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesConfigFile;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -156,11 +160,9 @@
     // clean up
     FileSystem fs = FileSystem.get(new Configuration());
     fs.delete(TEST_DIR, true);
-    
+
     JobConf conf = new JobConf();
     conf.set(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, "1024");
-    conf.set(
-      DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY, "default");
     
     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
     JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
@@ -239,10 +241,11 @@
                                       true);
     mr.getJobTrackerConf().setInt(JTConfig.JT_TASKS_PER_JOB, 25);
     
-    mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
+    mr.getJobTrackerConf().setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job", 
-                               ugi.getUserName());
+    mr.getJobTrackerConf().set(toFullPropertyName(
+        "default", QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
 
     // start the jobtracker
     LOG.info("Starting jobtracker");
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java b/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
index 869ea6a..c95cc57 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
@@ -39,12 +39,12 @@
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
@@ -122,12 +122,14 @@
     System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
 
     trackerFConf = new JobConf();
+
     trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     localDirs = new String[numLocalDirs];
     for (int i = 0; i < numLocalDirs; i++) {
       localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
     }
     trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+    trackerFConf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
     // Create the job configuration file. Same as trackerConf in this test.
     jobConf = new JobConf(trackerFConf);
@@ -139,6 +141,15 @@
     jobConf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 0);
     jobConf.setUser(getJobOwner().getShortUserName());
 
+    String queue = "default";
+    // set job queue name in job conf
+    jobConf.setQueueName(queue);
+    // Set queue admins acl in job conf similar to what JobClient does so that
+    // it goes into job conf also.
+    jobConf.set(toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName()),
+        "qAdmin1,qAdmin2 qAdminsGroup1,qAdminsGroup2");
+
     Job job = Job.getInstance(jobConf);
     String jtIdentifier = "200907202331";
     jobId = new JobID(jtIdentifier, 1);
@@ -527,6 +538,37 @@
         .exists());
     checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
         taskTrackerUGI.getGroupNames()[0]);
+
+    // Make sure that the job ACLs file job-acls.xml exists in job userlog dir
+    File jobACLsFile = new File(jobLogDir, TaskTracker.jobACLsFile);
+    assertTrue("JobACLsFile is missing in the job userlog dir " + jobLogDir,
+        jobACLsFile.exists());
+
+    // With default task controller, the job-acls.xml file is owned by TT and
+    // permissions are 700
+    checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
+        taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
+
+    validateJobACLsFileContent();
+  }
+
+  // Validate the contents of jobACLsFile ( i.e. user name, job-view-acl, queue
+  // name and queue-admins-acl ).
+  protected void validateJobACLsFileContent() {
+    JobConf jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(jobId);
+
+    assertTrue(jobACLsConf.get("user.name").equals(
+        localizedJobConf.getUser()));
+    assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
+        equals(localizedJobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB)));
+
+    String queue = localizedJobConf.getQueueName();
+    assertTrue(queue.equalsIgnoreCase(jobACLsConf.getQueueName()));
+
+    String qACLName = toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    assertTrue(jobACLsConf.get(qACLName).equals(
+        localizedJobConf.get(qACLName)));
   }
 
   /**
@@ -645,24 +687,6 @@
         + expectedStderr.toString() + " Observed : "
         + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
         attemptLogFiles[1].toString()));
-
-    // Make sure that the job ACLs file exists in the task log dir
-    File jobACLsFile = new File(logDir, TaskRunner.jobACLsFile);
-    assertTrue("JobACLsFile is missing in the task log dir " + logDir,
-        jobACLsFile.exists());
-
-    // With default task controller, the job-acls file is owned by TT and
-    // permissions are 700
-    checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
-        taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
-
-    // Validate the contents of jobACLsFile(both user name and job-view-acls)
-    Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(task
-        .getTaskID(), task.isTaskCleanupTask());
-    assertTrue(jobACLsConf.get(MRJobConfig.USER_NAME).equals(
-        localizedJobConf.getUser()));
-    assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
-        equals(localizedJobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB)));
   }
 
   /**
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java b/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
index f2aa35b..1707305 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
@@ -55,6 +55,7 @@
     taskLogCleanupThread = new UserLogCleaner(conf);
     taskLogCleanupThread.setClock(myClock);
     tt = new TaskTracker();
+    tt.setConf(new JobConf(conf));
     tt.setLocalizer(localizer);
     tt.setTaskLogCleanupThread(taskLogCleanupThread);
   }
@@ -67,8 +68,9 @@
   private File localizeJob(JobID jobid) throws IOException {
     File jobUserlog = TaskLog.getJobDir(jobid);
 
+    JobConf conf = new JobConf();
     // localize job log directory
-    tt.initializeJobLogDir(jobid);
+    tt.initializeJobLogDir(jobid, conf);
     assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
     return jobUserlog;
   }
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java b/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
index 39cbe0d..25f6a92 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
@@ -40,6 +40,8 @@
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -57,12 +59,18 @@
   private static final Log LOG = LogFactory.getLog(
       TestWebUIAuthorization.class);
 
-  // user1 submits the jobs
+  // users who submit the jobs
   private static final String jobSubmitter = "user1";
+  private static final String jobSubmitter1 = "user11";
+  private static final String jobSubmitter2 = "user12";
+  private static final String jobSubmitter3 = "user13";
+
   // mrOwner starts the cluster
   private static String mrOwner = null;
   // member of supergroup
   private static final String superGroupMember = "user2";
+  // admin of "default" queue
+  private static final String qAdmin = "user3";
   // "colleague1" is there in job-view-acls config
   private static final String viewColleague = "colleague1";
   // "colleague2" is there in job-modify-acls config
@@ -72,10 +80,17 @@
   // "evilJohn" is not having view/modify access on the jobs
   private static final String unauthorizedUser = "evilJohn";
 
+  @Override
   protected void setUp() throws Exception {
     // do not do anything
   };
 
+  @Override
+  protected void tearDown() throws Exception {
+    deleteQueuesConfigFile();
+    super.tearDown();
+  }
+
   /** access a url, ignoring some IOException such as the page does not exist */
   static int getHttpStatusCode(String urlstring, String userName,
       String method) throws IOException {
@@ -112,45 +127,51 @@
    * Validates the given jsp/servlet against different user names who
    * can(or cannot) view the job.
    * (1) jobSubmitter can view the job
-   * (2) superGroupMember can view the job
-   * (3) user mentioned in job-view-acls should be able to view the job
-   * (4) user mentioned in job-modify-acls but not in job-view-acls
+   * (2) superGroupMember can view any job
+   * (3) mrOwner can view any job
+   * (4) qAdmins of the queue to which job is submitted to can view any job in
+   *     that queue.
+   * (5) user mentioned in job-view-acl should be able to view the
+   *     job irrespective of job-modify-acl.
+   * (6) user mentioned in job-modify-acl but not in job-view-acl
    *     cannot view the job
-   * (5) other unauthorized users cannot view the job
+   * (7) other unauthorized users cannot view the job
    */
   private void validateViewJob(String url, String method)
       throws IOException {
-    assertEquals("Incorrect return code for " + jobSubmitter,
+    assertEquals("Incorrect return code for job submitter " + jobSubmitter,
         HttpURLConnection.HTTP_OK, getHttpStatusCode(url, jobSubmitter,
             method));
-    assertEquals("Incorrect return code for " + superGroupMember,
-        HttpURLConnection.HTTP_OK, getHttpStatusCode(url, superGroupMember,
-            method));
-    assertEquals("Incorrect return code for " + mrOwner,
+    assertEquals("Incorrect return code for supergroup-member " +
+        superGroupMember, HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, superGroupMember, method));
+    assertEquals("Incorrect return code for MR-owner " + mrOwner,
         HttpURLConnection.HTTP_OK, getHttpStatusCode(url, mrOwner, method));
-    assertEquals("Incorrect return code for " + viewColleague,
-        HttpURLConnection.HTTP_OK, getHttpStatusCode(url, viewColleague,
-            method));
-    assertEquals("Incorrect return code for " + viewAndModifyColleague,
-        HttpURLConnection.HTTP_OK, getHttpStatusCode(url,
-            viewAndModifyColleague, method));
-    assertEquals("Incorrect return code for " + modifyColleague,
-        HttpURLConnection.HTTP_UNAUTHORIZED, getHttpStatusCode(url,
-            modifyColleague, method));
-    assertEquals("Incorrect return code for " + unauthorizedUser,
-        HttpURLConnection.HTTP_UNAUTHORIZED, getHttpStatusCode(url,
-            unauthorizedUser, method));
+    assertEquals("Incorrect return code for queue admin " + qAdmin,
+        HttpURLConnection.HTTP_OK, getHttpStatusCode(url, qAdmin, method));
+    assertEquals("Incorrect return code for user in job-view-acl " +
+        viewColleague, HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, viewColleague, method));
+    assertEquals("Incorrect return code for user in job-view-acl and " +
+        "job-modify-acl " + viewAndModifyColleague, HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, viewAndModifyColleague, method));
+    assertEquals("Incorrect return code for user in job-modify-acl " +
+        modifyColleague, HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, modifyColleague, method));
+    assertEquals("Incorrect return code for unauthorizedUser " +
+        unauthorizedUser, HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, unauthorizedUser, method));
   }
 
   /**
    * Validates the given jsp/servlet against different user names who
    * can(or cannot) modify the job.
-   * (1) jobSubmitter and superGroupMember can modify the job. But we are not
-   *     validating this in this method. Let the caller explicitly validate
-   *     this, if needed.
-   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   * (1) jobSubmitter, mrOwner, qAdmin and superGroupMember can modify the job.
+   *     But we are not validating this in this method. Let the caller
+   *     explicitly validate this, if needed.
+   * (2) user mentioned in job-view-acl but not in job-modify-acl cannot
    *     modify the job
-   * (3) user mentioned in job-modify-acls (irrespective of job-view-acls)
+   * (3) user mentioned in job-modify-acl (irrespective of job-view-acl)
    *     can modify the job
    * (4) other unauthorized users cannot modify the job
    */
@@ -240,13 +261,13 @@
     Properties props = new Properties();
     props.setProperty("hadoop.http.filter.initializers",
         DummyFilterInitializer.class.getName());
-    props.setProperty(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG,
+    props.setProperty(MRConfig.MR_ACLS_ENABLED,
         String.valueOf(true));
+
     props.setProperty("dfs.permissions.enabled", "false");
     props.setProperty("mapred.job.tracker.history.completed.location",
         "historyDoneFolderOnHDFS");
-    props.setProperty("mapreduce.job.committer.setup.cleanup.needed",
-        "false");
+    props.setProperty(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
     props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
 
     MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
@@ -255,11 +276,17 @@
     MyGroupsProvider.mapping.put(unauthorizedUser, Arrays.asList("evilSociety"));
     MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
     MyGroupsProvider.mapping.put(viewAndModifyColleague, Arrays.asList("group3"));
+    MyGroupsProvider.mapping.put(qAdmin, Arrays.asList("group4"));
+
     mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
     MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
-        new String[] { "group4", "group5" }));
+        new String[] { "group5", "group6" }));
 
-    startCluster(true, props);
+    String[] queueNames = {"default"};
+    String[] submitAclStrings = new String[] { jobSubmitter };
+    String[] adminsAclStrings = new String[] { qAdmin };
+    startCluster(props, queueNames, submitAclStrings, adminsAclStrings);
+
     MiniMRCluster cluster = getMRCluster();
     int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
 
@@ -338,30 +365,57 @@
             Integer.toString(attemptsMap.get(attempt).getHttpPort()),
             attempt.toString()) + "&filter=" + TaskLog.LogName.STDERR;
         validateViewJob(stderrURL, "GET");
+      }
+    }
 
-        // delete job-acls.xml file from the task log dir of attempt and verify
-        // if unauthorized users can view task logs of attempt.
-        File attemptLogDir = TaskLog.getAttemptDir(
-            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
-        Path jobACLsFilePath = new Path(attemptLogDir.toString(),
-            TaskRunner.jobACLsFile);
-        new File(jobACLsFilePath.toUri().getPath()).delete();
+    // For each tip, let us test the effect of deletion of job-acls.xml file and
+    // deletion of task log dir for each of the attempts of the tip.
+
+    // delete job-acls.xml file from the job userlog dir and verify
+    // if unauthorized users can view task logs of each attempt.
+    Path jobACLsFilePath = new Path(TaskLog.getJobDir(jobid).toString(),
+        TaskTracker.jobACLsFile);
+    assertTrue("Could not delete job-acls.xml file.",
+        new File(jobACLsFilePath.toUri().getPath()).delete());
+
+    for (TaskID tip : tipsMap.keySet()) {
+
+      Map<TaskAttemptID, TaskAttemptInfo> attemptsMap =
+        tipsMap.get(tip).getAllTaskAttempts();
+      for (TaskAttemptID attempt : attemptsMap.keySet()) {
+
+        String stdoutURL = TaskLogServlet.getTaskLogUrl("localhost",
+            Integer.toString(attemptsMap.get(attempt).getHttpPort()),
+            attempt.toString()) + "&filter=" + TaskLog.LogName.STDOUT;;
+
+        String stderrURL = TaskLogServlet.getTaskLogUrl("localhost",
+            Integer.toString(attemptsMap.get(attempt).getHttpPort()),
+            attempt.toString()) + "&filter=" + TaskLog.LogName.STDERR;
+
+        // unauthorized users can view task logs of each attempt because
+        // job-acls.xml file is deleted.
         assertEquals("Incorrect return code for " + unauthorizedUser,
             HttpURLConnection.HTTP_OK, getHttpStatusCode(stdoutURL,
-            unauthorizedUser, "GET"));
+                unauthorizedUser, "GET"));
         assertEquals("Incorrect return code for " + unauthorizedUser,
             HttpURLConnection.HTTP_OK, getHttpStatusCode(stderrURL,
-            unauthorizedUser, "GET"));
+                unauthorizedUser, "GET"));
 
         // delete the whole task log dir of attempt and verify that we get
         // correct response code (i.e. HTTP_GONE) when task logs are accessed.
+        File attemptLogDir = TaskLog.getAttemptDir(
+            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
         FileUtil.fullyDelete(attemptLogDir);
+
+        // Try accessing tasklogs - STDOUT and STDERR now(after the whole
+        // attempt log dir is deleted).
         assertEquals("Incorrect return code for " + jobSubmitter,
             HttpURLConnection.HTTP_GONE, getHttpStatusCode(stdoutURL,
-            jobSubmitter, "GET"));
+                jobSubmitter, "GET"));
+
         assertEquals("Incorrect return code for " + jobSubmitter,
             HttpURLConnection.HTTP_GONE, getHttpStatusCode(stderrURL,
-            jobSubmitter, "GET"));
+                jobSubmitter, "GET"));
       }
     }
 
@@ -377,6 +431,21 @@
   }
 
   /**
+   * Creates queues configuration file with the given queues and acls and starts
+   * cluster with that queues configuration file.
+   * @param props   configuration properties to inject to the mini cluster
+   * @param queueNames   the job queues on the cluster 
+   * @param submitAclStrings acl-submit-job acls for all queues
+   * @param adminsAclStrings acl-administer-jobs acls for all queues
+   * @throws Exception
+   */
+  private void startCluster(Properties props, String[] queueNames,
+      String[] submitAclStrings, String[] adminsAclStrings) throws Exception {
+    createQueuesConfigFile(queueNames, submitAclStrings, adminsAclStrings);
+    startCluster(true, props);
+  }
+
+  /**
    * Starts a sleep job and tries to kill the job using jobdetails.jsp as
    * (1) viewColleague (2) unauthorizedUser (3) modifyColleague
    * (4) viewAndModifyColleague (5) mrOwner (6) superGroupMember and
@@ -387,11 +456,13 @@
    * (1) jobSubmitter, mrOwner and superGroupMember can do both view and modify
    *     on the job. But we are not validating this in this method. Let the
    *     caller explicitly validate this, if needed.
-   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   * (2) user mentioned in job-view-acls and job-modify-acls can do this
+   * (3) user mentioned in job-view-acls but not in job-modify-acls cannot
    *     do this
-   * (3) user mentioned in job-modify-acls but not in job-view-acls cannot
+   * (4) user mentioned in job-modify-acls but not in job-view-acls cannot
    *     do this
-   * (4) other unauthorized users cannot do this
+   * (5) qAdmin cannot do this because he doesn't have view access to the job
+   * (6) other unauthorized users cannot do this
    *
    * @throws Exception
    */
@@ -419,8 +490,11 @@
           getHttpStatusCode(url, unauthorizedUser, "POST"));
       assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
           getHttpStatusCode(url, modifyColleague, "POST"));
+
       assertEquals(HttpURLConnection.HTTP_OK,
           getHttpStatusCode(url, viewAndModifyColleague, "POST"));
+      assertTrue("killJob using jobdetails.jsp failed for a job for which "
+          + "user has job-view and job-modify permissions", job.isComplete());
     } finally {
       if (!job.isComplete()) {
         LOG.info("Killing job " + jobid + " from finally block");
@@ -430,14 +504,16 @@
       }
     }
 
-    // check if jobSubmitter, mrOwner and superGroupMember can do killJob
-    // using jobdetails.jsp url
+    // Check if jobSubmitter, mrOwner, superGroupMember and queueAdmins
+    // can do killJob using jobdetails.jsp url
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-                                       jobSubmitter);
+                                      jobSubmitter);
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-                                       mrOwner);
+                                      mrOwner);
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-                                       superGroupMember);
+                                      superGroupMember);
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                      qAdmin);
   }
 
   /**
@@ -486,47 +562,47 @@
     // jobTrackerJSP killJob url
     String url = jobTrackerJSP + "&killJobs=true";
     // view-job-acl doesn't matter for killJob from jobtracker jsp page
-    conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+    conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
     
-    // Let us start jobs as 4 different users(none of these 4 users is
+    // Let us start 4 jobs as 4 different users(none of these 4 users is
     // mrOwner and none of these users is a member of superGroup). So only
-    // based on the config JobContext.JOB_ACL_MODIFY_JOB being set here,
-    // killJob on each of the jobs will be succeeded.
+    // based on the config MRJobConfig.JOB_ACL_MODIFY_JOB being set here
+    // and the jobSubmitter, killJob on each of the jobs will be succeeded.
 
     // start 1st job.
     // Out of these 4 users, only jobSubmitter can do killJob on 1st job
-    conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, "");
+    conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, " ");
     Job job1 = startSleepJobAsUser(jobSubmitter, conf);
     org.apache.hadoop.mapreduce.JobID jobid = job1.getJobID();
     getTIPId(cluster, jobid);// wait till the map task is started
     url = url.concat("&jobCheckBox=" + jobid.toString());
     // start 2nd job.
-    // Out of these 4 users, only viewColleague can do killJob on 2nd job
-    Job job2 = startSleepJobAsUser(viewColleague, conf);
+    // Out of these 4 users, only jobSubmitter1 can do killJob on 2nd job
+    Job job2 = startSleepJobAsUser(jobSubmitter1, conf);
     jobid = job2.getJobID();
     getTIPId(cluster, jobid);// wait till the map task is started
     url = url.concat("&jobCheckBox=" + jobid.toString());
     // start 3rd job.
-    // Out of these 4 users, only modifyColleague can do killJob on 3rd job
-    Job job3 = startSleepJobAsUser(modifyColleague, conf);
+    // Out of these 4 users, only jobSubmitter2 can do killJob on 3rd job
+    Job job3 = startSleepJobAsUser(jobSubmitter2, conf);
     jobid = job3.getJobID();
     getTIPId(cluster, jobid);// wait till the map task is started
     url = url.concat("&jobCheckBox=" + jobid.toString());
     // start 4rd job.
-    // Out of these 4 users, viewColleague and viewAndModifyColleague
+    // Out of these 4 users, jobSubmitter1 and jobSubmitter3
     // can do killJob on 4th job
-    conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, viewColleague);
-    Job job4 = startSleepJobAsUser(viewAndModifyColleague, conf);
+    conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, jobSubmitter1);
+    Job job4 = startSleepJobAsUser(jobSubmitter3, conf);
     jobid = job4.getJobID();
     getTIPId(cluster, jobid);// wait till the map task is started
     url = url.concat("&jobCheckBox=" + jobid.toString());
 
     try {
-      // Try killing all the 4 jobs as user viewColleague who can kill only
+      // Try killing all the 4 jobs as user jobSubmitter1 who can kill only
       // 2nd and 4th jobs. Check if 1st and 3rd jobs are not killed and
       // 2nd and 4th jobs got killed
       assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
-          getHttpStatusCode(url, viewColleague, "POST"));
+          getHttpStatusCode(url, jobSubmitter1, "POST"));
       assertFalse("killJob succeeded for a job for which user doesnot "
           + " have job-modify permission", job1.isComplete());
       assertFalse("killJob succeeded for a job for which user doesnot "
@@ -557,12 +633,12 @@
     Properties props = new Properties();
     props.setProperty("hadoop.http.filter.initializers",
         DummyFilterInitializer.class.getName());
-    props.setProperty(
-       MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, String.valueOf(true));
+    props.setProperty(MRConfig.MR_ACLS_ENABLED, String.valueOf(true));
+
     props.setProperty("dfs.permissions.enabled", "false");
     
     props.setProperty(JSPUtil.PRIVATE_ACTIONS_KEY, "true");
-    props.setProperty("mapreduce.job.committer.setup.cleanup.needed", "false");
+    props.setProperty(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
     props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
 
     MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
@@ -571,12 +647,22 @@
     MyGroupsProvider.mapping.put(unauthorizedUser, Arrays.asList("evilSociety"));
     MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
     MyGroupsProvider.mapping.put(viewAndModifyColleague, Arrays.asList("group3"));
+    MyGroupsProvider.mapping.put(qAdmin, Arrays.asList("group4"));
 
     mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
     MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
-        new String[] { "group4", "group5" }));
+        new String[] { "group5", "group6" }));
+    
+    MyGroupsProvider.mapping.put(jobSubmitter1, Arrays.asList("group7"));
+    MyGroupsProvider.mapping.put(jobSubmitter2, Arrays.asList("group7"));
+    MyGroupsProvider.mapping.put(jobSubmitter3, Arrays.asList("group7"));
 
-    startCluster(true, props);
+    String[] queueNames = {"default"};
+    String[] submitAclStrings = {jobSubmitter + "," + jobSubmitter1 + ","
+        + jobSubmitter2 + "," + jobSubmitter3};
+    String[] adminsAclStrings = new String[]{qAdmin};
+    startCluster(props, queueNames, submitAclStrings, adminsAclStrings);
+
     MiniMRCluster cluster = getMRCluster();
     int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
 
@@ -630,8 +716,8 @@
         viewAndModifyColleague);
     confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, jobSubmitter);
     confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrOwner);
-    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
-        superGroupMember);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, superGroupMember);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, qAdmin);
 
     // validate killing of multiple jobs using jobtracker jsp and check
     // if all the jobs which can be killed by user are actually the ones that
@@ -676,7 +762,7 @@
         "&changeJobPriority=true&setJobPriority="+"HIGH"+"&jobCheckBox=" +
         jobid.toString();
     validateModifyJob(jobTrackerJSPSetJobPriorityAction, "GET");
-    // jobSubmitter, mrOwner and superGroupMember are not validated for
+    // jobSubmitter, mrOwner, qAdmin and superGroupMember are not validated for
     // job-modify permission in validateModifyJob(). So let us do it
     // explicitly here
     assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
@@ -684,6 +770,8 @@
     assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
         jobTrackerJSPSetJobPriorityAction, superGroupMember, "GET"));
     assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, qAdmin, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
         jobTrackerJSPSetJobPriorityAction, mrOwner, "GET"));
   }
 
@@ -759,6 +847,8 @@
     assertEquals(HttpURLConnection.HTTP_OK,
         getHttpStatusCode(jobTrackerJSP, mrOwner, "GET"));
     assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, qAdmin, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
         getHttpStatusCode(jobTrackerJSP, superGroupMember, "GET"));
   }
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java b/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
index 27ea213..de2fcc4 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
@@ -26,11 +26,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobInProgress;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Operation;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Before;
@@ -56,23 +57,33 @@
           TestJobACLs.class.getCanonicalName() + Path.SEPARATOR
               + "completed-job-store");
 
+  private String jobSubmitter = "jobSubmitter";
+  private String viewColleague = "viewColleague";
+  private String modifyColleague = "modifyColleague";
+  private String qAdmin = "qAdmin";
+
   /**
    * Start the cluster before running the actual test.
    * 
    * @throws IOException
    */
   @Before
-  public void setup() throws IOException {
+  public void setup() throws Exception {
     // Start the cluster
     startCluster(false);
   }
 
-  private void startCluster(boolean reStart) throws IOException {
-    UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
+  private void startCluster(boolean reStart) throws Exception {
+
+    // Configure job queues
+    String[] queueNames = {"default"};
+    createQueuesConfigFile(queueNames,
+        new String[] { jobSubmitter }, new String[] { qAdmin });
+
     JobConf conf = new JobConf();
 
-    // Enable job-level authorization
-    conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+    // Enable queue and job level authorization
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
     // Enable CompletedJobStore
     FileSystem fs = FileSystem.getLocal(conf);
@@ -84,6 +95,7 @@
     conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
     conf.set(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
 
+    UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
     mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, MR_UGI, conf);
   }
 
@@ -92,6 +104,7 @@
    */
   @After
   public void tearDown() {
+    deleteQueuesConfigFile();
     if (mr != null) {
       mr.shutdown();
     }
@@ -106,10 +119,10 @@
    * @throws ClassNotFoundException
    */
   @Test
-  public void testACLS() throws IOException, InterruptedException,
-      ClassNotFoundException {
+  public void testACLS() throws Exception {
     verifyACLViewJob();
-    verifyACLModifyJob();
+    verifyACLModifyJob(modifyColleague);
+    verifyACLModifyJob(qAdmin);
     verifyACLPersistence();
   }
 
@@ -123,18 +136,21 @@
 
     // Set the job up.
     final Configuration myConf = mr.createJobConf();
-    myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "user1,user3");
+    myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague);
 
     // Submit the job as user1
-    Job job = submitJobAsUser(myConf, "user1");
+    Job job = submitJobAsUser(myConf, jobSubmitter);
 
     final JobID jobId = job.getJobID();
 
     // Try operations as an unauthorized user.
-    verifyViewJobAsUnauthorizedUser(myConf, jobId, "user2");
+    verifyViewJobAsUnauthorizedUser(myConf, jobId, modifyColleague);
 
-    // Try operations as an authorized user.
-    verifyViewJobAsAuthorizedUser(myConf, jobId, "user3");
+    // Try operations as an authorized user, who is part of view-job-acl.
+    verifyViewJobAsAuthorizedUser(myConf, jobId, viewColleague);
+
+    // Try operations as an authorized user, who is a queue administrator.
+    verifyViewJobAsAuthorizedUser(myConf, jobId, qAdmin);
 
     // Clean up the job
     job.killJob();
@@ -242,7 +258,7 @@
           fail("AccessControlException expected..");
         } catch (IOException ioe) {
           assertTrue(ioe.getMessage().contains(
-              JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.VIEW_JOB));
+              " cannot perform operation " + JobACL.VIEW_JOB));
         } catch (InterruptedException e) {
           fail("Exception .. interrupted.." + e);
         }
@@ -253,7 +269,7 @@
           fail("AccessControlException expected..");
         } catch (IOException ioe) {
           assertTrue(ioe.getMessage().contains(
-              JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.VIEW_JOB));
+              " cannot perform operation " + JobACL.VIEW_JOB));
         } catch (InterruptedException e) {
           fail("Exception .. interrupted.." + e);
         }
@@ -264,29 +280,29 @@
   }
 
   /**
-   * Verify JobContext.Job_ACL_MODIFY_JOB
+   * Verify MRConfig.Job_ACL_MODIFY_JOB
    * 
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  private void verifyACLModifyJob() throws IOException,
+  private void verifyACLModifyJob(String authorizedUser) throws IOException,
       InterruptedException, ClassNotFoundException {
 
     // Set the job up.
     final Configuration myConf = mr.createJobConf();
-    myConf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, "user1,user3");
+    myConf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, modifyColleague);
 
     // Submit the job as user1
-    Job job = submitJobAsUser(myConf, "user1");
+    Job job = submitJobAsUser(myConf, jobSubmitter);
 
     final JobID jobId = job.getJobID();
 
     // Try operations as an unauthorized user.
-    verifyModifyJobAsUnauthorizedUser(myConf, jobId, "user2");
+    verifyModifyJobAsUnauthorizedUser(myConf, jobId, viewColleague);
 
     // Try operations as an authorized user.
-    verifyModifyJobAsAuthorizedUser(myConf, jobId, "user3");
+    verifyModifyJobAsAuthorizedUser(myConf, jobId, authorizedUser);
   }
 
   private void verifyModifyJobAsAuthorizedUser(
@@ -357,7 +373,7 @@
           fail("AccessControlException expected..");
         } catch (IOException ioe) {
           assertTrue(ioe.getMessage().contains(
-            JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.MODIFY_JOB));
+              " cannot perform operation " + Operation.KILL_JOB));
         } catch (InterruptedException e) {
           fail("Exception .. interrupted.." + e);
         }
@@ -368,7 +384,7 @@
           fail("AccessControlException expected..");
         } catch (IOException ioe) {
           assertTrue(ioe.getMessage().contains(
-            JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.MODIFY_JOB));
+              " cannot perform operation " + Operation.SET_JOB_PRIORITY));
         } catch (InterruptedException e) {
           fail("Exception .. interrupted.." + e);
         }
@@ -378,15 +394,14 @@
     });
   }
 
-  private void verifyACLPersistence() throws IOException,
-      InterruptedException {
+  private void verifyACLPersistence() throws Exception {
 
     // Set the job up.
     final Configuration myConf = mr.createJobConf();
-    myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "user2 group2");
+    myConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague + " group2");
 
     // Submit the job as user1
-    Job job = submitJobAsUser(myConf, "user1");
+    Job job = submitJobAsUser(myConf, jobSubmitter);
 
     final JobID jobId = job.getJobID();
 
@@ -406,11 +421,14 @@
 
     final Configuration myNewJobConf = mr.createJobConf();
     // Now verify view-job works off CompletedJobStore
-    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, "user2");
+    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, viewColleague);
+    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, qAdmin);
 
     // Only JobCounters is persisted on the JobStore. So test counters only.
     UserGroupInformation unauthorizedUGI =
-        UserGroupInformation.createUserForTesting("user3", new String[] {});
+        UserGroupInformation.createUserForTesting(
+            modifyColleague, new String[] {});
+
     unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
       @SuppressWarnings("null")
       @Override
@@ -432,7 +450,7 @@
           fail("AccessControlException expected..");
         } catch (IOException ioe) {
           assertTrue(ioe.getMessage().contains(
-              JobACLsManager.UNAUTHORIZED_JOB_ACCESS_ERROR + JobACL.VIEW_JOB));
+              " cannot perform operation " + Operation.VIEW_JOB_COUNTERS));
         } catch (InterruptedException e) {
           fail("Exception .. interrupted.." + e);
         }
diff --git a/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java b/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
index 5466f69..15d4ce1 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
@@ -21,10 +21,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 
diff --git a/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java b/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
index 7ca0891..dd2649e 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.tools.rumen;
 
-import java.text.ParseException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -72,7 +71,11 @@
       String submitTime = line.get("SUBMIT_TIME");
       String jobConf = line.get("JOBCONF");
       String user = line.get("USER");
+      if (user == null) {
+        user = "nulluser";
+      }
       String jobName = line.get("JOBNAME");
+      String jobQueueName = line.get("JOB_QUEUE");// could be null
 
       if (submitTime != null) {
         Job20LineHistoryEventEmitter that =
@@ -82,8 +85,8 @@
 
         Map<JobACL, AccessControlList> jobACLs =
           new HashMap<JobACL, AccessControlList>();
-        return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
-            : user, that.originalSubmitTime, jobConf, jobACLs);
+        return new JobSubmittedEvent(jobID, jobName, user,
+            that.originalSubmitTime, jobConf, jobACLs, jobQueueName);
       }
 
       return null;
diff --git a/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java b/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
index 59c7dec..ca77dda 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
@@ -544,6 +544,8 @@
     result.setJobName(event.getJobName());
     result.setUser(event.getUserName());
     result.setSubmitTime(event.getSubmitTime());
+    // job queue name is set when conf file is processed.
+    // See JobBuilder.process(Properties) method for details.
   }
 
   private void processJobStatusChangedEvent(JobStatusChangedEvent event) {