MAPREDUCE-1935. Makes the Distcp to work in a secure environment. Contributed by Boris Shkolnik.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@965629 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index a9f935f..58215ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -83,6 +83,9 @@
MAPREDUCE-1848. Put number of speculative, data local, rack local
tasks in JobTracker metrics. (Scott Chen via dhruba)
+ MAPREDUCE-1935. Makes the Distcp to work in a secure environment.
+ (Boris Shkolnik via ddas)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
index e6e4887..a290b49 100644
--- a/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
+++ b/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.net.URI;
-import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,18 +30,18 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -111,8 +110,34 @@
token.setService(new Text(fs_addr));
credentials.addToken(new Text(fs_addr), token);
- LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
+ LOG.info("Got dt for " + p + ";uri="+ fs_addr +
";t.service="+token.getService());
+ } else if (fs instanceof HftpFileSystem) {
+ String fs_addr = buildDTServiceName(fs.getUri());
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ continue;
+ }
+ //the initialize method of hftp, called via FileSystem.get() done
+ //earlier gets a delegation token
+
+ Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
+ if (t != null) {
+ credentials.addToken(new Text(fs_addr), t);
+
+ // in this case port in fs_addr is port for hftp request, but
+ // token's port is for RPC
+ // to find the correct DT we need to know the mapping between Hftp port
+ // and RPC one. hence this new setting in the conf.
+ URI uri = ((HftpFileSystem) fs).getUri();
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+buildDTServiceName(uri);
+ conf.set(key, t.getService().toString());
+ LOG.info("GOT dt for " + p + " and stored in conf as " + key + "="
+ + t.getService());
+
+ }
}
}
}
@@ -184,7 +209,7 @@
return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
- static String buildDTServiceName(URI uri) {
+ public static String buildDTServiceName(URI uri) {
int port = uri.getPort();
if(port == -1)
port = NameNode.DEFAULT_PORT;
diff --git a/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java b/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
index f7e12ea..b9ed0f3 100644
--- a/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
+++ b/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
@@ -20,16 +20,13 @@
import java.io.IOException;
import java.net.URI;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -43,10 +40,12 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@InterfaceAudience.Private
@@ -106,13 +105,10 @@
//managing the list of tokens using Map
// jobId=>List<tokens>
- private static List<DelegationTokenToRenew> delegationTokens =
- Collections.synchronizedList(new ArrayList<DelegationTokenToRenew>());
+ private static Set<DelegationTokenToRenew> delegationTokens =
+ Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
//adding token
private static void addTokenToList(DelegationTokenToRenew t) {
- //check to see if the token already exists in the list
- if (delegationTokens.contains(t))
- return;
delegationTokens.add(t);
}
diff --git a/src/tools/org/apache/hadoop/tools/DistCp.java b/src/tools/org/apache/hadoop/tools/DistCp.java
index 7a505d9..29658a8 100644
--- a/src/tools/org/apache/hadoop/tools/DistCp.java
+++ b/src/tools/org/apache/hadoop/tools/DistCp.java
@@ -34,28 +34,25 @@
import java.util.Stack;
import java.util.StringTokenizer;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -65,16 +62,15 @@
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -739,19 +735,22 @@
}
/** Sanity check for srcPath */
- private static void checkSrcPath(Configuration conf, List<Path> srcPaths,
- JobConf jobConf) throws IOException {
+ private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths)
+ throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
List<Path> unglobbed = new LinkedList<Path>();
// get tokens for all the required FileSystems..
+ // also set the renewer as the JobTracker for the hftp case
+ jobConf.set(HftpFileSystem.HFTP_RENEWER,
+ jobConf.get(JobTracker.JT_USER_NAME, ""));
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
- TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
for (Path p : srcPaths) {
- FileSystem fs = p.getFileSystem(conf);
+ FileSystem fs = p.getFileSystem(jobConf);
FileStatus[] inputs = fs.globStatus(p);
if(inputs != null && inputs.length > 0) {
@@ -782,7 +781,7 @@
JobConf job = createJobConf(conf);
- checkSrcPath(conf, args.srcs, job);
+ checkSrcPath(job, args.srcs);
if (args.preservedAttributes != null) {
job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
}