YARN-3992. TestApplicationPriority.testApplicationPriorityAllocation fails intermittently. (Contributed by Sunil G)

(cherry picked from commit df9e7280db58baddd02d6e23d3685efb8d5f1b97)
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7988536..aff5b2a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -680,6 +680,9 @@
     YARN-433. When RM is catching up with node updates then it should not expire
     acquired containers. (Xuan Gong via zxu)
 
+    YARN-3992. TestApplicationPriority.testApplicationPriorityAllocation fails 
+    intermittently. (Contributed by Sunil G)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 0e253600..5660b78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -309,16 +309,20 @@
   public ApplicationAttemptId getApplicationAttemptId() {
     return this.attemptId;
   }
-  
+
   public List<Container> allocateAndWaitForContainers(int nContainer,
       int memory, MockNM nm) throws Exception {
+    return allocateAndWaitForContainers("ANY", nContainer, memory, nm);
+  }
+
+  public List<Container> allocateAndWaitForContainers(String host,
+      int nContainer, int memory, MockNM nm) throws Exception {
     // AM request for containers
-    allocate("ANY", memory, nContainer, null);
+    allocate(host, memory, nContainer, null);
     // kick the scheduler
     nm.nodeHeartbeat(true);
-    List<Container> conts =
-        allocate(new ArrayList<ResourceRequest>(), null)
-            .getAllocatedContainers();
+    List<Container> conts = allocate(new ArrayList<ResourceRequest>(), null)
+        .getAllocatedContainers();
     while (conts.size() < nContainer) {
       nm.nodeHeartbeat(true);
       conts.addAll(allocate(new ArrayList<ResourceRequest>(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 80eff06..db094e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -22,20 +22,16 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -44,7 +40,6 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@@ -59,8 +54,6 @@
 import org.junit.Test;
 
 public class TestApplicationPriority {
-  private static final Log LOG = LogFactory
-      .getLog(TestApplicationPriority.class);
   private final int GB = 1024;
 
   private YarnConfiguration conf;
@@ -166,19 +159,10 @@
     MockAM am1 = MockRM.launchAM(app1, rm, nm1);
     am1.registerAppAttempt();
 
-    // add request for containers
-    am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 7);
-    AllocateResponse alloc1Response = am1.schedule(); // send the request
+    // allocate 7 containers for App1
+    List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
+        7, 2 * GB, nm1);
 
-    // kick the scheduler, 7 containers will be allocated for App1
-    nm1.nodeHeartbeat(true);
-    while (alloc1Response.getAllocatedContainers().size() < 1) {
-      LOG.info("Waiting for containers to be created for app 1...");
-      Thread.sleep(100);
-      alloc1Response = am1.schedule();
-    }
-
-    List<Container> allocated1 = alloc1Response.getAllocatedContainers();
     Assert.assertEquals(7, allocated1.size());
     Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
 
@@ -193,9 +177,7 @@
     RMApp app2 = rm.submitApp(1 * GB, appPriority2);
 
     // kick the scheduler, 1 GB which was free is given to AM of App2
-    nm1.nodeHeartbeat(true);
-    MockAM am2 = rm.sendAMLaunched(app2.getCurrentAppAttempt()
-        .getAppAttemptId());
+    MockAM am2 = MockRM.launchAM(app2, rm, nm1);
     am2.registerAppAttempt();
 
     // check node report, 16 GB used and 0 GB available
@@ -210,7 +192,7 @@
     FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
         .get(app1.getApplicationId()).getCurrentAppAttempt();
 
-    // kill 2 containers to free up some space
+    // kill 2 containers of App1 to free up some space
     int counter = 0;
     for (Container c : allocated1) {
       if (++counter > 2) {
@@ -224,22 +206,16 @@
     Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
     Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
 
-    // add request for containers App1
-    am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 10);
-    am1.schedule(); // send the request for App1
-
-    // add request for containers App2
-    am2.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 3);
-    AllocateResponse alloc1Response4 = am2.schedule(); // send the request
+    // send updated request for App1
+    am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>());
 
     // kick the scheduler, since App2 priority is more than App1, it will get
     // remaining cluster space.
-    nm1.nodeHeartbeat(true);
-    while (alloc1Response4.getAllocatedContainers().size() < 1) {
-      LOG.info("Waiting for containers to be created for app 2...");
-      Thread.sleep(100);
-      alloc1Response4 = am2.schedule();
-    }
+    List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1",
+        2, 2 * GB, nm1);
+
+    // App2 has got 2 containers now.
+    Assert.assertEquals(2, allocated2.size());
 
     // check node report, 16 GB used and 0 GB available
     report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
@@ -268,19 +244,10 @@
     MockAM am1 = MockRM.launchAM(app1, rm, nm1);
     am1.registerAppAttempt();
 
-    // add request for containers
-    am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 7);
-    AllocateResponse alloc1Response = am1.schedule(); // send the request
-
     // kick the scheduler, 7 containers will be allocated for App1
-    nm1.nodeHeartbeat(true);
-    while (alloc1Response.getAllocatedContainers().size() < 1) {
-      LOG.info("Waiting for containers to be created for app 1...");
-      Thread.sleep(100);
-      alloc1Response = am1.schedule();
-    }
+    List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
+        7, 1 * GB, nm1);
 
-    List<Container> allocated1 = alloc1Response.getAllocatedContainers();
     Assert.assertEquals(7, allocated1.size());
     Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
 
@@ -308,9 +275,7 @@
     rm.killApp(app1.getApplicationId());
 
     // kick the scheduler, app3 (high among pending) gets free space
-    nm1.nodeHeartbeat(true);
-    MockAM am3 = rm.sendAMLaunched(app3.getCurrentAppAttempt()
-        .getAppAttemptId());
+    MockAM am3 = MockRM.launchAM(app3, rm, nm1);
     am3.registerAppAttempt();
 
     // check node report, 1 GB used and 7 GB available