YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan
(cherry picked from commit a1f7e760dffeeffb9cc739f734c0a91b81a0c9d0)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index ee85042..40c27c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -645,7 +645,7 @@
@VisibleForTesting
@Private
public void removeOrTrackCompletedContainersFromContext(
- List<ContainerId> containerIds) throws IOException {
+ List<ContainerId> containerIds) {
Set<ContainerId> removedContainers = new HashSet<ContainerId>();
pendingContainersToRemove.addAll(containerIds);
@@ -662,13 +662,13 @@
removedContainers.add(containerId);
iter.remove();
}
+ pendingCompletedContainers.remove(containerId);
}
if (!removedContainers.isEmpty()) {
LOG.info("Removed completed containers from NM context: "
+ removedContainers);
}
- pendingCompletedContainers.clear();
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -1037,6 +1037,7 @@
@SuppressWarnings("unchecked")
public void run() {
int lastHeartbeatID = 0;
+ boolean missedHearbeat = false;
while (!isStopped) {
// Send heartbeat
try {
@@ -1083,6 +1084,20 @@
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
+ // If the last heartbeat was missed, it is possible that the
+ // RM saw this one as a duplicate and did not process it.
+ // If so, we can fail to notify the RM of these completed containers
+ // on the next heartbeat if we clear pendingCompletedContainers.
+ // If it wasn't a duplicate, the only impact is we might notify
+ // the RM twice, which it can handle.
+ if (!missedHearbeat) {
+ pendingCompletedContainers.clear();
+ } else {
+ LOG.info("skipped clearing pending completed containers due to " +
+ "missed heartbeat");
+ missedHearbeat = false;
+ }
+
logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
@@ -1158,6 +1173,7 @@
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
+ missedHearbeat = true;
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 11c3c35..67b2718 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -758,15 +758,11 @@
} else if (heartBeatID == 2 || heartBeatID == 3) {
List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses();
- if (heartBeatID == 2) {
- // NM should send completed containers again, since the last
- // heartbeat is lost.
- Assert.assertEquals(4, statuses.size());
- } else {
- // NM should not send completed containers again, since the last
- // heartbeat is successful.
- Assert.assertEquals(2, statuses.size());
- }
+ // NM should send completed containers on heartbeat 2,
+ // since heartbeat 1 was lost. It will send them again on
+ // heartbeat 3, because it does not clear them if the previous
+ // heartbeat was lost in case the RM treated it as a duplicate.
+ Assert.assertEquals(4, statuses.size());
Assert.assertEquals(4, context.getContainers().size());
boolean container2Exist = false, container3Exist = false,
@@ -797,14 +793,8 @@
container5Exist = true;
}
}
- if (heartBeatID == 2) {
- Assert.assertTrue(container2Exist && container3Exist
- && container4Exist && container5Exist);
- } else {
- // NM do not send completed containers again
- Assert.assertTrue(container2Exist && !container3Exist
- && container4Exist && !container5Exist);
- }
+ Assert.assertTrue(container2Exist && container3Exist
+ && container4Exist && container5Exist);
if (heartBeatID == 3) {
finishedContainersPulledByAM.add(containerStatus3.getContainerId());