YARN-4888. Changes in scheduler to identify resource-requests explicitly by allocation-id. (Subru Krishnan via wangda)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 6c337cf..d9ce11f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -303,7 +303,7 @@
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
optional ExecutionTypeRequestProto execution_type_request = 7;
- optional int64 allocation_request_id = 8 [default = -1];
+ optional int64 allocation_request_id = 8 [default = 0];
}
message ExecutionTypeRequestProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 5aa1c41..8ecbea7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -236,7 +236,8 @@
public static Container newContainer(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
- Token containerToken, ExecutionType executionType) {
+ Token containerToken, ExecutionType executionType,
+ long allocationRequestId) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@@ -245,6 +246,7 @@
container.setPriority(priority);
container.setContainerToken(containerToken);
container.setExecutionType(executionType);
+ container.setAllocationRequestId(allocationRequestId);
return container;
}
@@ -252,7 +254,15 @@
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
return newContainer(containerId, nodeId, nodeHttpAddress, resource,
- priority, containerToken, ExecutionType.GUARANTEED);
+ priority, containerToken, ExecutionType.GUARANTEED, 0);
+ }
+
+ public static Container newContainer(ContainerId containerId, NodeId nodeId,
+ String nodeHttpAddress, Resource resource, Priority priority,
+ Token containerToken, long allocationRequestId) {
+ return newContainer(containerId, nodeId, nodeHttpAddress, resource,
+ priority, containerToken, ExecutionType.GUARANTEED,
+ allocationRequestId);
}
public static <T extends Token> T newToken(Class<T> tokenClass,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index ce5bda0..4723233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -163,7 +163,8 @@
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
capability, rr.getPriority(), containerToken,
- containerTokenIdentifier.getExecutionType());
+ containerTokenIdentifier.getExecutionType(),
+ rr.getAllocationRequestId());
return container;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
index b4988be..4b640ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
@@ -30,9 +30,7 @@
Comparable<SchedulerRequestKey> {
private final Priority priority;
-
- public static final SchedulerRequestKey UNDEFINED =
- new SchedulerRequestKey(Priority.UNDEFINED);
+ private final long allocationRequestId;
/**
* Factory method to generate a SchedulerRequestKey from a ResourceRequest.
@@ -40,7 +38,8 @@
* @return SchedulerRequestKey
*/
public static SchedulerRequestKey create(ResourceRequest req) {
- return new SchedulerRequestKey(req.getPriority());
+ return new SchedulerRequestKey(req.getPriority(),
+ req.getAllocationRequestId());
}
/**
@@ -50,11 +49,13 @@
* @return SchedulerRequestKey
*/
public static SchedulerRequestKey extractFrom(Container container) {
- return new SchedulerRequestKey(container.getPriority());
+ return new SchedulerRequestKey(container.getPriority(),
+ container.getAllocationRequestId());
}
- private SchedulerRequestKey(Priority priority) {
+ private SchedulerRequestKey(Priority priority, long allocationRequestId) {
this.priority = priority;
+ this.allocationRequestId = allocationRequestId;
}
/**
@@ -66,6 +67,15 @@
return priority;
}
+ /**
+ * Get the Id of the associated {@link ResourceRequest}.
+ *
+ * @return the Id of the associated {@link ResourceRequest}
+ */
+ public long getAllocationRequestId() {
+ return allocationRequestId;
+ }
+
@Override
public int compareTo(SchedulerRequestKey o) {
if (o == null) {
@@ -75,7 +85,12 @@
return 1;
}
}
- return o.getPriority().compareTo(priority);
+ int priorityCompare = o.getPriority().compareTo(priority);
+ // we first sort by priority and then by allocationRequestId
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ return Long.compare(allocationRequestId, o.getAllocationRequestId());
}
@Override
@@ -88,12 +103,20 @@
}
SchedulerRequestKey that = (SchedulerRequestKey) o;
- return getPriority().equals(that.getPriority());
+ if (getAllocationRequestId() != that.getAllocationRequestId()) {
+ return false;
+ }
+ return getPriority() != null ?
+ getPriority().equals(that.getPriority()) :
+ that.getPriority() == null;
}
@Override
public int hashCode() {
- return getPriority().hashCode();
+ int result = getPriority() != null ? getPriority().hashCode() : 0;
+ result = 31 * result + (int) (getAllocationRequestId() ^ (
+ getAllocationRequestId() >>> 32));
+ return result;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 21114f7..8d4042c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,10 +37,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
-
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -54,9 +54,6 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Allocate normal (new) containers, considers locality/label, etc. Using
* delayed scheduling mechanism to get better locality allocation.
@@ -681,8 +678,10 @@
application.getNewContainerId());
// Create the container
- return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
- .getHttpAddress(), capability, schedulerKey.getPriority(), null);
+ return BuilderUtils.newContainer(containerId, nodeId,
+ node.getRMNode().getHttpAddress(), capability,
+ schedulerKey.getPriority(), null,
+ schedulerKey.getAllocationRequestId());
}
private ContainerAllocation handleNewContainerAllocation(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 8f074cd..9e5a807 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -20,8 +20,8 @@
import java.io.Serializable;
import java.text.DecimalFormat;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -55,8 +55,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -495,9 +495,10 @@
getApplicationAttemptId(), getNewContainerId());
// Create the container
- Container container =
- BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
- .getHttpAddress(), capability, schedulerKey.getPriority(), null);
+ Container container = BuilderUtils.newContainer(containerId, nodeId,
+ node.getRMNode().getHttpAddress(), capability,
+ schedulerKey.getPriority(), null,
+ schedulerKey.getAllocationRequestId());
return container;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index fe8d0af..2863a97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -710,9 +710,10 @@
.getApplicationAttemptId(), application.getNewContainerId());
// Create the container
- Container container =
- BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
- .getHttpAddress(), capability, schedulerKey.getPriority(), null);
+ Container container = BuilderUtils.newContainer(containerId, nodeId,
+ node.getRMNode().getHttpAddress(), capability,
+ schedulerKey.getPriority(), null,
+ schedulerKey.getAllocationRequestId());
// Allocate!
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 8f6a6c1..1b11472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -128,7 +128,13 @@
public void addRequests(String[] hosts, int memory, int priority,
int containers) throws Exception {
- requests.addAll(createReq(hosts, memory, priority, containers));
+ addRequests(hosts, memory, priority, containers, 0L);
+ }
+
+ public void addRequests(String[] hosts, int memory, int priority,
+ int containers, long allocationRequestId) throws Exception {
+ requests.addAll(
+ createReq(hosts, memory, priority, containers, allocationRequestId));
}
public AllocateResponse schedule() throws Exception {
@@ -159,17 +165,19 @@
List<ContainerId> releases, String labelExpression) throws Exception {
List<ResourceRequest> reqs =
createReq(new String[] { host }, memory, priority, numContainers,
- labelExpression);
+ labelExpression, 0L);
return allocate(reqs, releases);
}
- public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
- int containers) throws Exception {
- return createReq(hosts, memory, priority, containers, null);
+ public List<ResourceRequest> createReq(String[] hosts, int memory,
+ int priority, int containers, long allocationRequestId) throws Exception {
+ return createReq(hosts, memory, priority, containers, null,
+ allocationRequestId);
}
- public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
- int containers, String labelExpression) throws Exception {
+ public List<ResourceRequest> createReq(String[] hosts, int memory,
+ int priority, int containers, String labelExpression,
+ long allocationRequestId) throws Exception {
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
if (hosts != null) {
for (String host : hosts) {
@@ -178,10 +186,12 @@
ResourceRequest hostReq =
createResourceReq(host, memory, priority, containers,
labelExpression);
+ hostReq.setAllocationRequestId(allocationRequestId);
reqs.add(hostReq);
ResourceRequest rackReq =
createResourceReq("/default-rack", memory, priority, containers,
labelExpression);
+ rackReq.setAllocationRequestId(allocationRequestId);
reqs.add(rackReq);
}
}
@@ -189,6 +199,7 @@
ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory,
priority, containers, labelExpression);
+ offRackReq.setAllocationRequestId(allocationRequestId);
reqs.add(offRackReq);
return reqs;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index a1c6294..503ea34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -18,12 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.junit.Assert;
import org.junit.Test;
@@ -70,4 +75,34 @@
blacklistRemovals);
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
}
+
+ @Test
+ public void testSchedulerRequestKeyOrdering() {
+ TreeSet<SchedulerRequestKey> ts = new TreeSet<>();
+ ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 1));
+ ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 2));
+ ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 4));
+ ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 3));
+ ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 5));
+ ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 6));
+ Iterator<SchedulerRequestKey> iter = ts.iterator();
+ SchedulerRequestKey sk = iter.next();
+ Assert.assertEquals(0, sk.getPriority().getPriority());
+ Assert.assertEquals(3, sk.getAllocationRequestId());
+ sk = iter.next();
+ Assert.assertEquals(0, sk.getPriority().getPriority());
+ Assert.assertEquals(4, sk.getAllocationRequestId());
+ sk = iter.next();
+ Assert.assertEquals(1, sk.getPriority().getPriority());
+ Assert.assertEquals(1, sk.getAllocationRequestId());
+ sk = iter.next();
+ Assert.assertEquals(1, sk.getPriority().getPriority());
+ Assert.assertEquals(2, sk.getAllocationRequestId());
+ sk = iter.next();
+ Assert.assertEquals(2, sk.getPriority().getPriority());
+ Assert.assertEquals(5, sk.getAllocationRequestId());
+ sk = iter.next();
+ Assert.assertEquals(2, sk.getPriority().getPriority());
+ Assert.assertEquals(6, sk.getAllocationRequestId());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java
new file mode 100644
index 0000000..e60fd6f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for checking Scheduling with allocationRequestId, i.e. mapping of
+ * allocated containers to the original client {@code ResourceRequest}.
+ */
+public class TestSchedulingWithAllocationRequestId
+ extends ParameterizedSchedulerTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
+ private static final int GB = 1024;
+
+ @Test
+ public void testMultipleAllocationRequestIds() throws Exception {
+ configureScheduler();
+ YarnConfiguration conf = getConf();
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+ MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
+ RMApp app1 = rm.submitApp(2048);
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ // add request for containers with id 10 & 20
+ am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L);
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
+ allocResponse = am1.schedule(); // send the request
+
+ // check if request id 10 is satisfied
+ nm1.nodeHeartbeat(true);
+ allocResponse = am1.schedule(); // send the request
+ while (allocResponse.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+ List<Container> allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocated.size());
+ checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
+
+ // check now if request id 20 is satisfied
+ nm2.nodeHeartbeat(true);
+ while (allocResponse.getAllocatedContainers().size() < 2) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+
+ allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(2, allocated.size());
+ for (Container container : allocated) {
+ checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
+ }
+ } finally {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleAllocationRequestDiffPriority() throws Exception {
+ configureScheduler();
+ YarnConfiguration conf = getConf();
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+ MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
+ RMApp app1 = rm.submitApp(2048);
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ // add request for containers with id 10 & 20
+ am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L);
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
+ allocResponse = am1.schedule(); // send the request
+
+ // check if request id 20 is satisfied first
+ nm2.nodeHeartbeat(true);
+ while (allocResponse.getAllocatedContainers().size() < 2) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+
+ List<Container> allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(2, allocated.size());
+ for (Container container : allocated) {
+ checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
+ }
+
+ // check now if request id 10 is satisfied
+ nm1.nodeHeartbeat(true);
+ allocResponse = am1.schedule(); // send the request
+ while (allocResponse.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+ allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocated.size());
+ checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
+ } finally {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+ }
+
+ private void checkAllocatedContainer(Container allocated, int memory,
+ NodeId nodeId, long allocationRequestId) {
+ Assert.assertEquals(memory, allocated.getResource().getMemorySize());
+ Assert.assertEquals(nodeId, allocated.getNodeId());
+ Assert.assertEquals(allocationRequestId,
+ allocated.getAllocationRequestId());
+ }
+
+ @Test
+ public void testMultipleAppsWithAllocationReqId() throws Exception {
+ configureScheduler();
+ YarnConfiguration conf = getConf();
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+
+ // Register node1
+ String host0 = "host_0";
+ String host1 = "host_1";
+ MockNM nm1 =
+ new MockNM(host0 + ":1234", 8 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ // Register node2
+ MockNM nm2 =
+ new MockNM(host1 + ":2351", 8 * GB, rm.getResourceTrackerService());
+ nm2.registerNode();
+
+ // submit 1st app
+ RMApp app1 = rm.submitApp(1 * GB, "user_0", "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // Submit app1 RR with allocationReqId = 5
+ int numContainers = 1;
+ am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers,
+ 5L);
+ AllocateResponse allocResponse = am1.schedule();
+
+ // wait for containers to be allocated.
+ nm1.nodeHeartbeat(true);
+ allocResponse = am1.schedule(); // send the request
+ while (allocResponse.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+
+ List<Container> allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocated.size());
+ checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);
+
+ // Submit another application
+ RMApp app2 = rm.submitApp(1 * GB, "user_1", "a2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // Submit app2 RR with allocationReqId = 5
+ am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers,
+ 5L);
+ am2.schedule();
+
+ // wait for containers to be allocated.
+ nm2.nodeHeartbeat(true);
+ allocResponse = am2.schedule(); // send the request
+ while (allocResponse.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am2.schedule();
+ }
+
+ allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocated.size());
+ checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);
+
+ // Now submit app2 RR with allocationReqId = 10
+ am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers,
+ 10L);
+ am2.schedule();
+
+ // wait for containers to be allocated.
+ nm1.nodeHeartbeat(true);
+ allocResponse = am2.schedule(); // send the request
+ while (allocResponse.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am2.schedule();
+ }
+
+ allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocated.size());
+ checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);
+
+ // Now submit app1 RR with allocationReqId = 10
+ am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers,
+ 10L);
+ am1.schedule();
+
+ // wait for containers to be allocated.
+ nm2.nodeHeartbeat(true);
+ allocResponse = am1.schedule(); // send the request
+ while (allocResponse.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+
+ allocated = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocated.size());
+ checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
+ } finally {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index c808b5a..66e833f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -51,7 +51,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -412,4 +411,11 @@
return SchedulerRequestKey.create(ResourceRequest.newInstance(
Priority.newInstance(pri), null, null, 0));
}
+
+ public static SchedulerRequestKey toSchedulerKey(Priority pri,
+ long allocationRequestId) {
+ ResourceRequest req = ResourceRequest.newInstance(pri, null, null, 0);
+ req.setAllocationRequestId(allocationRequestId);
+ return SchedulerRequestKey.create(req);
+ }
}