YARN-4752. Improved preemption in FairScheduler. (kasha)
Contains:
YARN-5605. Preempt containers (all on one node) to meet the requirement of starved applications
YARN-5821. Drop left-over preemption-related code and clean up method visibilities in the Schedulable hierarchy
YARN-5783. Verify identification of starved applications.
YARN-5819. Verify fairshare and minshare preemption
YARN-5885. Cleanup YARN-4752 branch for merge
Change-Id: Iee0962377d019dd64dc69a020725d2eaf360858c
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 760b0ea..462e02a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -143,6 +143,10 @@
public static Resource none() {
return NONE;
}
+
+ public static boolean isNone(Resource other) {
+ return NONE.equals(other);
+ }
public static Resource unbounded() {
return UNBOUNDED;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 80811b1..feb20ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -624,6 +624,23 @@
}
/**
+ * Method to return the next resource request to be serviced.
+ *
+ * In the initial implementation, we just pick any {@link ResourceRequest}
+ * corresponding to the highest priority.
+ *
+ * @return next {@link ResourceRequest} to allocate resources for.
+ */
+ @Unstable
+ public synchronized ResourceRequest getNextResourceRequest() {
+ for (ResourceRequest rr:
+ resourceRequestMap.get(schedulerKeys.firstKey()).values()) {
+ return rr;
+ }
+ return null;
+ }
+
+ /**
* Returns if the place (node/rack today) is either blacklisted by the
* application (user) or the system
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index bc52816..b3ef471 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -1253,6 +1253,22 @@
unconfirmedAllocatedVcores.addAndGet(-res.getVirtualCores());
}
+ @Override
+ public int hashCode() {
+ return getApplicationAttemptId().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (! (o instanceof SchedulerApplicationAttempt)) {
+ return false;
+ }
+
+ SchedulerApplicationAttempt other = (SchedulerApplicationAttempt) o;
+ return (this == other ||
+ this.getApplicationAttemptId().equals(other.getApplicationAttemptId()));
+ }
+
/**
* Different state for Application Master, user can see this state from web UI
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f076e4f..498f34f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -1101,4 +1101,20 @@
}
}
}
+
+ /*
+ * Overriding to appease findbugs
+ */
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ /*
+ * Overriding to appease findbugs
+ */
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index df20117..39f4a3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -18,18 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -53,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -78,10 +78,17 @@
private ResourceWeights resourceWeights;
private Resource demand = Resources.createResource(0);
private FairScheduler scheduler;
+ private FSQueue fsQueue;
private Resource fairShare = Resources.createResource(0, 0);
- private Resource preemptedResources = Resources.createResource(0);
- private RMContainerComparator comparator = new RMContainerComparator();
- private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
+ // Preemption related variables
+ private final Resource preemptedResources = Resources.clone(Resources.none());
+ private final Set<RMContainer> containersToPreempt = new HashSet<>();
+ private Resource fairshareStarvation = Resources.none();
+ private long lastTimeAtFairShare;
+
+ // minShareStarvation attributed to this application by the leaf queue
+ private Resource minshareStarvation = Resources.none();
// Used to record node reservation by an app.
// Key = RackName, Value = Set of Nodes reserved by app on rack
@@ -107,12 +114,14 @@
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
this.scheduler = scheduler;
+ this.fsQueue = queue;
this.startTime = scheduler.getClock().getTime();
+ this.lastTimeAtFairShare = this.startTime;
this.appPriority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights();
}
- public ResourceWeights getResourceWeights() {
+ ResourceWeights getResourceWeights() {
return resourceWeights;
}
@@ -123,7 +132,7 @@
return queue.getMetrics();
}
- public void containerCompleted(RMContainer rmContainer,
+ void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
try {
writeLock.lock();
@@ -143,6 +152,7 @@
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
+ untrackContainerForPreemption(rmContainer);
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
@@ -152,9 +162,6 @@
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
- // remove from preemption map if it is completed
- preemptionMap.remove(rmContainer);
-
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
} finally {
@@ -484,7 +491,7 @@
* @param schedulerKey Scheduler Key
* @param level NodeType
*/
- public void resetAllowedLocalityLevel(
+ void resetAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, NodeType level) {
NodeType old;
try {
@@ -498,57 +505,113 @@
+ " priority " + schedulerKey.getPriority());
}
- // related methods
- public void addPreemption(RMContainer container, long time) {
- assert preemptionMap.get(container) == null;
- try {
- writeLock.lock();
- preemptionMap.put(container, time);
- Resources.addTo(preemptedResources, container.getAllocatedResource());
- } finally {
- writeLock.unlock();
- }
- }
-
- public Long getContainerPreemptionTime(RMContainer container) {
- return preemptionMap.get(container);
- }
-
- public Set<RMContainer> getPreemptionContainers() {
- return preemptionMap.keySet();
- }
-
@Override
public FSLeafQueue getQueue() {
- return (FSLeafQueue)super.getQueue();
+ Queue queue = super.getQueue();
+ assert queue instanceof FSLeafQueue;
+ return (FSLeafQueue) queue;
}
- public Resource getPreemptedResources() {
- return preemptedResources;
+ // Preemption related methods
+
+ /**
+ * Get overall starvation - fairshare and attributed minshare.
+ *
+ * @return total starvation attributed to this application
+ */
+ Resource getStarvation() {
+ return Resources.add(fairshareStarvation, minshareStarvation);
}
- public void resetPreemptedResources() {
- preemptedResources = Resources.createResource(0);
- for (RMContainer container : getPreemptionContainers()) {
+ /**
+ * Set the minshare attributed to this application. To be called only from
+ * {@link FSLeafQueue#updateStarvedApps}.
+ *
+ * @param starvation minshare starvation attributed to this app
+ */
+ void setMinshareStarvation(Resource starvation) {
+ this.minshareStarvation = starvation;
+ }
+
+ /**
+ * Reset the minshare starvation attributed to this application. To be
+ * called only from {@link FSLeafQueue#updateStarvedApps}
+ */
+ void resetMinshareStarvation() {
+ this.minshareStarvation = Resources.none();
+ }
+
+ void trackContainerForPreemption(RMContainer container) {
+ containersToPreempt.add(container);
+ synchronized (preemptedResources) {
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
}
- public void clearPreemptedResources() {
- preemptedResources.setMemorySize(0);
- preemptedResources.setVirtualCores(0);
+ private void untrackContainerForPreemption(RMContainer container) {
+ synchronized (preemptedResources) {
+ Resources.subtractFrom(preemptedResources,
+ container.getAllocatedResource());
+ }
+ containersToPreempt.remove(container);
+ }
+
+ Set<RMContainer> getPreemptionContainers() {
+ return containersToPreempt;
+ }
+
+ private Resource getPreemptedResources() {
+ synchronized (preemptedResources) {
+ return preemptedResources;
+ }
+ }
+
+ boolean canContainerBePreempted(RMContainer container) {
+ // Sanity check that the app owns this container
+ if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
+ !newlyAllocatedContainers.contains(container)) {
+ LOG.error("Looking to preempt container " + container +
+ ". Container does not belong to app " + getApplicationId());
+ return false;
+ }
+
+ if (containersToPreempt.contains(container)) {
+ // The container is already under consideration for preemption
+ return false;
+ }
+
+ // Check if any of the parent queues are not preemptable
+ // TODO (YARN-5831): Propagate the "preemptable" flag all the way down to
+ // the app to avoid recursing up every time.
+ for (FSQueue q = getQueue();
+ !q.getQueueName().equals("root");
+ q = q.getParent()) {
+ if (!q.isPreemptable()) {
+ return false;
+ }
+ }
+
+ // Check if the app's allocation will be over its fairshare even
+ // after preempting this container
+ Resource currentUsage = getResourceUsage();
+ Resource fairshare = getFairShare();
+ Resource overFairShareBy = Resources.subtract(currentUsage, fairshare);
+
+ return (Resources.fitsIn(container.getAllocatedResource(),
+ overFairShareBy));
}
/**
* Create and return a container object reflecting an allocation for the
- * given appliction on the given node with the given capability and
+ * given application on the given node with the given capability and
* priority.
+ *
* @param node Node
* @param capability Capability
* @param schedulerKey Scheduler Key
* @return Container
*/
- public Container createContainer(FSSchedulerNode node, Resource capability,
+ private Container createContainer(FSSchedulerNode node, Resource capability,
SchedulerRequestKey schedulerKey) {
NodeId nodeId = node.getRMNode().getNodeID();
@@ -556,12 +619,10 @@
getApplicationAttemptId(), getNewContainerId());
// Create the container
- Container container = BuilderUtils.newContainer(containerId, nodeId,
+ return BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability,
schedulerKey.getPriority(), null,
schedulerKey.getAllocationRequestId());
-
- return container;
}
/**
@@ -816,7 +877,8 @@
}
Collection<SchedulerRequestKey> keysToTry = (reserved) ?
- Arrays.asList(node.getReservedContainer().getReservedSchedulerKey()) :
+ Collections.singletonList(
+ node.getReservedContainer().getReservedSchedulerKey()) :
getSchedulerKeys();
// For each priority, see if we can schedule a node local, rack local
@@ -974,7 +1036,7 @@
* Node that the application has an existing reservation on
* @return whether the reservation on the given node is valid.
*/
- public boolean assignReservedContainer(FSSchedulerNode node) {
+ boolean assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
SchedulerRequestKey reservedSchedulerKey =
rmContainer.getReservedSchedulerKey();
@@ -1003,17 +1065,43 @@
return true;
}
- static class RMContainerComparator implements Comparator<RMContainer>,
- Serializable {
- @Override
- public int compare(RMContainer c1, RMContainer c2) {
- int ret = c1.getContainer().getPriority().compareTo(
- c2.getContainer().getPriority());
- if (ret == 0) {
- return c2.getContainerId().compareTo(c1.getContainerId());
- }
- return ret;
+ /**
+ * Helper method that computes the extent of fairshare fairshareStarvation.
+ */
+ Resource fairShareStarvation() {
+ Resource threshold = Resources.multiply(
+ getFairShare(), fsQueue.getFairSharePreemptionThreshold());
+ Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+
+ long now = scheduler.getClock().getTime();
+ boolean starved = Resources.greaterThan(
+ fsQueue.getPolicy().getResourceCalculator(),
+ scheduler.getClusterResource(), starvation, Resources.none());
+
+ if (!starved) {
+ lastTimeAtFairShare = now;
}
+
+ if (starved &&
+ (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
+ this.fairshareStarvation = starvation;
+ } else {
+ this.fairshareStarvation = Resources.none();
+ }
+ return this.fairshareStarvation;
+ }
+
+ ResourceRequest getNextResourceRequest() {
+ return appSchedulingInfo.getNextResourceRequest();
+ }
+
+ /**
+ * Helper method that captures if this app is identified to be starved.
+ * @return true if the app is starved for fairshare, false otherwise
+ */
+ @VisibleForTesting
+ boolean isStarvedForFairShare() {
+ return !Resources.isNone(fairshareStarvation);
}
/* Schedulable methods implementation */
@@ -1045,14 +1133,13 @@
@Override
public Resource getResourceUsage() {
- // Here the getPreemptedResources() always return zero, except in
- // a preemption round
- // In the common case where preempted resource is zero, return the
- // current consumption Resource object directly without calling
- // Resources.subtract which creates a new Resource object for each call.
- return getPreemptedResources().equals(Resources.none()) ?
- getCurrentConsumption() :
- Resources.subtract(getCurrentConsumption(), getPreemptedResources());
+ /*
+ * getResourcesToPreempt() returns zero, except when there are containers
+ * to preempt. Avoid creating an object in the common case.
+ */
+ return getPreemptedResources().equals(Resources.none())
+ ? getCurrentConsumption()
+ : Resources.subtract(getCurrentConsumption(), getPreemptedResources());
}
@Override
@@ -1131,24 +1218,19 @@
diagnosticMessageBldr.toString());
}
- /**
- * Preempt a running container according to the priority
+ /*
+ * Overriding to appease findbugs
*/
@Override
- public RMContainer preemptContainer() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("App " + getName() + " is going to preempt a running " +
- "container");
- }
+ public int hashCode() {
+ return super.hashCode();
+ }
- RMContainer toBePreempted = null;
- for (RMContainer container : getLiveContainers()) {
- if (!getPreemptionContainers().contains(container) &&
- (toBePreempted == null ||
- comparator.compare(toBePreempted, container) > 0)) {
- toBePreempted = container;
- }
- }
- return toBePreempted;
+ /*
+ * Overriding to appease findbugs
+ */
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
new file mode 100644
index 0000000..56bc99c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+/**
+ * Helper class that holds basic information to be passed around
+ * FairScheduler classes. Think of this as a glorified map that holds key
+ * information about the scheduler.
+ */
+public class FSContext {
+ // Preemption-related info
+ private boolean preemptionEnabled = false;
+ private float preemptionUtilizationThreshold;
+ private FSStarvedApps starvedApps;
+
+ public boolean isPreemptionEnabled() {
+ return preemptionEnabled;
+ }
+
+ public void setPreemptionEnabled() {
+ this.preemptionEnabled = true;
+ if (starvedApps == null) {
+ starvedApps = new FSStarvedApps();
+ }
+ }
+
+ public FSStarvedApps getStarvedApps() {
+ return starvedApps;
+ }
+
+ public float getPreemptionUtilizationThreshold() {
+ return preemptionUtilizationThreshold;
+ }
+
+ public void setPreemptionUtilizationThreshold(
+ float preemptionUtilizationThreshold) {
+ this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index c393759..343e9c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -45,16 +44,20 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
+import static org.apache.hadoop.yarn.util.resource.Resources.none;
+
@Private
@Unstable
public class FSLeafQueue extends FSQueue {
- private static final Log LOG = LogFactory.getLog(
- FSLeafQueue.class.getName());
+ private static final Log LOG = LogFactory.getLog(FSLeafQueue.class.getName());
+ private static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
- private final List<FSAppAttempt> runnableApps = // apps that are runnable
- new ArrayList<FSAppAttempt>();
- private final List<FSAppAttempt> nonRunnableApps =
- new ArrayList<FSAppAttempt>();
+ private FairScheduler scheduler;
+ private FSContext context;
+
+ // apps that are runnable
+ private final List<FSAppAttempt> runnableApps = new ArrayList<>();
+ private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
// get a lock with fair distribution for app list updates
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
private final Lock readLock = rwl.readLock();
@@ -64,24 +67,23 @@
// Variables used for preemption
private long lastTimeAtMinShare;
- private long lastTimeAtFairShareThreshold;
-
+
// Track the AM resource usage for this queue
private Resource amResourceUsage;
private final ActiveUsersManager activeUsersManager;
- public static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
super(name, scheduler, parent);
+ this.scheduler = scheduler;
+ this.context = scheduler.getContext();
this.lastTimeAtMinShare = scheduler.getClock().getTime();
- this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
amResourceUsage = Resource.newInstance(0, 0);
}
- public void addApp(FSAppAttempt app, boolean runnable) {
+ void addApp(FSAppAttempt app, boolean runnable) {
writeLock.lock();
try {
if (runnable) {
@@ -108,7 +110,7 @@
* Removes the given app from this queue.
* @return whether or not the app was runnable
*/
- public boolean removeApp(FSAppAttempt app) {
+ boolean removeApp(FSAppAttempt app) {
boolean runnable = false;
// Remove app from runnable/nonRunnable list while holding the write lock
@@ -139,7 +141,7 @@
* Removes the given app if it is non-runnable and belongs to this queue
* @return true if the app is removed, false otherwise
*/
- public boolean removeNonRunnableApp(FSAppAttempt app) {
+ boolean removeNonRunnableApp(FSAppAttempt app) {
writeLock.lock();
try {
return nonRunnableApps.remove(app);
@@ -148,7 +150,7 @@
}
}
- public boolean isRunnableApp(FSAppAttempt attempt) {
+ boolean isRunnableApp(FSAppAttempt attempt) {
readLock.lock();
try {
return runnableApps.contains(attempt);
@@ -157,7 +159,7 @@
}
}
- public boolean isNonRunnableApp(FSAppAttempt attempt) {
+ boolean isNonRunnableApp(FSAppAttempt attempt) {
readLock.lock();
try {
return nonRunnableApps.contains(attempt);
@@ -166,30 +168,8 @@
}
}
- public void resetPreemptedResources() {
- readLock.lock();
- try {
- for (FSAppAttempt attempt : runnableApps) {
- attempt.resetPreemptedResources();
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public void clearPreemptedResources() {
- readLock.lock();
- try {
- for (FSAppAttempt attempt : runnableApps) {
- attempt.clearPreemptedResources();
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
- List<FSAppAttempt> appsToReturn = new ArrayList<FSAppAttempt>();
+ List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
+ List<FSAppAttempt> appsToReturn = new ArrayList<>();
readLock.lock();
try {
appsToReturn.addAll(nonRunnableApps);
@@ -223,17 +203,78 @@
}
super.policy = policy;
}
-
+
@Override
- public void recomputeShares() {
+ public void updateInternal(boolean checkStarvation) {
readLock.lock();
try {
policy.computeShares(runnableApps, getFairShare());
+ if (checkStarvation) {
+ updateStarvedApps();
+ }
} finally {
readLock.unlock();
}
}
+ /**
+ * Helper method to identify starved applications. This needs to be called
+ * ONLY from {@link #updateInternal}, after the application shares
+ * are updated.
+ *
+ * A queue can be starving due to fairshare or minshare.
+ *
+ * Minshare is defined only on the queue and not the applications.
+ * Fairshare is defined for both the queue and the applications.
+ *
+ * If this queue is starved due to minshare, we need to identify the most
+ * deserving apps if they themselves are not starved due to fairshare.
+ *
+ * If this queue is starving due to fairshare, there must be at least
+ * one application that is starved. And, even if the queue is not
+ * starved due to fairshare, there might still be starved applications.
+ */
+ private void updateStarvedApps() {
+ // First identify starved applications and track total amount of
+ // starvation (in resources)
+ Resource fairShareStarvation = Resources.clone(none());
+
+ // Fetch apps with unmet demand sorted by fairshare starvation
+ TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
+ for (FSAppAttempt app : appsWithDemand) {
+ Resource appStarvation = app.fairShareStarvation();
+ if (!Resources.equals(Resources.none(), appStarvation)) {
+ context.getStarvedApps().addStarvedApp(app);
+ Resources.addTo(fairShareStarvation, appStarvation);
+ } else {
+ break;
+ }
+ }
+
+ // Compute extent of minshare starvation
+ Resource minShareStarvation = minShareStarvation();
+
+ // Compute minshare starvation that is not subsumed by fairshare starvation
+ Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+
+ // Keep adding apps to the starved list until the unmet demand goes over
+ // the remaining minshare
+ for (FSAppAttempt app : appsWithDemand) {
+ if (Resources.greaterThan(policy.getResourceCalculator(),
+ scheduler.getClusterResource(), minShareStarvation, none())) {
+ Resource appPendingDemand =
+ Resources.subtract(app.getDemand(), app.getResourceUsage());
+ Resources.subtractFrom(minShareStarvation, appPendingDemand);
+ app.setMinshareStarvation(appPendingDemand);
+ context.getStarvedApps().addStarvedApp(app);
+ } else {
+ // Reset minshare starvation in case we had set it in a previous
+ // iteration
+ app.resetMinshareStarvation();
+ }
+ }
+ }
+
@Override
public Resource getDemand() {
return demand;
@@ -256,7 +297,7 @@
return usage;
}
- public Resource getAmResourceUsage() {
+ Resource getAmResourceUsage() {
return amResourceUsage;
}
@@ -299,7 +340,7 @@
@Override
public Resource assignContainer(FSSchedulerNode node) {
- Resource assigned = Resources.none();
+ Resource assigned = none();
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
getName() + " fairShare: " + getFairShare());
@@ -309,26 +350,12 @@
return assigned;
}
- // Apps that have resource demands.
- TreeSet<FSAppAttempt> pendingForResourceApps =
- new TreeSet<FSAppAttempt>(policy.getComparator());
- readLock.lock();
- try {
- for (FSAppAttempt app : runnableApps) {
- Resource pending = app.getAppAttemptResourceUsage().getPending();
- if (!pending.equals(Resources.none())) {
- pendingForResourceApps.add(app);
- }
- }
- } finally {
- readLock.unlock();
- }
- for (FSAppAttempt sched : pendingForResourceApps) {
+ for (FSAppAttempt sched : fetchAppsWithDemand()) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
assigned = sched.assignContainer(node);
- if (!assigned.equals(Resources.none())) {
+ if (!assigned.equals(none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container in queue:" + getName() + " " +
"container:" + assigned);
@@ -339,40 +366,21 @@
return assigned;
}
- @Override
- public RMContainer preemptContainer() {
- RMContainer toBePreempted = null;
-
- // If this queue is not over its fair share, reject
- if (!preemptContainerPreCheck()) {
- return toBePreempted;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Queue " + getName() + " is going to preempt a container " +
- "from its applications.");
- }
-
- // Choose the app that is most over fair share
- Comparator<Schedulable> comparator = policy.getComparator();
- FSAppAttempt candidateSched = null;
+ private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
+ TreeSet<FSAppAttempt> pendingForResourceApps =
+ new TreeSet<>(policy.getComparator());
readLock.lock();
try {
- for (FSAppAttempt sched : runnableApps) {
- if (candidateSched == null ||
- comparator.compare(sched, candidateSched) > 0) {
- candidateSched = sched;
+ for (FSAppAttempt app : runnableApps) {
+ Resource pending = app.getAppAttemptResourceUsage().getPending();
+ if (!pending.equals(none())) {
+ pendingForResourceApps.add(app);
}
}
} finally {
readLock.unlock();
}
-
- // Preempt from the selected app
- if (candidateSched != null) {
- toBePreempted = candidateSched.preemptContainer();
- }
- return toBePreempted;
+ return pendingForResourceApps;
}
@Override
@@ -384,7 +392,7 @@
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
- List<QueueACL> operations = new ArrayList<QueueACL>();
+ List<QueueACL> operations = new ArrayList<>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
@@ -396,23 +404,10 @@
return Collections.singletonList(userAclInfo);
}
- public long getLastTimeAtMinShare() {
- return lastTimeAtMinShare;
- }
-
private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
- public long getLastTimeAtFairShareThreshold() {
- return lastTimeAtFairShareThreshold;
- }
-
- private void setLastTimeAtFairShareThreshold(
- long lastTimeAtFairShareThreshold) {
- this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
- }
-
@Override
public int getNumRunnableApps() {
readLock.lock();
@@ -423,7 +418,7 @@
}
}
- public int getNumNonRunnableApps() {
+ int getNumNonRunnableApps() {
readLock.lock();
try {
return nonRunnableApps.size();
@@ -475,10 +470,11 @@
/**
* Check whether this queue can run this application master under the
* maxAMShare limit.
- * @param amResource
+ *
+ * @param amResource resources required to run the AM
* @return true if this queue can run
*/
- public boolean canRunAppAM(Resource amResource) {
+ boolean canRunAppAM(Resource amResource) {
if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
return true;
}
@@ -503,7 +499,7 @@
return Resources.fitsIn(ifRunAMResource, maxAMResource);
}
- public void addAMResourceUsage(Resource amResource) {
+ void addAMResourceUsage(Resource amResource) {
if (amResource != null) {
Resources.addTo(amResourceUsage, amResource);
}
@@ -516,21 +512,8 @@
}
/**
- * Update the preemption fields for the queue, i.e. the times since last was
- * at its guaranteed share and over its fair share threshold.
- */
- public void updateStarvationStats() {
- long now = scheduler.getClock().getTime();
- if (!isStarvedForMinShare()) {
- setLastTimeAtMinShare(now);
- }
- if (!isStarvedForFairShare()) {
- setLastTimeAtFairShareThreshold(now);
- }
- }
-
- /** Allows setting weight for a dynamically created queue
- * Currently only used for reservation based queues
+ * Allows setting weight for a dynamically created queue.
+ * Currently only used for reservation based queues.
* @param weight queue weight
*/
public void setWeights(float weight) {
@@ -538,37 +521,61 @@
}
/**
- * Helper method to check if the queue should preempt containers
+ * Helper method to compute the amount of minshare starvation.
*
- * @return true if check passes (can preempt) or false otherwise
+ * @return the extent of minshare starvation
*/
- private boolean preemptContainerPreCheck() {
- return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
- getFairShare());
- }
-
- /**
- * Is a queue being starved for its min share.
- */
- @VisibleForTesting
- boolean isStarvedForMinShare() {
- return isStarved(getMinShare());
- }
-
- /**
- * Is a queue being starved for its fair share threshold.
- */
- @VisibleForTesting
- boolean isStarvedForFairShare() {
- return isStarved(
- Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
- }
-
- private boolean isStarved(Resource share) {
+ private Resource minShareStarvation() {
+ // If demand < minshare, we should use demand to determine starvation
Resource desiredShare = Resources.min(policy.getResourceCalculator(),
- scheduler.getClusterResource(), share, getDemand());
- Resource resourceUsage = getResourceUsage();
- return Resources.lessThan(policy.getResourceCalculator(),
- scheduler.getClusterResource(), resourceUsage, desiredShare);
+ scheduler.getClusterResource(), getMinShare(), getDemand());
+
+ Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
+ boolean starved = !Resources.isNone(starvation);
+
+ long now = scheduler.getClock().getTime();
+ if (!starved) {
+ // Record that the queue is not starved
+ setLastTimeAtMinShare(now);
+ }
+
+ if (now - lastTimeAtMinShare < getMinSharePreemptionTimeout()) {
+ // the queue is not starved for the preemption timeout
+ starvation = Resources.clone(Resources.none());
+ }
+
+ return starvation;
+ }
+
+ /**
+ * Helper method for tests to check if a queue is starved for minShare.
+ * @return whether starved for minshare
+ */
+ @VisibleForTesting
+ private boolean isStarvedForMinShare() {
+ return !Resources.isNone(minShareStarvation());
+ }
+
+ /**
+ * Helper method for tests to check if a queue is starved for fairshare.
+ * @return whether starved for fairshare
+ */
+ @VisibleForTesting
+ private boolean isStarvedForFairShare() {
+ for (FSAppAttempt app : runnableApps) {
+ if (app.isStarvedForFairShare()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Helper method for tests to check if a queue is starved.
+ * @return whether starved for either minshare or fairshare
+ */
+ @VisibleForTesting
+ boolean isStarved() {
+ return isStarvedForMinShare() || isStarvedForFairShare();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 53ac8c9..16570aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -61,7 +60,7 @@
super(name, scheduler, parent);
}
- public void addChildQueue(FSQueue child) {
+ void addChildQueue(FSQueue child) {
writeLock.lock();
try {
childQueues.add(child);
@@ -70,7 +69,7 @@
}
}
- public void removeChildQueue(FSQueue child) {
+ void removeChildQueue(FSQueue child) {
writeLock.lock();
try {
childQueues.remove(child);
@@ -80,20 +79,20 @@
}
@Override
- public void recomputeShares() {
+ public void updateInternal(boolean checkStarvation) {
readLock.lock();
try {
policy.computeShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
- childQueue.recomputeShares();
+ childQueue.updateInternal(checkStarvation);
}
} finally {
readLock.unlock();
}
}
- public void recomputeSteadyShares() {
+ void recomputeSteadyShares() {
readLock.lock();
try {
policy.computeSteadyShares(childQueues, getSteadyFairShare());
@@ -188,7 +187,7 @@
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
- List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
+ List<QueueUserACLInfo> userAcls = new ArrayList<>();
// Add queue acls
userAcls.add(getUserAclInfo(user));
@@ -246,39 +245,6 @@
}
@Override
- public RMContainer preemptContainer() {
- RMContainer toBePreempted = null;
-
- // Find the childQueue which is most over fair share
- FSQueue candidateQueue = null;
- Comparator<Schedulable> comparator = policy.getComparator();
-
- readLock.lock();
- try {
- for (FSQueue queue : childQueues) {
- // Skip selection for non-preemptable queue
- if (!queue.isPreemptable()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("skipping from queue=" + getName()
- + " because it's a non-preemptable queue");
- }
- } else if (candidateQueue == null ||
- comparator.compare(queue, candidateQueue) > 0) {
- candidateQueue = queue;
- }
- }
- } finally {
- readLock.unlock();
- }
-
- // Let the selected queue choose which of its container to preempt
- if (candidateQueue != null) {
- toBePreempted = candidateQueue.preemptContainer();
- }
- return toBePreempted;
- }
-
- @Override
public List<FSQueue> getChildQueues() {
readLock.lock();
try {
@@ -300,8 +266,8 @@
}
super.policy = policy;
}
-
- public void incrementRunnableApps() {
+
+ void incrementRunnableApps() {
writeLock.lock();
try {
runnableApps++;
@@ -310,7 +276,7 @@
}
}
- public void decrementRunnableApps() {
+ void decrementRunnableApps() {
writeLock.lock();
try {
runnableApps--;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
new file mode 100644
index 0000000..3579857
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Thread that handles FairScheduler preemption.
+ */
+class FSPreemptionThread extends Thread {
+ private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
+ protected final FSContext context;
+ private final FairScheduler scheduler;
+ private final long warnTimeBeforeKill;
+ private final Timer preemptionTimer;
+
+ FSPreemptionThread(FairScheduler scheduler) {
+ this.scheduler = scheduler;
+ this.context = scheduler.getContext();
+ FairSchedulerConfiguration fsConf = scheduler.getConf();
+ context.setPreemptionEnabled();
+ context.setPreemptionUtilizationThreshold(
+ fsConf.getPreemptionUtilizationThreshold());
+ warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+ preemptionTimer = new Timer("Preemption Timer", true);
+
+ setDaemon(true);
+ setName("FSPreemptionThread");
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ FSAppAttempt starvedApp;
+ try{
+ starvedApp = context.getStarvedApps().take();
+ if (!Resources.isNone(starvedApp.getStarvation())) {
+ List<RMContainer> containers =
+ identifyContainersToPreempt(starvedApp);
+ if (containers != null) {
+ preemptContainers(containers);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Preemption thread interrupted! Exiting.");
+ return;
+ }
+ }
+ }
+
+ /**
+ * Given an app, identify containers to preempt to satisfy the app's next
+ * resource request.
+ *
+ * @param starvedApp starved application for which we are identifying
+ * preemption targets
+ * @return list of containers to preempt to satisfy starvedApp, null if the
+ * app cannot be satisfied by preempting any running containers
+ */
+ private List<RMContainer> identifyContainersToPreempt(
+ FSAppAttempt starvedApp) {
+ List<RMContainer> containers = new ArrayList<>(); // return value
+
+ // Find the nodes that match the next resource request
+ ResourceRequest request = starvedApp.getNextResourceRequest();
+ // TODO (KK): Should we check other resource requests if we can't match
+ // the first one?
+
+ Resource requestCapability = request.getCapability();
+ List<FSSchedulerNode> potentialNodes =
+ scheduler.getNodeTracker().getNodesByResourceName(
+ request.getResourceName());
+
+ // From the potential nodes, pick a node that has enough containers
+ // from apps over their fairshare
+ for (FSSchedulerNode node : potentialNodes) {
+ // Reset containers for the new node being considered.
+ containers.clear();
+
+ // TODO (YARN-5829): Attempt to reserve the node for starved app. The
+ // subsequent if-check needs to be reworked accordingly.
+ FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+ if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
+ // This node is already reserved by another app. Let us not consider
+ // this for preemption.
+ continue;
+ }
+
+ // Figure out list of containers to consider
+ List<RMContainer> containersToCheck =
+ node.getCopiedListOfRunningContainers();
+ containersToCheck.removeAll(node.getContainersForPreemption());
+
+ // Initialize potential with unallocated resources
+ Resource potential = Resources.clone(node.getUnallocatedResource());
+ for (RMContainer container : containersToCheck) {
+ FSAppAttempt app =
+ scheduler.getSchedulerApp(container.getApplicationAttemptId());
+
+ if (app.canContainerBePreempted(container)) {
+ // Flag container for preemption
+ containers.add(container);
+ Resources.addTo(potential, container.getAllocatedResource());
+ }
+
+ // Check if we have already identified enough containers
+ if (Resources.fitsIn(requestCapability, potential)) {
+ // Mark the containers as being considered for preemption on the node.
+ // Make sure the containers are subsequently removed by calling
+ // FSSchedulerNode#removeContainerForPreemption.
+ node.addContainersForPreemption(containers);
+ return containers;
+ } else {
+ // TODO (YARN-5829): Unreserve the node for the starved app.
+ }
+ }
+ }
+ return null;
+ }
+
+ private void preemptContainers(List<RMContainer> containers) {
+ // Warn application about containers to be killed
+ for (RMContainer container : containers) {
+ ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+ FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
+ FSLeafQueue queue = app.getQueue();
+ LOG.info("Preempting container " + container +
+ " from queue " + queue.getName());
+ app.trackContainerForPreemption(container);
+ }
+
+ // Schedule timer task to kill containers
+ preemptionTimer.schedule(
+ new PreemptContainersTask(containers), warnTimeBeforeKill);
+ }
+
+ private class PreemptContainersTask extends TimerTask {
+ private List<RMContainer> containers;
+
+ PreemptContainersTask(List<RMContainer> containers) {
+ this.containers = containers;
+ }
+
+ @Override
+ public void run() {
+ for (RMContainer container : containers) {
+ ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus(
+ container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
+
+ LOG.info("Killing container " + container);
+ scheduler.completedContainer(
+ container, status, RMContainerEventType.KILL);
+
+ FSSchedulerNode containerNode = (FSSchedulerNode)
+ scheduler.getNodeTracker().getNode(container.getAllocatedNode());
+ containerNode.removeContainerForPreemption(container);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 5fa2ee1..572b5f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -253,7 +253,7 @@
return steadyFairShare;
}
- public void setSteadyFairShare(Resource steadyFairShare) {
+ void setSteadyFairShare(Resource steadyFairShare) {
this.steadyFairShare = steadyFairShare;
metrics.setSteadyFairShare(steadyFairShare);
}
@@ -262,27 +262,27 @@
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
}
- public long getFairSharePreemptionTimeout() {
+ long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
- public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
+ void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
}
- public long getMinSharePreemptionTimeout() {
+ long getMinSharePreemptionTimeout() {
return minSharePreemptionTimeout;
}
- public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
+ void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
}
- public float getFairSharePreemptionThreshold() {
+ float getFairSharePreemptionThreshold() {
return fairSharePreemptionThreshold;
}
- public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
+ void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
}
@@ -292,9 +292,17 @@
/**
* Recomputes the shares for all child queues and applications based on this
- * queue's current share
+ * queue's current share, and checks for starvation.
+ *
+ * @param checkStarvation whether to check for fairshare or minshare
+ * starvation on update
*/
- public abstract void recomputeShares();
+ abstract void updateInternal(boolean checkStarvation);
+
+ public void update(Resource fairShare, boolean checkStarvation) {
+ setFairShare(fairShare);
+ updateInternal(checkStarvation);
+ }
/**
* Update the min/fair share preemption timeouts, threshold and preemption
@@ -347,7 +355,7 @@
*
* @return true if check passes (can assign) or false otherwise
*/
- protected boolean assignContainerPreCheck(FSSchedulerNode node) {
+ boolean assignContainerPreCheck(FSSchedulerNode node) {
if (!Resources.fitsIn(getResourceUsage(), maxShare)
|| node.getReservedContainer() != null) {
return false;
@@ -403,7 +411,7 @@
return null;
}
- public boolean fitsInMaxShare(Resource additionalResource) {
+ boolean fitsInMaxShare(Resource additionalResource) {
Resource usagePlusAddition =
Resources.add(getResourceUsage(), additionalResource);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 024ec67..a27a222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -29,6 +29,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
@Private
@Unstable
public class FSSchedulerNode extends SchedulerNode {
@@ -36,6 +40,8 @@
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
private FSAppAttempt reservedAppSchedulable;
+ private final Set<RMContainer> containersForPreemption =
+ new ConcurrentSkipListSet<>();
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
@@ -99,8 +105,36 @@
this.reservedAppSchedulable = null;
}
- public synchronized FSAppAttempt getReservedAppSchedulable() {
+ synchronized FSAppAttempt getReservedAppSchedulable() {
return reservedAppSchedulable;
}
+ /**
+ * Mark {@code containers} as being considered for preemption so they are
+ * not considered again. A call to this requires a corresponding call to
+ * {@link #removeContainerForPreemption} to ensure we do not mark a
+ * container for preemption and never consider it again and avoid memory
+ * leaks.
+ *
+ * @param containers container to mark
+ */
+ void addContainersForPreemption(Collection<RMContainer> containers) {
+ containersForPreemption.addAll(containers);
+ }
+
+ /**
+ * @return set of containers marked for preemption.
+ */
+ Set<RMContainer> getContainersForPreemption() {
+ return containersForPreemption;
+ }
+
+ /**
+ * Remove container from the set of containers marked for preemption.
+ *
+ * @param container container to remove
+ */
+ void removeContainerForPreemption(RMContainer container) {
+ containersForPreemption.remove(container);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
new file mode 100644
index 0000000..4f28e41
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Helper class to track starved applications.
+ *
+ * Initially, this uses a blocking queue. We could use other data structures
+ * in the future. This class also has some methods to simplify testing.
+ */
+class FSStarvedApps {
+
+ // List of apps to be processed by the preemption thread.
+ private PriorityBlockingQueue<FSAppAttempt> appsToProcess;
+
+ // App being currently processed. This assumes a single reader.
+ private FSAppAttempt appBeingProcessed;
+
+ FSStarvedApps() {
+ appsToProcess = new PriorityBlockingQueue<>(10, new StarvationComparator());
+ }
+
+ /**
+ * Add a starved application if it is not already added.
+ * @param app application to add
+ */
+ void addStarvedApp(FSAppAttempt app) {
+ if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) {
+ appsToProcess.add(app);
+ }
+ }
+
+ /**
+ * Blocking call to fetch the next app to process. The returned app is
+ * tracked until the next call to this method. This tracking assumes a
+ * single reader.
+ *
+ * @return starved application to process
+ * @throws InterruptedException if interrupted while waiting
+ */
+ FSAppAttempt take() throws InterruptedException {
+ // Reset appBeingProcessed before the blocking call
+ appBeingProcessed = null;
+
+ // Blocking call to fetch the next starved application
+ FSAppAttempt app = appsToProcess.take();
+ appBeingProcessed = app;
+ return app;
+ }
+
+ private static class StarvationComparator implements
+ Comparator<FSAppAttempt>, Serializable {
+ private static final long serialVersionUID = 1;
+
+ @Override
+ public int compare(FSAppAttempt app1, FSAppAttempt app2) {
+ int ret = 1;
+ if (Resources.fitsIn(app1.getStarvation(), app2.getStarvation())) {
+ ret = -1;
+ }
+ return ret;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1d04710..571f2e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -24,7 +24,6 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -123,6 +122,7 @@
AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
private FairSchedulerConfiguration conf;
+ private FSContext context;
private Resource incrAllocation;
private QueueManager queueMgr;
private boolean usePortForNodeName;
@@ -150,6 +150,9 @@
@VisibleForTesting
Thread schedulingThread;
+
+ Thread preemptionThread;
+
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@@ -157,25 +160,6 @@
FSQueueMetrics rootMetrics;
FSOpDurations fsOpDurations;
- // Time when we last updated preemption vars
- protected long lastPreemptionUpdateTime;
- // Time we last ran preemptTasksIfNecessary
- private long lastPreemptCheckTime;
-
- // Preemption related variables
- protected boolean preemptionEnabled;
- protected float preemptionUtilizationThreshold;
-
- // How often tasks are preempted
- protected long preemptionInterval;
-
- // ms to wait before force killing stuff (must be longer than a couple
- // of heartbeats to give task-kill commands a chance to act).
- protected long waitTimeBeforeKill;
-
- // Containers whose AMs have been warned that they will be preempted soon.
- private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
-
private float reservableNodesRatio; // percentage of available nodes
// an app can be reserved on
@@ -211,11 +195,17 @@
public FairScheduler() {
super(FairScheduler.class.getName());
+ context = new FSContext();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
+ @VisibleForTesting
+ public FSContext getContext() {
+ return context;
+ }
+
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
return Resources.greaterThanOrEqual(resourceCalculator,
@@ -296,7 +286,6 @@
}
long start = getClock().getTime();
update();
- preemptTasksIfNecessary();
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
@@ -338,7 +327,6 @@
try {
writeLock.lock();
long start = getClock().getTime();
- updateStarvationStats(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@@ -346,214 +334,30 @@
rootQueue.updateDemand();
Resource clusterResource = getClusterResource();
- rootQueue.setFairShare(clusterResource);
- // Recursively compute fair shares for all queues
- // and update metrics
- rootQueue.recomputeShares();
+ rootQueue.update(clusterResource, shouldAttemptPreemption());
+
+ // Update metrics
updateRootQueueMetrics();
if (LOG.isDebugEnabled()) {
if (--updatesToSkipForDebug < 0) {
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
- LOG.debug("Cluster Capacity: " + clusterResource + " Allocations: "
- + rootMetrics.getAllocatedResources() + " Availability: "
- + Resource.newInstance(rootMetrics.getAvailableMB(),
- rootMetrics.getAvailableVirtualCores()) + " Demand: " + rootQueue
- .getDemand());
+ LOG.debug("Cluster Capacity: " + clusterResource +
+ " Allocations: " + rootMetrics.getAllocatedResources() +
+ " Availability: " + Resource.newInstance(
+ rootMetrics.getAvailableMB(),
+ rootMetrics.getAvailableVirtualCores()) +
+ " Demand: " + rootQueue.getDemand());
}
- }
- long duration = getClock().getTime() - start;
- fsOpDurations.addUpdateCallDuration(duration);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Update the preemption fields for all QueueScheduables, i.e. the times since
- * each queue last was at its guaranteed share and over its fair share
- * threshold for each type of task.
- */
- private void updateStarvationStats() {
- lastPreemptionUpdateTime = getClock().getTime();
- for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
- sched.updateStarvationStats();
- }
- }
-
- /**
- * Check for queues that need tasks preempted, either because they have been
- * below their guaranteed share for minSharePreemptionTimeout or they have
- * been below their fair share threshold for the fairSharePreemptionTimeout. If
- * such queues exist, compute how many tasks of each type need to be preempted
- * and then select the right ones using preemptTasks.
- */
- protected void preemptTasksIfNecessary() {
- try {
- writeLock.lock();
- if (!shouldAttemptPreemption()) {
- return;
- }
-
- long curTime = getClock().getTime();
- if (curTime - lastPreemptCheckTime < preemptionInterval) {
- return;
- }
- lastPreemptCheckTime = curTime;
-
- Resource resToPreempt = Resources.clone(Resources.none());
- for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
- Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
- }
- if (isResourceGreaterThanNone(resToPreempt)) {
- preemptResources(resToPreempt);
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addUpdateCallDuration(duration);
}
} finally {
writeLock.unlock();
}
}
- /**
- * Preempt a quantity of resources. Each round, we start from the root queue,
- * level-by-level, until choosing a candidate application.
- * The policy for prioritizing preemption for each queue depends on its
- * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
- * most over its fair share; (2) FIFO, choose the childSchedulable that is
- * latest launched.
- * Inside each application, we further prioritize preemption by choosing
- * containers with lowest priority to preempt.
- * We make sure that no queue is placed below its fair share in the process.
- */
- protected void preemptResources(Resource toPreempt) {
- long start = getClock().getTime();
- if (Resources.equals(toPreempt, Resources.none())) {
- return;
- }
-
- // Scan down the list of containers we've already warned and kill them
- // if we need to. Remove any containers from the list that we don't need
- // or that are no longer running.
- Iterator<RMContainer> warnedIter = warnedContainers.iterator();
- while (warnedIter.hasNext()) {
- RMContainer container = warnedIter.next();
- if ((container.getState() == RMContainerState.RUNNING ||
- container.getState() == RMContainerState.ALLOCATED) &&
- isResourceGreaterThanNone(toPreempt)) {
- warnOrKillContainer(container);
- Resources.subtractFrom(toPreempt, container.getContainer().getResource());
- } else {
- warnedIter.remove();
- }
- }
-
- try {
- // Reset preemptedResource for each app
- for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
- queue.resetPreemptedResources();
- }
-
- while (isResourceGreaterThanNone(toPreempt)) {
- RMContainer container =
- getQueueManager().getRootQueue().preemptContainer();
- if (container == null) {
- break;
- } else {
- warnOrKillContainer(container);
- warnedContainers.add(container);
- Resources.subtractFrom(
- toPreempt, container.getContainer().getResource());
- }
- }
- } finally {
- // Clear preemptedResources for each app
- for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
- queue.clearPreemptedResources();
- }
- }
-
- long duration = getClock().getTime() - start;
- fsOpDurations.addPreemptCallDuration(duration);
- }
-
- private boolean isResourceGreaterThanNone(Resource toPreempt) {
- return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0);
- }
-
- protected void warnOrKillContainer(RMContainer container) {
- ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
- FSAppAttempt app = getSchedulerApp(appAttemptId);
- FSLeafQueue queue = app.getQueue();
- LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
- "res=" + container.getContainer().getResource() +
- ") from queue " + queue.getName());
-
- Long time = app.getContainerPreemptionTime(container);
-
- if (time != null) {
- // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
- // proceed with kill
- if (time + waitTimeBeforeKill < getClock().getTime()) {
- ContainerStatus status =
- SchedulerUtils.createPreemptedContainerStatus(
- container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
-
- // TODO: Not sure if this ever actually adds this to the list of cleanup
- // containers on the RMNode (see SchedulerNode.releaseContainer()).
- super.completedContainer(container, status, RMContainerEventType.KILL);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Killing container" + container +
- " (after waiting for preemption for " +
- (getClock().getTime() - time) + "ms)");
- }
- }
- } else {
- // track the request in the FSAppAttempt itself
- app.addPreemption(container, getClock().getTime());
- }
- }
-
- /**
- * Return the resource amount that this queue is allowed to preempt, if any.
- * If the queue has been below its min share for at least its preemption
- * timeout, it should preempt the difference between its current share and
- * this min share. If it has been below its fair share preemption threshold
- * for at least the fairSharePreemptionTimeout, it should preempt enough tasks
- * to get up to its full fair share. If both conditions hold, we preempt the
- * max of the two amounts (this shouldn't happen unless someone sets the
- * timeouts to be identical for some reason).
- */
- protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
- long minShareTimeout = sched.getMinSharePreemptionTimeout();
- long fairShareTimeout = sched.getFairSharePreemptionTimeout();
- Resource resDueToMinShare = Resources.none();
- Resource resDueToFairShare = Resources.none();
- ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
- Resource clusterResource = getClusterResource();
- if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
- Resource target = Resources.componentwiseMin(
- sched.getMinShare(), sched.getDemand());
- resDueToMinShare = Resources.max(calc, clusterResource,
- Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
- }
- if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
- Resource target = Resources.componentwiseMin(
- sched.getFairShare(), sched.getDemand());
- resDueToFairShare = Resources.max(calc, clusterResource,
- Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
- }
- Resource deficit = Resources.max(calc, clusterResource,
- resDueToMinShare, resDueToFairShare);
- if (Resources.greaterThan(calc, clusterResource,
- deficit, Resources.none())) {
- String message = "Should preempt " + deficit + " res for queue "
- + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
- + ", resDueToFairShare = " + resDueToFairShare;
- LOG.info(message);
- }
- return deficit;
- }
-
public RMContainerTokenSecretManager
getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
@@ -1197,12 +1001,12 @@
* @return true if preemption should be attempted, false otherwise.
*/
private boolean shouldAttemptPreemption() {
- if (preemptionEnabled) {
- Resource clusterResource = getClusterResource();
- return (preemptionUtilizationThreshold < Math.max(
- (float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(),
+ if (context.isPreemptionEnabled()) {
+ return (context.getPreemptionUtilizationThreshold() < Math.max(
+ (float) rootMetrics.getAllocatedMB() /
+ getClusterResource().getMemorySize(),
(float) rootMetrics.getAllocatedVirtualCores() /
- clusterResource.getVirtualCores()));
+ getClusterResource().getVirtualCores()));
}
return false;
}
@@ -1390,15 +1194,10 @@
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
- preemptionEnabled = this.conf.getPreemptionEnabled();
- preemptionUtilizationThreshold =
- this.conf.getPreemptionUtilizationThreshold();
assignMultiple = this.conf.getAssignMultiple();
maxAssignDynamic = this.conf.isMaxAssignDynamic();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
- preemptionInterval = this.conf.getPreemptionInterval();
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
reservableNodesRatio = this.conf.getReservableNodes();
@@ -1436,6 +1235,10 @@
schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
+
+ if (this.conf.getPreemptionEnabled()) {
+ createPreemptionThread();
+ }
} finally {
writeLock.unlock();
}
@@ -1452,6 +1255,11 @@
}
}
+ @VisibleForTesting
+ protected void createPreemptionThread() {
+ preemptionThread = new FSPreemptionThread(this);
+ }
+
private void updateReservationThreshold() {
Resource newThreshold = Resources.multiply(
getIncrementResourceCapability(),
@@ -1471,6 +1279,9 @@
"schedulingThread is null");
schedulingThread.start();
}
+ if (preemptionThread != null) {
+ preemptionThread.start();
+ }
allocsLoader.start();
} finally {
writeLock.unlock();
@@ -1503,6 +1314,10 @@
schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
+ if (preemptionThread != null) {
+ preemptionThread.interrupt();
+ preemptionThread.join(THREAD_JOIN_TIMEOUT_MS);
+ }
if (allocsLoader != null) {
allocsLoader.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
index 289887f..cf78405 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
@@ -55,50 +55,45 @@
* Name of job/queue, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
- public String getName();
+ String getName();
/**
* Maximum number of resources required by this Schedulable. This is defined as
* number of currently utilized resources + number of unlaunched resources (that
* are either not yet launched or need to be speculated).
*/
- public Resource getDemand();
+ Resource getDemand();
/** Get the aggregate amount of resources consumed by the schedulable. */
- public Resource getResourceUsage();
+ Resource getResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
- public Resource getMinShare();
+ Resource getMinShare();
/** Maximum Resource share assigned to the schedulable. */
- public Resource getMaxShare();
+ Resource getMaxShare();
/** Job/queue weight in fair sharing. */
- public ResourceWeights getWeights();
+ ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
- public long getStartTime();
+ long getStartTime();
/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
- public Priority getPriority();
+ Priority getPriority();
/** Refresh the Schedulable's demand and those of its children if any. */
- public void updateDemand();
+ void updateDemand();
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned.
*/
- public Resource assignContainer(FSSchedulerNode node);
-
- /**
- * Preempt a container from this Schedulable if possible.
- */
- public RMContainer preemptContainer();
+ Resource assignContainer(FSSchedulerNode node);
/** Get the fair share assigned to this Schedulable. */
- public Resource getFairShare();
+ Resource getFairShare();
/** Assign a fair share to this Schedulable. */
- public void setFairShare(Resource fairShare);
+ void setFairShare(Resource fairShare);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 8e6272a..992b75d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -17,14 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import org.junit.Assert;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,7 +31,9 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -50,9 +44,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
public class FairSchedulerTestBase {
public final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
@@ -70,9 +72,14 @@
private static final int SLEEP_DURATION = 10;
private static final int SLEEP_RETRIES = 1000;
+ /**
+ * The list of nodes added to the cluster using the {@link #addNode} method.
+ */
+ protected final List<RMNode> rmNodes = new ArrayList<>();
+
// Helper methods
public Configuration createConfiguration() {
- Configuration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
@@ -280,4 +287,18 @@
Assert.assertEquals(resource.getVirtualCores(),
app.getCurrentConsumption().getVirtualCores());
}
+
+ /**
+ * Add a node to the cluster and track the nodes in {@link #rmNodes}.
+ * @param memory memory capacity of the node
+ * @param cores cpu capacity of the node
+ */
+ protected void addNode(int memory, int cores) {
+ int id = rmNodes.size() + 1;
+ RMNode node =
+ MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
+ "127.0.0." + id);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+ rmNodes.add(node);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
new file mode 100644
index 0000000..25780cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class FairSchedulerWithMockPreemption extends FairScheduler {
+ @Override
+ protected void createPreemptionThread() {
+ preemptionThread = new MockPreemptionThread(this);
+ }
+
+ static class MockPreemptionThread extends FSPreemptionThread {
+ private Set<FSAppAttempt> appsAdded = new HashSet<>();
+ private int totalAppsAdded = 0;
+
+ MockPreemptionThread(FairScheduler scheduler) {
+ super(scheduler);
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ FSAppAttempt app = context.getStarvedApps().take();
+ appsAdded.add(app);
+ totalAppsAdded++;
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ int uniqueAppsAdded() {
+ return appsAdded.size();
+ }
+
+ int totalAppsAdded() {
+ return totalAppsAdded;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
index 5a170cf..e802f42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
@@ -86,11 +86,6 @@
}
@Override
- public RMContainer preemptContainer() {
- return null;
- }
-
- @Override
public Resource getFairShare() {
return this.fairShare;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
new file mode 100644
index 0000000..a5b2d86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test class to verify identification of app starvation
+ */
+public class TestFSAppStarvation extends FairSchedulerTestBase {
+
+ private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
+
+ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+ private static final int NODE_CAPACITY_MULTIPLE = 4;
+ private static final String[] QUEUES =
+ {"no-preemption", "minshare", "fairshare.child", "drf.child"};
+
+ private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
+
+ @Before
+ public void setup() {
+ createConfiguration();
+ conf.set(YarnConfiguration.RM_SCHEDULER,
+ FairSchedulerWithMockPreemption.class.getCanonicalName());
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+ ALLOC_FILE.getAbsolutePath());
+ conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+ }
+
+ @After
+ public void teardown() {
+ ALLOC_FILE.delete();
+ conf = null;
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
+ }
+
+ /*
+ * Test to verify application starvation is computed only when preemption
+ * is enabled.
+ */
+ @Test
+ public void testPreemptionDisabled() throws Exception {
+ conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+
+ setupClusterAndSubmitJobs();
+
+ assertNull("Found starved apps even when preemption is turned off",
+ scheduler.getContext().getStarvedApps());
+ }
+
+ /*
+ * Test to verify application starvation is computed correctly when
+ * preemption is turned on.
+ */
+ @Test
+ public void testPreemptionEnabled() throws Exception {
+ setupClusterAndSubmitJobs();
+
+ assertNotNull("FSContext does not have an FSStarvedApps instance",
+ scheduler.getContext().getStarvedApps());
+ assertEquals("Expecting 3 starved applications, one each for the "
+ + "minshare and fairshare queues",
+ 3, preemptionThread.uniqueAppsAdded());
+
+ // Verify the apps get added again on a subsequent update
+ scheduler.update();
+ Thread.yield();
+
+ verifyLeafQueueStarvation();
+ assertTrue("Each app is marked as starved exactly once",
+ preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
+ }
+
+ /*
+ * Test to verify app starvation is computed only when the cluster
+ * utilization threshold is over the preemption threshold.
+ */
+ @Test
+ public void testClusterUtilizationThreshold() throws Exception {
+ // Set preemption threshold to 1.1, so the utilization is always lower
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
+
+ setupClusterAndSubmitJobs();
+
+ assertNotNull("FSContext does not have an FSStarvedApps instance",
+ scheduler.getContext().getStarvedApps());
+ assertEquals("Found starved apps when preemption threshold is over 100%", 0,
+ preemptionThread.totalAppsAdded());
+ }
+
+ private void verifyLeafQueueStarvation() {
+ for (String q : QUEUES) {
+ if (!q.equals("no-preemption")) {
+ boolean isStarved =
+ scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
+ assertTrue(isStarved);
+ }
+ }
+ }
+
+ private void setupClusterAndSubmitJobs() throws Exception {
+ setupStarvedCluster();
+ submitAppsToEachLeafQueue();
+ sendEnoughNodeUpdatesToAssignFully();
+
+ // Sleep to hit the preemption timeouts
+ Thread.sleep(10);
+
+ // Scheduler update to populate starved apps
+ scheduler.update();
+
+ // Wait for apps to be processed by MockPreemptionThread
+ Thread.yield();
+ }
+
+ /**
+ * Setup the cluster for starvation testing:
+ * 1. Create FS allocation file
+ * 2. Create and start MockRM
+ * 3. Add two nodes to the cluster
+ * 4. Submit an app that uses up all resources on the cluster
+ */
+ private void setupStarvedCluster() throws IOException {
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+
+ // Default queue
+ out.println("<queue name=\"default\">");
+ out.println("</queue>");
+
+ // Queue with preemption disabled
+ out.println("<queue name=\"no-preemption\">");
+ out.println("<fairSharePreemptionThreshold>0" +
+ "</fairSharePreemptionThreshold>");
+ out.println("</queue>");
+
+ // Queue with minshare preemption enabled
+ out.println("<queue name=\"minshare\">");
+ out.println("<fairSharePreemptionThreshold>0" +
+ "</fairSharePreemptionThreshold>");
+ out.println("<minSharePreemptionTimeout>0" +
+ "</minSharePreemptionTimeout>");
+ out.println("<minResources>2048mb,2vcores</minResources>");
+ out.println("</queue>");
+
+ // FAIR queue with fairshare preemption enabled
+ out.println("<queue name=\"fairshare\">");
+ out.println("<fairSharePreemptionThreshold>1" +
+ "</fairSharePreemptionThreshold>");
+ out.println("<fairSharePreemptionTimeout>0" +
+ "</fairSharePreemptionTimeout>");
+ out.println("<schedulingPolicy>fair</schedulingPolicy>");
+ addChildQueue(out);
+ out.println("</queue>");
+
+ // DRF queue with fairshare preemption enabled
+ out.println("<queue name=\"drf\">");
+ out.println("<fairSharePreemptionThreshold>1" +
+ "</fairSharePreemptionThreshold>");
+ out.println("<fairSharePreemptionTimeout>0" +
+ "</fairSharePreemptionTimeout>");
+ out.println("<schedulingPolicy>drf</schedulingPolicy>");
+ addChildQueue(out);
+ out.println("</queue>");
+
+ out.println("</allocations>");
+ out.close();
+
+ assertTrue("Allocation file does not exist, not running the test",
+ ALLOC_FILE.exists());
+
+ resourceManager = new MockRM(conf);
+ resourceManager.start();
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+ preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
+ scheduler.preemptionThread;
+
+ // Create and add two nodes to the cluster
+ addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+ addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+
+ // Create an app that takes up all the resources on the cluster
+ ApplicationAttemptId app
+ = createSchedulingRequest(1024, 1, "root.default", "default", 8);
+
+ scheduler.update();
+ sendEnoughNodeUpdatesToAssignFully();
+
+ assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
+ }
+
+ private void addChildQueue(PrintWriter out) {
+ // Child queue under fairshare with same settings
+ out.println("<queue name=\"child\">");
+ out.println("<fairSharePreemptionThreshold>1" +
+ "</fairSharePreemptionThreshold>");
+ out.println("<fairSharePreemptionTimeout>0" +
+ "</fairSharePreemptionTimeout>");
+ out.println("</queue>");
+ }
+
+ private void submitAppsToEachLeafQueue() {
+ for (String queue : QUEUES) {
+ createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
+ }
+ scheduler.update();
+ }
+
+ private void sendEnoughNodeUpdatesToAssignFully() {
+ for (RMNode node : rmNodes) {
+ NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+ new NodeUpdateSchedulerEvent(node);
+ for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+ scheduler.handle(nodeUpdateSchedulerEvent);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 0a2ce81..98de8db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -106,12 +105,8 @@
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
- out.println("<queue name=\"queueA\">");
- out.println("<minResources>2048mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048mb,0vcores</minResources>");
- out.println("</queue>");
+ out.println("<queue name=\"queueA\"></queue>");
+ out.println("<queue name=\"queueB\"></queue>");
out.println("</allocations>");
out.close();
@@ -144,162 +139,6 @@
scheduler.update();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
-
- // Queue A should be above min share, B below.
- FSLeafQueue queueA =
- scheduler.getQueueManager().getLeafQueue("queueA", false);
- FSLeafQueue queueB =
- scheduler.getQueueManager().getLeafQueue("queueB", false);
- assertFalse(queueA.isStarvedForMinShare());
- assertTrue(queueB.isStarvedForMinShare());
-
- // Node checks in again, should allocate for B
- scheduler.handle(nodeEvent2);
- // Now B should have min share ( = demand here)
- assertFalse(queueB.isStarvedForMinShare());
- }
-
- @Test (timeout = 5000)
- public void testIsStarvedForFairShare() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.2</weight>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.8</weight>");
- out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
- out.println("<queue name=\"queueB1\">");
- out.println("</queue>");
- out.println("<queue name=\"queueB2\">");
- out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
- out.println("</allocations>");
- out.close();
-
- resourceManager = new MockRM(conf);
- resourceManager.start();
- scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
- // Add one big node (only care about aggregate capacity)
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- scheduler.update();
-
- // Queue A wants 4 * 1024. Node update gives this all to A
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
- scheduler.update();
- NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
- for (int i = 0; i < 4; i ++) {
- scheduler.handle(nodeEvent2);
- }
-
- QueueManager queueMgr = scheduler.getQueueManager();
- FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
- assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize());
-
- // Both queue B1 and queue B2 want 3 * 1024
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
- scheduler.update();
- for (int i = 0; i < 4; i ++) {
- scheduler.handle(nodeEvent2);
- }
-
- FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
- FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
- assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize());
- assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize());
-
- // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
- // threshold is 1.6 * 1024
- assertFalse(queueB1.isStarvedForFairShare());
-
- // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
- // threshold is 2.4 * 1024
- assertTrue(queueB2.isStarvedForFairShare());
-
- // Node checks in again
- scheduler.handle(nodeEvent2);
- scheduler.handle(nodeEvent2);
- assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize());
- assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize());
-
- // Both queue B1 and queue B2 usages go to 3 * 1024
- assertFalse(queueB1.isStarvedForFairShare());
- assertFalse(queueB2.isStarvedForFairShare());
- }
-
- @Test (timeout = 5000)
- public void testIsStarvedForFairShareDRF() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.5</weight>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.5</weight>");
- out.println("</queue>");
- out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
- out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
- out.println("</allocations>");
- out.close();
-
- resourceManager = new MockRM(conf);
- resourceManager.start();
- scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
- // Add one big node (only care about aggregate capacity)
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- scheduler.update();
-
- // Queue A wants 7 * 1024, 1. Node update gives this all to A
- createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
- scheduler.update();
- NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeEvent2);
-
- QueueManager queueMgr = scheduler.getQueueManager();
- FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
- assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize());
- assertEquals(1, queueA.getResourceUsage().getVirtualCores());
-
- // Queue B has 3 reqs :
- // 1) 2 * 1024, 5 .. which will be granted
- // 2) 1 * 1024, 1 .. which will be granted
- // 3) 1 * 1024, 1 .. which wont
- createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
- createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
- scheduler.update();
- for (int i = 0; i < 3; i ++) {
- scheduler.handle(nodeEvent2);
- }
-
- FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
- assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize());
- assertEquals(6, queueB.getResourceUsage().getVirtualCores());
-
- scheduler.update();
-
- // Verify that Queue us not starved for fair share..
- // Since the Starvation logic now uses DRF when the policy = drf, The
- // Queue should not be starved
- assertFalse(queueB.isStarvedForFairShare());
}
@Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 2cbe507..36ee685 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -17,1467 +17,259 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .TestUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.util.ControlledClock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
import org.junit.After;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.Collection;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+/**
+ * Tests to verify fairshare and minshare preemption, using parameterization.
+ */
+@RunWith(Parameterized.class)
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
- private final static String ALLOC_FILE = new File(TEST_DIR,
- TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
+ private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
- private ControlledClock clock;
+ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+ private static final int NODE_CAPACITY_MULTIPLE = 4;
- private static class StubbedFairScheduler extends FairScheduler {
- public long lastPreemptMemory = -1;
+ private final boolean fairsharePreemption;
- @Override
- protected void preemptResources(Resource toPreempt) {
- lastPreemptMemory = toPreempt.getMemorySize();
- }
+ // App that takes up the entire cluster
+ private FSAppAttempt greedyApp;
- public void resetLastPreemptResources() {
- lastPreemptMemory = -1;
- }
+ // Starving app that is expected to instigate preemption
+ private FSAppAttempt starvingApp;
+
+ @Parameterized.Parameters
+ public static Collection<Boolean[]> getParameters() {
+ return Arrays.asList(new Boolean[][] {
+ {true}, {false}});
}
- public Configuration createConfiguration() {
- Configuration conf = super.createConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
- ResourceScheduler.class);
- conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- return conf;
+ public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
+ fairsharePreemption = fairshare;
+ writeAllocFile();
}
@Before
- public void setup() throws IOException {
- conf = createConfiguration();
- clock = new ControlledClock();
+ public void setup() {
+ createConfiguration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+ ALLOC_FILE.getAbsolutePath());
+ conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+ conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
}
@After
public void teardown() {
+ ALLOC_FILE.delete();
+ conf = null;
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
- conf = null;
}
- private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) {
- conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
- utilizationThreshold);
+ private void writeAllocFile() throws IOException {
+ /*
+ * Queue hierarchy:
+ * root
+ * |--- preemptable
+ * |--- child-1
+ * |--- child-2
+ * |--- nonpreemptible
+ * |--- child-1
+ * |--- child-2
+ */
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+
+ out.println("<queue name=\"preemptable\">");
+ writePreemptionParams(out);
+
+ // Child-1
+ out.println("<queue name=\"child-1\">");
+ writeResourceParams(out);
+ out.println("</queue>");
+
+ // Child-2
+ out.println("<queue name=\"child-2\">");
+ writeResourceParams(out);
+ out.println("</queue>");
+
+ out.println("</queue>"); // end of preemptable queue
+
+ // Queue with preemption disallowed
+ out.println("<queue name=\"nonpreemptable\">");
+ out.println("<allowPreemptionFrom>false" +
+ "</allowPreemptionFrom>");
+ writePreemptionParams(out);
+
+ // Child-1
+ out.println("<queue name=\"child-1\">");
+ writeResourceParams(out);
+ out.println("</queue>");
+
+ // Child-2
+ out.println("<queue name=\"child-2\">");
+ writeResourceParams(out);
+ out.println("</queue>");
+
+ out.println("</queue>"); // end of nonpreemptable queue
+
+ out.println("</allocations>");
+ out.close();
+
+ assertTrue("Allocation file does not exist, not running the test",
+ ALLOC_FILE.exists());
+ }
+
+ private void writePreemptionParams(PrintWriter out) {
+ if (fairsharePreemption) {
+ out.println("<fairSharePreemptionThreshold>1" +
+ "</fairSharePreemptionThreshold>");
+ out.println("<fairSharePreemptionTimeout>0" +
+ "</fairSharePreemptionTimeout>");
+ } else {
+ out.println("<minSharePreemptionTimeout>0" +
+ "</minSharePreemptionTimeout>");
+ }
+ }
+
+ private void writeResourceParams(PrintWriter out) {
+ if (!fairsharePreemption) {
+ out.println("<minResources>4096mb,4vcores</minResources>");
+ }
+ }
+
+ private void setupCluster() throws IOException {
resourceManager = new MockRM(conf);
resourceManager.start();
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
- assertTrue(
- resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
- scheduler = (FairScheduler)resourceManager.getResourceScheduler();
-
- scheduler.setClock(clock);
- scheduler.updateInterval = 60 * 1000;
+ // Create and add two nodes to the cluster
+ addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+ addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
}
- // YARN-4648: The starting code for ResourceManager mock is originated from
- // TestFairScheduler. It should be keep as it was to guarantee no changing
- // behaviour of ResourceManager preemption.
- private void startResourceManagerWithRealFairScheduler() {
- scheduler = new FairScheduler();
- conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
- ResourceScheduler.class);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
- conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
- 1024);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
- conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
- conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
- conf.setFloat(
- FairSchedulerConfiguration
- .RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE,
- TEST_RESERVATION_THRESHOLD);
-
- resourceManager = new MockRM(conf);
-
- // TODO: This test should really be using MockRM. For now starting stuff
- // that is needed at a bare minimum.
- ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
- resourceManager.getRMContext().getStateStore().start();
-
- // to initialize the master key
- resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
-
- scheduler.setRMContext(resourceManager.getRMContext());
- }
-
- private void stopResourceManager() {
- if (scheduler != null) {
- scheduler.stop();
- scheduler = null;
- }
- if (resourceManager != null) {
- resourceManager.stop();
- resourceManager = null;
- }
- QueueMetrics.clearQueueMetrics();
- DefaultMetricsSystem.shutdown();
- }
-
- private void registerNodeAndSubmitApp(
- int memory, int vcores, int appContainers, int appMemory) {
- RMNode node1 = MockNodes.newNodeInfo(
- 1, Resources.createResource(memory, vcores), 1, "node1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- assertEquals("Incorrect amount of resources in the cluster",
- memory, scheduler.rootMetrics.getAvailableMB());
- assertEquals("Incorrect amount of resources in the cluster",
- vcores, scheduler.rootMetrics.getAvailableVirtualCores());
-
- createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
- scheduler.update();
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
- }
- assertEquals("app1's request is not met",
- memory - appContainers * appMemory,
- scheduler.rootMetrics.getAvailableMB());
- }
-
- @Test
- public void testPreemptionWithFreeResources() throws Exception {
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("<maxResources>0mb,0vcores</maxResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>1</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>1</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
- out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
- out.println("</allocations>");
- out.close();
-
- startResourceManagerWithStubbedFairScheduler(0f);
- // Create node with 4GB memory and 4 vcores
- registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
-
- // Verify submitting another request triggers preemption
- createSchedulingRequest(1024, "queueB", "user1", 1, 1);
- scheduler.update();
- clock.tickSec(6);
-
- ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
- scheduler.preemptTasksIfNecessary();
- assertEquals("preemptResources() should have been called", 1024,
- ((StubbedFairScheduler) scheduler).lastPreemptMemory);
-
- resourceManager.stop();
-
- startResourceManagerWithStubbedFairScheduler(0.8f);
- // Create node with 4GB memory and 4 vcores
- registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
-
- // Verify submitting another request doesn't trigger preemption
- createSchedulingRequest(1024, "queueB", "user1", 1, 1);
- scheduler.update();
- clock.tickSec(6);
-
- ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
- scheduler.preemptTasksIfNecessary();
- assertEquals("preemptResources() should not have been called", -1,
- ((StubbedFairScheduler) scheduler).lastPreemptMemory);
-
- resourceManager.stop();
-
- startResourceManagerWithStubbedFairScheduler(0.7f);
- // Create node with 4GB memory and 4 vcores
- registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
-
- // Verify submitting another request triggers preemption
- createSchedulingRequest(1024, "queueB", "user1", 1, 1);
- scheduler.update();
- clock.tickSec(6);
-
- ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
- scheduler.preemptTasksIfNecessary();
- assertEquals("preemptResources() should have been called", 1024,
- ((StubbedFairScheduler) scheduler).lastPreemptMemory);
- }
-
- @Test (timeout = 5000)
- /**
- * Make sure containers are chosen to be preempted in the correct order.
- */
- public void testChoiceOfPreemptedContainers() throws Exception {
- startResourceManagerWithRealFairScheduler();
- conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.25</weight>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.25</weight>");
- out.println("</queue>");
- out.println("<queue name=\"queueC\">");
- out.println("<weight>.25</weight>");
- out.println("</queue>");
- out.println("<queue name=\"default\">");
- out.println("<weight>.25</weight>");
- out.println("</queue>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create two nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- // Queue A and B each request two applications
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
-
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
-
- scheduler.update();
-
- scheduler.getQueueManager().getLeafQueue("queueA", true)
- .setPolicy(SchedulingPolicy.parse("fifo"));
- scheduler.getQueueManager().getLeafQueue("queueB", true)
- .setPolicy(SchedulingPolicy.parse("fair"));
-
- // Sufficient node check-ins to fully schedule containers
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- for (int i = 0; i < 4; i++) {
- scheduler.handle(nodeUpdate1);
- scheduler.handle(nodeUpdate2);
- }
-
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-
- // Now new requests arrive from queueC and default
- createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
- scheduler.update();
-
- // We should be able to claw back one container from queueA and queueB each.
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
- // First verify we are adding containers to preemption list for the app.
- // For queueA (fifo), app2 is selected.
- // For queueB (fair), app4 is selected.
- assertTrue("App2 should have container to be preempted",
- !Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
- assertTrue("App4 should have container to be preempted",
- !Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
-
- // Pretend 15 seconds have passed
- clock.tickSec(15);
-
- // Trigger a kill by insisting we want containers back
- scheduler.preemptResources(Resources.createResource(2 * 1024));
-
- // At this point the containers should have been killed (since we are not simulating AM)
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- // Inside each app, containers are sorted according to their priorities.
- // Containers with priority 4 are preempted for app2 and app4.
- Set<RMContainer> set = new HashSet<RMContainer>();
- for (RMContainer container :
- scheduler.getSchedulerApp(app2).getLiveContainers()) {
- if (container.getAllocatedSchedulerKey().getPriority().getPriority() ==
- 4) {
- set.add(container);
+ private void sendEnoughNodeUpdatesToAssignFully() {
+ for (RMNode node : rmNodes) {
+ NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+ new NodeUpdateSchedulerEvent(node);
+ for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+ scheduler.handle(nodeUpdateSchedulerEvent);
}
}
- for (RMContainer container :
- scheduler.getSchedulerApp(app4).getLiveContainers()) {
- if (container.getAllocatedSchedulerKey().getPriority().getPriority() ==
- 4) {
- set.add(container);
+ }
+
+ /**
+ * Submit application to {@code queue1} and take over the entire cluster.
+ * Submit application with larger containers to {@code queue2} that
+ * requires preemption from the first application.
+ *
+ * @param queue1 first queue
+ * @param queue2 second queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ private void submitApps(String queue1, String queue2)
+ throws InterruptedException {
+ // Create an app that takes up all the resources on the cluster
+ ApplicationAttemptId appAttemptId1
+ = createSchedulingRequest(1024, 1, queue1, "default",
+ NODE_CAPACITY_MULTIPLE * rmNodes.size());
+ greedyApp = scheduler.getSchedulerApp(appAttemptId1);
+ scheduler.update();
+ sendEnoughNodeUpdatesToAssignFully();
+ assertEquals(8, greedyApp.getLiveContainers().size());
+
+ // Create an app that takes up all the resources on the cluster
+ ApplicationAttemptId appAttemptId2
+ = createSchedulingRequest(2048, 2, queue2, "default",
+ NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
+ starvingApp = scheduler.getSchedulerApp(appAttemptId2);
+
+ // Sleep long enough to pass
+ Thread.sleep(10);
+
+ scheduler.update();
+ }
+
+ private void verifyPreemption() throws InterruptedException {
+ // Sleep long enough for four containers to be preempted. Note that the
+ // starved app must be queued four times for containers to be preempted.
+ for (int i = 0; i < 10000; i++) {
+ if (greedyApp.getLiveContainers().size() == 4) {
+ break;
}
+ Thread.sleep(10);
}
- assertTrue("Containers with priority=4 in app2 and app4 should be " +
- "preempted.", set.isEmpty());
- // Trigger a kill by insisting we want containers back
- scheduler.preemptResources(Resources.createResource(2 * 1024));
+ // Verify the right amount of containers are preempted from greedyApp
+ assertEquals(4, greedyApp.getLiveContainers().size());
- // Pretend 15 seconds have passed
- clock.tickSec(15);
+ sendEnoughNodeUpdatesToAssignFully();
- // We should be able to claw back another container from A and B each.
- // For queueA (fifo), continue preempting from app2.
- // For queueB (fair), even app4 has a lowest priority container with p=4, it
- // still preempts from app3 as app3 is most over fair share.
- scheduler.preemptResources(Resources.createResource(2 * 1024));
+ // Verify the preempted containers are assigned to starvingApp
+ assertEquals(2, starvingApp.getLiveContainers().size());
+ }
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-
- // Now A and B are below fair share, so preemption shouldn't do anything
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- assertTrue("App1 should have no container to be preempted",
- scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
- assertTrue("App2 should have no container to be preempted",
- scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
- assertTrue("App3 should have no container to be preempted",
- scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
- assertTrue("App4 should have no container to be preempted",
- scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
- stopResourceManager();
+ private void verifyNoPreemption() throws InterruptedException {
+ // Sleep long enough to ensure not even one container is preempted.
+ for (int i = 0; i < 600; i++) {
+ if (greedyApp.getLiveContainers().size() != 8) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertEquals(8, greedyApp.getLiveContainers().size());
}
@Test
- public void testPreemptionIsNotDelayedToNextRound() throws Exception {
- startResourceManagerWithRealFairScheduler();
-
- conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>8</weight>");
- out.println("<queue name=\"queueA1\" />");
- out.println("<queue name=\"queueA2\" />");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>2</weight>");
- out.println("</queue>");
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Add a node of 8G
- RMNode node1 = MockNodes.newNodeInfo(1,
- Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- // Run apps in queueA.A1 and queueB
- ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
- "queueA.queueA1", "user1", 7, 1);
- // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
- ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
- "user2", 1, 1);
-
- scheduler.update();
-
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- for (int i = 0; i < 8; i++) {
- scheduler.handle(nodeUpdate1);
+ public void testPreemptionWithinSameLeafQueue() throws Exception {
+ setupCluster();
+ String queue = "root.preemptable.child-1";
+ submitApps(queue, queue);
+ if (fairsharePreemption) {
+ verifyPreemption();
+ } else {
+ verifyNoPreemption();
}
-
- // verify if the apps got the containers they requested
- assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-
- // Now submit an app in queueA.queueA2
- ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
- "queueA.queueA2", "user3", 7, 1);
- scheduler.update();
-
- // Let 11 sec pass
- clock.tickSec(11);
-
- scheduler.update();
- Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
- .getLeafQueue("queueA.queueA2", false), clock.getTime());
- assertEquals(3277, toPreempt.getMemorySize());
-
- // verify if the 3 containers required by queueA2 are preempted in the same
- // round
- scheduler.preemptResources(toPreempt);
- assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
- .size());
- stopResourceManager();
- }
-
- @Test (timeout = 5000)
- /**
- * Tests the timing of decision to preempt tasks.
- */
- public void testPreemptionDecision() throws Exception {
- startResourceManagerWithRealFairScheduler();
-
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("<maxResources>0mb,0vcores</maxResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueC\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueD\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- // Queue A and B each request three containers
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
-
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
- ApplicationAttemptId app5 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
- ApplicationAttemptId app6 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
- }
-
- // Now new requests arrive from queues C and D
- ApplicationAttemptId app7 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- ApplicationAttemptId app8 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- ApplicationAttemptId app9 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- ApplicationAttemptId app10 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
- ApplicationAttemptId app11 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
- ApplicationAttemptId app12 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
- scheduler.update();
-
- FSLeafQueue schedC =
- scheduler.getQueueManager().getLeafQueue("queueC", true);
- FSLeafQueue schedD =
- scheduler.getQueueManager().getLeafQueue("queueD", true);
-
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
- // After minSharePreemptionTime has passed, they should want to preempt min
- // share.
- clock.tickSec(6);
- assertEquals(
- 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
- assertEquals(
- 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
-
- // After fairSharePreemptionTime has passed, they should want to preempt
- // fair share.
- scheduler.update();
- clock.tickSec(6);
- assertEquals(
- 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
- assertEquals(
- 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
- stopResourceManager();
}
@Test
-/**
- * Tests the timing of decision to preempt tasks.
- */
- public void testPreemptionDecisionWithDRF() throws Exception {
- startResourceManagerWithRealFairScheduler();
-
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("<maxResources>0mb,0vcores</maxResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,1vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,2vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueC\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,3vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueD\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,2vcores</minResources>");
- out.println("</queue>");
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
- out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- // Queue A and B each request three containers
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
-
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
- ApplicationAttemptId app5 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
- ApplicationAttemptId app6 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
- }
-
- // Now new requests arrive from queues C and D
- ApplicationAttemptId app7 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- ApplicationAttemptId app8 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- ApplicationAttemptId app9 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- ApplicationAttemptId app10 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
- ApplicationAttemptId app11 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
- ApplicationAttemptId app12 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
-
- scheduler.update();
-
- FSLeafQueue schedC =
- scheduler.getQueueManager().getLeafQueue("queueC", true);
- FSLeafQueue schedD =
- scheduler.getQueueManager().getLeafQueue("queueD", true);
-
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
-
- // Test :
- // 1) whether componentWise min works as expected.
- // 2) DRF calculator is used
-
- // After minSharePreemptionTime has passed, they should want to preempt min
- // share.
- clock.tickSec(6);
- Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
- assertEquals(1024, res.getMemorySize());
- // Demand = 3
- assertEquals(3, res.getVirtualCores());
-
- res = scheduler.resourceDeficit(schedD, clock.getTime());
- assertEquals(1024, res.getMemorySize());
- // Demand = 6, but min share = 2
- assertEquals(2, res.getVirtualCores());
-
- // After fairSharePreemptionTime has passed, they should want to preempt
- // fair share.
- scheduler.update();
- clock.tickSec(6);
- res = scheduler.resourceDeficit(schedC, clock.getTime());
- assertEquals(1536, res.getMemorySize());
- assertEquals(3, res.getVirtualCores());
-
- res = scheduler.resourceDeficit(schedD, clock.getTime());
- assertEquals(1536, res.getMemorySize());
- // Demand = 6, but fair share = 3
- assertEquals(3, res.getVirtualCores());
- stopResourceManager();
+ public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
+ setupCluster();
+ submitApps("root.preemptable.child-1", "root.preemptable.child-2");
+ verifyPreemption();
}
@Test
- /**
- * Tests the various timing of decision to preempt tasks.
- */
- public void testPreemptionDecisionWithVariousTimeout() throws Exception {
- startResourceManagerWithRealFairScheduler();
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("<maxResources>0mb,0vcores</maxResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>1</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>2</weight>");
- out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
- out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
- out.println("<queue name=\"queueB1\">");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
- out.println("</queue>");
- out.println("<queue name=\"queueB2\">");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<queue name=\"queueC\">");
- out.println("<weight>1</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
- out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Check the min/fair share preemption timeout for each queue
- QueueManager queueMgr = scheduler.getQueueManager();
- assertEquals(30000, queueMgr.getQueue("root")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("default")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueA")
- .getFairSharePreemptionTimeout());
- assertEquals(25000, queueMgr.getQueue("queueB")
- .getFairSharePreemptionTimeout());
- assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
- .getFairSharePreemptionTimeout());
- assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueC")
- .getFairSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("root")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("default")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueA")
- .getMinSharePreemptionTimeout());
- assertEquals(10000, queueMgr.getQueue("queueB")
- .getMinSharePreemptionTimeout());
- assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
- .getMinSharePreemptionTimeout());
- assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueC")
- .getMinSharePreemptionTimeout());
-
- // Create one big node
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- // Queue A takes all resources
- for (int i = 0; i < 6; i ++) {
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
- }
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- for (int i = 0; i < 6; i++) {
- scheduler.handle(nodeUpdate1);
- }
-
- // Now new requests arrive from queues B1, B2 and C
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- scheduler.update();
-
- FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
- FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
- FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
-
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
-
- // After 5 seconds, queueB1 wants to preempt min share
- scheduler.update();
- clock.tickSec(6);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
- assertEquals(
- 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
- assertEquals(
- 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
- // After 10 seconds, queueB2 wants to preempt min share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
- assertEquals(
- 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
- // After 15 seconds, queueC wants to preempt min share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
- // After 20 seconds, queueB2 should want to preempt fair share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
- // After 25 seconds, queueB1 should want to preempt fair share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
- // After 30 seconds, queueC should want to preempt fair share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
- stopResourceManager();
+ public void testPreemptionBetweenNonSiblingQueues() throws Exception {
+ setupCluster();
+ submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
+ verifyPreemption();
}
@Test
- /**
- * Tests the decision to preempt tasks respect to non-preemptable queues
- * 1, Queues as follow:
- * queueA(non-preemptable)
- * queueB(preemptable)
- * parentQueue(non-preemptable)
- * --queueC(preemptable)
- * queueD(preemptable)
- * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
- * 3, Now all resource are occupied
- * 4, Submit request to queueD, and need to preempt resource from other queues
- * 5, Only preemptable queue(queueB) would be preempted.
- */
- public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
- startResourceManagerWithRealFairScheduler();
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("<maxResources>0mb,0vcores</maxResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queue name=\"parentQueue\">");
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
- out.println("<queue name=\"queueC\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<queue name=\"queueD\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>2048mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes(3G each)
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- RMNode node4 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
- "127.0.0.4");
- NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
- scheduler.handle(nodeEvent4);
-
- // Submit apps to queueA, queueB, queueC,
- // now all resource of the cluster is occupied
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
-
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
- // Now new requests arrive from queues D
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
- scheduler.update();
- FSLeafQueue schedD =
- scheduler.getQueueManager().getLeafQueue("queueD", true);
-
- // After minSharePreemptionTime has passed, 2G resource should preempted from
- // queueB to queueD
- clock.tickSec(6);
- assertEquals(2048,
- scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
-
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- // now only app2 is selected to be preempted
- assertTrue("App2 should have container to be preempted",
- !Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
- assertTrue("App1 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app1).getLiveContainers(),
- scheduler.getSchedulerApp(app1).getPreemptionContainers()));
- assertTrue("App3 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app3).getLiveContainers(),
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
- // Pretend 20 seconds have passed
- clock.tickSec(20);
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
- // after preemption
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- stopResourceManager();
- }
-
- @Test
- /**
- * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
- * all queues.
- * Then none of them would be preempted actually.
- * 1, Queues as follow:
- * queueA(non-preemptable)
- * queueB(non-preemptable)
- * parentQueue(non-preemptable)
- * --queueC(preemptable)
- * parentQueue(preemptable)
- * --queueD(non-preemptable)
- * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
- * 3, Now all resource are occupied
- * 4, Submit request to queueA, and need to preempt resource from other queues
- * 5, None of queues would be preempted.
- */
- public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
- throws Exception {
- startResourceManagerWithRealFairScheduler();
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("<maxResources>0mb,0vcores</maxResources>");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>2048mb,0vcores</minResources>");
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
- out.println("</queue>");
- out.println("<queue name=\"parentQueue1\">");
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
- out.println("<queue name=\"queueC\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<queue name=\"parentQueue2\">");
- out.println("<queue name=\"queueD\">");
- out.println("<weight>.25</weight>");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes(3G each)
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- RMNode node4 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
- "127.0.0.4");
- NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
- scheduler.handle(nodeEvent4);
-
- // Submit apps to queueB, queueC, queueD
- // now all resource of the cluster is occupied
-
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
-
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
- // Now new requests arrive from queues A
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
- scheduler.update();
- FSLeafQueue schedA =
- scheduler.getQueueManager().getLeafQueue("queueA", true);
-
- // After minSharePreemptionTime has passed, resource deficit is 2G
- clock.tickSec(6);
- assertEquals(2048,
- scheduler.resourceDeficit(schedA, clock.getTime()).getMemorySize());
-
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- // now none app is selected to be preempted
- assertTrue("App1 should have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app1).getLiveContainers(),
- scheduler.getSchedulerApp(app1).getPreemptionContainers()));
- assertTrue("App2 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
- assertTrue("App3 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app3).getLiveContainers(),
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
- // Pretend 20 seconds have passed
- clock.tickSec(20);
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
- // after preemption
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- stopResourceManager();
- }
-
- @Test
- public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
- startResourceManagerWithRealFairScheduler();
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<queue name=\"queueB1\">");
- out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
- out.println("</queue>");
- out.println("<queue name=\"queueB2\">");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<queue name=\"queueC\">");
- out.println("</queue>");
- out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
- out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
- out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Check the min/fair share preemption timeout for each queue
- QueueManager queueMgr = scheduler.getQueueManager();
- assertEquals(30000, queueMgr.getQueue("root")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("default")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueA")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueB")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueC")
- .getFairSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("root")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("default")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueA")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueB")
- .getMinSharePreemptionTimeout());
- assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueC")
- .getMinSharePreemptionTimeout());
-
- // If both exist, we take the default one
- out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"default\">");
- out.println("</queue>");
- out.println("<queue name=\"queueA\">");
- out.println("</queue>");
- out.println("<queue name=\"queueB\">");
- out.println("<queue name=\"queueB1\">");
- out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
- out.println("</queue>");
- out.println("<queue name=\"queueB2\">");
- out.println("</queue>");
- out.println("</queue>");
- out.println("<queue name=\"queueC\">");
- out.println("</queue>");
- out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
- out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
- out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
- out.println("</allocations>");
- out.close();
-
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- assertEquals(25000, queueMgr.getQueue("root")
- .getFairSharePreemptionTimeout());
- stopResourceManager();
- }
-
- @Test(timeout = 5000)
- public void testRecoverRequestAfterPreemption() throws Exception {
- startResourceManagerWithRealFairScheduler();
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- SchedulerRequestKey schedulerKey = TestUtils.toSchedulerKey(20);
- String host = "127.0.0.1";
- int GB = 1024;
-
- // Create Node and raised Node Added event
- RMNode node = MockNodes.newNodeInfo(1,
- Resources.createResource(16 * 1024, 4), 0, host);
- NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
- scheduler.handle(nodeEvent);
-
- // Create 3 container requests and place it in ask
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
- schedulerKey.getPriority().getPriority(), 1, true);
- ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
- node.getRackName(), schedulerKey.getPriority().getPriority(), 1,
- true);
- ResourceRequest offRackRequest = createResourceRequest(GB, 1,
- ResourceRequest.ANY, schedulerKey.getPriority().getPriority(), 1, true);
- ask.add(nodeLocalRequest);
- ask.add(rackLocalRequest);
- ask.add(offRackRequest);
-
- // Create Request and update
- ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
- "user1", ask);
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
- scheduler.handle(nodeUpdate);
-
- assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
- .size());
- SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
-
- // ResourceRequest will be empty once NodeUpdate is completed
- Assert.assertNull(app.getResourceRequest(schedulerKey, host));
-
- ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
- RMContainer rmContainer = app.getRMContainer(containerId1);
-
- // Create a preempt event and register for preemption
- scheduler.warnOrKillContainer(rmContainer);
-
- // Wait for few clock ticks
- clock.tickSec(5);
-
- // preempt now
- scheduler.warnOrKillContainer(rmContainer);
-
- // Trigger container rescheduled event
- scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
- SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
-
- List<ResourceRequest> requests = rmContainer.getResourceRequests();
- // Once recovered, resource request will be present again in app
- Assert.assertEquals(3, requests.size());
- for (ResourceRequest request : requests) {
- Assert.assertEquals(1,
- app.getResourceRequest(schedulerKey, request.getResourceName())
- .getNumContainers());
- }
-
- // Send node heartbeat
- scheduler.update();
- scheduler.handle(nodeUpdate);
-
- List<Container> containers = scheduler.allocate(appAttemptId,
- Collections.<ResourceRequest> emptyList(),
- Collections.<ContainerId> emptyList(), null, null, null, null).getContainers();
-
- // Now with updated ResourceRequest, a container is allocated for AM.
- Assert.assertTrue(containers.size() == 1);
- stopResourceManager();
+ public void testNoPreemptionFromDisallowedQueue() throws Exception {
+ setupCluster();
+ submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
+ verifyNoPreemption();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
new file mode 100644
index 0000000..5736f75
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * QueueManager tests that require a real scheduler
+ */
+public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
+ private final static File ALLOC_FILE = new File(TEST_DIR, "test-queue-mgr");
+
+ @Before
+ public void setup() throws IOException {
+ createConfiguration();
+ writeAllocFile(30, 40);
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+ ALLOC_FILE.getAbsolutePath());
+
+ resourceManager = new MockRM(conf);
+ resourceManager.start();
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+ }
+
+ @After
+ public void teardown() {
+ ALLOC_FILE.deleteOnExit();
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
+ }
+
+ private void writeAllocFile(int defaultFairShareTimeout,
+ int fairShareTimeout) throws IOException {
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"default\">");
+ out.println("</queue>");
+ out.println("<queue name=\"queueA\">");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<queue name=\"queueB1\">");
+ out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB2\">");
+ out.println("</queue>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueC\">");
+ out.println("</queue>");
+ out.println("<defaultMinSharePreemptionTimeout>15"
+ + "</defaultMinSharePreemptionTimeout>");
+ out.println("<defaultFairSharePreemptionTimeout>" +
+ + defaultFairShareTimeout + "</defaultFairSharePreemptionTimeout>");
+ out.println("<fairSharePreemptionTimeout>"
+ + fairShareTimeout + "</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+ }
+
+ @Test
+ public void testBackwardsCompatiblePreemptionConfiguration()
+ throws IOException {
+ // Check the min/fair share preemption timeout for each queue
+ QueueManager queueMgr = scheduler.getQueueManager();
+ assertEquals(30000, queueMgr.getQueue("root")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("default")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueA")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueB")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueC")
+ .getFairSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("root")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("default")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueA")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueB")
+ .getMinSharePreemptionTimeout());
+ assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueC")
+ .getMinSharePreemptionTimeout());
+
+ // Lower the fairshare preemption timeouts and verify it is picked
+ // correctly.
+ writeAllocFile(25, 30);
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+ assertEquals(25000, queueMgr.getQueue("root")
+ .getFairSharePreemptionTimeout());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index dea2dd1..57c7301 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -330,11 +330,6 @@
}
@Override
- public RMContainer preemptContainer() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Resource getFairShare() {
throw new UnsupportedOperationException();
}