Apache Hadoop 2.0.3-alpha-rc0 release.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/tags/release-2.0.3-alpha-rc0@1443313 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-common-project/hadoop-common/src/main/docs/releasenotes.html b/hadoop-common-project/hadoop-common/src/main/docs/releasenotes.html
index aa7bc11..6af2f47 100644
--- a/hadoop-common-project/hadoop-common/src/main/docs/releasenotes.html
+++ b/hadoop-common-project/hadoop-common/src/main/docs/releasenotes.html
@@ -769,6 +769,10 @@
      Major new feature reported by Arun C Murthy and fixed by Arun C Murthy (capacityscheduler , scheduler)<br>
      <b>Enhance CS to schedule accounting for both memory and cpu cores</b><br>
      <blockquote>With YARN being a general purpose system, it would be useful for several applications (MPI et al) to specify not just memory but also CPU (cores) for their resource requirements. Thus, it would be useful to the CapacityScheduler to account for both.</blockquote></li>
+<li> <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4977">MAPREDUCE-4977</a>.
+     Major improvement reported by Alejandro Abdelnur and fixed by Alejandro Abdelnur (documentation)<br>
+     <b>Documentation for pluggable shuffle and pluggable sort</b><br>
+     <blockquote></blockquote></li>
 <li> <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4971">MAPREDUCE-4971</a>.
      Minor improvement reported by Arun C Murthy and fixed by Arun C Murthy <br>
      <b>Minor extensibility enhancements </b><br>
@@ -906,6 +910,10 @@
      Major bug reported by Jason Lowe and fixed by Jason Lowe (mr-am)<br>
      <b>JobImpl.finished doesn't expect ERROR as a final job state</b><br>
      <blockquote></blockquote></li>
+<li> <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4822">MAPREDUCE-4822</a>.
+     Trivial improvement reported by Robert Joseph Evans and fixed by Chu Tong (jobhistoryserver)<br>
+     <b>Unnecessary conversions in History Events</b><br>
+     <blockquote></blockquote></li>
 <li> <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4819">MAPREDUCE-4819</a>.
      Blocker bug reported by Jason Lowe and fixed by Bikas Saha (mr-am)<br>
      <b>AM can rerun job after reporting final job status to the client</b><br>
@@ -1182,6 +1190,10 @@
      Major bug reported by Aaron T. Myers and fixed by Aaron T. Myers (namenode)<br>
      <b>2NN will fail to checkpoint after an HDFS upgrade from a pre-federation version of HDFS</b><br>
      <blockquote></blockquote></li>
+<li> <a href="https://issues.apache.org/jira/browse/HDFS-4458">HDFS-4458</a>.
+     Major bug reported by wenwupeng and fixed by Binglin Chang (balancer)<br>
+     <b>start balancer failed with "Failed to create file [/system/balancer.id]"  if configure IP on fs.defaultFS</b><br>
+     <blockquote></blockquote></li>
 <li> <a href="https://issues.apache.org/jira/browse/HDFS-4456">HDFS-4456</a>.
      Major new feature reported by Tsz Wo (Nicholas), SZE and fixed by Plamen Jeliazkov (webhdfs)<br>
      <b>Add concat to HttpFS and WebHDFS REST API docs</b><br>
@@ -1936,6 +1948,10 @@
      Major new feature reported by Dmytro Molkov and fixed by Konstantin Shvachko (namenode)<br>
      <b>Plugable block id generation </b><br>
      <blockquote></blockquote></li>
+<li> <a href="https://issues.apache.org/jira/browse/HADOOP-9289">HADOOP-9289</a>.
+     Blocker bug reported by Daryn Sharp and fixed by Daryn Sharp (fs)<br>
+     <b>FsShell rm -f fails for non-matching globs</b><br>
+     <blockquote></blockquote></li>
 <li> <a href="https://issues.apache.org/jira/browse/HADOOP-9278">HADOOP-9278</a>.
      Major bug reported by Chris Nauroth and fixed by Chris Nauroth (fs)<br>
      <b>HarFileSystem may leak file handle</b><br>
@@ -2084,6 +2100,10 @@
      Major bug reported by Robert Parker and fixed by Robert Parker <br>
      <b>Hadoop-Common-0.23-Build Fails to build in Jenkins</b><br>
      <blockquote></blockquote></li>
