YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan

(cherry picked from commit a1f7e760dffeeffb9cc739f734c0a91b81a0c9d0)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index ee85042..40c27c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -645,7 +645,7 @@
   @VisibleForTesting
   @Private
   public void removeOrTrackCompletedContainersFromContext(
-      List<ContainerId> containerIds) throws IOException {
+      List<ContainerId> containerIds) {
     Set<ContainerId> removedContainers = new HashSet<ContainerId>();
 
     pendingContainersToRemove.addAll(containerIds);
@@ -662,13 +662,13 @@
         removedContainers.add(containerId);
         iter.remove();
       }
+      pendingCompletedContainers.remove(containerId);
     }
 
     if (!removedContainers.isEmpty()) {
       LOG.info("Removed completed containers from NM context: "
           + removedContainers);
     }
-    pendingCompletedContainers.clear();
   }
 
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -1037,6 +1037,7 @@
     @SuppressWarnings("unchecked")
     public void run() {
       int lastHeartbeatID = 0;
+      boolean missedHearbeat = false;
       while (!isStopped) {
         // Send heartbeat
         try {
@@ -1083,6 +1084,20 @@
             removeOrTrackCompletedContainersFromContext(response
                 .getContainersToBeRemovedFromNM());
 
+            // If the last heartbeat was missed, it is possible that the
+            // RM saw this one as a duplicate and did not process it.
+            // If so, we can fail to notify the RM of these completed containers
+            // on the next heartbeat if we clear pendingCompletedContainers.
+            // If it wasn't a duplicate, the only impact is we might notify
+            // the RM twice, which it can handle.
+            if (!missedHearbeat) {
+              pendingCompletedContainers.clear();
+            } else {
+              LOG.info("skipped clearing pending completed containers due to " +
+                  "missed heartbeat");
+              missedHearbeat = false;
+            }
+
             logAggregationReportForAppsTempList.clear();
             lastHeartbeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
@@ -1158,6 +1173,7 @@
           // TODO Better error handling. Thread can die with the rest of the
           // NM still running.
           LOG.error("Caught exception in status-updater", e);
+          missedHearbeat = true;
         } finally {
           synchronized (heartbeatMonitor) {
             nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 11c3c35..67b2718 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -758,15 +758,11 @@
         } else if (heartBeatID == 2 || heartBeatID == 3) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
-          if (heartBeatID == 2) {
-            // NM should send completed containers again, since the last
-            // heartbeat is lost.
-            Assert.assertEquals(4, statuses.size());
-          } else {
-            // NM should not send completed containers again, since the last
-            // heartbeat is successful.
-            Assert.assertEquals(2, statuses.size());
-          }
+          // NM should send completed containers on heartbeat 2,
+          // since heartbeat 1 was lost.  It will send them again on
+          // heartbeat 3, because it does not clear them if the previous
+          // heartbeat was lost in case the RM treated it as a duplicate.
+          Assert.assertEquals(4, statuses.size());
           Assert.assertEquals(4, context.getContainers().size());
 
           boolean container2Exist = false, container3Exist = false,
@@ -797,14 +793,8 @@
               container5Exist = true;
             }
           }
-          if (heartBeatID == 2) {
-            Assert.assertTrue(container2Exist && container3Exist
-                && container4Exist && container5Exist);
-          } else {
-            // NM do not send completed containers again
-            Assert.assertTrue(container2Exist && !container3Exist
-                && container4Exist && !container5Exist);
-          }
+          Assert.assertTrue(container2Exist && container3Exist
+              && container4Exist && container5Exist);
 
           if (heartBeatID == 3) {
             finishedContainersPulledByAM.add(containerStatus3.getContainerId());