blob: 3e580735e2b5b2800ed9b26651dfe1b434ac51cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Identifies over utilized resources within a queue and tries to normalize
* them to resolve resource allocation anomalies w.r.t priority and user-limit.
*/
public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
@SuppressWarnings("serial")
static class TAPriorityComparator
implements
Serializable,
Comparator<TempAppPerPartition> {
@Override
public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
Priority p1 = Priority.newInstance(ta1.getPriority());
Priority p2 = Priority.newInstance(ta2.getPriority());
if (!p1.equals(p2)) {
return p1.compareTo(p2);
}
return ta1.getApplicationId().compareTo(ta2.getApplicationId());
}
}
/*
* Order first by amount used from least to most. Then order from oldest to
* youngest if amount used is the same.
*/
static class TAFairOrderingComparator
implements Comparator<TempAppPerPartition> {
private ResourceCalculator rc;
private Resource clusterRes;
TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) {
this.rc = rc;
this.clusterRes = clusterRes;
}
@Override
public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
if (ta1.getUser().equals(ta2.getUser())) {
AbstractComparatorOrderingPolicy<FiCaSchedulerApp> acop =
(AbstractComparatorOrderingPolicy<FiCaSchedulerApp>)
ta1.getFiCaSchedulerApp().getCSLeafQueue().getOrderingPolicy();
return acop.getComparator()
.compare(ta1.getFiCaSchedulerApp(), ta2.getFiCaSchedulerApp());
} else {
Resource usedByUser1 = ta1.getTempUserPerPartition().getUsedDeductAM();
Resource usedByUser2 = ta2.getTempUserPerPartition().getUsedDeductAM();
if (Resources.equals(usedByUser1, usedByUser2)) {
return ta1.getApplicationId().compareTo(ta2.getApplicationId());
}
if (Resources.lessThan(rc, clusterRes, usedByUser1, usedByUser2)) {
return -1;
} else {
return 1;
}
}
}
}
IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
final CapacitySchedulerPreemptionContext context;
private static final Log LOG =
LogFactory.getLog(IntraQueueCandidatesSelector.class);
IntraQueueCandidatesSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext);
fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc,
preemptionContext);
context = preemptionContext;
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed) {
// 1. Calculate the abnormality within each queue one by one.
computeIntraQueuePreemptionDemand(
clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
// 2. Previous selectors (with higher priority) could have already
// selected containers. We need to deduct pre-emptable resources
// based on already selected candidates.
CapacitySchedulerPreemptionUtils
.deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
selectedCandidates);
// 3. Loop through all partitions to select containers for preemption.
for (String partition : preemptionContext.getAllPartitions()) {
LinkedHashSet<String> queueNames = preemptionContext
.getUnderServedQueuesPerPartition(partition);
// Error check to handle non-mapped labels to queue.
if (null == queueNames) {
continue;
}
// 4. Iterate from most under-served queue in order.
for (String queueName : queueNames) {
LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).leafQueue;
// skip if not a leafqueue
if (null == leafQueue) {
continue;
}
// Don't preempt if intra-queue preemption is disabled for this queue.
if (leafQueue.getIntraQueuePreemptionDisabled()) {
continue;
}
// 5. Calculate the resource to obtain per partition
Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
.getResourceDemandFromAppsPerQueue(queueName, partition);
// Default preemption iterator considers only FIFO+priority. For
// userlimit preemption, its possible that some lower priority apps
// needs from high priority app of another user. Hence use apps
// ordered by userlimit starvation as well.
Collection<FiCaSchedulerApp> apps = fifoPreemptionComputePlugin
.getPreemptableApps(queueName, partition);
// 6. Get user-limit to ensure that we do not preempt resources which
// will force user's resource to come under its UL.
Map<String, Resource> rollingResourceUsagePerUser = new HashMap<>();
initializeUsageAndUserLimitForCompute(clusterResource, partition,
leafQueue, rollingResourceUsagePerUser);
// 7. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption.
try {
leafQueue.getReadLock().lock();
for (FiCaSchedulerApp app : apps) {
preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
clusterResource, totalPreemptedResourceAllowed,
resToObtainByPartition, rollingResourceUsagePerUser);
}
} finally {
leafQueue.getReadLock().unlock();
}
}
}
return selectedCandidates;
}
private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
String partition, LeafQueue leafQueue,
Map<String, Resource> rollingResourceUsagePerUser) {
for (String user : leafQueue.getAllUsers()) {
// Initialize used resource of a given user for rolling computation.
rollingResourceUsagePerUser.put(user, Resources.clone(
leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
if (LOG.isDebugEnabled()) {
LOG.debug("Rolling resource usage for user:" + user + " is : "
+ rollingResourceUsagePerUser.get(user));
}
}
}
private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
FiCaSchedulerApp app,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed,
Map<String, Resource> resToObtainByPartition,
Map<String, Resource> rollingResourceUsagePerUser) {
// ToDo: Reuse reservation selector here.
List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
sortContainers(liveContainers);
if (LOG.isDebugEnabled()) {
LOG.debug(
"totalPreemptedResourceAllowed for preemption at this round is :"
+ totalPreemptedResourceAllowed);
}
Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser
.get(app.getUser());
for (RMContainer c : liveContainers) {
// if there are no demand, return.
if (resToObtainByPartition.isEmpty()) {
return;
}
// skip preselected containers.
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedCandidates)) {
continue;
}
// Skip already marked to killable containers
if (null != preemptionContext.getKillableContainers() && preemptionContext
.getKillableContainers().contains(c.getContainerId())) {
continue;
}
// Skip AM Container from preemption for now.
if (c.isAMContainer()) {
continue;
}
// If selected container brings down resource usage under its user's
// UserLimit (or equals to), we must skip such containers.
if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app,
clusterResource, rollingUsedResourcePerUser, c)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Skipping container: " + c.getContainerId() + " with resource:"
+ c.getAllocatedResource() + " as UserLimit for user:"
+ app.getUser() + " with resource usage: "
+ rollingUsedResourcePerUser + " is going under UL");
}
break;
}
// Try to preempt this container
boolean ret = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, selectedCandidates,
totalPreemptedResourceAllowed);
// Subtract from respective user's resource usage once a container is
// selected for preemption.
if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy()
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
Resources.subtractFrom(rollingUsedResourcePerUser,
c.getAllocatedResource());
}
}
}
private void computeIntraQueuePreemptionDemand(Resource clusterResource,
Resource totalPreemptedResourceAllowed,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
// 1. Iterate through all partition to calculate demand within a partition.
for (String partition : context.getAllPartitions()) {
LinkedHashSet<String> queueNames = context
.getUnderServedQueuesPerPartition(partition);
if (null == queueNames) {
continue;
}
// 2. loop through all queues corresponding to a partition.
for (String queueName : queueNames) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition);
LeafQueue leafQueue = tq.leafQueue;
// skip if its parent queue
if (null == leafQueue) {
continue;
}
// 3. Consider reassignableResource as (used - actuallyToBePreempted).
// This provides as upper limit to split apps quota in a queue.
Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
tq.getActuallyToBePreempted());
// 4. Check queue's used capacity. Make sure that the used capacity is
// above certain limit to consider for intra queue preemption.
if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
.getMinimumThresholdForIntraQueuePreemption()) {
continue;
}
// 5. compute the allocation of all apps based on queue's unallocated
// capacity
fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
tq, selectedCandidates, totalPreemptedResourceAllowed,
queueReassignableResource,
context.getMaxAllowableLimitForIntraQueuePreemption());
}
}
}
}