+<li> <a href="https://issues.apache.org/jira/browse/HADOOP-9070">HADOOP-9070</a>.
+     Blocker sub-task reported by Daryn Sharp and fixed by Daryn Sharp (ipc)<br>
+     <b>Kerberos SASL server cannot find kerberos key</b><br>
+     <blockquote></blockquote></li>
 <li> <a href="https://issues.apache.org/jira/browse/HADOOP-9067">HADOOP-9067</a>.
      Minor test reported by Ivan A. Veselovsky and fixed by Ivan A. Veselovsky <br>
      <b>provide test for method org.apache.hadoop.fs.LocalFileSystem.reportChecksumFailure(Path, FSDataInputStream, long, FSDataInputStream, long)</b><br>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 35a8155..d9f78c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -466,6 +466,10 @@
     HDFS-4468.  Use the new StringUtils methods added by HADOOP-9252 and fix
     TestHDFSCLI and TestQuota. (szetszwo)
 
+    HDFS-4458. In DFSUtil.getNameServiceUris(..), convert default fs URI using
+    NetUtils.createSocketAddr(..) for being consistent with other addresses.
+    (Binglin Chang via szetszwo)
+
   BREAKDOWN OF HDFS-3077 SUBTASKS
 
     HDFS-3077. Quorum-based protocol for reading and writing edit logs.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 613f8de..773b1ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -764,6 +764,13 @@
     
     // Add the default URI if it is an HDFS URI.
     URI defaultUri = FileSystem.getDefaultUri(conf);
+    // checks if defaultUri is ip:port format
+    // and convert it to hostname:port format
+    if (defaultUri != null && (defaultUri.getPort() != -1)) {
+      defaultUri = createUri(defaultUri.getScheme(),
+          NetUtils.createSocketAddr(defaultUri.getHost(), 
+              defaultUri.getPort()));
+    }
     if (defaultUri != null &&
         HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
         !nonPreferredUris.contains(defaultUri)) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index 06b4ed0..75596cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -619,6 +619,16 @@
     
     assertEquals(1, uris.size());
     assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
+
+    // Make sure when config FS_DEFAULT_NAME_KEY using IP address,
+    // it will automatically convert it to hostname
+    conf = new HdfsConfiguration();
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:8020");
+    uris = DFSUtil.getNameServiceUris(conf);
+    assertEquals(1, uris.size());
+    for (URI uri : uris) {
+      assertFalse(uri.getHost().equals("127.0.0.1"));
+    }
   }
   
   @Test
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index eb3f429..2bd09b6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -64,6 +64,9 @@
     MAPREDUCE-4971. Minor extensibility enhancements to Counters & 
     FileOutputFormat. (Arun C Murthy via sseth)
 
+    MAPREDUCE-4977. Documentation for pluggable shuffle and pluggable sort. 
+    (tucu)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of
@@ -535,6 +538,9 @@
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
     number of map completion event type conversions. (Jason Lowe via sseth)
 
+    MAPREDUCE-4822. Unnecessary conversions in History Events. (Chu Tong via
+    jlowe)
+
   BUG FIXES
 
     MAPREDUCE-4458. Warn if java.library.path is used for AM or Task
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index d4f9fa3..62df2aa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -178,7 +178,7 @@
 
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(taskType.toString());
+    return taskType;
   }
   /** Get the task status */
   public String getTaskStatus() { return taskStatus.toString(); }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index 10b8c1f..a779fca 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -176,11 +176,11 @@
   public TaskID getTaskId() { return attemptId.getTaskID(); }
   /** Get the attempt id */
   public TaskAttemptID getAttemptId() {
-    return TaskAttemptID.forName(attemptId.toString());
+    return attemptId;
   }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(taskType.toString());
+    return taskType;
   }
   /** Get the task status */
   public String getTaskStatus() { return taskStatus.toString(); }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index a62ca38..78b9ca9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -105,11 +105,11 @@
   public TaskID getTaskId() { return attemptId.getTaskID(); }
   /** Get the task attempt id */
   public TaskAttemptID getAttemptId() {
-    return TaskAttemptID.forName(attemptId.toString());
+    return attemptId;
   }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(taskType.toString());
+    return taskType;
   }
   /** Get the task status */
   public String getTaskStatus() { return taskStatus.toString(); }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index 55de80c..edbf009 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -95,14 +95,10 @@
   }
 
   /** Get task id */
