YARN-4752. Improved preemption in FairScheduler. (kasha)

Contains:
YARN-5605. Preempt containers (all on one node) to meet the requirement of starved applications
YARN-5821. Drop left-over preemption-related code and clean up method visibilities in the Schedulable hierarchy
YARN-5783. Verify identification of starved applications.
YARN-5819. Verify fairshare and minshare preemption
YARN-5885. Cleanup YARN-4752 branch for merge

Change-Id: Iee0962377d019dd64dc69a020725d2eaf360858c
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 760b0ea..462e02a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -143,6 +143,10 @@
   public static Resource none() {
     return NONE;
   }
+
+  public static boolean isNone(Resource other) {
+    return NONE.equals(other);
+  }
   
   public static Resource unbounded() {
     return UNBOUNDED;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 80811b1..feb20ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -624,6 +624,23 @@
   }
 
   /**
+   * Method to return the next resource request to be serviced.
+   *
+   * In the initial implementation, we just pick any {@link ResourceRequest}
+   * corresponding to the highest priority.
+   *
+   * @return next {@link ResourceRequest} to allocate resources for.
+   */
+  @Unstable
+  public synchronized ResourceRequest getNextResourceRequest() {
+    for (ResourceRequest rr:
+        resourceRequestMap.get(schedulerKeys.firstKey()).values()) {
+      return rr;
+    }
+    return null;
+  }
+
+  /**
    * Returns if the place (node/rack today) is either blacklisted by the
    * application (user) or the system
    *
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index bc52816..b3ef471 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -1253,6 +1253,22 @@
     unconfirmedAllocatedVcores.addAndGet(-res.getVirtualCores());
   }
 
+  @Override
+  public int hashCode() {
+    return getApplicationAttemptId().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (! (o instanceof SchedulerApplicationAttempt)) {
+      return false;
+    }
+
+    SchedulerApplicationAttempt other = (SchedulerApplicationAttempt) o;
+    return (this == other ||
+        this.getApplicationAttemptId().equals(other.getApplicationAttemptId()));
+  }
+
   /**
    * Different state for Application Master, user can see this state from web UI
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f076e4f..498f34f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -1101,4 +1101,20 @@
       }
     }
   }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index df20117..39f4a3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -18,18 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -53,6 +52,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -78,10 +78,17 @@
   private ResourceWeights resourceWeights;
   private Resource demand = Resources.createResource(0);
   private FairScheduler scheduler;
+  private FSQueue fsQueue;
   private Resource fairShare = Resources.createResource(0, 0);
-  private Resource preemptedResources = Resources.createResource(0);
-  private RMContainerComparator comparator = new RMContainerComparator();
-  private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
+  // Preemption related variables
+  private final Resource preemptedResources = Resources.clone(Resources.none());
+  private final Set<RMContainer> containersToPreempt = new HashSet<>();
+  private Resource fairshareStarvation = Resources.none();
+  private long lastTimeAtFairShare;
+
+  // minShareStarvation attributed to this application by the leaf queue
+  private Resource minshareStarvation = Resources.none();
 
   // Used to record node reservation by an app.
   // Key = RackName, Value = Set of Nodes reserved by app on rack
@@ -107,12 +114,14 @@
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
 
     this.scheduler = scheduler;
+    this.fsQueue = queue;
     this.startTime = scheduler.getClock().getTime();
+    this.lastTimeAtFairShare = this.startTime;
     this.appPriority = Priority.newInstance(1);
     this.resourceWeights = new ResourceWeights();
   }
 
-  public ResourceWeights getResourceWeights() {
+  ResourceWeights getResourceWeights() {
     return resourceWeights;
   }
 
@@ -123,7 +132,7 @@
     return queue.getMetrics();
   }
 
-  public void containerCompleted(RMContainer rmContainer,
+  void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     try {
       writeLock.lock();
@@ -143,6 +152,7 @@
 
       // Remove from the list of containers
       liveContainers.remove(rmContainer.getContainerId());
+      untrackContainerForPreemption(rmContainer);
 
       Resource containerResource = rmContainer.getContainer().getResource();
       RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
@@ -152,9 +162,6 @@
       queue.getMetrics().releaseResources(getUser(), 1, containerResource);
       this.attemptResourceUsage.decUsed(containerResource);
 
-      // remove from preemption map if it is completed
-      preemptionMap.remove(rmContainer);
-
       // Clear resource utilization metrics cache.
       lastMemoryAggregateAllocationUpdateTime = -1;
     } finally {
@@ -484,7 +491,7 @@
    * @param schedulerKey Scheduler Key
    * @param level NodeType
    */
-  public void resetAllowedLocalityLevel(
+  void resetAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, NodeType level) {
     NodeType old;
     try {
@@ -498,57 +505,113 @@
         + " priority " + schedulerKey.getPriority());
   }
 
