| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.locks.Lock; |
| |
| /** |
| * Thread that handles FairScheduler preemption. |
| */ |
| class FSPreemptionThread extends Thread { |
| private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class); |
| protected final FSContext context; |
| private final FairScheduler scheduler; |
| private final long warnTimeBeforeKill; |
| private final long delayBeforeNextStarvationCheck; |
| private final Timer preemptionTimer; |
| private final Lock schedulerReadLock; |
| |
| FSPreemptionThread(FairScheduler scheduler) { |
| setDaemon(true); |
| setName("FSPreemptionThread"); |
| this.scheduler = scheduler; |
| this.context = scheduler.getContext(); |
| FairSchedulerConfiguration fsConf = scheduler.getConf(); |
| context.setPreemptionEnabled(); |
| context.setPreemptionUtilizationThreshold( |
| fsConf.getPreemptionUtilizationThreshold()); |
| preemptionTimer = new Timer("Preemption Timer", true); |
| |
| warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); |
| long allocDelay = (fsConf.isContinuousSchedulingEnabled() |
| ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs |
| : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats |
| delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay + |
| fsConf.getWaitTimeBeforeNextStarvationCheck(); |
| schedulerReadLock = scheduler.getSchedulerReadLock(); |
| } |
| |
| @Override |
| public void run() { |
| while (!Thread.interrupted()) { |
| try { |
| FSAppAttempt starvedApp = context.getStarvedApps().take(); |
| // Hold the scheduler readlock so this is not concurrent with the |
| // update thread. |
| schedulerReadLock.lock(); |
| try { |
| preemptContainers(identifyContainersToPreempt(starvedApp)); |
| } finally { |
| schedulerReadLock.unlock(); |
| } |
| starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); |
| } catch (InterruptedException e) { |
| LOG.info("Preemption thread interrupted! Exiting."); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Given an app, identify containers to preempt to satisfy the app's |
| * starvation. |
| * |
| * Mechanics: |
| * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of |
| * starvation. |
| * 2. For each {@link ResourceRequest}, iterate through matching |
| * nodes and identify containers to preempt all on one node, also |
| * optimizing for least number of AM container preemptions. |
| * |
| * @param starvedApp starved application for which we are identifying |
| * preemption targets |
| * @return list of containers to preempt to satisfy starvedApp |
| */ |
| private List<RMContainer> identifyContainersToPreempt( |
| FSAppAttempt starvedApp) { |
| List<RMContainer> containersToPreempt = new ArrayList<>(); |
| |
| // Iterate through enough RRs to address app's starvation |
| for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { |
| for (int i = 0; i < rr.getNumContainers(); i++) { |
| PreemptableContainers bestContainers = null; |
| List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker() |
| .getNodesByResourceName(rr.getResourceName()); |
| int maxAMContainers = Integer.MAX_VALUE; |
| |
| for (FSSchedulerNode node : potentialNodes) { |
| PreemptableContainers preemptableContainers = |
| identifyContainersToPreemptOnNode( |
| rr.getCapability(), node, maxAMContainers); |
| |
| if (preemptableContainers != null) { |
| // This set is better than any previously identified set. |
| bestContainers = preemptableContainers; |
| maxAMContainers = bestContainers.numAMContainers; |
| |
| if (maxAMContainers == 0) { |
| break; |
| } |
| } |
| } // End of iteration through nodes for one RR |
| |
| if (bestContainers != null) { |
| List<RMContainer> containers = bestContainers.getAllContainers(); |
| if (containers.size() > 0) { |
| containersToPreempt.addAll(containers); |
| // Reserve the containers for the starved app |
| trackPreemptionsAgainstNode(containers, starvedApp); |
| // Warn application about containers to be killed |
| for (RMContainer container : containers) { |
| FSAppAttempt app = scheduler.getSchedulerApp( |
| container.getApplicationAttemptId()); |
| LOG.info("Preempting container " + container + |
| " from queue " + app.getQueueName()); |
| app.trackContainerForPreemption(container); |
| } |
| } |
| } |
| } |
| } // End of iteration over RRs |
| return containersToPreempt; |
| } |
| |
| /** |
| * Identify containers to preempt on a given node. Try to find a list with |
| * least AM containers to avoid preempting AM containers. This method returns |
| * a non-null set of containers only if the number of AM containers is less |
| * than maxAMContainers. |
| * |
| * @param request resource requested |
| * @param node the node to check |
| * @param maxAMContainers max allowed AM containers in the set |
| * @return list of preemptable containers with fewer AM containers than |
| * maxAMContainers if such a list exists; null otherwise. |
| */ |
| private PreemptableContainers identifyContainersToPreemptOnNode( |
| Resource request, FSSchedulerNode node, int maxAMContainers) { |
| PreemptableContainers preemptableContainers = |
| new PreemptableContainers(maxAMContainers); |
| |
| // Figure out list of containers to consider |
| List<RMContainer> containersToCheck = |
| node.getRunningContainersWithAMsAtTheEnd(); |
| containersToCheck.removeAll(node.getContainersForPreemption()); |
| |
| // Initialize potential with unallocated but not reserved resources |
| Resource potential = Resources.subtractFromNonNegative( |
| Resources.clone(node.getUnallocatedResource()), |
| node.getTotalReserved()); |
| |
| for (RMContainer container : containersToCheck) { |
| FSAppAttempt app = |
| scheduler.getSchedulerApp(container.getApplicationAttemptId()); |
| ApplicationId appId = app.getApplicationId(); |
| |
| if (app.canContainerBePreempted(container, |
| preemptableContainers.getResourcesToPreemptForApp(appId))) { |
| // Flag container for preemption |
| if (!preemptableContainers.addContainer(container, appId)) { |
| return null; |
| } |
| |
| Resources.addTo(potential, container.getAllocatedResource()); |
| } |
| |
| // Check if we have already identified enough containers |
| if (Resources.fitsIn(request, potential)) { |
| return preemptableContainers; |
| } |
| } |
| |
| // Return null if the sum of all preemptable containers' resources |
| // isn't enough to satisfy the starved request. |
| return null; |
| } |
| |
| private void trackPreemptionsAgainstNode(List<RMContainer> containers, |
| FSAppAttempt app) { |
| FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() |
| .getNode(containers.get(0).getNodeId()); |
| node.addContainersForPreemption(containers, app); |
| } |
| |
| private void preemptContainers(List<RMContainer> containers) { |
| // Schedule timer task to kill containers |
| preemptionTimer.schedule( |
| new PreemptContainersTask(containers), warnTimeBeforeKill); |
| } |
| |
| private class PreemptContainersTask extends TimerTask { |
| private final List<RMContainer> containers; |
| |
| PreemptContainersTask(List<RMContainer> containers) { |
| this.containers = containers; |
| } |
| |
| @Override |
| public void run() { |
| for (RMContainer container : containers) { |
| ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus( |
| container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); |
| |
| LOG.info("Killing container " + container); |
| scheduler.completedContainer( |
| container, status, RMContainerEventType.KILL); |
| } |
| } |
| } |
| |
| /** |
| * A class to track preemptable containers. |
| */ |
| private static class PreemptableContainers { |
| Map<ApplicationId, List<RMContainer>> containersByApp; |
| int numAMContainers; |
| int maxAMContainers; |
| |
| PreemptableContainers(int maxAMContainers) { |
| numAMContainers = 0; |
| this.maxAMContainers = maxAMContainers; |
| this.containersByApp = new HashMap<>(); |
| } |
| |
| /** |
| * Add a container if the number of AM containers is less than |
| * maxAMContainers. |
| * |
| * @param container the container to add |
| * @return true if success; false otherwise |
| */ |
| private boolean addContainer(RMContainer container, ApplicationId appId) { |
| if (container.isAMContainer()) { |
| numAMContainers++; |
| if (numAMContainers >= maxAMContainers) { |
| return false; |
| } |
| } |
| |
| if (!containersByApp.containsKey(appId)) { |
| containersByApp.put(appId, new ArrayList<>()); |
| } |
| |
| containersByApp.get(appId).add(container); |
| return true; |
| } |
| |
| private List<RMContainer> getAllContainers() { |
| List<RMContainer> allContainers = new ArrayList<>(); |
| for (List<RMContainer> containersForApp : containersByApp.values()) { |
| allContainers.addAll(containersForApp); |
| } |
| return allContainers; |
| } |
| |
| private Resource getResourcesToPreemptForApp(ApplicationId appId) { |
| Resource resourcesToPreempt = Resources.createResource(0, 0); |
| if (containersByApp.containsKey(appId)) { |
| for (RMContainer container : containersByApp.get(appId)) { |
| Resources.addTo(resourcesToPreempt, container.getAllocatedResource()); |
| } |
| } |
| return resourcesToPreempt; |
| } |
| } |
| } |