MAPREDUCE-1718. Fixes a bug in the construction of jobconf key for the mapping that the tasks use at runtime for looking up delegation tokens. Contributed by Boris Shkolnik.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@967297 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 703763b..17e6314 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -195,6 +195,10 @@
MAPREDUCE-1925. Fix failing TestRumenJobTraces. (Ravi Gummadi via cdouglas)
+ MAPREDUCE-1718. Fixes a bug in the construction of jobconf key for the
+ mapping that the tasks use at runtime for looking up delegation tokens.
+ (Boris Shkolnik via ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
index fcb7509..92339a8 100644
--- a/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
+++ b/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
@@ -36,9 +36,9 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosName;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,82 +86,99 @@
static void obtainTokensForNamenodesInternal(Credentials credentials,
Path[] ps, Configuration conf) throws IOException {
- // get jobtracker principal id (for the renewer)
- KerberosName jtKrbName = new KerberosName(conf.get(JTConfig.JT_USER_NAME,
- ""));
- Text delegTokenRenewer = new Text(jtKrbName.getShortName());
- boolean notReadFile = true;
for(Path p: ps) {
- // TODO: Connecting to the namenode is not required in the case,
- // where we already have the credentials in the file
FileSystem fs = FileSystem.get(p.toUri(), conf);
- if(fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- URI uri = fs.getUri();
- String fs_addr = buildDTServiceName(uri);
-
- // see if we already have the token
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- if (notReadFile) { //read the file only once
- String binaryTokenFilename =
- conf.get("mapreduce.job.credentials.binary");
- if (binaryTokenFilename != null) {
- credentials.readTokenStorageFile(new Path("file:///" +
- binaryTokenFilename), conf);
- }
- notReadFile = false;
- token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- }
- // get the token
- token = dfs.getDelegationToken(delegTokenRenewer);
- if(token==null)
- throw new IOException("Token from " + fs_addr + " is null");
-
- token.setService(new Text(fs_addr));
- credentials.addToken(new Text(fs_addr), token);
- LOG.info("Got dt for " + p + ";uri="+ fs_addr +
- ";t.service="+token.getService());
- } else if (fs instanceof HftpFileSystem) {
- String fs_addr = buildDTServiceName(fs.getUri());
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- //the initialize method of hftp, called via FileSystem.get() done
- //earlier gets a delegation token
-
- Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
- if (t != null) {
- credentials.addToken(new Text(fs_addr), t);
-
- // in this case port in fs_addr is port for hftp request, but
- // token's port is for RPC
- // to find the correct DT we need to know the mapping between Hftp port
- // and RPC one. hence this new setting in the conf.
- URI uri = ((HftpFileSystem) fs).getUri();
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+buildDTServiceName(uri);
- conf.set(key, t.getService().toString());
- LOG.info("GOT dt for " + p + " and stored in conf as " + key + "="
- + t.getService());
-
- }
- }
+ obtainTokensForNamenodesInternal(fs, credentials, conf);
}
}
/**
+ * get delegation token for a specific FS
+ * @param fs
+ * @param credentials
+ * @param p
+ * @param conf
+ * @throws IOException
+ */
+ static void obtainTokensForNamenodesInternal(FileSystem fs,
+ Credentials credentials, Configuration conf) throws IOException {
+
+ // get jobtracker principal id (for the renewer)
+ KerberosName jtKrbName =
+ new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
+ Text delegTokenRenewer = new Text(jtKrbName.getShortName());
+
+ boolean notReadFile = true;
+
+ // TODO: Connecting to the namenode is not required in the case,
+ // where we already have the credentials in the file
+ if(fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ URI uri = dfs.getUri();
+ String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+
+ // see if we already have the token
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ return;
+ }
+ if (notReadFile) { //read the file only once
+ String binaryTokenFilename =
+ conf.get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ credentials.readTokenStorageFile(new Path("file:///" +
+ binaryTokenFilename), conf);
+ }
+ notReadFile = false;
+ token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ return;
+ }
+ }
+ // get the token
+ token = dfs.getDelegationToken(delegTokenRenewer);
+ if(token==null)
+ throw new IOException("Token from " + fs_addr + " is null");
+
+ token.setService(new Text(fs_addr));
+ credentials.addToken(new Text(fs_addr), token);
+ LOG.info("Got dt for " + uri + ";uri="+ fs_addr +
+ ";t.service="+token.getService());
+ } else if (fs instanceof HftpFileSystem) {
+ HftpFileSystem hfs = (HftpFileSystem)fs;
+ URI uri = hfs.getUri();
+ String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ return;
+ }
+ //the initialize method of hftp, called via FileSystem.get() done
+ //earlier gets a delegation token
+
+ Token<? extends TokenIdentifier> t = hfs.getDelegationToken();
+ if (t != null) {
+ credentials.addToken(new Text(fs_addr), t);
+
+ // in this case port in fs_addr is port for hftp request, but
+ // token's port is for RPC
+ // to find the correct DT we need to know the mapping between Hftp port
+ // and RPC one. hence this new setting in the conf.
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+fs_addr;
+ conf.set(key, t.getService().toString());
+ LOG.info("GOT dt for " + uri + " and stored in conf as " + key + "="
+ + t.getService());
+
+ }
+ }
+ }
+
+ /**
* file name used on HDFS for generated job token
*/
@InterfaceAudience.Private
@@ -225,15 +242,4 @@
public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
-
- public static String buildDTServiceName(URI uri) {
- int port = uri.getPort();
- if(port == -1)
- port = NameNode.DEFAULT_PORT;
-
- // build the service name string "ip:port"
- StringBuffer sb = new StringBuffer();
- sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
- return sb.toString();
- }
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java b/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
index 8e5ac65..f8cce42 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
@@ -22,9 +22,12 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.HashMap;
@@ -37,8 +40,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -46,18 +52,20 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Assert;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestTokenCache {
private static final int NUM_OF_KEYS = 10;
@@ -79,7 +87,7 @@
dts_size = dts.size();
- if(dts.size() != 2) { // one job token and one delegation token
+ if(dts_size != 2) { // one job token and one delegation token
throw new RuntimeException("tokens are not available"); // fail the test
}
@@ -270,7 +278,8 @@
p2 }, jConf);
// this token is keyed by hostname:port key.
- String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
+ String fs_addr =
+ SecurityUtil.buildDTServiceName(p1.toUri(), NameNode.DEFAULT_PORT);
Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(
credentials, fs_addr);
System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt);
@@ -289,4 +298,63 @@
assertTrue("didn't find token for " + p1 ,found);
}
}
+
+ @Test
+ public void testGetTokensForHftpFS() throws IOException, URISyntaxException {
+ HftpFileSystem hfs = mock(HftpFileSystem.class);
+
+ DelegationTokenSecretManager dtSecretManager =
+ dfsCluster.getNamesystem().getDelegationTokenSecretManager();
+ DelegationTokenIdentifier dtId =
+ new DelegationTokenIdentifier(new Text("user"), new Text("renewer"), null);
+ final Token<DelegationTokenIdentifier> t =
+ new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
+
+ final URI uri = new URI("hftp://host:2222/file1");
+ String fs_addr =
+ SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+ t.setService(new Text(fs_addr));
+
+ //when(hfs.getUri()).thenReturn(uri);
+ Mockito.doAnswer(new Answer<URI>(){
+ @Override
+ public URI answer(InvocationOnMock invocation)
+ throws Throwable {
+ return uri;
+ }}).when(hfs).getUri();
+
+ //when(hfs.getDelegationToken()).thenReturn((Token<? extends TokenIdentifier>) t);
+ Mockito.doAnswer(new Answer<Token<DelegationTokenIdentifier>>(){
+ @Override
+ public Token<DelegationTokenIdentifier> answer(InvocationOnMock invocation)
+ throws Throwable {
+ return t;
+ }}).when(hfs).getDelegationToken();
+
+
+ Credentials credentials = new Credentials();
+ Path p = new Path(uri.toString());
+ System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr);
+ TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf);
+
+ Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
+ assertEquals("number of tokens is not 1", 1, tns.size());
+
+ boolean found = false;
+ for(Token<? extends TokenIdentifier> tt: tns) {
+ System.out.println("token="+tt);
+ if(tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
+ tt.getService().equals(new Text(fs_addr))) {
+ found = true;
+ assertEquals("different token", tt, t);
+ }
+ assertTrue("didn't find token for " + p, found);
+ }
+ // also check the conf value
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + fs_addr;
+ String confKey = jConf.get(key);
+ assertEquals("jconf key for HFTP DT is not correct", confKey,
+ t.getService().toString());
+ }
+
}