-  public TaskID getTaskId() { return TaskID.forName(taskid.toString()); }
+  public TaskID getTaskId() { return taskid; }
   /** Get successful task attempt id */
   public TaskAttemptID getSuccessfulTaskAttemptId() {
-    if(successfulAttemptId != null)
-    {
-      return TaskAttemptID.forName(successfulAttemptId.toString());
-    }
-    return null;
+    return successfulAttemptId;
   }
   /** Get the task finish time */
   public long getFinishTime() { return finishTime; }
@@ -110,7 +106,7 @@
   public Counters getCounters() { return counters; }
   /** Get task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(taskType.toString());
+    return taskType;
   }
   /** Get task status */
   public String getTaskStatus() { return status.toString(); }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
new file mode 100644
index 0000000..8dd2f2e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
@@ -0,0 +1,96 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~   http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+  ---
+  Hadoop Map Reduce Next Generation-${project.version} - Pluggable Shuffle and Pluggable Sort
+  ---
+  ---
+  ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort
+
+  \[ {{{./index.html}Go Back}} \]
+
+* Introduction
+
+  The pluggable shuffle and pluggable sort capabilities allow replacing the 
+  built in shuffle and sort logic with alternate implementations. Example use 
+  cases for this are: using a different application protocol other than HTTP 
+  such as RDMA for shuffling data from the Map nodes to the Reducer nodes; or
+  replacing the sort logic with custom algorithms that enable Hash aggregation 
+  and Limit-N query.
+
+  <<IMPORTANT:>> The pluggable shuffle and pluggable sort capabilities are 
+  experimental and unstable. This means the provided APIs may change and break 
+  compatibility in future versions of Hadoop.
+
+* Implementing a Custom Shuffle and a Custom Sort 
+
+  A custom shuffle implementation requires a
+  <<<org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService>>> 
+  implementation class running in the NodeManagers and a 
+  <<<org.apache.hadoop.mapred.ShuffleConsumerPlugin>>> implementation class
+  running in the Reducer tasks.
+
+  The default implementations provided by Hadoop can be used as references:
+
+    * <<<org.apache.hadoop.mapred.ShuffleHandler>>>
+    
+    * <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>>
+
+  A custom sort implementation requires a <<<org.apache.hadoop.mapred.MapOutputCollector>>>
+  implementation class running in the Mapper tasks and (optionally, depending
+  on the sort implementation) a <<<org.apache.hadoop.mapred.ShuffleConsumerPlugin>>> 
+  implementation class running in the Reducer tasks.
+
+  The default implementations provided by Hadoop can be used as references:
+
+  * <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>>
+  
+  * <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>>
+
+* Configuration
+
+  Except for the auxiliary service running in the NodeManagers serving the 
+  shuffle (by default the <<<ShuffleHandler>>>), all the pluggable components 
+  run in the job tasks. This means, they can be configured on per job basis. 
+  The auxiliary service servicing the Shuffle must be configured in the 
+  NodeManagers configuration.
+
+** Job Configuration Properties (on per job basis):
+
+*--------------------------------------+---------------------+-----------------+
+| <<Property>>                         | <<Default Value>>   | <<Explanation>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> | <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>>         | The <<<ShuffleConsumerPlugin>>> implementation to use |
+*--------------------------------------+---------------------+-----------------+
+| <<<mapreduce.job.map.output.collector.class>>>   | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation to use |
+*--------------------------------------+---------------------+-----------------+
+
+  These properties can also be set in the <<<mapred-site.xml>>> to change the default values for all jobs.
+
+** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes:
+
+*--------------------------------------+---------------------+-----------------+
+| <<Property>>                         | <<Default Value>>   | <<Explanation>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<yarn.nodemanager.aux-services>>> | <<<...,mapreduce.shuffle>>>  | The auxiliary service name |
+*--------------------------------------+---------------------+-----------------+
+| <<<yarn.nodemanager.aux-services.mapreduce.shuffle.class>>>   | <<<org.apache.hadoop.mapred.ShuffleHandler>>> | The auxiliary service class to use |
+*--------------------------------------+---------------------+-----------------+
+
+  <<IMPORTANT:>> If setting an auxiliary service in addition the default 
+  <<<mapreduce.shuffle>>> service, then a new service key should be added to the
+  <<<yarn.nodemanager.aux-services>>> property, for example <<<mapred.shufflex>>>.
+  Then the property defining the corresponding class must be
+  <<<yarn.nodemanager.aux-services.mapreduce.shufflex.class>>>.
+  
\ No newline at end of file
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index 080ca0a..1263081 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -65,6 +65,7 @@
 
     <menu name="MapReduce" inherit="top">
       <item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
