MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873. Contributed by Boris Shkolnik & Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@982087 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 78cb88b..a913556 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -223,6 +223,9 @@
     of configuration properties "mapreduce.job.name" and "mapred.job.name".
     (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873.
+    (Boris Shkolnik & Owen O'Malley via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index c1836cd..db2c5f8 100644
--- a/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -38,6 +38,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -46,6 +47,7 @@
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -340,12 +342,7 @@
       TokenCache.obtainTokensForNamenodes(job.getCredentials(),
           new Path[] { submitJobDir }, conf);
       
-      // load the binary file, if the user has one
-      String binaryTokenFilename = conf.get("mapreduce.job.credentials.binary");
-      if (binaryTokenFilename != null) {
-        job.getCredentials().readTokenStorageFile(
-            new Path("file:///" + binaryTokenFilename), conf);
-      }
+      populateTokenCache(conf, job.getCredentials());
 
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
@@ -364,7 +361,7 @@
       //
       // Now, actually submit the job (using the submit name)
       //
-      populateTokenCache(conf, job.getCredentials());
+      printTokens(jobId, job.getCredentials());
       status = submitClient.submitJob(
           jobId, submitJobDir.toString(), job.getCredentials());
       if (status != null) {
@@ -411,6 +408,21 @@
   }
   
 
+  
+  @SuppressWarnings("unchecked")
+  private void printTokens(JobID jobId,
+      Credentials credentials) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Printing tokens for job: " + jobId);
+      for(Token<?> token: credentials.getAllTokens()) {
+        if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
+          LOG.debug("Submitting with " +
+              DFSClient.stringifyToken((Token<org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier>) token));
+        }
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private <T extends InputSplit>
   int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
@@ -494,11 +506,18 @@
     }
   }
   
-  // get secret keys and tokens and store them into TokenCache
   @SuppressWarnings("unchecked")
