blob: f843db402c4c8afafab587d761167d313cc0eed5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class FifoCandidatesSelector
extends PreemptionCandidatesSelector {
private static final Log LOG =
LogFactory.getLog(FifoCandidatesSelector.class);
private PreemptableResourceCalculator preemptableAmountCalculator;
FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
boolean includeReservedResource) {
super(preemptionContext);
preemptableAmountCalculator = new PreemptableResourceCalculator(
preemptionContext, includeReservedResource);
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptionAllowed) {
// Calculate how much resources we need to preempt
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptionAllowed);
// Previous selectors (with higher priority) could have already
// selected containers. We need to deduct preemptable resources
// based on already selected candidates.
CapacitySchedulerPreemptionUtils
.deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
selectedCandidates);
List<RMContainer> skippedAMContainerlist = new ArrayList<>();
// Loop all leaf queues
for (String queueName : preemptionContext.getLeafQueueNames()) {
// check if preemption disabled for the queue
if (preemptionContext.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping from queue=" + queueName
+ " because it's a non-preemptable queue");
}
continue;
}
// compute resToObtainByPartition considered inter-queue preemption
LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).leafQueue;
Map<String, Resource> resToObtainByPartition =
CapacitySchedulerPreemptionUtils
.getResToObtainByPartitionForLeafQueue(preemptionContext,
queueName, clusterResource);
try {
leafQueue.getReadLock().lock();
// go through all ignore-partition-exclusivity containers first to make
// sure such containers will be preemptionCandidates first
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
leafQueue.getIgnoreExclusivityRMContainers();
for (String partition : resToObtainByPartition.keySet()) {
if (ignorePartitionExclusivityContainers.containsKey(partition)) {
TreeSet<RMContainer> rmContainers =
ignorePartitionExclusivityContainers.get(partition);
// We will check container from reverse order, so latter submitted
// application's containers will be preemptionCandidates first.
for (RMContainer c : rmContainers.descendingSet()) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedCandidates)) {
// Skip already selected containers
continue;
}
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc,
preemptionContext, resToObtainByPartition, c,
clusterResource, selectedCandidates,
totalPreemptionAllowed);
if (!preempted) {
continue;
}
}
}
}
// preempt other containers
Resource skippedAMSize = Resource.newInstance(0, 0);
Iterator<FiCaSchedulerApp> desc =
leafQueue.getOrderingPolicy().getPreemptionIterator();
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
// When we complete preempt from one partition, we will remove from
// resToObtainByPartition, so when it becomes empty, we can get no
// more preemption is needed
if (resToObtainByPartition.isEmpty()) {
break;
}
preemptFrom(fc, clusterResource, resToObtainByPartition,
skippedAMContainerlist, skippedAMSize, selectedCandidates,
totalPreemptionAllowed);
}
// Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preemptionCandidates from this Queue.
Resource maxAMCapacityForThisQueue = Resources.multiply(
Resources.multiply(clusterResource,
leafQueue.getAbsoluteCapacity()),
leafQueue.getMaxAMResourcePerQueuePercent());
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
totalPreemptionAllowed);
} finally {
leafQueue.getReadLock().unlock();
}
}
return selectedCandidates;
}
/**
* As more resources are needed for preemption, saved AMContainers has to be
* rescanned. Such AMContainers can be preemptionCandidates based on resToObtain, but
* maxAMCapacityForThisQueue resources will be still retained.
*
* @param clusterResource
* @param preemptMap
* @param skippedAMContainerlist
* @param skippedAMSize
* @param maxAMCapacityForThisQueue
*/
private void preemptAMContainers(Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
List<RMContainer> skippedAMContainerlist,
Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) {
for (RMContainer c : skippedAMContainerlist) {
// Got required amount of resources for preemption, can stop now
if (resToObtainByPartition.isEmpty()) {
break;
}
// Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
// container selection iteration for preemption will be stopped.
if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
maxAMCapacityForThisQueue)) {
break;
}
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, preemptMap,
totalPreemptionAllowed);
if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
}
skippedAMContainerlist.clear();
}
/**
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*/
@SuppressWarnings("unchecked")
private void preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
Resource totalPreemptionAllowed) {
ApplicationAttemptId appId = app.getApplicationAttemptId();
// first drop reserved containers towards rsrcPreempt
List<RMContainer> reservedContainers =
new ArrayList<>(app.getReservedContainers());
for (RMContainer c : reservedContainers) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedContainers)) {
continue;
}
if (resToObtainByPartition.isEmpty()) {
return;
}
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedContainers, totalPreemptionAllowed);
if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler()
.handle(new ContainerPreemptEvent(appId, c,
SchedulerEventType.KILL_RESERVED_CONTAINER));
}
}
// if more resources are to be freed go through all live containers in
// reverse priority and reverse allocation order and mark them for
// preemption
List<RMContainer> liveContainers =
new ArrayList<>(app.getLiveContainers());
sortContainers(liveContainers);
for (RMContainer c : liveContainers) {
if (resToObtainByPartition.isEmpty()) {
return;
}
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedContainers)) {
continue;
}
// Skip already marked to killable containers
if (null != preemptionContext.getKillableContainers() && preemptionContext
.getKillableContainers().contains(c.getContainerId())) {
continue;
}
// Skip AM Container from preemption for now.
if (c.isAMContainer()) {
skippedAMContainerlist.add(c);
Resources.addTo(skippedAMSize, c.getAllocatedResource());
continue;
}
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedContainers, totalPreemptionAllowed);
}
}
}