Merge r1293034 through r1293500 from 0.23.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1293501 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 99c5111..ea4ffe4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -187,6 +187,11 @@
     dfs.client.block.write.replace-datanode-on-failure.enable should be true.
     (szetszwo)
 
+    HDFS-3008. Negative caching of local addrs doesn't work. (eli)
+
+    HDFS-3006. In WebHDFS, when the return body is empty, set the Content-Type
+    to application/octet-stream instead of application/json.  (szetszwo)
+
 Release 0.23.1 - 2012-02-17 
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index be52b48..cdaf07e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -542,11 +542,12 @@
   private static boolean isLocalAddress(InetSocketAddress targetAddr) {
     InetAddress addr = targetAddr.getAddress();
     Boolean cached = localAddrMap.get(addr.getHostAddress());
-    if (cached != null && cached) {
+    if (cached != null) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Address " + targetAddr + " is local");
+        LOG.trace("Address " + targetAddr +
+                  (cached ? " is local" : " is not local"));
       }
-      return true;
+      return cached;
     }
 
     // Check if the address is any local or loop back
@@ -561,7 +562,8 @@
       }
     }
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Address " + targetAddr + " is local");
+      LOG.trace("Address " + targetAddr +
+                (local ? " is local" : " is not local"));
     }
     localAddrMap.put(addr.getHostAddress(), local);
     return local;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