-  private void populateTokenCache(Configuration conf, Credentials credentials)
-      throws IOException {
-    // create TokenStorage object with user secretKeys
+  private void readTokensFromFiles(Configuration conf, Credentials credentials)
+  throws IOException {
+    // add tokens and secrets coming from a token storage file
+    String binaryTokenFilename =
+      conf.get("mapreduce.job.credentials.binary");
+    if (binaryTokenFilename != null) {
+      Credentials binary = Credentials.readTokenStorageFile(
+          new Path("file:///" + binaryTokenFilename), conf);
+      credentials.addAll(binary);
+    }
+    // add secret keys coming from a json file
     String tokensFileName = conf.get("mapreduce.job.credentials.json");
     if(tokensFileName != null) {
       LOG.info("loading user's secret keys from " + tokensFileName);
@@ -523,10 +542,17 @@
       if(json_error)
         LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
     }
-    
+  }
+
+  //get secret keys and tokens and store them into TokenCache
+  @SuppressWarnings("unchecked")
+  private void populateTokenCache(Configuration conf, Credentials credentials) 
+  throws IOException{
+    readTokensFromFiles(conf, credentials);
     // add the delegation tokens from configuration
     String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
-    LOG.info("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+    LOG.debug("adding the following namenodes' delegation tokens:" + 
+        Arrays.toString(nameNodes));
     if(nameNodes != null) {
       Path [] ps = new Path[nameNodes.length];
       for(int i=0; i< nameNodes.length; i++) {
diff --git a/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
index 92339a8..b39d90b 100644
--- a/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
+++ b/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
@@ -106,74 +106,40 @@
     // get jobtracker principal id (for the renewer)
     KerberosName jtKrbName = 
       new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
-    Text delegTokenRenewer = new Text(jtKrbName.getShortName());
+    
+    String delegTokenRenewer = jtKrbName.getShortName();
+    boolean readFile = true;
 
-    boolean notReadFile = true;
-
-    // TODO: Connecting to the namenode is not required in the case,
-    // where we already have the credentials in the file
-    if(fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
-      URI uri = dfs.getUri();
-      String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
-
-      // see if we already have the token
-      Token<DelegationTokenIdentifier> token = 
-        TokenCache.getDelegationToken(credentials, fs_addr); 
-      if(token != null) {
-        LOG.debug("DT for " + token.getService()  + " is already present");
-        return;
-      }
-      if (notReadFile) { //read the file only once
+    String fsName = fs.getCanonicalServiceName();
+    if (TokenCache.getDelegationToken(credentials, fsName) == null) {
+      //TODO: Need to come up with a better place to put
+      //this block of code to do with reading the file
+      if (readFile) {
+        readFile = false;
         String binaryTokenFilename =
           conf.get("mapreduce.job.credentials.binary");
         if (binaryTokenFilename != null) {
-          credentials.readTokenStorageFile(new Path("file:///" +  
-              binaryTokenFilename), conf);
+          Credentials binary;
+          try {
+            binary = Credentials.readTokenStorageFile(
+                new Path("file:///" +  binaryTokenFilename), conf);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          credentials.addAll(binary);
         }
-        notReadFile = false;
-        token = 
-          TokenCache.getDelegationToken(credentials, fs_addr); 
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
+        if (TokenCache.getDelegationToken(credentials, fsName) != null) {
+          LOG.debug("DT for " + fsName  + " is already present");
           return;
         }
       }
-      // get the token
-      token = dfs.getDelegationToken(delegTokenRenewer);
-      if(token==null) 
-        throw new IOException("Token from " + fs_addr + " is null");
-
-      token.setService(new Text(fs_addr));
-      credentials.addToken(new Text(fs_addr), token);
-      LOG.info("Got dt for " + uri + ";uri="+ fs_addr + 
-          ";t.service="+token.getService());
-    } else if (fs instanceof HftpFileSystem) {
-      HftpFileSystem hfs = (HftpFileSystem)fs;
-      URI uri = hfs.getUri();
-      String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
-      Token<DelegationTokenIdentifier> token = 
-        TokenCache.getDelegationToken(credentials, fs_addr); 
-      if(token != null) {
-        LOG.debug("DT for " + token.getService()  + " is already present");
-        return;
-      }
-      //the initialize method of hftp, called via FileSystem.get() done
-      //earlier gets a delegation token
-
-      Token<? extends TokenIdentifier> t = hfs.getDelegationToken();
-      if (t != null) {
-        credentials.addToken(new Text(fs_addr), t);
-
-        // in this case port in fs_addr is port for hftp request, but
-        // token's port is for RPC
-        // to find the correct DT we need to know the mapping between Hftp port 
-        // and RPC one. hence this new setting in the conf.
-        String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+fs_addr;
-        conf.set(key, t.getService().toString());
-        LOG.info("GOT dt for " + uri + " and stored in conf as " + key + "=" 
-            + t.getService());
-
+      Token<?> token = fs.getDelegationToken(delegTokenRenewer);
+      if (token != null) {
+        Text fsNameText = new Text(fsName);
+        token.setService(fsNameText);
+        credentials.addToken(fsNameText, token);
+        LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + 
+            ";t.service="+token.getService());
       }
     }
   }
@@ -213,14 +179,14 @@
   public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
     Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
-    
-    Credentials ts = new Credentials();
-    ts.readTokenStorageFile(localJobTokenFile, conf);
+
+    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
 
     if(LOG.isDebugEnabled()) {
-      LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
-        +"; num of sec keys  = " + ts.numberOfSecretKeys() + " Number of tokens " + 
-        ts.numberOfTokens());
+      LOG.debug("Task: Loaded jobTokenFile from: "+
+          localJobTokenFile.toUri().getPath() 
+          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
+          " Number of tokens " +  ts.numberOfTokens());
     }
     return ts;
   }
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java b/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
index f8cce42..1b7fcc4 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
@@ -271,7 +271,6 @@
   
   @Test
   public void testGetTokensForNamenodes() throws IOException {
-    FileSystem fs = dfsCluster.getFileSystem();
     
     Credentials credentials = new Credentials();
     TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1,
@@ -305,13 +304,15 @@
 
     DelegationTokenSecretManager dtSecretManager = 
       dfsCluster.getNamesystem().getDelegationTokenSecretManager();
+    String renewer = "renewer";
+    jConf.set(JTConfig.JT_USER_NAME,renewer);
     DelegationTokenIdentifier dtId = 
-      new DelegationTokenIdentifier(new Text("user"), new Text("renewer"), null);
+      new DelegationTokenIdentifier(new Text("user"), new Text(renewer), null);
     final Token<DelegationTokenIdentifier> t = 
       new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
 
     final URI uri = new URI("hftp://host:2222/file1");
-    String fs_addr = 
+    final String fs_addr = 
       SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
     t.setService(new Text(fs_addr));
 
@@ -329,12 +330,19 @@
       public Token<DelegationTokenIdentifier>  answer(InvocationOnMock invocation)
       throws Throwable {
         return t;
-      }}).when(hfs).getDelegationToken();
-
-
+      }}).when(hfs).getDelegationToken(renewer);
+    
+    //when(hfs.getCanonicalServiceName).thenReturn(fs_addr);
+    Mockito.doAnswer(new Answer<String>(){
+      @Override
+      public String answer(InvocationOnMock invocation)
+      throws Throwable {
+        return fs_addr;
+      }}).when(hfs).getCanonicalServiceName();
+    
     Credentials credentials = new Credentials();
     Path p = new Path(uri.toString());
-    System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr);
+    System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr + "; rn=" + renewer);
     TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf);
 
     Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
@@ -350,11 +358,6 @@
       }
       assertTrue("didn't find token for " + p, found);
     }
-    // also check the conf value
-    String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + fs_addr;
-    String confKey = jConf.get(key);
-    assertEquals("jconf key for HFTP DT is not correct", confKey,
-        t.getService().toString());
   }
 
 }
diff --git a/src/tools/org/apache/hadoop/tools/DistCp.java b/src/tools/org/apache/hadoop/tools/DistCp.java
index 29658a8..5cd49f4 100644
--- a/src/tools/org/apache/hadoop/tools/DistCp.java
+++ b/src/tools/org/apache/hadoop/tools/DistCp.java
@@ -740,10 +740,6 @@
     List<IOException> rslt = new ArrayList<IOException>();
     List<Path> unglobbed = new LinkedList<Path>();
     
-    // get tokens for all the required FileSystems..
-    // also set the renewer as the JobTracker for the hftp case
-    jobConf.set(HftpFileSystem.HFTP_RENEWER, 
-        jobConf.get(JobTracker.JT_USER_NAME, ""));
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);