YARN-7290. Method canContainerBePreempted can return true when it shouldn't. (Contributed by Steven Rand)

(cherry picked from commit 2bde3aedf139368fc71f053d8dd6580b498ff46d)
(cherry picked from commit f335d509d3a778f11265d3f45800dd6e75f7be59)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index bbd4418..2aa45b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -588,7 +588,8 @@
     }
   }
 
-  boolean canContainerBePreempted(RMContainer container) {
+  boolean canContainerBePreempted(RMContainer container,
+                                  Resource alreadyConsideringForPreemption) {
     if (!isPreemptable()) {
       return false;
     }
@@ -610,6 +611,15 @@
 
     // Check if the app's allocation will be over its fairshare even
     // after preempting this container
+    Resource usageAfterPreemption = getUsageAfterPreemptingContainer(
+            container.getAllocatedResource(),
+            alreadyConsideringForPreemption);
+
+    return !isUsageBelowShare(usageAfterPreemption, getFairShare());
+  }
+
+  private Resource getUsageAfterPreemptingContainer(Resource containerResources,
+          Resource alreadyConsideringForPreemption) {
     Resource usageAfterPreemption = Resources.clone(getResourceUsage());
 
     // Subtract resources of containers already queued for preemption
@@ -617,10 +627,13 @@
       Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted);
     }
 
-    // Subtract this container's allocation to compute usage after preemption
-    Resources.subtractFrom(
-        usageAfterPreemption, container.getAllocatedResource());
-    return !isUsageBelowShare(usageAfterPreemption, getFairShare());
+    // Subtract resources of this container and other containers of this app
+    // that the FSPreemptionThread is already considering for preemption.
+    Resources.subtractFrom(usageAfterPreemption, containerResources);
+    Resources.subtractFrom(usageAfterPreemption,
+            alreadyConsideringForPreemption);
+
+    return usageAfterPreemption;
   }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index b3e59c5..47e580d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -19,7 +19,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -29,7 +29,9 @@
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.locks.Lock;
@@ -130,10 +132,21 @@
           }
         } // End of iteration through nodes for one RR
 
-        if (bestContainers != null && bestContainers.containers.size() > 0) {
-          containersToPreempt.addAll(bestContainers.containers);
-          // Reserve the containers for the starved app
-          trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
+        if (bestContainers != null) {
+          List<RMContainer> containers = bestContainers.getAllContainers();
+          if (containers.size() > 0) {
+            containersToPreempt.addAll(containers);
+            // Reserve the containers for the starved app
+            trackPreemptionsAgainstNode(containers, starvedApp);
+            // Warn application about containers to be killed
+            for (RMContainer container : containers) {
+              FSAppAttempt app = scheduler.getSchedulerApp(
+                      container.getApplicationAttemptId());
+              LOG.info("Preempting container " + container +
+                      " from queue " + app.getQueueName());
+              app.trackContainerForPreemption(container);
+            }
+          }
         }
       }
     } // End of iteration over RRs