index 093cd9c..44ef527 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
@@ -117,7 +117,7 @@
   @PUT
   @Path("/")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response putRoot(
       final InputStream in,
       @Context final UserGroupInformation ugi,
@@ -147,7 +147,7 @@
   @PUT
   @Path("{" + UriFsPathParam.NAME + ":.*}")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response put(
       final InputStream in,
       @Context final UserGroupInformation ugi,
@@ -209,7 +209,7 @@
       final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf);
       final URI uri = new URI(WebHdfsFileSystem.SCHEME, null,
           nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null);
-      return Response.created(uri).type(MediaType.APPLICATION_JSON).build();
+      return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
@@ -222,7 +222,7 @@
   @POST
   @Path("/")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response postRoot(
       final InputStream in,
       @Context final UserGroupInformation ugi,
@@ -243,7 +243,7 @@
   @POST
   @Path("{" + UriFsPathParam.NAME + ":.*}")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response post(
       final InputStream in,
       @Context final UserGroupInformation ugi,
@@ -287,7 +287,7 @@
         IOUtils.cleanup(LOG, out);
         IOUtils.cleanup(LOG, dfsclient);
       }
-      return Response.ok().type(MediaType.APPLICATION_JSON).build();
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index 44e5e13..e041b03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -215,7 +215,7 @@
   @PUT
   @Path("/")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response putRoot(
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -263,7 +263,7 @@
   @PUT
   @Path("{" + UriFsPathParam.NAME + ":.*}")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response put(
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -324,7 +324,7 @@
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
           fullpath, op.getValue(), -1L,
           permission, overwrite, bufferSize, replication, blockSize);
-      return Response.temporaryRedirect(uri).build();
+      return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     } 
     case MKDIRS:
     {
@@ -336,7 +336,7 @@
     {
       np.createSymlink(destination.getValue(), fullpath,
           PermissionParam.getDefaultFsPermission(), createParent.getValue());
-      return Response.ok().type(MediaType.APPLICATION_JSON).build();
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case RENAME:
     {
@@ -348,7 +348,7 @@
       } else {
         np.rename2(fullpath, destination.getValue(),
             s.toArray(new Options.Rename[s.size()]));
-        return Response.ok().type(MediaType.APPLICATION_JSON).build();
+        return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
       }
     }
     case SETREPLICATION:
@@ -364,17 +364,17 @@
       }
 
       np.setOwner(fullpath, owner.getValue(), group.getValue());
-      return Response.ok().type(MediaType.APPLICATION_JSON).build();
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case SETPERMISSION:
     {
       np.setPermission(fullpath, permission.getFsPermission());
-      return Response.ok().type(MediaType.APPLICATION_JSON).build();
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case SETTIMES:
     {
       np.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
-      return Response.ok().type(MediaType.APPLICATION_JSON).build();
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case RENEWDELEGATIONTOKEN:
     {
@@ -389,7 +389,7 @@
       final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
       token.decodeFromUrlString(delegationTokenArgument.getValue());
       np.cancelDelegationToken(token);
-      return Response.ok().type(MediaType.APPLICATION_JSON).build();
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
@@ -406,7 +406,7 @@
   @POST
   @Path("/")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response postRoot(
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -427,7 +427,7 @@
   @POST
   @Path("{" + UriFsPathParam.NAME + ":.*}")
   @Consumes({"*/*"})
-  @Produces({MediaType.APPLICATION_JSON})
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response post(
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -459,7 +459,7 @@
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
           fullpath, op.getValue(), -1L, bufferSize);
-      return Response.temporaryRedirect(uri).build();
+      return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
@@ -542,7 +542,7 @@
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
           fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
-      return Response.temporaryRedirect(uri).build();
+      return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GET_BLOCK_LOCATIONS:
     {
@@ -578,7 +578,7 @@
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
           fullpath, op.getValue(), -1L);
-      return Response.temporaryRedirect(uri).build();
+      return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETDELEGATIONTOKEN:
     {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
index 0c2372c..b551dd1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 
 import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -314,6 +315,8 @@
       conn.setRequestMethod(op.getType().toString());
       conn.connect();
       assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+      assertEquals(0, conn.getContentLength());
+      assertEquals(MediaType.APPLICATION_OCTET_STREAM, conn.getContentType());
       assertEquals((short)0755, webhdfs.getFileStatus(dir).getPermission().toShort());
       conn.disconnect();
     }
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 674f0a8..e22241b 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -36,6 +36,7 @@
   NEW FEATURES
 
   IMPROVEMENTS
+
     MAPREDUCE-3849. Change TokenCache's reading of the binary token file
     (Daryn Sharp via bobby)
 
@@ -45,9 +46,17 @@
     MAPREDUCE-3877 Add a test to formalise the current state transitions
     of the yarn lifecycle. (stevel)
 
+    MAPREDUCE-3866. Fixed the bin/yarn script to not print the command line
+    unnecessarily. (vinodkv)
+
+    MAPREDUCE-3730. Modified RM to allow restarted NMs to be able to join the
+    cluster without waiting for expiry. (Jason Lowe via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES
+    MAPREDUCE-3918  proc_historyserver no longer in command line arguments for
+    HistoryServer (Jon Eagles via bobby)
 
     MAPREDUCE-3862.  Nodemanager can appear to hang on shutdown due to lingering
     DeletionService threads (Jason Lowe via bobby)
@@ -74,6 +83,13 @@
 
     MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles
     via tgraves)
+
+    MAPREDUCE-3738. MM can hang during shutdown if AppLogAggregatorImpl thread
+    dies unexpectedly (Jason Lowe via sseth)
+
+    MAPREDUCE-3904 Job history produced with mapreduce.cluster.acls.enabled
+    false can not be viewed with mapreduce.cluster.acls.enabled true 
+    (Jonathon Eagles via tgraves)
  
 Release 0.23.1 - 2012-02-17 
 
diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred
index 898e053..eb13e60 100755
--- a/hadoop-mapreduce-project/bin/mapred
+++ b/hadoop-mapreduce-project/bin/mapred
@@ -136,4 +136,4 @@
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
 
 export CLASSPATH
-exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
+exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index cd357a2..0abccc4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -438,6 +438,9 @@
   public boolean checkAccess(UserGroupInformation callerUGI, 
       JobACL jobOperation) {
     AccessControlList jobACL = jobACLs.get(jobOperation);
+    if (jobACL == null) {
+      return true;
+    }
     return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
   }
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 2461760..81387a0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -191,5 +191,16 @@
         null, null, null, true, null, 0, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access without security enabled
+    Configuration conf5 = new Configuration();
+    conf5.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf5.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
+        null, null, null, true, null, 0, null);
+    Assert.assertTrue(job5.checkAccess(ugi1, null));
+    Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
index d0465d3..9584d05 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
@@ -330,6 +330,9 @@
       boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
     Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
     AccessControlList jobACL = jobACLs.get(jobOperation);
+    if (jobACL == null) {
+      return true;
+    }
     return aclsMgr.checkAccess(callerUGI, jobOperation, 
         jobInfo.getUsername(), jobACL);
   }
diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
index d7dae8b..fe80061 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
+++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
@@ -221,6 +221,5 @@
   YARN_OPTS="$YARN_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
 fi  
 
-echo "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
 exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
 fi
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index fdd4ecb..d226464 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -133,8 +133,16 @@
   }
 
   @Override
+  public void run() {
+    try {
+      doAppLogAggregation();
+    } finally {
+      this.appAggregationFinished.set(true);
+    }
+  }
+
   @SuppressWarnings("unchecked")
-  public void run() {    
+  private void doAppLogAggregation() {
     ContainerId containerId;
 
     while (!this.appFinishing.get()) {
@@ -189,8 +197,6 @@
     this.dispatcher.getEventHandler().handle(
         new ApplicationEvent(this.appId,
             ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
-        
-    this.appAggregationFinished.set(true);
   }
 
   private Path getRemoteNodeTmpLogFileForApp() {
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index a1853b3..18b8d9b 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -21,6 +21,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 
@@ -69,6 +70,7 @@
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -536,4 +538,31 @@
     appAcls.put(ApplicationAccessType.VIEW_APP, "*");
     return appAcls;
   }
+
+  @Test(timeout=20000)
+  @SuppressWarnings("unchecked")
+  public void testStopAfterError() throws Exception {
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    // get the AppLogAggregationImpl thread to crash
+    LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
+    when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, this.context, delSrvc,
+                                  mockedDirSvc);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    logAggregationService.handle(new LogHandlerAppStartedEvent(
+            application1, this.user, null,
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+
+    logAggregationService.stop();
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 75c91aa..d762766 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -51,6 +51,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -177,17 +178,17 @@
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
         resolve(host), capability);
 
-    if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
-      LOG.info("Duplicate registration from the node at: " + host
-          + ", Sending SHUTDOWN Signal to the NodeManager");
-      regResponse.setNodeAction(NodeAction.SHUTDOWN);
-      response.setRegistrationResponse(regResponse);
-      return response;
+    RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
+    if (oldNode == null) {
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+    } else {
+      LOG.info("Reconnect from the node at: " + host);
+      this.nmLivelinessMonitor.unregister(nodeId);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeReconnectEvent(nodeId, rmNode));
     }
 
-    this.rmContext.getDispatcher().getEventHandler().handle(
-        new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
-
     this.nmLivelinessMonitor.register(nodeId);
 
     LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index d562836..ef644be 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -28,6 +28,7 @@
   // ResourceTrackerService
   STATUS_UPDATE,
   REBOOTING,
+  RECONNECTED,
 
   // Source: Application
   CLEANUP_APP,
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 9b8892a..f0384da 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -110,9 +110,11 @@
                                            RMNodeEventType,
                                            RMNodeEvent>(RMNodeState.NEW)
   
-     //Transitions from RUNNING state
+     //Transitions from NEW state
      .addTransition(RMNodeState.NEW, RMNodeState.RUNNING, 
          RMNodeEventType.STARTED, new AddNodeTransition())
+
+     //Transitions from RUNNING state
      .addTransition(RMNodeState.RUNNING, 
          EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@@ -129,11 +131,15 @@
          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
      .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+     .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
+         RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
 
      //Transitions from UNHEALTHY state
      .addTransition(RMNodeState.UNHEALTHY, 
          EnumSet.of(RMNodeState.UNHEALTHY, RMNodeState.RUNNING),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
+     .addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
+         RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
          
      // create the topology tables
      .installTopology(); 
@@ -372,6 +378,39 @@
     }
   }
   
+  public static class ReconnectNodeTransition implements
+      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      // Kill containers since node is rejoining.
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodeRemovedSchedulerEvent(rmNode));
+
+      RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+      if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
+          && rmNode.getHttpPort() == newNode.getHttpPort()) {
+        // Reset heartbeat ID since node just restarted.
+        rmNode.getLastHeartBeatResponse().setResponseId(0);
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeAddedSchedulerEvent(rmNode));
+      } else {
+        // Reconnected node differs, so replace old node and start new node
+        switch (rmNode.getState()) {
+        case RUNNING:
+          ClusterMetrics.getMetrics().decrNumActiveNodes();
+          break;
+        case UNHEALTHY:
+          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+          break;
+        }
+        rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+      }
+    }
+  }
+
   public static class CleanUpAppTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
new file mode 100644
index 0000000..b1fa0ad
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMNodeReconnectEvent extends RMNodeEvent {
+  private RMNode reconnectedNode;
+
+  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+    super(nodeId, RMNodeEventType.RECONNECTED);
+    reconnectedNode = newNode;
+  }
+
+  public RMNode getReconnectedNode() {
+    return reconnectedNode;
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index aa7d23e..0f90f6c 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -666,7 +666,10 @@
 
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
-    Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+    if (node == null) {
+      return;
+    }
+    Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     root.updateClusterResource(clusterResource);
     --numNodeManagers;
 
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 6fcf1fe..1526683 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -731,6 +731,9 @@
 
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = getNode(nodeInfo.getNodeID());
+    if (node == null) {
+      return;
+    }
     // Kill running containers
     for(RMContainer container : node.getRunningContainers()) {
       containerCompleted(container, 
@@ -744,7 +747,7 @@
     this.nodes.remove(nodeInfo.getNodeID());
     
     // Update cluster metrics
-    Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
   }
 
   @Override
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 05b17a3..f30883f 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -19,23 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 
 import com.google.common.collect.Lists;
 
@@ -195,8 +190,12 @@
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
+    return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++);
+  }
+
+  private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr, int hostnum) {
     final String rackName = "rack"+ rack;
-    final int nid = NODE_ID++;
+    final int nid = hostnum;
     final String hostName = "host"+ nid;
     final int port = 123;
     final NodeId nodeID = newNodeID(hostName, port);
@@ -219,4 +218,8 @@
   public static RMNode newNodeInfo(int rack, final Resource perNode) {
     return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0");
   }
+
+  public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
+    return buildRMNode(rack, perNode, null, "localhost:0", hostnum);
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
index 2cca6f0..e995e51 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import junit.framework.Assert;
@@ -27,10 +28,17 @@
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -167,10 +175,37 @@
     testMinimumAllocation(conf);
   }
 
+  @Test
+  public void testReconnectedNode() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setQueues("default", new String[] {"default"});
+    conf.setCapacity("default", 100);
+    FifoScheduler fs = new FifoScheduler();
+    fs.reinitialize(conf, null, null);
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+    fs.handle(new NodeAddedSchedulerEvent(n1));
+    fs.handle(new NodeAddedSchedulerEvent(n2));
+    List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
+    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+    Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
+
+    // reconnect n1 with downgraded memory
+    n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+    fs.handle(new NodeRemovedSchedulerEvent(n1));
+    fs.handle(new NodeAddedSchedulerEvent(n1));
+    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+
+    Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+  }
+
   public static void main(String[] args) throws Exception {
     TestFifoScheduler t = new TestFifoScheduler();
     t.test();
     t.testDefaultMinimumAllocation();
     t.testNonDefaultMinimumAllocation();
+    t.testReconnectedNode();
   }
 }
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 8b3f4a0..7826819 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -31,12 +31,17 @@
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Test;
@@ -189,7 +194,7 @@
     conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
         .getAbsolutePath());
 
