YARN-4888. Changes in scheduler to identify resource-requests explicitly by allocation-id. (Subru Krishnan via wangda)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 6c337cf..d9ce11f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -303,7 +303,7 @@
   optional bool relax_locality = 5 [default = true];
   optional string node_label_expression = 6;
   optional ExecutionTypeRequestProto execution_type_request = 7;
-  optional int64 allocation_request_id = 8 [default = -1];
+  optional int64 allocation_request_id = 8 [default = 0];
 }
 
 message ExecutionTypeRequestProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 5aa1c41..8ecbea7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -236,7 +236,8 @@
 
   public static Container newContainer(ContainerId containerId, NodeId nodeId,
       String nodeHttpAddress, Resource resource, Priority priority,
-      Token containerToken, ExecutionType executionType) {
+      Token containerToken, ExecutionType executionType,
+      long allocationRequestId) {
     Container container = recordFactory.newRecordInstance(Container.class);
     container.setId(containerId);
     container.setNodeId(nodeId);
@@ -245,6 +246,7 @@
     container.setPriority(priority);
     container.setContainerToken(containerToken);
     container.setExecutionType(executionType);
+    container.setAllocationRequestId(allocationRequestId);
     return container;
   }
 
@@ -252,7 +254,15 @@
       String nodeHttpAddress, Resource resource, Priority priority,
       Token containerToken) {
     return newContainer(containerId, nodeId, nodeHttpAddress, resource,
-        priority, containerToken, ExecutionType.GUARANTEED);
+        priority, containerToken, ExecutionType.GUARANTEED, 0);
+  }
+
+  public static Container newContainer(ContainerId containerId, NodeId nodeId,
+      String nodeHttpAddress, Resource resource, Priority priority,
+      Token containerToken, long allocationRequestId) {
+    return newContainer(containerId, nodeId, nodeHttpAddress, resource,
+        priority, containerToken, ExecutionType.GUARANTEED,
+        allocationRequestId);
   }
 
   public static <T extends Token> T newToken(Class<T> tokenClass,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index ce5bda0..4723233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -163,7 +163,8 @@
     Container container = BuilderUtils.newContainer(
         cId, nodeId, nodeId.getHost() + ":" + webpagePort,
         capability, rr.getPriority(), containerToken,
-        containerTokenIdentifier.getExecutionType());
+        containerTokenIdentifier.getExecutionType(),
+        rr.getAllocationRequestId());
     return container;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
index b4988be..4b640ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
@@ -30,9 +30,7 @@
     Comparable<SchedulerRequestKey> {
 
   private final Priority priority;
-
-  public static final SchedulerRequestKey UNDEFINED =
-      new SchedulerRequestKey(Priority.UNDEFINED);
+  private final long allocationRequestId;
 
   /**
    * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
@@ -40,7 +38,8 @@
    * @return SchedulerRequestKey
    */
   public static SchedulerRequestKey create(ResourceRequest req) {
-    return new SchedulerRequestKey(req.getPriority());
+    return new SchedulerRequestKey(req.getPriority(),
+        req.getAllocationRequestId());
   }
 
   /**
@@ -50,11 +49,13 @@
    * @return SchedulerRequestKey
    */
   public static SchedulerRequestKey extractFrom(Container container) {
-    return new SchedulerRequestKey(container.getPriority());
+    return new SchedulerRequestKey(container.getPriority(),
+        container.getAllocationRequestId());
   }
 
-  private SchedulerRequestKey(Priority priority) {
+  private SchedulerRequestKey(Priority priority, long allocationRequestId) {
     this.priority = priority;
+    this.allocationRequestId = allocationRequestId;
   }
 
   /**
@@ -66,6 +67,15 @@
     return priority;
   }
 
+  /**
+   * Get the Id of the associated {@link ResourceRequest}.
+   *
+   * @return the Id of the associated {@link ResourceRequest}
+   */
+  public long getAllocationRequestId() {
+    return allocationRequestId;
+  }
+
   @Override
   public int compareTo(SchedulerRequestKey o) {
     if (o == null) {
@@ -75,7 +85,12 @@
         return 1;
       }
     }
-    return o.getPriority().compareTo(priority);
+    int priorityCompare = o.getPriority().compareTo(priority);
+    // we first sort by priority and then by allocationRequestId
+    if (priorityCompare != 0) {
+      return priorityCompare;
+    }
+    return Long.compare(allocationRequestId, o.getAllocationRequestId());
   }
 
   @Override
@@ -88,12 +103,20 @@
     }
 
     SchedulerRequestKey that = (SchedulerRequestKey) o;