+      <item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
     </menu>
     
     <menu name="YARN" inherit="top">
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 910a6d1..801c802 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -204,6 +204,9 @@
     YARN-370. Fix SchedulerUtils to correctly round up the resource for
     containers. (Zhijie Shen via acmurthy) 
 
+    YARN-355. Fixes a bug where RM app submission could jam under load.
+    (Daryn Sharp via sseth)
+
 Release 2.0.2-alpha - 2012-09-07 
 
     YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
index 14994f9..eb84b31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
@@ -25,13 +25,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -47,8 +45,6 @@
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -199,30 +195,6 @@
   }
 
 
-  // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
-  // are part of ClientRMProtocol.
-  @Private
-  public long renewRMDelegationToken(DelegationToken rmToken)
-      throws YarnRemoteException {
-    RenewDelegationTokenRequest request = Records
-        .newRecord(RenewDelegationTokenRequest.class);
-    request.setDelegationToken(rmToken);
-    RenewDelegationTokenResponse response = rmClient
-        .renewDelegationToken(request);
-    return response.getNextExpirationTime();
-  }
-
-  // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
-  // are part of ClietnRMProtocol
-  @Private
-  public void cancelRMDelegationToken(DelegationToken rmToken)
-      throws YarnRemoteException {
-    CancelDelegationTokenRequest request = Records
-        .newRecord(CancelDelegationTokenRequest.class);
-    request.setDelegationToken(rmToken);
-    rmClient.cancelDelegationToken(request);
-  }
-
   private GetQueueInfoRequest
       getQueueInfoRequest(String queueName, boolean includeApplications,
           boolean includeChildQueues, boolean recursive) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java
deleted file mode 100644
index 3f1caee..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.security;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-public class RMDelegationTokenRenewer extends TokenRenewer {
-
-  @Override
-  public boolean handleKind(Text kind) {
-    return RMDelegationTokenIdentifier.KIND_NAME.equals(kind);
-  }
-
-  @Override
-  public boolean isManaged(Token<?> token) throws IOException {
-    return true;
-  }
-
-  @Override
-  public long renew(Token<?> token, Configuration conf) throws IOException,
-      InterruptedException {
-    YarnClientImpl yarnClient = getYarnClient(conf,
-        SecurityUtil.getTokenServiceAddr(token));
-    try {
-      DelegationToken dToken = BuilderUtils.newDelegationToken(
-          token.getIdentifier(), token.getKind().toString(),
-          token.getPassword(), token.getService().toString());
-      return yarnClient.renewRMDelegationToken(dToken);
-    } finally {
-      yarnClient.stop();
-    }
-  }
-
-  @Override
-  public void cancel(Token<?> token, Configuration conf) throws IOException,
-      InterruptedException {
-    YarnClientImpl yarnClient = getYarnClient(conf,
-        SecurityUtil.getTokenServiceAddr(token));
-    try {
-      DelegationToken dToken = BuilderUtils.newDelegationToken(
-          token.getIdentifier(), token.getKind().toString(),
-          token.getPassword(), token.getService().toString());
-      yarnClient.cancelRMDelegationToken(dToken);
-      return;
-    } finally {
-      yarnClient.stop();
-    }
-  }
-
-  private YarnClientImpl getYarnClient(Configuration conf,
-      InetSocketAddress rmAddress) {
-    YarnClientImpl yarnClient = new YarnClientImpl(rmAddress);
-    yarnClient.init(conf);
-    yarnClient.start();
-    return yarnClient;
-  }
-}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
deleted file mode 100644
index 9e78b11..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
+++ /dev/null
@@ -1,14 +0,0 @@
-#
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-org.apache.hadoop.yarn.security.RMDelegationTokenRenewer;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
index fd3dbf0..73bdce4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
@@ -19,10 +19,28 @@
 package org.apache.hadoop.yarn.security.client;
 
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Delegation Token Identifier that identifies the delegation tokens from the 
@@ -51,4 +69,100 @@
   public Text getKind() {
     return KIND_NAME;
   }
