MAPREDUCE-2067. Distinct minicluster services (e.g. NN and JT) overwrite each other's service policies. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1002905 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fc96883..199c3b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -310,6 +310,9 @@
MAPREDUCE-1989. Fixes error message in gridmix when user resolver is set
and no user list is given. (Ravi Gummadi via amareshwari)
+ MAPREDUCE-2067. Distinct minicluster services (e.g. NN and JT) overwrite
+ each other's service policies. (Aaron T. Myers via tomwhite)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 851fba5..06238f6 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -1451,12 +1452,6 @@
= conf.getClass(JT_TASK_SCHEDULER,
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
-
- // Set service-level authorization security policy
- if (conf.getBoolean(
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
- ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
- }
int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10);
this.interTrackerServer = RPC.getServer(ClientProtocol.class,
@@ -1464,6 +1459,13 @@
addr.getHostName(),
addr.getPort(), handlerCount,
false, conf, secretManager);
+
+ // Set service-level authorization security policy
+ if (conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ this.interTrackerServer.refreshServiceAcl(conf, new MapReducePolicyProvider());
+ }
+
if (LOG.isDebugEnabled()) {
Properties p = System.getProperties();
for (Iterator it = p.keySet().iterator(); it.hasNext();) {
@@ -4353,10 +4355,10 @@
@Override
public void refreshServiceAcl() throws IOException {
if (!conf.getBoolean(
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
throw new AuthorizationException("Service Level Authorization not enabled!");
}
- ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
+ this.interTrackerServer.refreshServiceAcl(conf, new MapReducePolicyProvider());
}
@Override
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index 0b125e2..d24b626 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -100,7 +101,6 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
@@ -642,24 +642,25 @@
this.jvmManager = new JvmManager(this);
- // Set service-level authorization security policy
- if (this.fConf.getBoolean(
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
- PolicyProvider policyProvider =
- (PolicyProvider)(ReflectionUtils.newInstance(
- this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
- MapReducePolicyProvider.class, PolicyProvider.class),
- this.fConf));
- ServiceAuthorizationManager.refresh(fConf, policyProvider);
- }
-
// RPC initialization
- int max = maxMapSlots > maxReduceSlots ?
+ int max = maxMapSlots > maxReduceSlots ?
maxMapSlots : maxReduceSlots;
//set the num handlers to max*2 since canCommit may wait for the duration
//of a heartbeat RPC
this.taskReportServer = RPC.getServer(this.getClass(), this, bindAddress,
tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
+
+ // Set service-level authorization security policy
+ if (this.fConf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ PolicyProvider policyProvider =
+ (PolicyProvider)(ReflectionUtils.newInstance(
+ this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
+ MapReducePolicyProvider.class, PolicyProvider.class),
+ this.fConf));
+ this.taskReportServer.refreshServiceAcl(fConf, policyProvider);
+ }
+
this.taskReportServer.start();
// get the assigned address
diff --git a/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java b/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
index 6a14707..38b0aee 100644
--- a/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
+++ b/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
@@ -21,6 +21,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -28,6 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf;
@@ -55,10 +57,35 @@
// Start the mini clusters
dfs = new MiniDFSCluster(conf, slaves, true, null);
+
+ // Ensure that the protocols authorized on the name node are only the HDFS protocols.
+ Set<Class<?>> protocolsWithAcls = NameNodeAdapter.getRpcServer(dfs.getNameNode())
+ .getServiceAuthorizationManager().getProtocolsWithAcls();
+ Service[] hdfsServices = new HDFSPolicyProvider().getServices();
+ for (Service service : hdfsServices) {
+ if (!protocolsWithAcls.contains(service.getProtocol()))
+ fail("service authorization manager has no entry for protocol " + service.getProtocol());
+ }
+ if (hdfsServices.length != protocolsWithAcls.size())
+ fail("there should be an entry for every HDFS service in the protocols with ACLs map");
+
fileSys = dfs.getFileSystem();
JobConf mrConf = new JobConf(conf);
mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1,
null, null, mrConf);
+
+ // Ensure that the protocols configured for the name node did not change
+ // when the MR cluster was started.
+ protocolsWithAcls = NameNodeAdapter.getRpcServer(dfs.getNameNode())
+ .getServiceAuthorizationManager().getProtocolsWithAcls();
+ hdfsServices = new HDFSPolicyProvider().getServices();
+ for (Service service : hdfsServices) {
+ if (!protocolsWithAcls.contains(service.getProtocol()))
+ fail("service authorization manager has no entry for protocol " + service.getProtocol());
+ }
+ if (hdfsServices.length != protocolsWithAcls.size())
+ fail("there should be an entry for every HDFS service in the protocols with ACLs map");
+
// make cleanup inline sothat validation of existence of these directories
// can be done
mr.setInlineCleanupThreads();