YARN-1278. Fixed NodeManager to not delete local resources for apps on resync command from RM - a bug caused by YARN-1149. Contributed by Hitesh Shah.
svn merge --ignore-ancestry -c 1529657 ../../trunk/
Merge with minor fixes for conflicts.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1529659 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index eda3ea0..affce45 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -122,6 +122,9 @@
     and log-dirs correctly even when there are no resources to localize for the
     container. (Siddharth Seth via vinodkv)
 
+    YARN-1278. Fixed NodeManager to not delete local resources for apps on resync
+    command from RM - a bug caused by YARN-1149. (Hitesh Shah via vinodkv)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
index e5e5537..5f7d01e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
@@ -25,13 +25,39 @@
 public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
 
   private final List<ContainerId> containerToCleanup;
-  
-  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
+  private final Reason reason;
+
+  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup,
+                                      Reason reason) {
     super(ContainerManagerEventType.FINISH_CONTAINERS);
     this.containerToCleanup = containersToCleanup;
+    this.reason = reason;
   }
 
   public List<ContainerId> getContainersToCleanup() {
     return this.containerToCleanup;
   }
+
+  public Reason getReason() {
+    return reason;
+  }
+
+  public static enum Reason {
+    /**
+     * Container is killed as NodeManager is shutting down
+     */
+    ON_SHUTDOWN,
+
+    /**
+     * Container is killed as the Nodemanager is re-syncing with the
+     * ResourceManager
+     */
+    ON_NODEMANAGER_RESYNC,
+
+    /**
+     * Container is killed on request by the ResourceManager
+     */
+    BY_RESOURCEMANAGER
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 31a20ea..a76b1f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -226,7 +226,8 @@ protected void resyncWithRM() {
       public void run() {
         LOG.info("Notifying ContainerManager to block new container-requests");
         containerManager.setBlockNewContainerRequests(true);
-        containerManager.cleanUpApplications(NodeManagerEventType.RESYNC);
+        LOG.info("Cleaning up running containers on resync");
+        containerManager.cleanupContainersOnNMResync();
         ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
       }
     }.start();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index a46a094..9b68bec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -471,7 +471,8 @@ public void run() {
                 .getContainersToCleanup();
             if (!containersToCleanup.isEmpty()) {
               dispatcher.getEventHandler().handle(
-                  new CMgrCompletedContainersEvent(containersToCleanup));
+                  new CMgrCompletedContainersEvent(containersToCleanup,
+                    CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
             }
             List<ApplicationId> appsToCleanup =
                 response.getApplicationsToCleanup();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e6393e3..33088fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -87,7 +87,6 @@
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManagerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@@ -306,7 +305,7 @@ public void serviceStop() throws Exception {
     try {
       serviceStopped = true;
       if (context != null) {
-        cleanUpApplications(NodeManagerEventType.SHUTDOWN);
+        cleanUpApplicationsOnNMShutDown();
       }
     } finally {
       this.writeLock.unlock();
@@ -320,7 +319,7 @@ public void serviceStop() throws Exception {
     super.serviceStop();
   }
 
-  public void cleanUpApplications(NodeManagerEventType eventType) {
+  public void cleanUpApplicationsOnNMShutDown() {
     Map<ApplicationId, Application> applications =
         this.context.getApplications();
     if (applications.isEmpty()) {
@@ -336,33 +335,15 @@ public void cleanUpApplications(NodeManagerEventType eventType) {
 
     LOG.info("Waiting for Applications to be Finished");
 
-    switch (eventType) {
-      case SHUTDOWN:
-        long waitStartTime = System.currentTimeMillis();
-        while (!applications.isEmpty()
-            && System.currentTimeMillis() - waitStartTime
-                < waitForContainersOnShutdownMillis) {
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ex) {
-            LOG.warn("Interrupted while sleeping on applications finish on shutdown",
-              ex);
-          }
-        }
-        break;
-      case RESYNC:
-        while (!applications.isEmpty()) {
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ex) {
-            LOG.warn("Interrupted while sleeping on applications finish on resync",
-              ex);
-          }
-        }
-        break;
-      default:
-        throw new YarnRuntimeException("Get an unknown NodeManagerEventType: "
-            + eventType);
+    long waitStartTime = System.currentTimeMillis();
+    while (!applications.isEmpty()
+        && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ex) {
+        LOG.warn(
+          "Interrupted while sleeping on applications finish on shutdown", ex);
+      }
     }
 
     // All applications Finished
@@ -374,6 +355,40 @@ public void cleanUpApplications(NodeManagerEventType eventType) {
     }
   }
 
+  public void cleanupContainersOnNMResync() {
+    Map<ContainerId, Container> containers = context.getContainers();
+    if (containers.isEmpty()) {
+      return;
+    }
+    LOG.info("Containers still running on "
+        + CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC + " : "
+        + containers.keySet());
+
+    List<ContainerId> containerIds =
+      new ArrayList<ContainerId>(containers.keySet());
+
+    LOG.info("Waiting for containers to be killed");
+
+    this.handle(new CMgrCompletedContainersEvent(containerIds,
+      CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
+    while (!containers.isEmpty()) {
+      try {
+        Thread.sleep(1000);
+        nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
+      } catch (InterruptedException ex) {
+        LOG.warn("Interrupted while sleeping on container kill on resync", ex);
+      }
+    }
+
+    // All containers killed
+    if (containers.isEmpty()) {
+      LOG.info("All containers in DONE state");
+    } else {
+      LOG.info("Done waiting for containers to be killed. Still alive: " +
+        containers.keySet());
+    }
+  }
+
   // Get the remoteUGI corresponding to the api call.
   protected UserGroupInformation getRemoteUgi()
       throws YarnException {
@@ -852,7 +867,7 @@ public void handle(ContainerManagerEvent event) {
       break;
     default:
         throw new YarnRuntimeException(
-            "Get an unknown ContainerManagerEvent type: " + event.getType());
+            "Got an unknown ContainerManagerEvent type: " + event.getType());
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index 3e0846b..f2090fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -102,7 +102,11 @@ public void testKillContainersOnResync() throws IOException,
     } catch (BrokenBarrierException e) {
     }
     Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
-
+    // Only containers should be killed on resync, apps should lie around. That
+    // way local resources for apps can be used beyond resync without
+    // relocalization
+    Assert.assertTrue(nm.getNMContext().getApplications()
+      .containsKey(cId.getApplicationAttemptId().getApplicationId()));
     Assert.assertFalse(assertionFailedInThread.get());
 
     nm.stop();
@@ -285,7 +289,6 @@ public void run() {
             recordFactory.newRecordInstance(ContainerLaunchContext.class);
         try {
           while (!isStopped && numContainers < 10) {
-            ContainerId cId = TestNodeManagerShutdown.createContainerId();
             StartContainerRequest scRequest =
                 StartContainerRequest.newInstance(containerLaunchContext,
                   null);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index f200974..70837b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -93,6 +93,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
+@SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater {
 
   // temp fix until metrics system can auto-detect itself running in unit test:
@@ -352,7 +353,6 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) {
 
   private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
 
-    private Context context;
     private final long rmStartIntervalMS;
     private final boolean rmNeverStart;
     public ResourceTracker resourceTracker;
@@ -360,7 +360,6 @@ public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
         long rmStartIntervalMS, boolean rmNeverStart) {
       super(context, dispatcher, healthChecker, metrics);
-      this.context = context;
       this.rmStartIntervalMS = rmStartIntervalMS;
       this.rmNeverStart = rmNeverStart;
     }
@@ -376,8 +375,8 @@ protected ResourceTracker getRMClient() throws IOException {
       RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
       resourceTracker =
           (ResourceTracker) RetryProxy.create(ResourceTracker.class,
-            new MyResourceTracker6(this.context, rmStartIntervalMS,
-              rmNeverStart), retryPolicy);
+            new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
+            retryPolicy);
       return resourceTracker;
     }
 
@@ -683,14 +682,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
 
   private class MyResourceTracker6 implements ResourceTracker {
 
-    private final Context context;
     private long rmStartIntervalMS;
     private boolean rmNeverStart;
     private final long waitStartTime;
 
-    public MyResourceTracker6(Context context, long rmStartIntervalMS,
-        boolean rmNeverStart) {
-      this.context = context;
+    public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) {
       this.rmStartIntervalMS = rmStartIntervalMS;
       this.rmNeverStart = rmNeverStart;
       this.waitStartTime = System.currentTimeMillis();
@@ -866,8 +862,8 @@ protected ContainerManagerImpl createContainerManager(Context context,
             metrics, aclsManager, dirsHandler) {
 
           @Override
-          public void cleanUpApplications(NodeManagerEventType eventType) {
-            super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
+          public void cleanUpApplicationsOnNMShutDown() {
+            super.cleanUpApplicationsOnNMShutDown();
             numCleanups.incrementAndGet();
           }
         };