YARN-7469. Capacity Scheduler Intra-queue preemption: User can starve if newest app is exactly at user limit. Contributed by Eric Payne.
(cherry picked from commit 61ace174cdcbca9d22abce7aa0aa71148f37ad55)
(cherry picked from commit c3fb49667a4c11d993056e9e3c8ca4ec9479538f)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 00ae3da..3332f2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -203,6 +203,12 @@
Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected);
Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed());
+ if (context.getIntraQueuePreemptionOrderPolicy()
+ .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
+ Resources.subtractFromNonNegative(preemtableFromApp,
+ tmpApp.getFiCaSchedulerApp().getCSLeafQueue().getMinimumAllocation());
+ }
+
// Calculate toBePreempted from apps as follows:
// app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
// intra_q_preemptable)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index 4fc0ea4..0bc5cb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -358,6 +358,9 @@
queue = (LeafQueue) nameToCSQueues.get(queueName);
queue.getApplications().add(app);
queue.getAllApplications().add(app);
+ when(queue.getMinimumAllocation())
+ .thenReturn(Resource.newInstance(1,1));
+ when(app.getCSLeafQueue()).thenReturn(queue);
HashSet<String> users = userMap.get(queueName);
if (null == users) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
index 7df52f9..0440db3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
@@ -896,4 +896,39 @@
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
+
+ @Test
+ public void testSimpleIntraQueuePreemptionOneUserUnderOneUserAtOneUserAbove()
+ throws IOException {
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 1 0]);" + // root
+ "-a(=[100 100 100 1 0])"; // a
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,65,false,0,user1);" +
+ "a\t" // app2 in a
+ + "(1,1,n1,,35,false,0,user2);" +
+ "a\t" // app3 in a
+ + "(1,1,n1,,0,false,1,user3)"
+ ;
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // app2 is right at its user limit and app1 needs one resource. Should
+ // preempt 1 container.
+ verify(mDisp, times(1)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ }
}