-    return getPriority().equals(that.getPriority());
 
+    if (getAllocationRequestId() != that.getAllocationRequestId()) {
+      return false;
+    }
+    return getPriority() != null ?
+        getPriority().equals(that.getPriority()) :
+        that.getPriority() == null;
   }
 
   @Override
   public int hashCode() {
-    return getPriority().hashCode();
+    int result = getPriority() != null ? getPriority().hashCode() : 0;
+    result = 31 * result + (int) (getAllocationRequestId() ^ (
+        getAllocationRequestId() >>> 32));
+    return result;
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 21114f7..8d4042c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,10 +37,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
-
-
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -54,9 +54,6 @@
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Allocate normal (new) containers, considers locality/label, etc. Using
  * delayed scheduling mechanism to get better locality allocation.
@@ -681,8 +678,10 @@
             application.getNewContainerId());
 
     // Create the container
-    return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-        .getHttpAddress(), capability, schedulerKey.getPriority(), null);
+    return BuilderUtils.newContainer(containerId, nodeId,
+        node.getRMNode().getHttpAddress(), capability,
+        schedulerKey.getPriority(), null,
+        schedulerKey.getAllocationRequestId());
   }
   
   private ContainerAllocation handleNewContainerAllocation(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 8f074cd..9e5a807 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -20,8 +20,8 @@
 
 import java.io.Serializable;
 import java.text.DecimalFormat;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -55,8 +55,8 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -495,9 +495,10 @@
         getApplicationAttemptId(), getNewContainerId());
 
     // Create the container
-    Container container =
-        BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-            .getHttpAddress(), capability, schedulerKey.getPriority(), null);
+    Container container = BuilderUtils.newContainer(containerId, nodeId,
+        node.getRMNode().getHttpAddress(), capability,
+        schedulerKey.getPriority(), null,
+        schedulerKey.getAllocationRequestId());
 
     return container;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index fe8d0af..2863a97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -710,9 +710,10 @@
             .getApplicationAttemptId(), application.getNewContainerId());
 
         // Create the container
-        Container container =
-            BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-              .getHttpAddress(), capability, schedulerKey.getPriority(), null);
+        Container container = BuilderUtils.newContainer(containerId, nodeId,
+            node.getRMNode().getHttpAddress(), capability,
+            schedulerKey.getPriority(), null,
+            schedulerKey.getAllocationRequestId());
         
         // Allocate!
         
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 8f6a6c1..1b11472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -128,7 +128,13 @@
 
   public void addRequests(String[] hosts, int memory, int priority,
       int containers) throws Exception {
-    requests.addAll(createReq(hosts, memory, priority, containers));
+    addRequests(hosts, memory, priority, containers, 0L);
+  }
+
+  public void addRequests(String[] hosts, int memory, int priority,
+      int containers, long allocationRequestId) throws Exception {
+    requests.addAll(
+        createReq(hosts, memory, priority, containers, allocationRequestId));
   }
 
   public AllocateResponse schedule() throws Exception {
@@ -159,17 +165,19 @@
       List<ContainerId> releases, String labelExpression) throws Exception {
     List<ResourceRequest> reqs =
         createReq(new String[] { host }, memory, priority, numContainers,
-            labelExpression);
+            labelExpression, 0L);
     return allocate(reqs, releases);
   }
   
