blob: de23d0a291cacb9fc232b295d98f5bbd6afbd063 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ReservedContainerCandidatesSelector
extends PreemptionCandidatesSelector {
private static final Log LOG =
LogFactory.getLog(ReservedContainerCandidatesSelector.class);
private PreemptableResourceCalculator preemptableAmountCalculator;
/**
* A temporary data structure to remember what to preempt on a node
*/
private static class NodeForPreemption {
private float preemptionCost;
private FiCaSchedulerNode schedulerNode;
private List<RMContainer> selectedContainers;
public NodeForPreemption(float preemptionCost,
FiCaSchedulerNode schedulerNode, List<RMContainer> selectedContainers) {
this.preemptionCost = preemptionCost;
this.schedulerNode = schedulerNode;
this.selectedContainers = selectedContainers;
}
}
ReservedContainerCandidatesSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext);
preemptableAmountCalculator = new PreemptableResourceCalculator(
preemptionContext, true);
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource,
Resource totalPreemptedResourceAllowed) {
// Calculate how much resources we need to preempt
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptedResourceAllowed);
// Get queue to preemptable resource by partition
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition =
new HashMap<>();
for (String leafQueue : preemptionContext.getLeafQueueNames()) {
queueToPreemptableResourceByPartition.put(leafQueue,
CapacitySchedulerPreemptionUtils
.getResToObtainByPartitionForLeafQueue(preemptionContext,
leafQueue, clusterResource));
}
// Get list of nodes for preemption, ordered by preemption cost
List<NodeForPreemption> nodesForPreemption = getNodesForPreemption(
clusterResource, queueToPreemptableResourceByPartition,
selectedCandidates, totalPreemptedResourceAllowed);
for (NodeForPreemption nfp : nodesForPreemption) {
RMContainer reservedContainer = nfp.schedulerNode.getReservedContainer();
if (null == reservedContainer) {
continue;
}
NodeForPreemption preemptionResult = getPreemptionCandidatesOnNode(
nfp.schedulerNode, clusterResource,
queueToPreemptableResourceByPartition, selectedCandidates,
totalPreemptedResourceAllowed, false);
if (null != preemptionResult) {
for (RMContainer c : preemptionResult.selectedContainers) {
ApplicationAttemptId appId = c.getApplicationAttemptId();
Set<RMContainer> containers = selectedCandidates.get(appId);
if (null == containers) {
containers = new HashSet<>();
selectedCandidates.put(appId, containers);
}
containers.add(c);
if (LOG.isDebugEnabled()) {
LOG.debug(this.getClass().getName() + " Marked container=" + c
.getContainerId() + " from queue=" + c.getQueueName()
+ " to be preemption candidates");
}
}
}
}
return selectedCandidates;
}
private Resource getPreemptableResource(String queueName,
String partitionName,
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition) {
Map<String, Resource> partitionToPreemptable =
queueToPreemptableResourceByPartition.get(queueName);
if (null == partitionToPreemptable) {
return null;
}
Resource preemptable = partitionToPreemptable.get(partitionName);
return preemptable;
}
private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
String partitionName,
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
Resource required, Resource totalPreemptionAllowed, boolean readOnly) {
Resource preemptable = getPreemptableResource(queueName, partitionName,
queueToPreemptableResourceByPartition);
if (null == preemptable) {
return false;
}
if (!Resources.fitsIn(rc, cluster, required, preemptable)) {
return false;
}
if (!Resources.fitsIn(rc, cluster, required, totalPreemptionAllowed)) {
return false;
}
if (!readOnly) {
Resources.subtractFrom(preemptable, required);
Resources.subtractFrom(totalPreemptionAllowed, required);
}
return true;
}
/**
* Try to check if we can preempt resources for reserved container in given node
* @param node
* @param cluster
* @param queueToPreemptableResourceByPartition it's a map of
* <queueName, <partition, preemptable-resource>>
* @param readOnly do we want to modify preemptable resource after we selected
* candidates
* @return NodeForPreemption if it's possible to preempt containers on the node
* to satisfy reserved resource
*/
private NodeForPreemption getPreemptionCandidatesOnNode(
FiCaSchedulerNode node, Resource cluster,
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptionAllowed, boolean readOnly) {
RMContainer reservedContainer = node.getReservedContainer();
Resource available = Resources.clone(node.getUnallocatedResource());
Resource totalSelected = Resources.createResource(0);
List<RMContainer> sortedRunningContainers =
node.getCopiedListOfRunningContainers();
List<RMContainer> selectedContainers = new ArrayList<>();
Map<ContainerId, RMContainer> killableContainers =
node.getKillableContainers();
// Sort running container by launch time, we preferred to preempt recent
// launched preempt container
Collections.sort(sortedRunningContainers, new Comparator<RMContainer>() {
@Override public int compare(RMContainer o1, RMContainer o2) {
return -1 * o1.getContainerId().compareTo(o2.getContainerId());
}
});
// First check: can we preempt containers to allocate the
// reservedContainer?
boolean canAllocateReservedContainer = false;
// At least, we can get available + killable resources from this node
Resource cur = Resources.add(available, node.getTotalKillableResources());
String partition = node.getPartition();
// Avoid preempt any container if required <= available + killable
if (Resources.fitsIn(rc, cluster, reservedContainer.getReservedResource(),
cur)) {
return null;
}
// Extra cost of am container preemption
float amPreemptionCost = 0f;
for (RMContainer c : sortedRunningContainers) {
String containerQueueName = c.getQueueName();
// Skip container if it is already marked killable
if (killableContainers.containsKey(c.getContainerId())) {
continue;
}
// An alternative approach is add a "penalty cost" if AM container is
// selected. Here for safety, avoid preempt AM container in any cases
if (c.isAMContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip selecting AM container on host=" + node.getNodeID()
+ " AM container=" + c.getContainerId());
}
continue;
}
// Can we preempt container c?
// Check if we have quota to preempt this container
boolean canPreempt = tryToPreemptFromQueue(cluster, containerQueueName,
partition, queueToPreemptableResourceByPartition,
c.getAllocatedResource(), totalPreemptionAllowed, readOnly);
// If we can, add to selected container, and change resource accordingly.
if (canPreempt) {
if (!CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedCandidates)) {
if (!readOnly) {
selectedContainers.add(c);
}
Resources.addTo(totalSelected, c.getAllocatedResource());
}
Resources.addTo(cur, c.getAllocatedResource());
if (Resources.fitsIn(rc, cluster,
reservedContainer.getReservedResource(), cur)) {
canAllocateReservedContainer = true;
break;
}
}
}
if (!canAllocateReservedContainer) {
if (!readOnly) {
// Revert queue preemption quotas
for (RMContainer c : selectedContainers) {
Resource res = getPreemptableResource(c.getQueueName(), partition,
queueToPreemptableResourceByPartition);
if (null == res) {
// This shouldn't happen in normal cases, one possible cause is
// container moved to different queue while executing preemption logic.
// Ignore such failures.
continue;
}
Resources.addTo(res, c.getAllocatedResource());
}
}
return null;
}
float ratio = Resources.ratio(rc, totalSelected,
reservedContainer.getReservedResource());
// Compute preemption score
NodeForPreemption nfp = new NodeForPreemption(ratio + amPreemptionCost,
node, selectedContainers);
return nfp;
}
private List<NodeForPreemption> getNodesForPreemption(Resource cluster,
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptionAllowed) {
List<NodeForPreemption> nfps = new ArrayList<>();
// get nodes have reserved container
for (FiCaSchedulerNode node : preemptionContext.getScheduler()
.getAllNodes()) {
if (node.getReservedContainer() != null) {
NodeForPreemption nfp = getPreemptionCandidatesOnNode(node, cluster,
queueToPreemptableResourceByPartition, selectedCandidates,
totalPreemptionAllowed, true);
if (null != nfp) {
// Null means we cannot preempt containers on the node to satisfy
// reserved container
nfps.add(nfp);
}
}
}
// Return sorted node-for-preemptions (by cost)
Collections.sort(nfps, new Comparator<NodeForPreemption>() {
@Override
public int compare(NodeForPreemption o1, NodeForPreemption o2) {
return Float.compare(o1.preemptionCost, o2.preemptionCost);
}
});
return nfps;
}
}