MAPREDUCE-2067.  Distinct minicluster services (e.g. NN and JT) overwrite each other's service policies.  Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1002905 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fc96883..199c3b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -310,6 +310,9 @@
     MAPREDUCE-1989. Fixes error message in gridmix when user resolver is set
     and no user list is given. (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-2067.  Distinct minicluster services (e.g. NN and JT) overwrite
+    each other's service policies.  (Aaron T. Myers via tomwhite)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 851fba5..06238f6 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -58,6 +58,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -1451,12 +1452,6 @@
       = conf.getClass(JT_TASK_SCHEDULER,
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
-                                           
-    // Set service-level authorization security policy
-    if (conf.getBoolean(
-          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
-    }
     
     int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10);
     this.interTrackerServer = RPC.getServer(ClientProtocol.class,
@@ -1464,6 +1459,13 @@
                                             addr.getHostName(), 
                                             addr.getPort(), handlerCount, 
                                             false, conf, secretManager);
+
+    // Set service-level authorization security policy
+    if (conf.getBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      this.interTrackerServer.refreshServiceAcl(conf, new MapReducePolicyProvider());
+    }
+
     if (LOG.isDebugEnabled()) {
       Properties p = System.getProperties();
       for (Iterator it = p.keySet().iterator(); it.hasNext();) {
@@ -4353,10 +4355,10 @@
   @Override
   public void refreshServiceAcl() throws IOException {
     if (!conf.getBoolean(
-            ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+        CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
       throw new AuthorizationException("Service Level Authorization not enabled!");
     }
-    ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
+    this.interTrackerServer.refreshServiceAcl(conf, new MapReducePolicyProvider());
   }
 
   @Override
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index 0b125e2..d24b626 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -57,6 +57,7 @@
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -100,7 +101,6 @@
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
@@ -642,24 +642,25 @@
     
     this.jvmManager = new JvmManager(this);
 
-    // Set service-level authorization security policy
-    if (this.fConf.getBoolean(
-          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      PolicyProvider policyProvider = 
-        (PolicyProvider)(ReflectionUtils.newInstance(
-            this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
-                MapReducePolicyProvider.class, PolicyProvider.class), 
-            this.fConf));
-      ServiceAuthorizationManager.refresh(fConf, policyProvider);
-    }
-    
     // RPC initialization
-    int max = maxMapSlots > maxReduceSlots ? 
+    int max = maxMapSlots > maxReduceSlots ?
                        maxMapSlots : maxReduceSlots;
     //set the num handlers to max*2 since canCommit may wait for the duration
     //of a heartbeat RPC
     this.taskReportServer = RPC.getServer(this.getClass(), this, bindAddress,
         tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
+
+    // Set service-level authorization security policy
+    if (this.fConf.getBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      PolicyProvider policyProvider = 
+        (PolicyProvider)(ReflectionUtils.newInstance(
+            this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                MapReducePolicyProvider.class, PolicyProvider.class), 
+            this.fConf));
+      this.taskReportServer.refreshServiceAcl(fConf, policyProvider);
+    }
+
     this.taskReportServer.start();
 
     // get the assigned address
diff --git a/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java b/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
index 6a14707..38b0aee 100644
--- a/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
+++ b/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
@@ -21,6 +21,7 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,6 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobConf;
@@ -55,10 +57,35 @@
       
       // Start the mini clusters
       dfs = new MiniDFSCluster(conf, slaves, true, null);
+
+      // Ensure that the protocols authorized on the name node are only the HDFS protocols.
+      Set<Class<?>> protocolsWithAcls = NameNodeAdapter.getRpcServer(dfs.getNameNode())
+          .getServiceAuthorizationManager().getProtocolsWithAcls();
+      Service[] hdfsServices = new HDFSPolicyProvider().getServices();
+      for (Service service : hdfsServices) {
+        if (!protocolsWithAcls.contains(service.getProtocol()))
+          fail("service authorization manager has no entry for protocol " + service.getProtocol());
+      }
+      if (hdfsServices.length != protocolsWithAcls.size())
+        fail("there should be an entry for every HDFS service in the protocols with ACLs map");
+
       fileSys = dfs.getFileSystem();
       JobConf mrConf = new JobConf(conf);
       mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, 
                              null, null, mrConf);
+
+      // Ensure that the protocols configured for the name node did not change
+      // when the MR cluster was started.
+      protocolsWithAcls = NameNodeAdapter.getRpcServer(dfs.getNameNode())
+          .getServiceAuthorizationManager().getProtocolsWithAcls();
+      hdfsServices = new HDFSPolicyProvider().getServices();
+      for (Service service : hdfsServices) {
+        if (!protocolsWithAcls.contains(service.getProtocol()))
+          fail("service authorization manager has no entry for protocol " + service.getProtocol());
+      }
+      if (hdfsServices.length != protocolsWithAcls.size())
+        fail("there should be an entry for every HDFS service in the protocols with ACLs map");
+
       // make cleanup inline sothat validation of existence of these directories
       // can be done
       mr.setInlineCleanupThreads();