-  public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
-      int containers) throws Exception {
-    return createReq(hosts, memory, priority, containers, null);
+  public List<ResourceRequest> createReq(String[] hosts, int memory,
+      int priority, int containers, long allocationRequestId) throws Exception {
+    return createReq(hosts, memory, priority, containers, null,
+        allocationRequestId);
   }
 
-  public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
-      int containers, String labelExpression) throws Exception {
+  public List<ResourceRequest> createReq(String[] hosts, int memory,
+      int priority, int containers, String labelExpression,
+      long allocationRequestId) throws Exception {
     List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
     if (hosts != null) {
       for (String host : hosts) {
@@ -178,10 +186,12 @@
           ResourceRequest hostReq =
               createResourceReq(host, memory, priority, containers,
                   labelExpression);
+          hostReq.setAllocationRequestId(allocationRequestId);
           reqs.add(hostReq);
           ResourceRequest rackReq =
               createResourceReq("/default-rack", memory, priority, containers,
                   labelExpression);
+          rackReq.setAllocationRequestId(allocationRequestId);
           reqs.add(rackReq);
         }
       }
@@ -189,6 +199,7 @@
 
     ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory,
         priority, containers, labelExpression);
+    offRackReq.setAllocationRequestId(allocationRequestId);
     reqs.add(offRackReq);
     return reqs;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index a1c6294..503ea34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -18,12 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.TreeSet;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,4 +75,34 @@
         blacklistRemovals);
     Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
   }