-    MockRM rm = new MockRM(conf);
+    rm = new MockRM(conf);
     rm.start();
 
     MockNM nm1 = rm.registerNode("host1:1234", 5120);
@@ -223,6 +228,61 @@
         ClusterMetrics.getMetrics().getUnhealthyNMs());
   }
 
+  @Test
+  public void testReconnectNode() throws Exception {
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRM() {
+      @Override
+      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+        return new SchedulerEventDispatcher(this.scheduler) {
+          @Override
+          public void handle(SchedulerEvent event) {
+            scheduler.handle(event);
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 5120);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(false);
+    checkUnealthyNMCount(rm, nm2, true, 1);
+    final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
+    QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
+    Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
+
+    // reconnect of healthy node
+    nm1 = rm.registerNode("host1:1234", 5120);
+    HeartbeatResponse response = nm1.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    dispatcher.await();
+    Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
+    checkUnealthyNMCount(rm, nm2, true, 1);
+
+    // reconnect of unhealthy node
+    nm2 = rm.registerNode("host2:5678", 5120);
+    response = nm2.nodeHeartbeat(false);
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    dispatcher.await();
+    Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
+    checkUnealthyNMCount(rm, nm2, true, 1);
+
+    // reconnect of node with changed capability
+    nm1 = rm.registerNode("host2:5678", 10240);
+    dispatcher.await();
+    response = nm2.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
+  }
+
   private void writeToHostsFile(String... hosts) throws IOException {
     if (!hostFile.exists()) {
       TEMP_DIR.mkdirs();
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index bcfd09d..dbe21d1 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@@ -41,12 +42,15 @@
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestCapacityScheduler {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+  private final int GB = 1024;
   
   private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
   private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
@@ -97,8 +101,6 @@
 
     LOG.info("--- START: testCapacityScheduler ---");
         
-    final int GB = 1024;
-    
     // Register node1
     String host_0 = "host_0";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = 
@@ -340,4 +342,27 @@
     cs.reinitialize(conf, null, null);
   }
 
+  @Test
+  public void testReconnectedNode() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.reinitialize(csConf, null, null);
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+
+    Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
+
+    // reconnect n1 with downgraded memory
+    n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+
+    Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
+  }
 }