-  // related methods
-  public void addPreemption(RMContainer container, long time) {
-    assert preemptionMap.get(container) == null;
-    try {
-      writeLock.lock();
-      preemptionMap.put(container, time);
-      Resources.addTo(preemptedResources, container.getAllocatedResource());
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  public Long getContainerPreemptionTime(RMContainer container) {
-    return preemptionMap.get(container);
-  }
-
-  public Set<RMContainer> getPreemptionContainers() {
-    return preemptionMap.keySet();
-  }
-  
   @Override
   public FSLeafQueue getQueue() {
-    return (FSLeafQueue)super.getQueue();
+    Queue queue = super.getQueue();
+    assert queue instanceof FSLeafQueue;
+    return (FSLeafQueue) queue;
   }
 
-  public Resource getPreemptedResources() {
-    return preemptedResources;
+  // Preemption related methods
+
+  /**
+   * Get overall starvation - fairshare and attributed minshare.
+   *
+   * @return total starvation attributed to this application
+   */
+  Resource getStarvation() {
+    return Resources.add(fairshareStarvation, minshareStarvation);
   }
 
-  public void resetPreemptedResources() {
-    preemptedResources = Resources.createResource(0);
-    for (RMContainer container : getPreemptionContainers()) {
+  /**
+   * Set the minshare attributed to this application. To be called only from
+   * {@link FSLeafQueue#updateStarvedApps}.
+   *
+   * @param starvation minshare starvation attributed to this app
+   */
+  void setMinshareStarvation(Resource starvation) {
+    this.minshareStarvation = starvation;
+  }
+
+  /**
+   * Reset the minshare starvation attributed to this application. To be
+   * called only from {@link FSLeafQueue#updateStarvedApps}
+   */
+  void resetMinshareStarvation() {
+    this.minshareStarvation = Resources.none();
+  }
+
+  void trackContainerForPreemption(RMContainer container) {
+    containersToPreempt.add(container);
+    synchronized (preemptedResources) {
       Resources.addTo(preemptedResources, container.getAllocatedResource());
     }
   }
 
-  public void clearPreemptedResources() {
-    preemptedResources.setMemorySize(0);
-    preemptedResources.setVirtualCores(0);
+  private void untrackContainerForPreemption(RMContainer container) {
+    synchronized (preemptedResources) {
+      Resources.subtractFrom(preemptedResources,
+          container.getAllocatedResource());
+    }
+    containersToPreempt.remove(container);
+  }
+
+  Set<RMContainer> getPreemptionContainers() {
+    return containersToPreempt;
+  }
+
+  private Resource getPreemptedResources() {
+    synchronized (preemptedResources) {
+      return preemptedResources;
+    }
+  }
+
+  boolean canContainerBePreempted(RMContainer container) {
+    // Sanity check that the app owns this container
+    if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
+        !newlyAllocatedContainers.contains(container)) {
+      LOG.error("Looking to preempt container " + container +
+          ". Container does not belong to app " + getApplicationId());
+      return false;
+    }
+
+    if (containersToPreempt.contains(container)) {
+      // The container is already under consideration for preemption
+      return false;
+    }
+
+    // Check if any of the parent queues are not preemptable
+    // TODO (YARN-5831): Propagate the "preemptable" flag all the way down to
+    // the app to avoid recursing up every time.
+    for (FSQueue q = getQueue();
+        !q.getQueueName().equals("root");
+        q = q.getParent()) {
+      if (!q.isPreemptable()) {
+        return false;
+      }
+    }
+
+    // Check if the app's allocation will be over its fairshare even
+    // after preempting this container
+    Resource currentUsage = getResourceUsage();
+    Resource fairshare = getFairShare();
+    Resource overFairShareBy = Resources.subtract(currentUsage, fairshare);
+
+    return (Resources.fitsIn(container.getAllocatedResource(),
+        overFairShareBy));
   }
 
   /**
    * Create and return a container object reflecting an allocation for the
-   * given appliction on the given node with the given capability and
+   * given application on the given node with the given capability and
    * priority.
+   *
    * @param node Node
    * @param capability Capability
    * @param schedulerKey Scheduler Key
    * @return Container
    */
-  public Container createContainer(FSSchedulerNode node, Resource capability,
+  private Container createContainer(FSSchedulerNode node, Resource capability,
       SchedulerRequestKey schedulerKey) {
 
     NodeId nodeId = node.getRMNode().getNodeID();
@@ -556,12 +619,10 @@
         getApplicationAttemptId(), getNewContainerId());
 
     // Create the container
-    Container container = BuilderUtils.newContainer(containerId, nodeId,
+    return BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability,
         schedulerKey.getPriority(), null,
         schedulerKey.getAllocationRequestId());
-
-    return container;
   }
 
   /**
@@ -816,7 +877,8 @@
     }
 
     Collection<SchedulerRequestKey> keysToTry = (reserved) ?
-        Arrays.asList(node.getReservedContainer().getReservedSchedulerKey()) :
+        Collections.singletonList(
+            node.getReservedContainer().getReservedSchedulerKey()) :
         getSchedulerKeys();
 
     // For each priority, see if we can schedule a node local, rack local
@@ -974,7 +1036,7 @@
    *     Node that the application has an existing reservation on
    * @return whether the reservation on the given node is valid.
    */
-  public boolean assignReservedContainer(FSSchedulerNode node) {
+  boolean assignReservedContainer(FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
     SchedulerRequestKey reservedSchedulerKey =
         rmContainer.getReservedSchedulerKey();
@@ -1003,17 +1065,43 @@
     return true;
   }
 
-  static class RMContainerComparator implements Comparator<RMContainer>,
-      Serializable {
-    @Override
-    public int compare(RMContainer c1, RMContainer c2) {
-      int ret = c1.getContainer().getPriority().compareTo(
-          c2.getContainer().getPriority());
-      if (ret == 0) {
-        return c2.getContainerId().compareTo(c1.getContainerId());
-      }
-      return ret;
+  /**
+   * Helper method that computes the extent of fairshare fairshareStarvation.
+   */
+  Resource fairShareStarvation() {
+    Resource threshold = Resources.multiply(
+        getFairShare(), fsQueue.getFairSharePreemptionThreshold());
+    Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+
+    long now = scheduler.getClock().getTime();
+    boolean starved = Resources.greaterThan(
+        fsQueue.getPolicy().getResourceCalculator(),
+        scheduler.getClusterResource(), starvation, Resources.none());
+
+    if (!starved) {
+      lastTimeAtFairShare = now;
     }
+
+    if (starved &&
+        (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
+      this.fairshareStarvation = starvation;
+    } else {
+      this.fairshareStarvation = Resources.none();
+    }
+    return this.fairshareStarvation;
+  }
+
+  ResourceRequest getNextResourceRequest() {
+    return appSchedulingInfo.getNextResourceRequest();
+  }
+
+  /**
+   * Helper method that captures if this app is identified to be starved.
+   * @return true if the app is starved for fairshare, false otherwise
+   */
+  @VisibleForTesting
+  boolean isStarvedForFairShare() {
+    return !Resources.isNone(fairshareStarvation);
   }
 
   /* Schedulable methods implementation */
@@ -1045,14 +1133,13 @@
 
   @Override
   public Resource getResourceUsage() {
-    // Here the getPreemptedResources() always return zero, except in
-    // a preemption round
-    // In the common case where preempted resource is zero, return the
-    // current consumption Resource object directly without calling
-    // Resources.subtract which creates a new Resource object for each call.
-    return getPreemptedResources().equals(Resources.none()) ?
-        getCurrentConsumption() :
-        Resources.subtract(getCurrentConsumption(), getPreemptedResources());
+    /*
+     * getResourcesToPreempt() returns zero, except when there are containers
+     * to preempt. Avoid creating an object in the common case.
+     */
+    return getPreemptedResources().equals(Resources.none())
+        ? getCurrentConsumption()
+        : Resources.subtract(getCurrentConsumption(), getPreemptedResources());
   }
 
   @Override
@@ -1131,24 +1218,19 @@
         diagnosticMessageBldr.toString());
   }
 
-  /**
-   * Preempt a running container according to the priority
+  /*
+   * Overriding to appease findbugs
    */
   @Override
-  public RMContainer preemptContainer() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("App " + getName() + " is going to preempt a running " +
-          "container");
-    }
+  public int hashCode() {
+    return super.hashCode();
+  }
 
-    RMContainer toBePreempted = null;
-    for (RMContainer container : getLiveContainers()) {
-      if (!getPreemptionContainers().contains(container) &&
-          (toBePreempted == null ||
-              comparator.compare(toBePreempted, container) > 0)) {
-        toBePreempted = container;
-      }
-    }
-    return toBePreempted;
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
new file mode 100644
index 0000000..56bc99c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+/**
+ * Helper class that holds basic information to be passed around
+ * FairScheduler classes. Think of this as a glorified map that holds key
+ * information about the scheduler.
+ */
+public class FSContext {
+  // Preemption-related info
+  private boolean preemptionEnabled = false;
+  private float preemptionUtilizationThreshold;
+  private FSStarvedApps starvedApps;
+
+  public boolean isPreemptionEnabled() {
+    return preemptionEnabled;
+  }
+
+  public void setPreemptionEnabled() {
+    this.preemptionEnabled = true;
+    if (starvedApps == null) {
+      starvedApps = new FSStarvedApps();
+    }
+  }
+
+  public FSStarvedApps getStarvedApps() {
+    return starvedApps;
+  }
+
+  public float getPreemptionUtilizationThreshold() {
+    return preemptionUtilizationThreshold;
+  }
+
+  public void setPreemptionUtilizationThreshold(
+      float preemptionUtilizationThreshold) {
+    this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index c393759..343e9c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -45,16 +44,20 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import static org.apache.hadoop.yarn.util.resource.Resources.none;
+
 @Private
 @Unstable
 public class FSLeafQueue extends FSQueue {
-  private static final Log LOG = LogFactory.getLog(
-      FSLeafQueue.class.getName());
+  private static final Log LOG = LogFactory.getLog(FSLeafQueue.class.getName());
+  private static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
 
-  private final List<FSAppAttempt> runnableApps = // apps that are runnable
-      new ArrayList<FSAppAttempt>();
-  private final List<FSAppAttempt> nonRunnableApps =
-      new ArrayList<FSAppAttempt>();
+  private FairScheduler scheduler;
+  private FSContext context;
+
+  // apps that are runnable
+  private final List<FSAppAttempt> runnableApps = new ArrayList<>();
+  private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
   // get a lock with fair distribution for app list updates
   private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
   private final Lock readLock = rwl.readLock();
@@ -64,24 +67,23 @@
   
   // Variables used for preemption
   private long lastTimeAtMinShare;
-  private long lastTimeAtFairShareThreshold;
-  
+
   // Track the AM resource usage for this queue
   private Resource amResourceUsage;
 
   private final ActiveUsersManager activeUsersManager;
-  public static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
 
   public FSLeafQueue(String name, FairScheduler scheduler,
       FSParentQueue parent) {
     super(name, scheduler, parent);
+    this.scheduler = scheduler;
+    this.context = scheduler.getContext();
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
-    this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
     amResourceUsage = Resource.newInstance(0, 0);
   }
   
-  public void addApp(FSAppAttempt app, boolean runnable) {
+  void addApp(FSAppAttempt app, boolean runnable) {
     writeLock.lock();
     try {
       if (runnable) {
@@ -108,7 +110,7 @@
    * Removes the given app from this queue.
    * @return whether or not the app was runnable
    */
-  public boolean removeApp(FSAppAttempt app) {
+  boolean removeApp(FSAppAttempt app) {
     boolean runnable = false;
 
     // Remove app from runnable/nonRunnable list while holding the write lock
@@ -139,7 +141,7 @@
    * Removes the given app if it is non-runnable and belongs to this queue
    * @return true if the app is removed, false otherwise
    */
-  public boolean removeNonRunnableApp(FSAppAttempt app) {
+  boolean removeNonRunnableApp(FSAppAttempt app) {
     writeLock.lock();
     try {
       return nonRunnableApps.remove(app);
@@ -148,7 +150,7 @@
     }
   }
 
-  public boolean isRunnableApp(FSAppAttempt attempt) {
+  boolean isRunnableApp(FSAppAttempt attempt) {
     readLock.lock();
     try {
       return runnableApps.contains(attempt);
@@ -157,7 +159,7 @@
     }
   }
 
-  public boolean isNonRunnableApp(FSAppAttempt attempt) {
+  boolean isNonRunnableApp(FSAppAttempt attempt) {
     readLock.lock();
     try {
       return nonRunnableApps.contains(attempt);
@@ -166,30 +168,8 @@
     }
   }
 
-  public void resetPreemptedResources() {
-    readLock.lock();
-    try {
-      for (FSAppAttempt attempt : runnableApps) {
-        attempt.resetPreemptedResources();
-      }
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public void clearPreemptedResources() {
-    readLock.lock();
-    try {
-      for (FSAppAttempt attempt : runnableApps) {
-        attempt.clearPreemptedResources();
-      }
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
-    List<FSAppAttempt> appsToReturn = new ArrayList<FSAppAttempt>();
+  List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
+    List<FSAppAttempt> appsToReturn = new ArrayList<>();
     readLock.lock();
     try {
       appsToReturn.addAll(nonRunnableApps);
@@ -223,17 +203,78 @@
     }
     super.policy = policy;
   }
-  
+
   @Override
-  public void recomputeShares() {
+  public void updateInternal(boolean checkStarvation) {
     readLock.lock();
     try {
       policy.computeShares(runnableApps, getFairShare());
+      if (checkStarvation) {
+        updateStarvedApps();
+      }
     } finally {
       readLock.unlock();
     }
   }
 
+  /**
+   * Helper method to identify starved applications. This needs to be called
+   * ONLY from {@link #updateInternal}, after the application shares
+   * are updated.
+   *
+   * A queue can be starving due to fairshare or minshare.
+   *
+   * Minshare is defined only on the queue and not the applications.
+   * Fairshare is defined for both the queue and the applications.
+   *
+   * If this queue is starved due to minshare, we need to identify the most
+   * deserving apps if they themselves are not starved due to fairshare.
+   *
+   * If this queue is starving due to fairshare, there must be at least
+   * one application that is starved. And, even if the queue is not
+   * starved due to fairshare, there might still be starved applications.
+   */
+  private void updateStarvedApps() {
+    // First identify starved applications and track total amount of
+    // starvation (in resources)
+    Resource fairShareStarvation = Resources.clone(none());
+
+    // Fetch apps with unmet demand sorted by fairshare starvation
+    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
+    for (FSAppAttempt app : appsWithDemand) {
+      Resource appStarvation = app.fairShareStarvation();
+      if (!Resources.equals(Resources.none(), appStarvation))  {
+        context.getStarvedApps().addStarvedApp(app);
+        Resources.addTo(fairShareStarvation, appStarvation);
+      } else {
+        break;
+      }
+    }
+
+    // Compute extent of minshare starvation
+    Resource minShareStarvation = minShareStarvation();
+
+    // Compute minshare starvation that is not subsumed by fairshare starvation
+    Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+
+    // Keep adding apps to the starved list until the unmet demand goes over
+    // the remaining minshare
+    for (FSAppAttempt app : appsWithDemand) {
+      if (Resources.greaterThan(policy.getResourceCalculator(),
+          scheduler.getClusterResource(), minShareStarvation, none())) {
+        Resource appPendingDemand =
+            Resources.subtract(app.getDemand(), app.getResourceUsage());
+        Resources.subtractFrom(minShareStarvation, appPendingDemand);
+        app.setMinshareStarvation(appPendingDemand);
+        context.getStarvedApps().addStarvedApp(app);
+      } else {
+        // Reset minshare starvation in case we had set it in a previous
+        // iteration
+        app.resetMinshareStarvation();
+      }
+    }
+  }
+
   @Override
   public Resource getDemand() {
     return demand;
@@ -256,7 +297,7 @@
     return usage;
   }
 
-  public Resource getAmResourceUsage() {
+  Resource getAmResourceUsage() {
     return amResourceUsage;
   }
 
@@ -299,7 +340,7 @@
 
   @Override
   public Resource assignContainer(FSSchedulerNode node) {
-    Resource assigned = Resources.none();
+    Resource assigned = none();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
           getName() + " fairShare: " + getFairShare());
@@ -309,26 +350,12 @@
       return assigned;
     }
 
-    // Apps that have resource demands.
-    TreeSet<FSAppAttempt> pendingForResourceApps =
-        new TreeSet<FSAppAttempt>(policy.getComparator());
-    readLock.lock();
-    try {
-      for (FSAppAttempt app : runnableApps) {
-        Resource pending = app.getAppAttemptResourceUsage().getPending();
-        if (!pending.equals(Resources.none())) {
-          pendingForResourceApps.add(app);
-        }
-      }
-    } finally {
-      readLock.unlock();
-    }
-    for (FSAppAttempt sched : pendingForResourceApps) {
+    for (FSAppAttempt sched : fetchAppsWithDemand()) {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
         continue;
       }
       assigned = sched.assignContainer(node);
-      if (!assigned.equals(Resources.none())) {
+      if (!assigned.equals(none())) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Assigned container in queue:" + getName() + " " +
               "container:" + assigned);
@@ -339,40 +366,21 @@
     return assigned;
   }
 
-  @Override
-  public RMContainer preemptContainer() {
-    RMContainer toBePreempted = null;
-
-    // If this queue is not over its fair share, reject
-    if (!preemptContainerPreCheck()) {
-      return toBePreempted;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Queue " + getName() + " is going to preempt a container " +
-          "from its applications.");
-    }
-
-    // Choose the app that is most over fair share
-    Comparator<Schedulable> comparator = policy.getComparator();
-    FSAppAttempt candidateSched = null;
+  private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
+    TreeSet<FSAppAttempt> pendingForResourceApps =
+        new TreeSet<>(policy.getComparator());
     readLock.lock();
     try {
-      for (FSAppAttempt sched : runnableApps) {
-        if (candidateSched == null ||
-            comparator.compare(sched, candidateSched) > 0) {
-          candidateSched = sched;
+      for (FSAppAttempt app : runnableApps) {
+        Resource pending = app.getAppAttemptResourceUsage().getPending();
+        if (!pending.equals(none())) {
+          pendingForResourceApps.add(app);
         }
       }
     } finally {
       readLock.unlock();
     }
-
-    // Preempt from the selected app
-    if (candidateSched != null) {
-      toBePreempted = candidateSched.preemptContainer();
-    }
-    return toBePreempted;
+    return pendingForResourceApps;
   }
 
   @Override
@@ -384,7 +392,7 @@
   public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
     QueueUserACLInfo userAclInfo =
       recordFactory.newRecordInstance(QueueUserACLInfo.class);
-    List<QueueACL> operations = new ArrayList<QueueACL>();
+    List<QueueACL> operations = new ArrayList<>();
     for (QueueACL operation : QueueACL.values()) {
       if (hasAccess(operation, user)) {
         operations.add(operation);
@@ -396,23 +404,10 @@
     return Collections.singletonList(userAclInfo);
   }
   
-  public long getLastTimeAtMinShare() {
-    return lastTimeAtMinShare;
-  }
-
   private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
     this.lastTimeAtMinShare = lastTimeAtMinShare;
   }
 
-  public long getLastTimeAtFairShareThreshold() {
-    return lastTimeAtFairShareThreshold;
-  }
-
-  private void setLastTimeAtFairShareThreshold(
-      long lastTimeAtFairShareThreshold) {
-    this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
-  }
-
   @Override
   public int getNumRunnableApps() {
     readLock.lock();
@@ -423,7 +418,7 @@
     }
   }
 
-  public int getNumNonRunnableApps() {
+  int getNumNonRunnableApps() {
     readLock.lock();
     try {
       return nonRunnableApps.size();
@@ -475,10 +470,11 @@
   /**
    * Check whether this queue can run this application master under the
    * maxAMShare limit.
-   * @param amResource
+   *
+   * @param amResource resources required to run the AM
    * @return true if this queue can run
    */
-  public boolean canRunAppAM(Resource amResource) {
+  boolean canRunAppAM(Resource amResource) {
     if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
       return true;
     }
@@ -503,7 +499,7 @@
     return Resources.fitsIn(ifRunAMResource, maxAMResource);
   }
 
-  public void addAMResourceUsage(Resource amResource) {
+  void addAMResourceUsage(Resource amResource) {
     if (amResource != null) {
       Resources.addTo(amResourceUsage, amResource);
     }
@@ -516,21 +512,8 @@
   }
 
   /**
-   * Update the preemption fields for the queue, i.e. the times since last was
-   * at its guaranteed share and over its fair share threshold.
-   */
-  public void updateStarvationStats() {
-    long now = scheduler.getClock().getTime();
-    if (!isStarvedForMinShare()) {
-      setLastTimeAtMinShare(now);
-    }
-    if (!isStarvedForFairShare()) {
-      setLastTimeAtFairShareThreshold(now);
-    }
-  }
-
-  /** Allows setting weight for a dynamically created queue
-   * Currently only used for reservation based queues
+   * Allows setting weight for a dynamically created queue.
+   * Currently only used for reservation based queues.
    * @param weight queue weight
    */
   public void setWeights(float weight) {
@@ -538,37 +521,61 @@
   }
 
   /**
-   * Helper method to check if the queue should preempt containers
+   * Helper method to compute the amount of minshare starvation.
    *
-   * @return true if check passes (can preempt) or false otherwise
+   * @return the extent of minshare starvation
    */
-  private boolean preemptContainerPreCheck() {
-    return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
-        getFairShare());
-  }
-
-  /**
-   * Is a queue being starved for its min share.
-   */
-  @VisibleForTesting
-  boolean isStarvedForMinShare() {
-    return isStarved(getMinShare());
-  }
-
-  /**
-   * Is a queue being starved for its fair share threshold.
-   */
-  @VisibleForTesting
-  boolean isStarvedForFairShare() {
-    return isStarved(
-        Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
-  }
-
-  private boolean isStarved(Resource share) {
+  private Resource minShareStarvation() {
+    // If demand < minshare, we should use demand to determine starvation
     Resource desiredShare = Resources.min(policy.getResourceCalculator(),
-            scheduler.getClusterResource(), share, getDemand());
-    Resource resourceUsage = getResourceUsage();
-    return Resources.lessThan(policy.getResourceCalculator(),
-            scheduler.getClusterResource(), resourceUsage, desiredShare);
+        scheduler.getClusterResource(), getMinShare(), getDemand());
+
+    Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
+    boolean starved = !Resources.isNone(starvation);
+
+    long now = scheduler.getClock().getTime();
+    if (!starved) {
+      // Record that the queue is not starved
+      setLastTimeAtMinShare(now);
+    }
+
+    if (now - lastTimeAtMinShare < getMinSharePreemptionTimeout()) {
+      // the queue is not starved for the preemption timeout
+      starvation = Resources.clone(Resources.none());
+    }
+
+    return starvation;
+  }
+
+  /**
+   * Helper method for tests to check if a queue is starved for minShare.
+   * @return whether starved for minshare
+   */
+  @VisibleForTesting
+  private boolean isStarvedForMinShare() {
+    return !Resources.isNone(minShareStarvation());
+  }
+
+  /**
+   * Helper method for tests to check if a queue is starved for fairshare.
+   * @return whether starved for fairshare
+   */
+  @VisibleForTesting
+  private boolean isStarvedForFairShare() {
+    for (FSAppAttempt app : runnableApps) {
+      if (app.isStarvedForFairShare()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper method for tests to check if a queue is starved.
+   * @return whether starved for either minshare or fairshare
+   */
+  @VisibleForTesting
+  boolean isStarved() {
+    return isStarvedForMinShare() || isStarvedForFairShare();
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 53ac8c9..16570aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -61,7 +60,7 @@
     super(name, scheduler, parent);
   }
   
-  public void addChildQueue(FSQueue child) {
+  void addChildQueue(FSQueue child) {
     writeLock.lock();
     try {
       childQueues.add(child);
@@ -70,7 +69,7 @@
     }
   }
 
-  public void removeChildQueue(FSQueue child) {
+  void removeChildQueue(FSQueue child) {
     writeLock.lock();
     try {
       childQueues.remove(child);
@@ -80,20 +79,20 @@
   }
 
   @Override
-  public void recomputeShares() {
+  public void updateInternal(boolean checkStarvation) {
     readLock.lock();
     try {
       policy.computeShares(childQueues, getFairShare());
       for (FSQueue childQueue : childQueues) {
         childQueue.getMetrics().setFairShare(childQueue.getFairShare());
-        childQueue.recomputeShares();
+        childQueue.updateInternal(checkStarvation);
       }
     } finally {
       readLock.unlock();
     }
   }
 
-  public void recomputeSteadyShares() {
+  void recomputeSteadyShares() {
     readLock.lock();
     try {
       policy.computeSteadyShares(childQueues, getSteadyFairShare());
@@ -188,7 +187,7 @@
   
   @Override
   public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
-    List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
+    List<QueueUserACLInfo> userAcls = new ArrayList<>();
     
     // Add queue acls
     userAcls.add(getUserAclInfo(user));
@@ -246,39 +245,6 @@
   }
 
   @Override
-  public RMContainer preemptContainer() {
-    RMContainer toBePreempted = null;
-
-    // Find the childQueue which is most over fair share
-    FSQueue candidateQueue = null;
-    Comparator<Schedulable> comparator = policy.getComparator();
-
-    readLock.lock();
-    try {
-      for (FSQueue queue : childQueues) {
-        // Skip selection for non-preemptable queue
-        if (!queue.isPreemptable()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("skipping from queue=" + getName()
-                + " because it's a non-preemptable queue");
-          }
-        } else if (candidateQueue == null ||
-                  comparator.compare(queue, candidateQueue) > 0) {
-          candidateQueue = queue;
-        }
-      }
-    } finally {
-      readLock.unlock();
-    }
-
-    // Let the selected queue choose which of its container to preempt
-    if (candidateQueue != null) {
-      toBePreempted = candidateQueue.preemptContainer();
-    }
-    return toBePreempted;
-  }
-
-  @Override
   public List<FSQueue> getChildQueues() {
     readLock.lock();
     try {
@@ -300,8 +266,8 @@
     }
     super.policy = policy;
   }
-  
-  public void incrementRunnableApps() {
+
+  void incrementRunnableApps() {
     writeLock.lock();
     try {
       runnableApps++;
@@ -310,7 +276,7 @@
     }
   }
   
-  public void decrementRunnableApps() {
+  void decrementRunnableApps() {
     writeLock.lock();
     try {
       runnableApps--;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
new file mode 100644
index 0000000..3579857
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Thread that handles FairScheduler preemption.
+ */
+class FSPreemptionThread extends Thread {
+  private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
+  protected final FSContext context;
+  private final FairScheduler scheduler;
+  private final long warnTimeBeforeKill;
+  private final Timer preemptionTimer;
+
+  FSPreemptionThread(FairScheduler scheduler) {
+    this.scheduler = scheduler;
+    this.context = scheduler.getContext();
+    FairSchedulerConfiguration fsConf = scheduler.getConf();
+    context.setPreemptionEnabled();
+    context.setPreemptionUtilizationThreshold(
+        fsConf.getPreemptionUtilizationThreshold());
+    warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+    preemptionTimer = new Timer("Preemption Timer", true);
+
+    setDaemon(true);
+    setName("FSPreemptionThread");
+  }
+
+  public void run() {
+    while (!Thread.interrupted()) {
+      FSAppAttempt starvedApp;
+      try{
+        starvedApp = context.getStarvedApps().take();
+        if (!Resources.isNone(starvedApp.getStarvation())) {
+          List<RMContainer> containers =
+              identifyContainersToPreempt(starvedApp);
+          if (containers != null) {
+            preemptContainers(containers);
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.info("Preemption thread interrupted! Exiting.");
+        return;
+      }
+    }
+  }
+
+  /**
+   * Given an app, identify containers to preempt to satisfy the app's next
+   * resource request.
+   *
+   * @param starvedApp starved application for which we are identifying
+   *                   preemption targets
+   * @return list of containers to preempt to satisfy starvedApp, null if the
+   * app cannot be satisfied by preempting any running containers
+   */
+  private List<RMContainer> identifyContainersToPreempt(
+      FSAppAttempt starvedApp) {
+    List<RMContainer> containers = new ArrayList<>(); // return value
+
+    // Find the nodes that match the next resource request
+    ResourceRequest request = starvedApp.getNextResourceRequest();
+    // TODO (KK): Should we check other resource requests if we can't match
+    // the first one?
+
+    Resource requestCapability = request.getCapability();
+    List<FSSchedulerNode> potentialNodes =
+        scheduler.getNodeTracker().getNodesByResourceName(
+            request.getResourceName());
+
+    // From the potential nodes, pick a node that has enough containers
+    // from apps over their fairshare
+    for (FSSchedulerNode node : potentialNodes) {
+      // Reset containers for the new node being considered.
+      containers.clear();
+
+      // TODO (YARN-5829): Attempt to reserve the node for starved app. The
+      // subsequent if-check needs to be reworked accordingly.
+      FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+      if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
+        // This node is already reserved by another app. Let us not consider
+        // this for preemption.
+        continue;
+      }
+
+      // Figure out list of containers to consider
+      List<RMContainer> containersToCheck =
+          node.getCopiedListOfRunningContainers();
+      containersToCheck.removeAll(node.getContainersForPreemption());
+
+      // Initialize potential with unallocated resources
+      Resource potential = Resources.clone(node.getUnallocatedResource());
+      for (RMContainer container : containersToCheck) {
+        FSAppAttempt app =
+            scheduler.getSchedulerApp(container.getApplicationAttemptId());
+
+        if (app.canContainerBePreempted(container)) {
+          // Flag container for preemption
+          containers.add(container);
+          Resources.addTo(potential, container.getAllocatedResource());
+        }
+
+        // Check if we have already identified enough containers
+        if (Resources.fitsIn(requestCapability, potential)) {
+          // Mark the containers as being considered for preemption on the node.
+          // Make sure the containers are subsequently removed by calling
+          // FSSchedulerNode#removeContainerForPreemption.
+          node.addContainersForPreemption(containers);
+          return containers;
+        } else {
+          // TODO (YARN-5829): Unreserve the node for the starved app.
+        }
+      }
+    }
+    return null;
+  }
+
+  private void preemptContainers(List<RMContainer> containers) {
+    // Warn application about containers to be killed
+    for (RMContainer container : containers) {
+      ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+      FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
+      FSLeafQueue queue = app.getQueue();
+      LOG.info("Preempting container " + container +
+          " from queue " + queue.getName());
+      app.trackContainerForPreemption(container);
+    }
+
+    // Schedule timer task to kill containers
+    preemptionTimer.schedule(
+        new PreemptContainersTask(containers), warnTimeBeforeKill);
+  }
+
+  private class PreemptContainersTask extends TimerTask {
+    private List<RMContainer> containers;
+
+    PreemptContainersTask(List<RMContainer> containers) {
+      this.containers = containers;
+    }
+
+    @Override
+    public void run() {
+      for (RMContainer container : containers) {
+        ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus(
+            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
+
+        LOG.info("Killing container " + container);
+        scheduler.completedContainer(
+            container, status, RMContainerEventType.KILL);
+
+        FSSchedulerNode containerNode = (FSSchedulerNode)
+            scheduler.getNodeTracker().getNode(container.getAllocatedNode());
+        containerNode.removeContainerForPreemption(container);
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 5fa2ee1..572b5f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -253,7 +253,7 @@
     return steadyFairShare;
   }
 
-  public void setSteadyFairShare(Resource steadyFairShare) {
+  void setSteadyFairShare(Resource steadyFairShare) {
     this.steadyFairShare = steadyFairShare;
     metrics.setSteadyFairShare(steadyFairShare);
   }
@@ -262,27 +262,27 @@
     return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
   }
 
-  public long getFairSharePreemptionTimeout() {
+  long getFairSharePreemptionTimeout() {
     return fairSharePreemptionTimeout;
   }
 
-  public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
+  void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
     this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
   }
 
-  public long getMinSharePreemptionTimeout() {
+  long getMinSharePreemptionTimeout() {
     return minSharePreemptionTimeout;
   }
 
-  public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
+  void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
     this.minSharePreemptionTimeout = minSharePreemptionTimeout;
   }
 
-  public float getFairSharePreemptionThreshold() {
+  float getFairSharePreemptionThreshold() {
     return fairSharePreemptionThreshold;
   }
 
-  public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
+  void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
     this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
   }
 
@@ -292,9 +292,17 @@
 
   /**
    * Recomputes the shares for all child queues and applications based on this
-   * queue's current share
+   * queue's current share, and checks for starvation.
+   *
+   * @param checkStarvation whether to check for fairshare or minshare
+   *                        starvation on update
    */
-  public abstract void recomputeShares();
+  abstract void updateInternal(boolean checkStarvation);
+
+  public void update(Resource fairShare, boolean checkStarvation) {
+    setFairShare(fairShare);
+    updateInternal(checkStarvation);
+  }
 
   /**
    * Update the min/fair share preemption timeouts, threshold and preemption
@@ -347,7 +355,7 @@
    * 
    * @return true if check passes (can assign) or false otherwise
    */
-  protected boolean assignContainerPreCheck(FSSchedulerNode node) {
+  boolean assignContainerPreCheck(FSSchedulerNode node) {
     if (!Resources.fitsIn(getResourceUsage(), maxShare)
         || node.getReservedContainer() != null) {
       return false;
@@ -403,7 +411,7 @@
     return null;
   }
 
-  public boolean fitsInMaxShare(Resource additionalResource) {
+  boolean fitsInMaxShare(Resource additionalResource) {
     Resource usagePlusAddition =
         Resources.add(getResourceUsage(), additionalResource);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 024ec67..a27a222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -29,6 +29,10 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 @Private
 @Unstable
 public class FSSchedulerNode extends SchedulerNode {
@@ -36,6 +40,8 @@
   private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
 
   private FSAppAttempt reservedAppSchedulable;
+  private final Set<RMContainer> containersForPreemption =
+      new ConcurrentSkipListSet<>();
 
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     super(node, usePortForNodeName);
@@ -99,8 +105,36 @@
     this.reservedAppSchedulable = null;
   }
 
-  public synchronized FSAppAttempt getReservedAppSchedulable() {
+  synchronized FSAppAttempt getReservedAppSchedulable() {
     return reservedAppSchedulable;
   }
 
+  /**
+   * Mark {@code containers} as being considered for preemption so they are
+   * not considered again. A call to this requires a corresponding call to
+   * {@link #removeContainerForPreemption} to ensure we do not mark a
+   * container for preemption and never consider it again and avoid memory
+   * leaks.
+   *
+   * @param containers container to mark
+   */
+  void addContainersForPreemption(Collection<RMContainer> containers) {
+    containersForPreemption.addAll(containers);
+  }
+
+  /**
+   * @return set of containers marked for preemption.
+   */
+  Set<RMContainer> getContainersForPreemption() {
+    return containersForPreemption;
+  }
+
+  /**
+   * Remove container from the set of containers marked for preemption.
+   *
+   * @param container container to remove
+   */
+  void removeContainerForPreemption(RMContainer container) {
+    containersForPreemption.remove(container);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
new file mode 100644
index 0000000..4f28e41
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Helper class to track starved applications.
+ *
+ * Initially, this uses a blocking queue. We could use other data structures
+ * in the future. This class also has some methods to simplify testing.
+ */
+class FSStarvedApps {
+
+  // List of apps to be processed by the preemption thread.
+  private PriorityBlockingQueue<FSAppAttempt> appsToProcess;
+
+  // App being currently processed. This assumes a single reader.
+  private FSAppAttempt appBeingProcessed;
+
+  FSStarvedApps() {
+    appsToProcess = new PriorityBlockingQueue<>(10, new StarvationComparator());
+  }
+
+  /**
+   * Add a starved application if it is not already added.
+   * @param app application to add
+   */
+  void addStarvedApp(FSAppAttempt app) {
+    if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) {
+      appsToProcess.add(app);
+    }
+  }
+
+  /**
+   * Blocking call to fetch the next app to process. The returned app is
+   * tracked until the next call to this method. This tracking assumes a
+   * single reader.
+   *
+   * @return starved application to process
+   * @throws InterruptedException if interrupted while waiting
+   */
+  FSAppAttempt take() throws InterruptedException {
+    // Reset appBeingProcessed before the blocking call
+    appBeingProcessed = null;
+
+    // Blocking call to fetch the next starved application
+    FSAppAttempt app = appsToProcess.take();
+    appBeingProcessed = app;
+    return app;
+  }
+
+  private static class StarvationComparator implements
+      Comparator<FSAppAttempt>, Serializable {
+    private static final long serialVersionUID = 1;
+
+    @Override
+    public int compare(FSAppAttempt app1, FSAppAttempt app2) {
+      int ret = 1;
+      if (Resources.fitsIn(app1.getStarvation(), app2.getStarvation())) {
+        ret = -1;
+      }
+      return ret;
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1d04710..571f2e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -24,7 +24,6 @@
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -123,6 +122,7 @@
     AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
   private FairSchedulerConfiguration conf;
 
+  private FSContext context;
   private Resource incrAllocation;
   private QueueManager queueMgr;
   private boolean usePortForNodeName;
@@ -150,6 +150,9 @@
 
   @VisibleForTesting
   Thread schedulingThread;
+
+  Thread preemptionThread;
+
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
@@ -157,25 +160,6 @@
   FSQueueMetrics rootMetrics;
   FSOpDurations fsOpDurations;
 
-  // Time when we last updated preemption vars
-  protected long lastPreemptionUpdateTime;
-  // Time we last ran preemptTasksIfNecessary
-  private long lastPreemptCheckTime;
-
-  // Preemption related variables
-  protected boolean preemptionEnabled;
-  protected float preemptionUtilizationThreshold;
-
-  // How often tasks are preempted
-  protected long preemptionInterval; 
-  
-  // ms to wait before force killing stuff (must be longer than a couple
-  // of heartbeats to give task-kill commands a chance to act).
-  protected long waitTimeBeforeKill; 
-  
-  // Containers whose AMs have been warned that they will be preempted soon.
-  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
-
   private float reservableNodesRatio; // percentage of available nodes
                                       // an app can be reserved on
 
@@ -211,11 +195,17 @@
 
   public FairScheduler() {
     super(FairScheduler.class.getName());
+    context = new FSContext();
     allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
     maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
   }
 
+  @VisibleForTesting
+  public FSContext getContext() {
+    return context;
+  }
+
   public boolean isAtLeastReservationThreshold(
       ResourceCalculator resourceCalculator, Resource resource) {
     return Resources.greaterThanOrEqual(resourceCalculator,
@@ -296,7 +286,6 @@
           }
           long start = getClock().getTime();
           update();
-          preemptTasksIfNecessary();
           long duration = getClock().getTime() - start;
           fsOpDurations.addUpdateThreadRunDuration(duration);
         } catch (InterruptedException ie) {
@@ -338,7 +327,6 @@
     try {
       writeLock.lock();
       long start = getClock().getTime();
-      updateStarvationStats(); // Determine if any queues merit preemption
 
       FSQueue rootQueue = queueMgr.getRootQueue();
 
@@ -346,214 +334,30 @@
       rootQueue.updateDemand();
 
       Resource clusterResource = getClusterResource();
-      rootQueue.setFairShare(clusterResource);
-      // Recursively compute fair shares for all queues
-      // and update metrics
-      rootQueue.recomputeShares();
+      rootQueue.update(clusterResource, shouldAttemptPreemption());
+
+      // Update metrics
       updateRootQueueMetrics();
 
       if (LOG.isDebugEnabled()) {
         if (--updatesToSkipForDebug < 0) {
           updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
-          LOG.debug("Cluster Capacity: " + clusterResource + "  Allocations: "
-              + rootMetrics.getAllocatedResources() + "  Availability: "
-              + Resource.newInstance(rootMetrics.getAvailableMB(),
-              rootMetrics.getAvailableVirtualCores()) + "  Demand: " + rootQueue
-              .getDemand());
+          LOG.debug("Cluster Capacity: " + clusterResource +
+              "  Allocations: " + rootMetrics.getAllocatedResources() +
+              "  Availability: " + Resource.newInstance(
+              rootMetrics.getAvailableMB(),
+              rootMetrics.getAvailableVirtualCores()) +
+              "  Demand: " + rootQueue.getDemand());
         }
-      }
 
-      long duration = getClock().getTime() - start;
-      fsOpDurations.addUpdateCallDuration(duration);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /**
-   * Update the preemption fields for all QueueScheduables, i.e. the times since
-   * each queue last was at its guaranteed share and over its fair share
-   * threshold for each type of task.
-   */
-  private void updateStarvationStats() {
-    lastPreemptionUpdateTime = getClock().getTime();
-    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      sched.updateStarvationStats();
-    }
-  }
-
-  /**
-   * Check for queues that need tasks preempted, either because they have been
-   * below their guaranteed share for minSharePreemptionTimeout or they have
-   * been below their fair share threshold for the fairSharePreemptionTimeout. If
-   * such queues exist, compute how many tasks of each type need to be preempted
-   * and then select the right ones using preemptTasks.
-   */
-  protected void preemptTasksIfNecessary() {
-    try {
-      writeLock.lock();
-      if (!shouldAttemptPreemption()) {
-        return;
-      }
-
-      long curTime = getClock().getTime();
-      if (curTime - lastPreemptCheckTime < preemptionInterval) {
-        return;
-      }
-      lastPreemptCheckTime = curTime;
-
-      Resource resToPreempt = Resources.clone(Resources.none());
-      for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-        Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
-      }
-      if (isResourceGreaterThanNone(resToPreempt)) {
-        preemptResources(resToPreempt);
+        long duration = getClock().getTime() - start;
+        fsOpDurations.addUpdateCallDuration(duration);
       }
     } finally {
       writeLock.unlock();
     }
   }
 
-  /**
-   * Preempt a quantity of resources. Each round, we start from the root queue,
-   * level-by-level, until choosing a candidate application.
-   * The policy for prioritizing preemption for each queue depends on its
-   * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
-   * most over its fair share; (2) FIFO, choose the childSchedulable that is
-   * latest launched.
-   * Inside each application, we further prioritize preemption by choosing
-   * containers with lowest priority to preempt.
-   * We make sure that no queue is placed below its fair share in the process.
-   */
-  protected void preemptResources(Resource toPreempt) {
-    long start = getClock().getTime();
-    if (Resources.equals(toPreempt, Resources.none())) {
-      return;
-    }
-
-    // Scan down the list of containers we've already warned and kill them
-    // if we need to.  Remove any containers from the list that we don't need
-    // or that are no longer running.
-    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
-    while (warnedIter.hasNext()) {
-      RMContainer container = warnedIter.next();
-      if ((container.getState() == RMContainerState.RUNNING ||
-              container.getState() == RMContainerState.ALLOCATED) &&
-              isResourceGreaterThanNone(toPreempt)) {
-        warnOrKillContainer(container);
-        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
-      } else {
-        warnedIter.remove();
-      }
-    }
-
-    try {
-      // Reset preemptedResource for each app
-      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
-        queue.resetPreemptedResources();
-      }
-
-      while (isResourceGreaterThanNone(toPreempt)) {
-        RMContainer container =
-            getQueueManager().getRootQueue().preemptContainer();
-        if (container == null) {
-          break;
-        } else {
-          warnOrKillContainer(container);
-          warnedContainers.add(container);
-          Resources.subtractFrom(
-              toPreempt, container.getContainer().getResource());
-        }
-      }
-    } finally {
-      // Clear preemptedResources for each app
-      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
-        queue.clearPreemptedResources();
-      }
-    }
-
-    long duration = getClock().getTime() - start;
-    fsOpDurations.addPreemptCallDuration(duration);
-  }
-
-  private boolean isResourceGreaterThanNone(Resource toPreempt) {
-    return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0);
-  }
-
-  protected void warnOrKillContainer(RMContainer container) {
-    ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
-    FSAppAttempt app = getSchedulerApp(appAttemptId);
-    FSLeafQueue queue = app.getQueue();
-    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
-        "res=" + container.getContainer().getResource() +
-        ") from queue " + queue.getName());
-    
-    Long time = app.getContainerPreemptionTime(container);
-
-    if (time != null) {
-      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
-      // proceed with kill
-      if (time + waitTimeBeforeKill < getClock().getTime()) {
-        ContainerStatus status =
-          SchedulerUtils.createPreemptedContainerStatus(
-            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
-
-        // TODO: Not sure if this ever actually adds this to the list of cleanup
-        // containers on the RMNode (see SchedulerNode.releaseContainer()).
-        super.completedContainer(container, status, RMContainerEventType.KILL);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Killing container" + container +
-                  " (after waiting for preemption for " +
-                  (getClock().getTime() - time) + "ms)");
-        }
-      }
-    } else {
-      // track the request in the FSAppAttempt itself
-      app.addPreemption(container, getClock().getTime());
-    }
-  }
-
-  /**
-   * Return the resource amount that this queue is allowed to preempt, if any.
-   * If the queue has been below its min share for at least its preemption
-   * timeout, it should preempt the difference between its current share and
-   * this min share. If it has been below its fair share preemption threshold
-   * for at least the fairSharePreemptionTimeout, it should preempt enough tasks
-   * to get up to its full fair share. If both conditions hold, we preempt the
-   * max of the two amounts (this shouldn't happen unless someone sets the
-   * timeouts to be identical for some reason).
-   */
-  protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
-    long minShareTimeout = sched.getMinSharePreemptionTimeout();
-    long fairShareTimeout = sched.getFairSharePreemptionTimeout();
-    Resource resDueToMinShare = Resources.none();
-    Resource resDueToFairShare = Resources.none();
-    ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
-    Resource clusterResource = getClusterResource();
-    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
-      Resource target = Resources.componentwiseMin(
-          sched.getMinShare(), sched.getDemand());
-      resDueToMinShare = Resources.max(calc, clusterResource,
-          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
-    }
-    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
-      Resource target = Resources.componentwiseMin(
-              sched.getFairShare(), sched.getDemand());
-      resDueToFairShare = Resources.max(calc, clusterResource,
-          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
-    }
-    Resource deficit = Resources.max(calc, clusterResource,
-        resDueToMinShare, resDueToFairShare);
-    if (Resources.greaterThan(calc, clusterResource,
-        deficit, Resources.none())) {
-      String message = "Should preempt " + deficit + " res for queue "
-          + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
-          + ", resDueToFairShare = " + resDueToFairShare;
-      LOG.info(message);
-    }
-    return deficit;
-  }
-
   public RMContainerTokenSecretManager
       getContainerTokenSecretManager() {
     return rmContext.getContainerTokenSecretManager();
@@ -1197,12 +1001,12 @@
    * @return true if preemption should be attempted, false otherwise.
    */
   private boolean shouldAttemptPreemption() {
-    if (preemptionEnabled) {
-      Resource clusterResource = getClusterResource();
-      return (preemptionUtilizationThreshold < Math.max(
-          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(),
+    if (context.isPreemptionEnabled()) {
+      return (context.getPreemptionUtilizationThreshold() < Math.max(
+          (float) rootMetrics.getAllocatedMB() /
+              getClusterResource().getMemorySize(),
           (float) rootMetrics.getAllocatedVirtualCores() /
-              clusterResource.getVirtualCores()));
+              getClusterResource().getVirtualCores()));
     }
     return false;
   }
@@ -1390,15 +1194,10 @@
       rackLocalityThreshold = this.conf.getLocalityThresholdRack();
       nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
       rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      preemptionUtilizationThreshold =
-          this.conf.getPreemptionUtilizationThreshold();
       assignMultiple = this.conf.getAssignMultiple();
       maxAssignDynamic = this.conf.isMaxAssignDynamic();
       maxAssign = this.conf.getMaxAssign();
       sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
       usePortForNodeName = this.conf.getUsePortForNodeName();
       reservableNodesRatio = this.conf.getReservableNodes();
 
@@ -1436,6 +1235,10 @@
         schedulingThread.setName("FairSchedulerContinuousScheduling");
         schedulingThread.setDaemon(true);
       }
+
+      if (this.conf.getPreemptionEnabled()) {
+        createPreemptionThread();
+      }
     } finally {
       writeLock.unlock();
     }
@@ -1452,6 +1255,11 @@
     }
   }
 
+  @VisibleForTesting
+  protected void createPreemptionThread() {
+    preemptionThread = new FSPreemptionThread(this);
+  }
+
   private void updateReservationThreshold() {
     Resource newThreshold = Resources.multiply(
         getIncrementResourceCapability(),
@@ -1471,6 +1279,9 @@
             "schedulingThread is null");
         schedulingThread.start();
       }
+      if (preemptionThread != null) {
+        preemptionThread.start();
+      }
       allocsLoader.start();
     } finally {
       writeLock.unlock();
@@ -1503,6 +1314,10 @@
           schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
         }
       }
+      if (preemptionThread != null) {
+        preemptionThread.interrupt();
+        preemptionThread.join(THREAD_JOIN_TIMEOUT_MS);
+      }
       if (allocsLoader != null) {
         allocsLoader.stop();
       }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
index 289887f..cf78405 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
@@ -55,50 +55,45 @@
    * Name of job/queue, used for debugging as well as for breaking ties in
    * scheduling order deterministically.
    */
-  public String getName();
+  String getName();
 
   /**
    * Maximum number of resources required by this Schedulable. This is defined as
    * number of currently utilized resources + number of unlaunched resources (that
    * are either not yet launched or need to be speculated).
    */
-  public Resource getDemand();
+  Resource getDemand();
 
   /** Get the aggregate amount of resources consumed by the schedulable. */
-  public Resource getResourceUsage();
+  Resource getResourceUsage();
 
   /** Minimum Resource share assigned to the schedulable. */
-  public Resource getMinShare();
+  Resource getMinShare();
 
   /** Maximum Resource share assigned to the schedulable. */
-  public Resource getMaxShare();
+  Resource getMaxShare();
 
   /** Job/queue weight in fair sharing. */
-  public ResourceWeights getWeights();
+  ResourceWeights getWeights();
 
   /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
-  public long getStartTime();
+  long getStartTime();
 
  /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
-  public Priority getPriority();
+  Priority getPriority();
 
   /** Refresh the Schedulable's demand and those of its children if any. */
-  public void updateDemand();
+  void updateDemand();
 
   /**
    * Assign a container on this node if possible, and return the amount of
    * resources assigned.
    */
-  public Resource assignContainer(FSSchedulerNode node);
-
-  /**
-   * Preempt a container from this Schedulable if possible.
-   */
-  public RMContainer preemptContainer();
+  Resource assignContainer(FSSchedulerNode node);
 
   /** Get the fair share assigned to this Schedulable. */
-  public Resource getFairShare();
+  Resource getFairShare();
 
   /** Assign a fair share to this Schedulable. */
-  public void setFairShare(Resource fairShare);
+  void setFairShare(Resource fairShare);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 8e6272a..992b75d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -17,14 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import org.junit.Assert;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,7 +31,9 @@
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -50,9 +44,17 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import org.junit.Assert;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
 public class FairSchedulerTestBase {
   public final static String TEST_DIR =
       new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
@@ -70,9 +72,14 @@
   private static final int SLEEP_DURATION = 10;
   private static final int SLEEP_RETRIES = 1000;
 
+  /**
+   * The list of nodes added to the cluster using the {@link #addNode} method.
+   */
+  protected final List<RMNode> rmNodes = new ArrayList<>();
+
   // Helper methods
   public Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
         ResourceScheduler.class);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
@@ -280,4 +287,18 @@
     Assert.assertEquals(resource.getVirtualCores(),
         app.getCurrentConsumption().getVirtualCores());
   }
+
+  /**
+   * Add a node to the cluster and track the nodes in {@link #rmNodes}.
+   * @param memory memory capacity of the node
+   * @param cores cpu capacity of the node
+   */
+  protected void addNode(int memory, int cores) {
+    int id = rmNodes.size() + 1;
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
+            "127.0.0." + id);
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+    rmNodes.add(node);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
new file mode 100644
index 0000000..25780cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class FairSchedulerWithMockPreemption extends FairScheduler {
+  @Override
+  protected void createPreemptionThread() {
+    preemptionThread = new MockPreemptionThread(this);
+  }
+
+  static class MockPreemptionThread extends FSPreemptionThread {
+    private Set<FSAppAttempt> appsAdded = new HashSet<>();
+    private int totalAppsAdded = 0;
+
+    MockPreemptionThread(FairScheduler scheduler) {
+      super(scheduler);
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          FSAppAttempt app = context.getStarvedApps().take();
+          appsAdded.add(app);
+          totalAppsAdded++;
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+    int uniqueAppsAdded() {
+      return appsAdded.size();
+    }
+
+    int totalAppsAdded() {
+      return totalAppsAdded;
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
index 5a170cf..e802f42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
@@ -86,11 +86,6 @@
   }
 
   @Override
-  public RMContainer preemptContainer() {
-    return null;
-  }
-
-  @Override
   public Resource getFairShare() {
     return this.fairShare;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
new file mode 100644
index 0000000..a5b2d86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test class to verify identification of app starvation
+ */
+public class TestFSAppStarvation extends FairSchedulerTestBase {
+
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
+
+  // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+  private static final int NODE_CAPACITY_MULTIPLE = 4;
+  private static final String[] QUEUES =
+      {"no-preemption", "minshare", "fairshare.child", "drf.child"};
+
+  private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
+
+  @Before
+  public void setup() {
+    createConfiguration();
+    conf.set(YarnConfiguration.RM_SCHEDULER,
+        FairSchedulerWithMockPreemption.class.getCanonicalName());
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.delete();
+    conf = null;
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  /*
+   * Test to verify application starvation is computed only when preemption
+   * is enabled.
+   */
+  @Test
+  public void testPreemptionDisabled() throws Exception {
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+
+    setupClusterAndSubmitJobs();
+
+    assertNull("Found starved apps even when preemption is turned off",
+        scheduler.getContext().getStarvedApps());
+  }
+
+  /*
+   * Test to verify application starvation is computed correctly when
+   * preemption is turned on.
+   */
+  @Test
+  public void testPreemptionEnabled() throws Exception {
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Expecting 3 starved applications, one each for the "
+            + "minshare and fairshare queues",
+        3, preemptionThread.uniqueAppsAdded());
+
+    // Verify the apps get added again on a subsequent update
+    scheduler.update();
+    Thread.yield();
+
+    verifyLeafQueueStarvation();
+    assertTrue("Each app is marked as starved exactly once",
+        preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
+  }
+
+  /*
+   * Test to verify app starvation is computed only when the cluster
+   * utilization threshold is over the preemption threshold.
+   */
+  @Test
+  public void testClusterUtilizationThreshold() throws Exception {
+    // Set preemption threshold to 1.1, so the utilization is always lower
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
+
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Found starved apps when preemption threshold is over 100%", 0,
+        preemptionThread.totalAppsAdded());
+  }
+
+  private void verifyLeafQueueStarvation() {
+    for (String q : QUEUES) {
+      if (!q.equals("no-preemption")) {
+        boolean isStarved =
+            scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
+        assertTrue(isStarved);
+      }
+    }
+  }
+
+  private void setupClusterAndSubmitJobs() throws Exception {
+    setupStarvedCluster();
+    submitAppsToEachLeafQueue();
+    sendEnoughNodeUpdatesToAssignFully();
+
+    // Sleep to hit the preemption timeouts
+    Thread.sleep(10);
+
+    // Scheduler update to populate starved apps
+    scheduler.update();
+
+    // Wait for apps to be processed by MockPreemptionThread
+    Thread.yield();
+  }
+
+  /**
+   * Setup the cluster for starvation testing:
+   * 1. Create FS allocation file
+   * 2. Create and start MockRM
+   * 3. Add two nodes to the cluster
+   * 4. Submit an app that uses up all resources on the cluster
+   */
+  private void setupStarvedCluster() throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+
+    // Default queue
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+
+    // Queue with preemption disabled
+    out.println("<queue name=\"no-preemption\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+
+    // Queue with minshare preemption enabled
+    out.println("<queue name=\"minshare\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<minSharePreemptionTimeout>0" +
+        "</minSharePreemptionTimeout>");
+    out.println("<minResources>2048mb,2vcores</minResources>");
+    out.println("</queue>");
+
+    // FAIR queue with fairshare preemption enabled
+    out.println("<queue name=\"fairshare\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("<schedulingPolicy>fair</schedulingPolicy>");
+    addChildQueue(out);
+    out.println("</queue>");
+
+    // DRF queue with fairshare preemption enabled
+    out.println("<queue name=\"drf\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("<schedulingPolicy>drf</schedulingPolicy>");
+    addChildQueue(out);
+    out.println("</queue>");
+
+    out.println("</allocations>");
+    out.close();
+
+    assertTrue("Allocation file does not exist, not running the test",
+        ALLOC_FILE.exists());
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
+        scheduler.preemptionThread;
+
+    // Create and add two nodes to the cluster
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId app
+        = createSchedulingRequest(1024, 1, "root.default", "default", 8);
+
+    scheduler.update();
+    sendEnoughNodeUpdatesToAssignFully();
+
+    assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
+  }
+
+  private void addChildQueue(PrintWriter out) {
+    // Child queue under fairshare with same settings
+    out.println("<queue name=\"child\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+  }
+
+  private void submitAppsToEachLeafQueue() {
+    for (String queue : QUEUES) {
+      createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
+    }
+    scheduler.update();
+  }
+
+  private void sendEnoughNodeUpdatesToAssignFully() {
+    for (RMNode node : rmNodes) {
+      NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+          new NodeUpdateSchedulerEvent(node);
+      for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+        scheduler.handle(nodeUpdateSchedulerEvent);
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 0a2ce81..98de8db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -106,12 +105,8 @@
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
+    out.println("<queue name=\"queueA\"></queue>");
+    out.println("<queue name=\"queueB\"></queue>");
     out.println("</allocations>");
     out.close();
 
@@ -144,162 +139,6 @@
     scheduler.update();
     Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
     assertEquals(3, queues.size());
-
-    // Queue A should be above min share, B below.
-    FSLeafQueue queueA =
-        scheduler.getQueueManager().getLeafQueue("queueA", false);
-    FSLeafQueue queueB =
-        scheduler.getQueueManager().getLeafQueue("queueB", false);
-    assertFalse(queueA.isStarvedForMinShare());
-    assertTrue(queueB.isStarvedForMinShare());
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // Now B should have min share ( = demand here)
-    assertFalse(queueB.isStarvedForMinShare());
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.2</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.8</weight>");
-    out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
-    out.println("<queue name=\"queueB1\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB2\">");
-    out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    scheduler.update();
-
-    // Queue A wants 4 * 1024. Node update gives this all to A
-    createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    for (int i = 0; i < 4; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    QueueManager queueMgr = scheduler.getQueueManager();
-    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
-    assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize());
-
-    // Both queue B1 and queue B2 want 3 * 1024
-    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
-    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
-    scheduler.update();
-    for (int i = 0; i < 4; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
-    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
-    assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize());
-    assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize());
-
-    // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
-    // threshold is 1.6 * 1024
-    assertFalse(queueB1.isStarvedForFairShare());
-
-    // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
-    // threshold is 2.4 * 1024
-    assertTrue(queueB2.isStarvedForFairShare());
-
-    // Node checks in again
-    scheduler.handle(nodeEvent2);
-    scheduler.handle(nodeEvent2);
-    assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize());
-    assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize());
-
-    // Both queue B1 and queue B2 usages go to 3 * 1024
-    assertFalse(queueB1.isStarvedForFairShare());
-    assertFalse(queueB2.isStarvedForFairShare());
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShareDRF() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.5</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.5</weight>");
-    out.println("</queue>");
-    out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
-    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
-    out.println("</allocations>");
-    out.close();
-
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    scheduler.update();
-
-    // Queue A wants 7 * 1024, 1. Node update gives this all to A
-    createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    QueueManager queueMgr = scheduler.getQueueManager();
-    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
-    assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize());
-    assertEquals(1, queueA.getResourceUsage().getVirtualCores());
-
-    // Queue B has 3 reqs :
-    // 1) 2 * 1024, 5 .. which will be granted
-    // 2) 1 * 1024, 1 .. which will be granted
-    // 3) 1 * 1024, 1 .. which wont
-    createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
-    createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
-    scheduler.update();
-    for (int i = 0; i < 3; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
-    assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize());
-    assertEquals(6, queueB.getResourceUsage().getVirtualCores());
-
-    scheduler.update();
-
-    // Verify that Queue us not starved for fair share..
-    // Since the Starvation logic now uses DRF when the policy = drf, The
-    // Queue should not be starved
-    assertFalse(queueB.isStarvedForFairShare());
   }
 
   @Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 2cbe507..36ee685 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -17,1467 +17,259 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .TestUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.util.ControlledClock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
 import org.junit.After;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.Collection;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+/**
+ * Tests to verify fairshare and minshare preemption, using parameterization.
+ */
+@RunWith(Parameterized.class)
 public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
-  private final static String ALLOC_FILE = new File(TEST_DIR,
-      TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
 
-  private ControlledClock clock;
+  // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+  private static final int NODE_CAPACITY_MULTIPLE = 4;
 
-  private static class StubbedFairScheduler extends FairScheduler {
-    public long lastPreemptMemory = -1;
+  private final boolean fairsharePreemption;
 
-    @Override
-    protected void preemptResources(Resource toPreempt) {
-      lastPreemptMemory = toPreempt.getMemorySize();
-    }
+  // App that takes up the entire cluster
+  private FSAppAttempt greedyApp;
 
-    public void resetLastPreemptResources() {
-      lastPreemptMemory = -1;
-    }
+  // Starving app that is expected to instigate preemption
+  private FSAppAttempt starvingApp;
+
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> getParameters() {
+    return Arrays.asList(new Boolean[][] {
+        {true}, {false}});
   }
 
-  public Configuration createConfiguration() {
-    Configuration conf = super.createConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
-        ResourceScheduler.class);
-    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    return conf;
+  public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
+    fairsharePreemption = fairshare;
+    writeAllocFile();
   }
 
   @Before
-  public void setup() throws IOException {
-    conf = createConfiguration();
-    clock = new ControlledClock();
+  public void setup() {
+    createConfiguration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+    conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
   }
 
   @After
   public void teardown() {
+    ALLOC_FILE.delete();
+    conf = null;
     if (resourceManager != null) {
       resourceManager.stop();
       resourceManager = null;
     }
-    conf = null;
   }
 
-  private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) {
-    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
-        utilizationThreshold);
+  private void writeAllocFile() throws IOException {
+    /*
+     * Queue hierarchy:
+     * root
+     * |--- preemptable
+     *      |--- child-1
+     *      |--- child-2
+     * |--- nonpreemptible
+     *      |--- child-1
+     *      |--- child-2
+     */
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+
+    out.println("<queue name=\"preemptable\">");
+    writePreemptionParams(out);
+
+    // Child-1
+    out.println("<queue name=\"child-1\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    // Child-2
+    out.println("<queue name=\"child-2\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    out.println("</queue>"); // end of preemptable queue
+
+    // Queue with preemption disallowed
+    out.println("<queue name=\"nonpreemptable\">");
+    out.println("<allowPreemptionFrom>false" +
+        "</allowPreemptionFrom>");
+    writePreemptionParams(out);
+
+    // Child-1
+    out.println("<queue name=\"child-1\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    // Child-2
+    out.println("<queue name=\"child-2\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    out.println("</queue>"); // end of nonpreemptable queue
+
+    out.println("</allocations>");
+    out.close();
+
+    assertTrue("Allocation file does not exist, not running the test",
+        ALLOC_FILE.exists());
+  }
+
+  private void writePreemptionParams(PrintWriter out) {
+    if (fairsharePreemption) {
+      out.println("<fairSharePreemptionThreshold>1" +
+          "</fairSharePreemptionThreshold>");
+      out.println("<fairSharePreemptionTimeout>0" +
+          "</fairSharePreemptionTimeout>");
+    } else {
+      out.println("<minSharePreemptionTimeout>0" +
+          "</minSharePreemptionTimeout>");
+    }
+  }
+
+  private void writeResourceParams(PrintWriter out) {
+    if (!fairsharePreemption) {
+      out.println("<minResources>4096mb,4vcores</minResources>");
+    }
+  }
+
+  private void setupCluster() throws IOException {
     resourceManager = new MockRM(conf);
     resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
 
-    assertTrue(
-        resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
-    scheduler = (FairScheduler)resourceManager.getResourceScheduler();
-
-    scheduler.setClock(clock);
-    scheduler.updateInterval = 60 * 1000;
+    // Create and add two nodes to the cluster
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
   }
 
-  // YARN-4648: The starting code for ResourceManager mock is originated from
-  // TestFairScheduler. It should be keep as it was to guarantee no changing
-  // behaviour of ResourceManager preemption.
-  private void startResourceManagerWithRealFairScheduler() {
-    scheduler = new FairScheduler();
-    conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
-            ResourceScheduler.class);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
-    conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
-            1024);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
-    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
-    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
-    conf.setFloat(
-            FairSchedulerConfiguration
-                    .RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE,
-            TEST_RESERVATION_THRESHOLD);
-
-    resourceManager = new MockRM(conf);
-
-    // TODO: This test should really be using MockRM. For now starting stuff
-    // that is needed at a bare minimum.
-    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
-    resourceManager.getRMContext().getStateStore().start();
-
-    // to initialize the master key
-    resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
-
-    scheduler.setRMContext(resourceManager.getRMContext());
-  }
-
-  private void stopResourceManager() {
-    if (scheduler != null) {
-      scheduler.stop();
-      scheduler = null;
-    }
-    if (resourceManager != null) {
-      resourceManager.stop();
-      resourceManager = null;
-    }
-    QueueMetrics.clearQueueMetrics();
-    DefaultMetricsSystem.shutdown();
-  }
-
-  private void registerNodeAndSubmitApp(
-      int memory, int vcores, int appContainers, int appMemory) {
-    RMNode node1 = MockNodes.newNodeInfo(
-        1, Resources.createResource(memory, vcores), 1, "node1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    assertEquals("Incorrect amount of resources in the cluster",
-        memory, scheduler.rootMetrics.getAvailableMB());
-    assertEquals("Incorrect amount of resources in the cluster",
-        vcores, scheduler.rootMetrics.getAvailableVirtualCores());
-
-    createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
-    scheduler.update();
-    // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 3; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-    }
-    assertEquals("app1's request is not met",
-        memory - appContainers * appMemory,
-        scheduler.rootMetrics.getAvailableMB());
-  }
-
-  @Test
-  public void testPreemptionWithFreeResources() throws Exception {
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("<maxResources>0mb,0vcores</maxResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>1</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>1</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
-    out.println("</allocations>");
-    out.close();
-
-    startResourceManagerWithStubbedFairScheduler(0f);
-    // Create node with 4GB memory and 4 vcores
-    registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
-
-    // Verify submitting another request triggers preemption
-    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
-    scheduler.update();
-    clock.tickSec(6);
-
-    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
-    scheduler.preemptTasksIfNecessary();
-    assertEquals("preemptResources() should have been called", 1024,
-        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
-
-    resourceManager.stop();
-
-    startResourceManagerWithStubbedFairScheduler(0.8f);
-    // Create node with 4GB memory and 4 vcores
-    registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
-
-    // Verify submitting another request doesn't trigger preemption
-    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
-    scheduler.update();
-    clock.tickSec(6);
-
-    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
-    scheduler.preemptTasksIfNecessary();
-    assertEquals("preemptResources() should not have been called", -1,
-        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
-
-    resourceManager.stop();
-
-    startResourceManagerWithStubbedFairScheduler(0.7f);
-    // Create node with 4GB memory and 4 vcores
-    registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
-
-    // Verify submitting another request triggers preemption
-    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
-    scheduler.update();
-    clock.tickSec(6);
-
-    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
-    scheduler.preemptTasksIfNecessary();
-    assertEquals("preemptResources() should have been called", 1024,
-        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
-  }
-
-  @Test (timeout = 5000)
-  /**
-   * Make sure containers are chosen to be preempted in the correct order.
-   */
-  public void testChoiceOfPreemptedContainers() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
-    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
-    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
-
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.25</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueC\">");
-    out.println("<weight>.25</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"default\">");
-    out.println("<weight>.25</weight>");
-    out.println("</queue>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Create two nodes
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    RMNode node2 =
-            MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
-                    "127.0.0.2");
-    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
-    scheduler.handle(nodeEvent2);
-
-    // Queue A and B each request two applications
-    ApplicationAttemptId app1 =
-            createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
-    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
-    ApplicationAttemptId app2 =
-            createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
-    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
-
-    ApplicationAttemptId app3 =
-            createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
-    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
-    ApplicationAttemptId app4 =
-            createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
-    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
-
-    scheduler.update();
-
-    scheduler.getQueueManager().getLeafQueue("queueA", true)
-            .setPolicy(SchedulingPolicy.parse("fifo"));
-    scheduler.getQueueManager().getLeafQueue("queueB", true)
-            .setPolicy(SchedulingPolicy.parse("fair"));
-
-    // Sufficient node check-ins to fully schedule containers
-    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-    NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-    for (int i = 0; i < 4; i++) {
-      scheduler.handle(nodeUpdate1);
-      scheduler.handle(nodeUpdate2);
-    }
-
-    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-
-    // Now new requests arrive from queueC and default
-    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
-    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
-    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
-    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
-    scheduler.update();
-
-    // We should be able to claw back one container from queueA and queueB each.
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
-    // First verify we are adding containers to preemption list for the app.
-    // For queueA (fifo), app2 is selected.
-    // For queueB (fair), app4 is selected.
-    assertTrue("App2 should have container to be preempted",
-            !Collections.disjoint(
-                    scheduler.getSchedulerApp(app2).getLiveContainers(),
-                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
-    assertTrue("App4 should have container to be preempted",
-            !Collections.disjoint(
-                    scheduler.getSchedulerApp(app2).getLiveContainers(),
-                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
-
-    // Pretend 15 seconds have passed
-    clock.tickSec(15);
-
-    // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-
-    // At this point the containers should have been killed (since we are not simulating AM)
-    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    // Inside each app, containers are sorted according to their priorities.
-    // Containers with priority 4 are preempted for app2 and app4.
-    Set<RMContainer> set = new HashSet<RMContainer>();
-    for (RMContainer container :
-            scheduler.getSchedulerApp(app2).getLiveContainers()) {
-      if (container.getAllocatedSchedulerKey().getPriority().getPriority() ==
-          4) {
-        set.add(container);
+  private void sendEnoughNodeUpdatesToAssignFully() {
+    for (RMNode node : rmNodes) {
+      NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+          new NodeUpdateSchedulerEvent(node);
+      for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+        scheduler.handle(nodeUpdateSchedulerEvent);
       }
     }
-    for (RMContainer container :
-            scheduler.getSchedulerApp(app4).getLiveContainers()) {
-      if (container.getAllocatedSchedulerKey().getPriority().getPriority() ==
-          4) {
-        set.add(container);
+  }
+
+  /**
+   * Submit application to {@code queue1} and take over the entire cluster.
+   * Submit application with larger containers to {@code queue2} that
+   * requires preemption from the first application.
+   *
+   * @param queue1 first queue
+   * @param queue2 second queue
+   * @throws InterruptedException if interrupted while waiting
+   */
+  private void submitApps(String queue1, String queue2)
+      throws InterruptedException {
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId appAttemptId1
+        = createSchedulingRequest(1024, 1, queue1, "default",
+        NODE_CAPACITY_MULTIPLE * rmNodes.size());
+    greedyApp = scheduler.getSchedulerApp(appAttemptId1);
+    scheduler.update();
+    sendEnoughNodeUpdatesToAssignFully();
+    assertEquals(8, greedyApp.getLiveContainers().size());
+
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId appAttemptId2
+        = createSchedulingRequest(2048, 2, queue2, "default",
+        NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
+    starvingApp = scheduler.getSchedulerApp(appAttemptId2);
+
+    // Sleep long enough to pass
+    Thread.sleep(10);
+
+    scheduler.update();
+  }
+
+  private void verifyPreemption() throws InterruptedException {
+    // Sleep long enough for four containers to be preempted. Note that the
+    // starved app must be queued four times for containers to be preempted.
+    for (int i = 0; i < 10000; i++) {
+      if (greedyApp.getLiveContainers().size() == 4) {
+        break;
       }
+      Thread.sleep(10);
     }
-    assertTrue("Containers with priority=4 in app2 and app4 should be " +
-            "preempted.", set.isEmpty());
 
-    // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    // Verify the right amount of containers are preempted from greedyApp
+    assertEquals(4, greedyApp.getLiveContainers().size());
 
-    // Pretend 15 seconds have passed
-    clock.tickSec(15);
+    sendEnoughNodeUpdatesToAssignFully();
 
-    // We should be able to claw back another container from A and B each.
-    // For queueA (fifo), continue preempting from app2.
-    // For queueB (fair), even app4 has a lowest priority container with p=4, it
-    // still preempts from app3 as app3 is most over fair share.
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    // Verify the preempted containers are assigned to starvingApp
+    assertEquals(2, starvingApp.getLiveContainers().size());
+  }
 
-    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-
-    // Now A and B are below fair share, so preemption shouldn't do anything
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-    assertTrue("App1 should have no container to be preempted",
-            scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
-    assertTrue("App2 should have no container to be preempted",
-            scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
-    assertTrue("App3 should have no container to be preempted",
-            scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
-    assertTrue("App4 should have no container to be preempted",
-            scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
-    stopResourceManager();
+  private void verifyNoPreemption() throws InterruptedException {
+    // Sleep long enough to ensure not even one container is preempted.
+    for (int i = 0; i < 600; i++) {
+      if (greedyApp.getLiveContainers().size() != 8) {
+        break;
+      }
+      Thread.sleep(10);
+    }
+    assertEquals(8, greedyApp.getLiveContainers().size());
   }
 
   @Test
-  public void testPreemptionIsNotDelayedToNextRound() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-
-    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
-    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
-
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>8</weight>");
-    out.println("<queue name=\"queueA1\" />");
-    out.println("<queue name=\"queueA2\" />");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>2</weight>");
-    out.println("</queue>");
-    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Add a node of 8G
-    RMNode node1 = MockNodes.newNodeInfo(1,
-            Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    // Run apps in queueA.A1 and queueB
-    ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
-            "queueA.queueA1", "user1", 7, 1);
-    // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
-    ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
-            "user2", 1, 1);
-
-    scheduler.update();
-
-    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-    for (int i = 0; i < 8; i++) {
-      scheduler.handle(nodeUpdate1);
+  public void testPreemptionWithinSameLeafQueue() throws Exception {
+    setupCluster();
+    String queue = "root.preemptable.child-1";
+    submitApps(queue, queue);
+    if (fairsharePreemption) {
+      verifyPreemption();
+    } else {
+      verifyNoPreemption();
     }
-
-    // verify if the apps got the containers they requested
-    assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-
-    // Now submit an app in queueA.queueA2
-    ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
-            "queueA.queueA2", "user3", 7, 1);
-    scheduler.update();
-
-    // Let 11 sec pass
-    clock.tickSec(11);
-
-    scheduler.update();
-    Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
-            .getLeafQueue("queueA.queueA2", false), clock.getTime());
-    assertEquals(3277, toPreempt.getMemorySize());
-
-    // verify if the 3 containers required by queueA2 are preempted in the same
-    // round
-    scheduler.preemptResources(toPreempt);
-    assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
-            .size());
-    stopResourceManager();
-  }
-
-  @Test (timeout = 5000)
-  /**
-   * Tests the timing of decision to preempt tasks.
-   */
-  public void testPreemptionDecision() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("<maxResources>0mb,0vcores</maxResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueC\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueD\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Create four nodes
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    RMNode node2 =
-            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
-                    "127.0.0.2");
-    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
-    scheduler.handle(nodeEvent2);
-
-    RMNode node3 =
-            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
-                    "127.0.0.3");
-    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
-    scheduler.handle(nodeEvent3);
-
-    // Queue A and B each request three containers
-    ApplicationAttemptId app1 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
-    ApplicationAttemptId app2 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
-    ApplicationAttemptId app3 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
-
-    ApplicationAttemptId app4 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
-    ApplicationAttemptId app5 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
-    ApplicationAttemptId app6 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
-
-    scheduler.update();
-
-    // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 2; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-      scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
-    }
-
-    // Now new requests arrive from queues C and D
-    ApplicationAttemptId app7 =
-            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
-    ApplicationAttemptId app8 =
-            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
-    ApplicationAttemptId app9 =
-            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
-    ApplicationAttemptId app10 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
-    ApplicationAttemptId app11 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
-    ApplicationAttemptId app12 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
-    scheduler.update();
-
-    FSLeafQueue schedC =
-            scheduler.getQueueManager().getLeafQueue("queueC", true);
-    FSLeafQueue schedD =
-            scheduler.getQueueManager().getLeafQueue("queueD", true);
-
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
-    // After minSharePreemptionTime has passed, they should want to preempt min
-    // share.
-    clock.tickSec(6);
-    assertEquals(
-            1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
-    assertEquals(
-            1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
-
-    // After fairSharePreemptionTime has passed, they should want to preempt
-    // fair share.
-    scheduler.update();
-    clock.tickSec(6);
-    assertEquals(
-            1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
-    assertEquals(
-            1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
-    stopResourceManager();
   }
 
   @Test
-/**
- * Tests the timing of decision to preempt tasks.
- */
-  public void testPreemptionDecisionWithDRF() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("<maxResources>0mb,0vcores</maxResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,1vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,2vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueC\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,3vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueD\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,2vcores</minResources>");
-    out.println("</queue>");
-    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Create four nodes
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    RMNode node2 =
-            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
-                    "127.0.0.2");
-    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
-    scheduler.handle(nodeEvent2);
-
-    RMNode node3 =
-            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
-                    "127.0.0.3");
-    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
-    scheduler.handle(nodeEvent3);
-
-    // Queue A and B each request three containers
-    ApplicationAttemptId app1 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
-    ApplicationAttemptId app2 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
-    ApplicationAttemptId app3 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
-
-    ApplicationAttemptId app4 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
-    ApplicationAttemptId app5 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
-    ApplicationAttemptId app6 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
-
-    scheduler.update();
-
-    // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 2; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-      scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
-    }
-
-    // Now new requests arrive from queues C and D
-    ApplicationAttemptId app7 =
-            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
-    ApplicationAttemptId app8 =
-            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
-    ApplicationAttemptId app9 =
-            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
-    ApplicationAttemptId app10 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
-    ApplicationAttemptId app11 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
-    ApplicationAttemptId app12 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
-
-    scheduler.update();
-
-    FSLeafQueue schedC =
-            scheduler.getQueueManager().getLeafQueue("queueC", true);
-    FSLeafQueue schedD =
-            scheduler.getQueueManager().getLeafQueue("queueD", true);
-
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
-
-    // Test :
-    // 1) whether componentWise min works as expected.
-    // 2) DRF calculator is used
-
-    // After minSharePreemptionTime has passed, they should want to preempt min
-    // share.
-    clock.tickSec(6);
-    Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
-    assertEquals(1024, res.getMemorySize());
-    // Demand = 3
-    assertEquals(3, res.getVirtualCores());
-
-    res = scheduler.resourceDeficit(schedD, clock.getTime());
-    assertEquals(1024, res.getMemorySize());
-    // Demand = 6, but min share = 2
-    assertEquals(2, res.getVirtualCores());
-
-    // After fairSharePreemptionTime has passed, they should want to preempt
-    // fair share.
-    scheduler.update();
-    clock.tickSec(6);
-    res = scheduler.resourceDeficit(schedC, clock.getTime());
-    assertEquals(1536, res.getMemorySize());
-    assertEquals(3, res.getVirtualCores());
-
-    res = scheduler.resourceDeficit(schedD, clock.getTime());
-    assertEquals(1536, res.getMemorySize());
-    // Demand = 6, but fair share = 3
-    assertEquals(3, res.getVirtualCores());
-    stopResourceManager();
+  public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
+    setupCluster();
+    submitApps("root.preemptable.child-1", "root.preemptable.child-2");
+    verifyPreemption();
   }
 
   @Test
-  /**
-   * Tests the various timing of decision to preempt tasks.
-   */
-  public void testPreemptionDecisionWithVariousTimeout() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("<maxResources>0mb,0vcores</maxResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>1</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>2</weight>");
-    out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
-    out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
-    out.println("<queue name=\"queueB1\">");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB2\">");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueC\">");
-    out.println("<weight>1</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
-    out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Check the min/fair share preemption timeout for each queue
-    QueueManager queueMgr = scheduler.getQueueManager();
-    assertEquals(30000, queueMgr.getQueue("root")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("default")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueA")
-            .getFairSharePreemptionTimeout());
-    assertEquals(25000, queueMgr.getQueue("queueB")
-            .getFairSharePreemptionTimeout());
-    assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
-            .getFairSharePreemptionTimeout());
-    assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueC")
-            .getFairSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("root")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("default")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("queueA")
-            .getMinSharePreemptionTimeout());
-    assertEquals(10000, queueMgr.getQueue("queueB")
-            .getMinSharePreemptionTimeout());
-    assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
-            .getMinSharePreemptionTimeout());
-    assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("queueC")
-            .getMinSharePreemptionTimeout());
-
-    // Create one big node
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    // Queue A takes all resources
-    for (int i = 0; i < 6; i ++) {
-      createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
-    }
-
-    scheduler.update();
-
-    // Sufficient node check-ins to fully schedule containers
-    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-    for (int i = 0; i < 6; i++) {
-      scheduler.handle(nodeUpdate1);
-    }
-
-    // Now new requests arrive from queues B1, B2 and C
-    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
-    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
-    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
-    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
-    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
-    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
-    createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
-    createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
-    createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
-    scheduler.update();
-
-    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
-    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
-    FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
-
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
-    assertTrue(Resources.equals(
-            Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
-
-    // After 5 seconds, queueB1 wants to preempt min share
-    scheduler.update();
-    clock.tickSec(6);
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
-    assertEquals(
-            0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
-    assertEquals(
-            0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
-    // After 10 seconds, queueB2 wants to preempt min share
-    scheduler.update();
-    clock.tickSec(5);
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
-    assertEquals(
-            0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
-    // After 15 seconds, queueC wants to preempt min share
-    scheduler.update();
-    clock.tickSec(5);
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
-    // After 20 seconds, queueB2 should want to preempt fair share
-    scheduler.update();
-    clock.tickSec(5);
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
-    assertEquals(
-            1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
-    // After 25 seconds, queueB1 should want to preempt fair share
-    scheduler.update();
-    clock.tickSec(5);
-    assertEquals(
-            1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
-    assertEquals(
-            1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
-    assertEquals(
-            1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-
-    // After 30 seconds, queueC should want to preempt fair share
-    scheduler.update();
-    clock.tickSec(5);
-    assertEquals(
-            1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
-    assertEquals(
-            1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
-    assertEquals(
-            1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-    stopResourceManager();
+  public void testPreemptionBetweenNonSiblingQueues() throws Exception {
+    setupCluster();
+    submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
+    verifyPreemption();
   }
 
   @Test
-  /**
-   * Tests the decision to preempt tasks respect to non-preemptable queues
-   * 1, Queues as follow:
-   *   queueA(non-preemptable)
-   *   queueB(preemptable)
-   *   parentQueue(non-preemptable)
-   *     --queueC(preemptable)
-   *   queueD(preemptable)
-   * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
-   * 3, Now all resource are occupied
-   * 4, Submit request to queueD, and need to preempt resource from other queues
-   * 5, Only preemptable queue(queueB) would be preempted.
-   */
-  public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("<maxResources>0mb,0vcores</maxResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"parentQueue\">");
-    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
-    out.println("<queue name=\"queueC\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueD\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Create four nodes(3G each)
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    RMNode node2 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
-                    "127.0.0.2");
-    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
-    scheduler.handle(nodeEvent2);
-
-    RMNode node3 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
-                    "127.0.0.3");
-    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
-    scheduler.handle(nodeEvent3);
-
-    RMNode node4 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
-                    "127.0.0.4");
-    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
-    scheduler.handle(nodeEvent4);
-
-    // Submit apps to queueA, queueB, queueC,
-    // now all resource of the cluster is occupied
-    ApplicationAttemptId app1 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
-    ApplicationAttemptId app2 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
-    ApplicationAttemptId app3 =
-            createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
-
-    scheduler.update();
-
-    // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 3; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-      scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
-
-      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
-      scheduler.handle(nodeUpdate4);
-    }
-
-    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
-    // Now new requests arrive from queues D
-    ApplicationAttemptId app4 =
-            createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
-    scheduler.update();
-    FSLeafQueue schedD =
-            scheduler.getQueueManager().getLeafQueue("queueD", true);
-
-    // After minSharePreemptionTime has passed, 2G resource should preempted from
-    // queueB to queueD
-    clock.tickSec(6);
-    assertEquals(2048,
-            scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
-
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-    // now only app2 is selected to be preempted
-    assertTrue("App2 should have container to be preempted",
-            !Collections.disjoint(
-                    scheduler.getSchedulerApp(app2).getLiveContainers(),
-                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
-    assertTrue("App1 should not have container to be preempted",
-            Collections.disjoint(
-                    scheduler.getSchedulerApp(app1).getLiveContainers(),
-                    scheduler.getSchedulerApp(app1).getPreemptionContainers()));
-    assertTrue("App3 should not have container to be preempted",
-            Collections.disjoint(
-                    scheduler.getSchedulerApp(app3).getLiveContainers(),
-                    scheduler.getSchedulerApp(app3).getPreemptionContainers()));
-    // Pretend 20 seconds have passed
-    clock.tickSec(20);
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-    for (int i = 0; i < 3; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-      scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
-
-      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
-      scheduler.handle(nodeUpdate4);
-    }
-    // after preemption
-    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    stopResourceManager();
-  }
-
-  @Test
-  /**
-   * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
-   * all queues.
-   * Then none of them would be preempted actually.
-   * 1, Queues as follow:
-   *   queueA(non-preemptable)
-   *   queueB(non-preemptable)
-   *   parentQueue(non-preemptable)
-   *     --queueC(preemptable)
-   *   parentQueue(preemptable)
-   *     --queueD(non-preemptable)
-   * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
-   * 3, Now all resource are occupied
-   * 4, Submit request to queueA, and need to preempt resource from other queues
-   * 5, None of queues would be preempted.
-   */
-  public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
-          throws Exception {
-    startResourceManagerWithRealFairScheduler();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("<maxResources>0mb,0vcores</maxResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
-    out.println("</queue>");
-    out.println("<queue name=\"parentQueue1\">");
-    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
-    out.println("<queue name=\"queueC\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<queue name=\"parentQueue2\">");
-    out.println("<queue name=\"queueD\">");
-    out.println("<weight>.25</weight>");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Create four nodes(3G each)
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    RMNode node2 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
-                    "127.0.0.2");
-    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
-    scheduler.handle(nodeEvent2);
-
-    RMNode node3 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
-                    "127.0.0.3");
-    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
-    scheduler.handle(nodeEvent3);
-
-    RMNode node4 =
-            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
-                    "127.0.0.4");
-    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
-    scheduler.handle(nodeEvent4);
-
-    // Submit apps to queueB, queueC, queueD
-    // now all resource of the cluster is occupied
-
-    ApplicationAttemptId app1 =
-            createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
-    ApplicationAttemptId app2 =
-            createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
-    ApplicationAttemptId app3 =
-            createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
-    scheduler.update();
-
-    // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 3; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-      scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
-
-      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
-      scheduler.handle(nodeUpdate4);
-    }
-
-    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
-    // Now new requests arrive from queues A
-    ApplicationAttemptId app4 =
-            createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
-    scheduler.update();
-    FSLeafQueue schedA =
-            scheduler.getQueueManager().getLeafQueue("queueA", true);
-
-    // After minSharePreemptionTime has passed, resource deficit is 2G
-    clock.tickSec(6);
-    assertEquals(2048,
-            scheduler.resourceDeficit(schedA, clock.getTime()).getMemorySize());
-
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-    // now none app is selected to be preempted
-    assertTrue("App1 should have container to be preempted",
-            Collections.disjoint(
-                    scheduler.getSchedulerApp(app1).getLiveContainers(),
-                    scheduler.getSchedulerApp(app1).getPreemptionContainers()));
-    assertTrue("App2 should not have container to be preempted",
-            Collections.disjoint(
-                    scheduler.getSchedulerApp(app2).getLiveContainers(),
-                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
-    assertTrue("App3 should not have container to be preempted",
-            Collections.disjoint(
-                    scheduler.getSchedulerApp(app3).getLiveContainers(),
-                    scheduler.getSchedulerApp(app3).getPreemptionContainers()));
-    // Pretend 20 seconds have passed
-    clock.tickSec(20);
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
-    for (int i = 0; i < 3; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
-      scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
-      scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
-
-      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
-      scheduler.handle(nodeUpdate4);
-    }
-    // after preemption
-    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    stopResourceManager();
-  }
-
-  @Test
-  public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<queue name=\"queueB1\">");
-    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB2\">");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueC\">");
-    out.println("</queue>");
-    out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
-    out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
-    out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Check the min/fair share preemption timeout for each queue
-    QueueManager queueMgr = scheduler.getQueueManager();
-    assertEquals(30000, queueMgr.getQueue("root")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("default")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueA")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueB")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
-            .getFairSharePreemptionTimeout());
-    assertEquals(30000, queueMgr.getQueue("queueC")
-            .getFairSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("root")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("default")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("queueA")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("queueB")
-            .getMinSharePreemptionTimeout());
-    assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
-            .getMinSharePreemptionTimeout());
-    assertEquals(15000, queueMgr.getQueue("queueC")
-            .getMinSharePreemptionTimeout());
-
-    // If both exist, we take the default one
-    out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"default\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueA\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<queue name=\"queueB1\">");
-    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB2\">");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueC\">");
-    out.println("</queue>");
-    out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
-    out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
-    out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    assertEquals(25000, queueMgr.getQueue("root")
-            .getFairSharePreemptionTimeout());
-    stopResourceManager();
-  }
-
-  @Test(timeout = 5000)
-  public void testRecoverRequestAfterPreemption() throws Exception {
-    startResourceManagerWithRealFairScheduler();
-    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
-
-    ControlledClock clock = new ControlledClock();
-    scheduler.setClock(clock);
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    SchedulerRequestKey schedulerKey = TestUtils.toSchedulerKey(20);
-    String host = "127.0.0.1";
-    int GB = 1024;
-
-    // Create Node and raised Node Added event
-    RMNode node = MockNodes.newNodeInfo(1,
-            Resources.createResource(16 * 1024, 4), 0, host);
-    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
-    scheduler.handle(nodeEvent);
-
-    // Create 3 container requests and place it in ask
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
-            schedulerKey.getPriority().getPriority(), 1, true);
-    ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
-        node.getRackName(), schedulerKey.getPriority().getPriority(), 1,
-        true);
-    ResourceRequest offRackRequest = createResourceRequest(GB, 1,
-        ResourceRequest.ANY, schedulerKey.getPriority().getPriority(), 1, true);
-    ask.add(nodeLocalRequest);
-    ask.add(rackLocalRequest);
-    ask.add(offRackRequest);
-
-    // Create Request and update
-    ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
-            "user1", ask);
-    scheduler.update();
-
-    // Sufficient node check-ins to fully schedule containers
-    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
-    scheduler.handle(nodeUpdate);
-
-    assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
-            .size());
-    SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
-
-    // ResourceRequest will be empty once NodeUpdate is completed
-    Assert.assertNull(app.getResourceRequest(schedulerKey, host));
-
-    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
-    RMContainer rmContainer = app.getRMContainer(containerId1);
-
-    // Create a preempt event and register for preemption
-    scheduler.warnOrKillContainer(rmContainer);
-
-    // Wait for few clock ticks
-    clock.tickSec(5);
-
-    // preempt now
-    scheduler.warnOrKillContainer(rmContainer);
-
-    // Trigger container rescheduled event
-    scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
-            SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
-
-    List<ResourceRequest> requests = rmContainer.getResourceRequests();
-    // Once recovered, resource request will be present again in app
-    Assert.assertEquals(3, requests.size());
-    for (ResourceRequest request : requests) {
-      Assert.assertEquals(1,
-              app.getResourceRequest(schedulerKey, request.getResourceName())
-                      .getNumContainers());
-    }
-
-    // Send node heartbeat
-    scheduler.update();
-    scheduler.handle(nodeUpdate);
-
-    List<Container> containers = scheduler.allocate(appAttemptId,
-            Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null, null, null).getContainers();
-
-    // Now with updated ResourceRequest, a container is allocated for AM.
-    Assert.assertTrue(containers.size() == 1);
-    stopResourceManager();
+  public void testNoPreemptionFromDisallowedQueue() throws Exception {
+    setupCluster();
+    submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
+    verifyNoPreemption();
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
new file mode 100644
index 0000000..5736f75
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * QueueManager tests that require a real scheduler
+ */
+public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
+  private final static File ALLOC_FILE = new File(TEST_DIR, "test-queue-mgr");
+
+  @Before
+  public void setup() throws IOException {
+    createConfiguration();
+    writeAllocFile(30, 40);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.deleteOnExit();
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  private void writeAllocFile(int defaultFairShareTimeout,
+      int fairShareTimeout) throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<queue name=\"queueB1\">");
+    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>15"
+        + "</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>" +
+        + defaultFairShareTimeout + "</defaultFairSharePreemptionTimeout>");
+    out.println("<fairSharePreemptionTimeout>"
+        + fairShareTimeout + "</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+  }
+
+  @Test
+  public void testBackwardsCompatiblePreemptionConfiguration()
+      throws IOException {
+    // Check the min/fair share preemption timeout for each queue
+    QueueManager queueMgr = scheduler.getQueueManager();
+    assertEquals(30000, queueMgr.getQueue("root")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("default")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueA")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueC")
+        .getFairSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("root")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("default")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueA")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueB")
+        .getMinSharePreemptionTimeout());
+    assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueC")
+        .getMinSharePreemptionTimeout());
+
+    // Lower the fairshare preemption timeouts and verify it is picked
+    // correctly.
+    writeAllocFile(25, 30);
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+    assertEquals(25000, queueMgr.getQueue("root")
+        .getFairSharePreemptionTimeout());
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index dea2dd1..57c7301 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -330,11 +330,6 @@
       }
 
       @Override
-      public RMContainer preemptContainer() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
       public Resource getFairShare() {
         throw new UnsupportedOperationException();
       }