+  
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return KIND_NAME.equals(kind);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    private static
+    AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> localSecretManager;
+    private static InetSocketAddress localServiceAddress;
+    
+    @Private
+    public static void setSecretManager(
+        AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> secretManager,
+        InetSocketAddress serviceAddress) {
+      localSecretManager = secretManager;
+      localServiceAddress = serviceAddress;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          RenewDelegationTokenRequest request =
+              Records.newRecord(RenewDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          return rmClient.renewDelegationToken(request).getNextExpirationTime();
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        return localSecretManager.renewToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          CancelDelegationTokenRequest request =
+              Records.newRecord(CancelDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          rmClient.cancelDelegationToken(request);
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        localSecretManager.cancelToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+    
+    private static ClientRMProtocol getRmClient(Token<?> token,
+        Configuration conf) {
+      InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
+      if (localSecretManager != null) {
+        // return null if it's our token
+        if (localServiceAddress.getAddress().isAnyLocalAddress()) {
+            if (NetUtils.isLocalAddress(addr.getAddress()) &&
+                addr.getPort() == localServiceAddress.getPort()) {
+              return null;
+            }
+        } else if (addr.equals(localServiceAddress)) {
+          return null;
+        }
+      }
+      final YarnRPC rpc = YarnRPC.create(conf);
+      return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf);        
+    }
+
+    // get renewer so we can always renew our own tokens
+    @SuppressWarnings("unchecked")
+    private static String getRenewer(Token<?> token) throws IOException {
+      return ((Token<RMDelegationTokenIdentifier>)token).decodeIdentifier()
+          .getRenewer().toString();
+    }
+    
+    private static DelegationToken convertToProtoToken(Token<?> token) {
+      return BuilderUtils.newDelegationToken(
+          token.getIdentifier(), token.getKind().toString(),
+          token.getPassword(), token.getService().toString());
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
index 3380cb8..0e87a7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -13,3 +13,4 @@
 #
 org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
 org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
+org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index a464b3a..e0522a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -157,6 +157,10 @@
     this.server.start();
     clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
                                                server.getListenerAddress());
+    // enable RM to short-circuit token operations directly to itself
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(
+        rmDTSecretManager, clientBindAddress);
+    
     super.start();
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
index 3f78696..5ee851b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
@@ -17,13 +17,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
@@ -34,9 +33,15 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@@ -46,12 +51,14 @@
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
 import org.junit.Test;
 
 
@@ -59,6 +66,10 @@
 
   private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
   
+  @Before
+  public void resetSecretManager() {
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+  }
   
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
@@ -200,7 +211,122 @@
         RPC.stopProxy(clientRMWithDT);
       }
     }
+  }
+  
+  @Test
+  public void testShortCircuitRenewCancel()
+      throws IOException, InterruptedException {
+    InetSocketAddress addr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(addr, addr, true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelWildcardAddress()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr = new InetSocketAddress(123);
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+        true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelSameHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostSamePort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
+        false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
+                                            InetSocketAddress serviceAddr,
+                                            boolean shouldShortCircuit
+      ) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
+        YarnBadRPC.class, YarnRPC.class);
     
+    RMDelegationTokenSecretManager secretManager =
+        mock(RMDelegationTokenSecretManager.class);
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
+
+    RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
+        new Text("owner"), new Text("renewer"), null);
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>(ident, secretManager);
+
+    SecurityUtil.setTokenService(token, serviceAddr);
+    if (shouldShortCircuit) {
+      token.renew(conf);
+      verify(secretManager).renewToken(eq(token), eq("renewer"));
+      reset(secretManager);
+      token.cancel(conf);
+      verify(secretManager).cancelToken(eq(token), eq("renewer"));
+    } else {      
+      try { 
+        token.renew(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).renewToken(any(Token.class), anyString());
+      try { 
+        token.cancel(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).cancelToken(any(Token.class), anyString());
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static class YarnBadRPC extends YarnRPC {
+    @Override
+    public Object getProxy(Class protocol, InetSocketAddress addr,
+        Configuration conf) {
+      throw new RuntimeException("getProxy");
+    }
+
+    @Override
+    public void stopProxy(Object proxy, Configuration conf) {
+      throw new RuntimeException("stopProxy");
+    }
+
+    @Override
+    public Server getServer(Class protocol, Object instance,
+        InetSocketAddress addr, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager,
+        int numHandlers, String portRangeConfig) {
+      throw new RuntimeException("getServer");
+    }
   }
   
   // Get the delegation token directly as it is a little difficult to setup