+
+  @Test
+  public void testSchedulerRequestKeyOrdering() {
+    TreeSet<SchedulerRequestKey> ts = new TreeSet<>();
+    ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 1));
+    ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 2));
+    ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 4));
+    ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 3));
+    ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 5));
+    ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 6));
+    Iterator<SchedulerRequestKey> iter = ts.iterator();
+    SchedulerRequestKey sk = iter.next();
+    Assert.assertEquals(0, sk.getPriority().getPriority());
+    Assert.assertEquals(3, sk.getAllocationRequestId());
+    sk = iter.next();
+    Assert.assertEquals(0, sk.getPriority().getPriority());
+    Assert.assertEquals(4, sk.getAllocationRequestId());
+    sk = iter.next();
+    Assert.assertEquals(1, sk.getPriority().getPriority());
+    Assert.assertEquals(1, sk.getAllocationRequestId());
+    sk = iter.next();
+    Assert.assertEquals(1, sk.getPriority().getPriority());
+    Assert.assertEquals(2, sk.getAllocationRequestId());
+    sk = iter.next();
+    Assert.assertEquals(2, sk.getPriority().getPriority());
+    Assert.assertEquals(5, sk.getAllocationRequestId());
+    sk = iter.next();
+    Assert.assertEquals(2, sk.getPriority().getPriority());
+    Assert.assertEquals(6, sk.getAllocationRequestId());
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java
new file mode 100644
index 0000000..e60fd6f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for checking Scheduling with allocationRequestId, i.e. mapping of
+ * allocated containers to the original client {@code ResourceRequest}.
+ */
+public class TestSchedulingWithAllocationRequestId
+    extends ParameterizedSchedulerTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
+  private static final int GB = 1024;
+
+  @Test
+  public void testMultipleAllocationRequestIds() throws Exception {
+    configureScheduler();
+    YarnConfiguration conf = getConf();
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+
+      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+      MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
+      RMApp app1 = rm.submitApp(2048);
+      // kick the scheduling
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+      am1.registerAppAttempt();
+
+      // add request for containers with id 10 & 20
+      am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L);
+      AllocateResponse allocResponse = am1.schedule(); // send the request
+      am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
+      allocResponse = am1.schedule(); // send the request
+
+      // check if request id 10 is satisfied
+      nm1.nodeHeartbeat(true);
+      allocResponse = am1.schedule(); // send the request
+      while (allocResponse.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am1.schedule();
+      }
+      List<Container> allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(1, allocated.size());
+      checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
+
+      // check now if request id 20 is satisfied
+      nm2.nodeHeartbeat(true);
+      while (allocResponse.getAllocatedContainers().size() < 2) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am1.schedule();
+      }
+
+      allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(2, allocated.size());
+      for (Container container : allocated) {
+        checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
+      }
+    } finally {
+      if (rm != null) {
+        rm.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleAllocationRequestDiffPriority() throws Exception {
+    configureScheduler();
+    YarnConfiguration conf = getConf();
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+
+      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+      MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
+      RMApp app1 = rm.submitApp(2048);
+      // kick the scheduling
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+      am1.registerAppAttempt();
+
+      // add request for containers with id 10 & 20
+      am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L);
+      AllocateResponse allocResponse = am1.schedule(); // send the request
+      am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
+      allocResponse = am1.schedule(); // send the request
+
+      // check if request id 20 is satisfied first
+      nm2.nodeHeartbeat(true);
+      while (allocResponse.getAllocatedContainers().size() < 2) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am1.schedule();
+      }
+
+      List<Container> allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(2, allocated.size());
+      for (Container container : allocated) {
+        checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
+      }
+
+      // check now if request id 10 is satisfied
+      nm1.nodeHeartbeat(true);
+      allocResponse = am1.schedule(); // send the request
+      while (allocResponse.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am1.schedule();
+      }
+      allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(1, allocated.size());
+      checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
+    } finally {
+      if (rm != null) {
+        rm.stop();
+      }
+    }
+  }
+
+  private void checkAllocatedContainer(Container allocated, int memory,
+      NodeId nodeId, long allocationRequestId) {
+    Assert.assertEquals(memory, allocated.getResource().getMemorySize());
+    Assert.assertEquals(nodeId, allocated.getNodeId());
+    Assert.assertEquals(allocationRequestId,
+        allocated.getAllocationRequestId());
+  }
+
+  @Test
+  public void testMultipleAppsWithAllocationReqId() throws Exception {
+    configureScheduler();
+    YarnConfiguration conf = getConf();
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+
+      // Register node1
+      String host0 = "host_0";
+      String host1 = "host_1";
+      MockNM nm1 =
+          new MockNM(host0 + ":1234", 8 * GB, rm.getResourceTrackerService());
+      nm1.registerNode();
+
+      // Register node2
+      MockNM nm2 =
+          new MockNM(host1 + ":2351", 8 * GB, rm.getResourceTrackerService());
+      nm2.registerNode();
+
+      // submit 1st app
+      RMApp app1 = rm.submitApp(1 * GB, "user_0", "a1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+      // Submit app1 RR with allocationReqId = 5
+      int numContainers = 1;
+      am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers,
+          5L);
+      AllocateResponse allocResponse = am1.schedule();
+
+      // wait for containers to be allocated.
+      nm1.nodeHeartbeat(true);
+      allocResponse = am1.schedule(); // send the request
+      while (allocResponse.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am1.schedule();
+      }
+
+      List<Container> allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(1, allocated.size());
+      checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);
+
+      // Submit another application
+      RMApp app2 = rm.submitApp(1 * GB, "user_1", "a2");
+      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+      // Submit app2 RR with allocationReqId = 5
+      am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers,
+          5L);
+      am2.schedule();
+
+      // wait for containers to be allocated.
+      nm2.nodeHeartbeat(true);
+      allocResponse = am2.schedule(); // send the request
+      while (allocResponse.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am2.schedule();
+      }
+
+      allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(1, allocated.size());
+      checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);
+
+      // Now submit app2 RR with allocationReqId = 10
+      am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers,
+          10L);
+      am2.schedule();
+
+      // wait for containers to be allocated.
+      nm1.nodeHeartbeat(true);
+      allocResponse = am2.schedule(); // send the request
+      while (allocResponse.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am2.schedule();
+      }
+
+      allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(1, allocated.size());
+      checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);
+
+      // Now submit app1 RR with allocationReqId = 10
+      am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers,
+          10L);
+      am1.schedule();
+
+      // wait for containers to be allocated.
+      nm2.nodeHeartbeat(true);
+      allocResponse = am1.schedule(); // send the request
+      while (allocResponse.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(100);
+        allocResponse = am1.schedule();
+      }
+
+      allocated = allocResponse.getAllocatedContainers();
+      Assert.assertEquals(1, allocated.size());
+      checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
+    } finally {
+      if (rm != null) {
+        rm.stop();
+      }
+    }
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index c808b5a..66e833f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -51,7 +51,6 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -412,4 +411,11 @@
     return SchedulerRequestKey.create(ResourceRequest.newInstance(
         Priority.newInstance(pri), null, null, 0));
   }
+
+  public static SchedulerRequestKey toSchedulerKey(Priority pri,
+      long allocationRequestId) {
+    ResourceRequest req = ResourceRequest.newInstance(pri, null, null, 0);
+    req.setAllocationRequestId(allocationRequestId);
+    return SchedulerRequestKey.create(req);
+  }
 }