@@ -170,10 +183,12 @@
     for (RMContainer container : containersToCheck) {
       FSAppAttempt app =
           scheduler.getSchedulerApp(container.getApplicationAttemptId());
+      ApplicationId appId = app.getApplicationId();
 
-      if (app.canContainerBePreempted(container)) {
+      if (app.canContainerBePreempted(container,
+              preemptableContainers.getResourcesToPreemptForApp(appId))) {
         // Flag container for preemption
-        if (!preemptableContainers.addContainer(container)) {
+        if (!preemptableContainers.addContainer(container, appId)) {
           return null;
         }
 
@@ -199,15 +214,6 @@
   }
 
   private void preemptContainers(List<RMContainer> containers) {
-    // Warn application about containers to be killed
-    for (RMContainer container : containers) {
-      ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
-      FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
-      LOG.info("Preempting container " + container +
-          " from queue " + app.getQueueName());
-      app.trackContainerForPreemption(container);
-    }
-
     // Schedule timer task to kill containers
     preemptionTimer.schedule(
         new PreemptContainersTask(containers), warnTimeBeforeKill);
@@ -237,14 +243,14 @@
    * A class to track preemptable containers.
    */
   private static class PreemptableContainers {
-    List<RMContainer> containers;
+    Map<ApplicationId, List<RMContainer>> containersByApp;
     int numAMContainers;
     int maxAMContainers;
 
     PreemptableContainers(int maxAMContainers) {
-      containers = new ArrayList<>();
       numAMContainers = 0;
       this.maxAMContainers = maxAMContainers;
+      this.containersByApp = new HashMap<>();
     }
 
     /**
@@ -254,7 +260,7 @@
      * @param container the container to add
      * @return true if success; false otherwise
      */
-    private boolean addContainer(RMContainer container) {
+    private boolean addContainer(RMContainer container, ApplicationId appId) {
       if (container.isAMContainer()) {
         numAMContainers++;
         if (numAMContainers >= maxAMContainers) {
@@ -262,8 +268,30 @@
         }
       }
 
-      containers.add(container);
+      if (!containersByApp.containsKey(appId)) {
+        containersByApp.put(appId, new ArrayList<>());
+      }
+
+      containersByApp.get(appId).add(container);
       return true;
     }
+
+    private List<RMContainer> getAllContainers() {
+      List<RMContainer> allContainers = new ArrayList<>();
+      for (List<RMContainer> containersForApp : containersByApp.values()) {
+        allContainers.addAll(containersForApp);
+      }
+      return allContainers;
+    }
+
+    private Resource getResourcesToPreemptForApp(ApplicationId appId) {
+      Resource resourcesToPreempt = Resources.createResource(0, 0);
+      if (containersByApp.containsKey(appId)) {
+        for (RMContainer container : containersByApp.get(appId)) {
+          Resources.addTo(resourcesToPreempt, container.getAllocatedResource());
+        }
+      }
+      return resourcesToPreempt;
+    }
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 3163024..ac5d9fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -278,11 +278,12 @@
     preemptHalfResources(queue2);
   }
 
-  private void verifyPreemption(int numStarvedAppContainers)
+  private void verifyPreemption(int numStarvedAppContainers,
+                                int numGreedyAppContainers)
       throws InterruptedException {
     // Sleep long enough for four containers to be preempted.
     for (int i = 0; i < 1000; i++) {
-      if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
+      if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) {
         break;
       }
       Thread.sleep(10);
@@ -290,12 +291,12 @@
 
     // Post preemption, verify the greedyApp has the correct # of containers.
     assertEquals("Incorrect # of containers on the greedy app",
-        2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
+            numGreedyAppContainers, greedyApp.getLiveContainers().size());
 
     // Verify the queue metrics are set appropriately. The greedyApp started
     // with 8 1GB, 1vcore containers.
     assertEquals("Incorrect # of preempted containers in QueueMetrics",
-        8 - 2 * numStarvedAppContainers,
+        8 - numGreedyAppContainers,
         greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
 
     // Verify the node is reserved for the starvingApp
@@ -340,7 +341,7 @@
     String queue = "root.preemptable.child-1";
     submitApps(queue, queue);
     if (fairsharePreemption) {
-      verifyPreemption(2);
+      verifyPreemption(2, 4);
     } else {
       verifyNoPreemption();
     }
@@ -349,13 +350,13 @@
   @Test
   public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
     submitApps("root.preemptable.child-1", "root.preemptable.child-2");
-    verifyPreemption(2);
+    verifyPreemption(2, 4);
   }
 
   @Test
   public void testPreemptionBetweenNonSiblingQueues() throws Exception {
     submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
-    verifyPreemption(2);
+    verifyPreemption(2, 4);
   }
 
   @Test
@@ -389,7 +390,7 @@
     setNumAMContainersPerNode(2);
     preemptHalfResources("root.preemptable.child-2");
 
-    verifyPreemption(2);
+    verifyPreemption(2, 4);
 
     ArrayList<RMContainer> containers =
         (ArrayList<RMContainer>) starvingApp.getLiveContainers();
@@ -402,6 +403,22 @@
   }
 
   @Test
+  public void testAppNotPreemptedBelowFairShare() throws Exception {
+    takeAllResources("root.preemptable.child-1");
+    tryPreemptMoreThanFairShare("root.preemptable.child-2");
+  }
+
+  private void tryPreemptMoreThanFairShare(String queueName)
+          throws InterruptedException {
+    ApplicationAttemptId appAttemptId
+            = createSchedulingRequest(3 * GB, 3, queueName, "default",
+            NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
+    starvingApp = scheduler.getSchedulerApp(appAttemptId);
+
+    verifyPreemption(1, 5);
+  }
+
+  @Test
   public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
       throws InterruptedException {
     // Run this test only for fairshare preemption
@@ -414,10 +431,10 @@
 
     // Submit a job so half the resources go to parent's sibling
     preemptHalfResources("root.preemptable-sibling");
-    verifyPreemption(2);
+    verifyPreemption(2, 4);
 
     // Submit a job to the child's sibling to force preemption from the child
     preemptHalfResources("root.preemptable.child-2");
-    verifyPreemption(1);
+    verifyPreemption(1, 2);
   }
 }