YARN-7290. Method canContainerBePreempted can return true when it shouldn't. (Contributed by Steven Rand)
(cherry picked from commit 2bde3aedf139368fc71f053d8dd6580b498ff46d)
(cherry picked from commit f335d509d3a778f11265d3f45800dd6e75f7be59)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index bbd4418..2aa45b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -588,7 +588,8 @@
}
}
- boolean canContainerBePreempted(RMContainer container) {
+ boolean canContainerBePreempted(RMContainer container,
+ Resource alreadyConsideringForPreemption) {
if (!isPreemptable()) {
return false;
}
@@ -610,6 +611,15 @@
// Check if the app's allocation will be over its fairshare even
// after preempting this container
+ Resource usageAfterPreemption = getUsageAfterPreemptingContainer(
+ container.getAllocatedResource(),
+ alreadyConsideringForPreemption);
+
+ return !isUsageBelowShare(usageAfterPreemption, getFairShare());
+ }
+
+ private Resource getUsageAfterPreemptingContainer(Resource containerResources,
+ Resource alreadyConsideringForPreemption) {
Resource usageAfterPreemption = Resources.clone(getResourceUsage());
// Subtract resources of containers already queued for preemption
@@ -617,10 +627,13 @@
Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted);
}
- // Subtract this container's allocation to compute usage after preemption
- Resources.subtractFrom(
- usageAfterPreemption, container.getAllocatedResource());
- return !isUsageBelowShare(usageAfterPreemption, getFairShare());
+ // Subtract resources of this container and other containers of this app
+ // that the FSPreemptionThread is already considering for preemption.
+ Resources.subtractFrom(usageAfterPreemption, containerResources);
+ Resources.subtractFrom(usageAfterPreemption,
+ alreadyConsideringForPreemption);
+
+ return usageAfterPreemption;
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index b3e59c5..47e580d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -19,7 +19,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -29,7 +29,9 @@
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
@@ -130,10 +132,21 @@
}
} // End of iteration through nodes for one RR
- if (bestContainers != null && bestContainers.containers.size() > 0) {
- containersToPreempt.addAll(bestContainers.containers);
- // Reserve the containers for the starved app
- trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
+ if (bestContainers != null) {
+ List<RMContainer> containers = bestContainers.getAllContainers();
+ if (containers.size() > 0) {
+ containersToPreempt.addAll(containers);
+ // Reserve the containers for the starved app
+ trackPreemptionsAgainstNode(containers, starvedApp);
+ // Warn application about containers to be killed
+ for (RMContainer container : containers) {
+ FSAppAttempt app = scheduler.getSchedulerApp(
+ container.getApplicationAttemptId());
+ LOG.info("Preempting container " + container +
+ " from queue " + app.getQueueName());
+ app.trackContainerForPreemption(container);
+ }
+ }
}
}
} // End of iteration over RRs
@@ -170,10 +183,12 @@
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId());
+ ApplicationId appId = app.getApplicationId();
- if (app.canContainerBePreempted(container)) {
+ if (app.canContainerBePreempted(container,
+ preemptableContainers.getResourcesToPreemptForApp(appId))) {
// Flag container for preemption
- if (!preemptableContainers.addContainer(container)) {
+ if (!preemptableContainers.addContainer(container, appId)) {
return null;
}
@@ -199,15 +214,6 @@
}
private void preemptContainers(List<RMContainer> containers) {
- // Warn application about containers to be killed
- for (RMContainer container : containers) {
- ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
- FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
- LOG.info("Preempting container " + container +
- " from queue " + app.getQueueName());
- app.trackContainerForPreemption(container);
- }
-
// Schedule timer task to kill containers
preemptionTimer.schedule(
new PreemptContainersTask(containers), warnTimeBeforeKill);
@@ -237,14 +243,14 @@
* A class to track preemptable containers.
*/
private static class PreemptableContainers {
- List<RMContainer> containers;
+ Map<ApplicationId, List<RMContainer>> containersByApp;
int numAMContainers;
int maxAMContainers;
PreemptableContainers(int maxAMContainers) {
- containers = new ArrayList<>();
numAMContainers = 0;
this.maxAMContainers = maxAMContainers;
+ this.containersByApp = new HashMap<>();
}
/**
@@ -254,7 +260,7 @@
* @param container the container to add
* @return true if success; false otherwise
*/
- private boolean addContainer(RMContainer container) {
+ private boolean addContainer(RMContainer container, ApplicationId appId) {
if (container.isAMContainer()) {
numAMContainers++;
if (numAMContainers >= maxAMContainers) {
@@ -262,8 +268,30 @@
}
}
- containers.add(container);
+ if (!containersByApp.containsKey(appId)) {
+ containersByApp.put(appId, new ArrayList<>());
+ }
+
+ containersByApp.get(appId).add(container);
return true;
}
+
+ private List<RMContainer> getAllContainers() {
+ List<RMContainer> allContainers = new ArrayList<>();
+ for (List<RMContainer> containersForApp : containersByApp.values()) {
+ allContainers.addAll(containersForApp);
+ }
+ return allContainers;
+ }
+
+ private Resource getResourcesToPreemptForApp(ApplicationId appId) {
+ Resource resourcesToPreempt = Resources.createResource(0, 0);
+ if (containersByApp.containsKey(appId)) {
+ for (RMContainer container : containersByApp.get(appId)) {
+ Resources.addTo(resourcesToPreempt, container.getAllocatedResource());
+ }
+ }
+ return resourcesToPreempt;
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 3163024..ac5d9fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -278,11 +278,12 @@
preemptHalfResources(queue2);
}
- private void verifyPreemption(int numStarvedAppContainers)
+ private void verifyPreemption(int numStarvedAppContainers,
+ int numGreedyAppContainers)
throws InterruptedException {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) {
- if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
+ if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) {
break;
}
Thread.sleep(10);
@@ -290,12 +291,12 @@
// Post preemption, verify the greedyApp has the correct # of containers.
assertEquals("Incorrect # of containers on the greedy app",
- 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
+ numGreedyAppContainers, greedyApp.getLiveContainers().size());
// Verify the queue metrics are set appropriately. The greedyApp started
// with 8 1GB, 1vcore containers.
assertEquals("Incorrect # of preempted containers in QueueMetrics",
- 8 - 2 * numStarvedAppContainers,
+ 8 - numGreedyAppContainers,
greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
// Verify the node is reserved for the starvingApp
@@ -340,7 +341,7 @@
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
- verifyPreemption(2);
+ verifyPreemption(2, 4);
} else {
verifyNoPreemption();
}
@@ -349,13 +350,13 @@
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
- verifyPreemption(2);
+ verifyPreemption(2, 4);
}
@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
- verifyPreemption(2);
+ verifyPreemption(2, 4);
}
@Test
@@ -389,7 +390,7 @@
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");
- verifyPreemption(2);
+ verifyPreemption(2, 4);
ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
@@ -402,6 +403,22 @@
}
@Test
+ public void testAppNotPreemptedBelowFairShare() throws Exception {
+ takeAllResources("root.preemptable.child-1");
+ tryPreemptMoreThanFairShare("root.preemptable.child-2");
+ }
+
+ private void tryPreemptMoreThanFairShare(String queueName)
+ throws InterruptedException {
+ ApplicationAttemptId appAttemptId
+ = createSchedulingRequest(3 * GB, 3, queueName, "default",
+ NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
+ starvingApp = scheduler.getSchedulerApp(appAttemptId);
+
+ verifyPreemption(1, 5);
+ }
+
+ @Test
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
throws InterruptedException {
// Run this test only for fairshare preemption
@@ -414,10 +431,10 @@
// Submit a job so half the resources go to parent's sibling
preemptHalfResources("root.preemptable-sibling");
- verifyPreemption(2);
+ verifyPreemption(2, 4);
// Submit a job to the child's sibling to force preemption from the child
preemptHalfResources("root.preemptable.child-2");
- verifyPreemption(1);
+ verifyPreemption(1, 2);
}
}