blob: 4b6700a61e32388be0710e648f0c944e8b2a6f6b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class QueuePriorityContainerCandidateSelector
extends PreemptionCandidatesSelector {
private static final Log LOG =
LogFactory.getLog(QueuePriorityContainerCandidateSelector.class);
// Configured timeout before doing reserved container preemption
private long minTimeout;
// Allow move reservation around for better placement?
private boolean allowMoveReservation;
// All the reserved containers of the system which could possible preempt from
// queue with lower priorities
private List<RMContainer> reservedContainers;
// From -> To
// A digraph to represent if one queue has higher priority than another.
// For example, a->b means queue=a has higher priority than queue=b
private Table<String, String, Boolean> priorityDigraph =
HashBasedTable.create();
private Resource clusterResource;
private Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates;
private Resource totalPreemptionAllowed;
// A cached scheduler node map, will be refreshed each round.
private Map<NodeId, TempSchedulerNode> tempSchedulerNodeMap = new HashMap<>();
// Have we touched (make any changes to the node) for this round
// Once a node is touched, we will not try to move reservations to the node
private Set<NodeId> touchedNodes;
// Resource which marked to preempt from other queues.
// <Queue, Partition, Resource-marked-to-be-preempted-from-other-queue>
private Table<String, String, Resource> toPreemptedFromOtherQueues =
HashBasedTable.create();
private final Comparator<RMContainer>
CONTAINER_CREATION_TIME_COMPARATOR = new Comparator<RMContainer>() {
@Override
public int compare(RMContainer o1, RMContainer o2) {
if (preemptionAllowed(o1.getQueueName(), o2.getQueueName())) {
return -1;
} else if (preemptionAllowed(o2.getQueueName(), o1.getQueueName())) {
return 1;
}
// If two queues cannot preempt each other, compare creation time.
return Long.compare(o1.getCreationTime(), o2.getCreationTime());
}
};
QueuePriorityContainerCandidateSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext);
// Initialize parameters
CapacitySchedulerConfiguration csc =
preemptionContext.getScheduler().getConfiguration();
minTimeout = csc.getPUOrderingPolicyUnderUtilizedPreemptionDelay();
allowMoveReservation =
csc.getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation();
}
private List<TempQueuePerPartition> getPathToRoot(TempQueuePerPartition tq) {
List<TempQueuePerPartition> list = new ArrayList<>();
while (tq != null) {
list.add(tq);
tq = tq.parent;
}
return list;
}
private void intializePriorityDigraph() {
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing priority preemption directed graph:");
}
// Make sure we iterate all leaf queue combinations
for (String q1 : preemptionContext.getLeafQueueNames()) {
for (String q2 : preemptionContext.getLeafQueueNames()) {
// Make sure we only calculate each combination once instead of all
// permutations
if (q1.compareTo(q2) < 0) {
TempQueuePerPartition tq1 = preemptionContext.getQueueByPartition(q1,
RMNodeLabelsManager.NO_LABEL);
TempQueuePerPartition tq2 = preemptionContext.getQueueByPartition(q2,
RMNodeLabelsManager.NO_LABEL);
List<TempQueuePerPartition> path1 = getPathToRoot(tq1);
List<TempQueuePerPartition> path2 = getPathToRoot(tq2);
// Get direct ancestor below LCA (Lowest common ancestor)
int i = path1.size() - 1;
int j = path2.size() - 1;
while (path1.get(i).queueName.equals(path2.get(j).queueName)) {
i--;
j--;
}
// compare priority of path1[i] and path2[j]
int p1 = path1.get(i).relativePriority;
int p2 = path2.get(j).relativePriority;
if (p1 < p2) {
priorityDigraph.put(q2, q1, true);
if (LOG.isDebugEnabled()) {
LOG.debug("- Added priority ordering edge: " + q2 + " >> " + q1);
}
} else if (p2 < p1) {
priorityDigraph.put(q1, q2, true);
if (LOG.isDebugEnabled()) {
LOG.debug("- Added priority ordering edge: " + q1 + " >> " + q2);
}
}
}
}
}
}
/**
* Do we allow demandingQueue preempt resource from toBePreemptedQueue
*
* @param demandingQueue demandingQueue
* @param toBePreemptedQueue toBePreemptedQueue
* @return can/cannot
*/
private boolean preemptionAllowed(String demandingQueue,
String toBePreemptedQueue) {
return priorityDigraph.contains(demandingQueue,
toBePreemptedQueue);
}
/**
* Can we preempt enough resource for given:
*
* @param requiredResource askedResource
* @param demandingQueue demandingQueue
* @param schedulerNode node
* @param lookingForNewReservationPlacement Are we trying to look for move
* reservation to the node
* @param newlySelectedContainers newly selected containers, will be set when
* we can preempt enough resources from the node.
*
* @return can/cannot
*/
private boolean canPreemptEnoughResourceForAsked(Resource requiredResource,
String demandingQueue, FiCaSchedulerNode schedulerNode,
boolean lookingForNewReservationPlacement,
List<RMContainer> newlySelectedContainers) {
// Do not check touched nodes again.
if (touchedNodes.contains(schedulerNode.getNodeID())) {
return false;
}
TempSchedulerNode node = tempSchedulerNodeMap.get(schedulerNode.getNodeID());
if (null == node) {
node = TempSchedulerNode.fromSchedulerNode(schedulerNode);
tempSchedulerNodeMap.put(schedulerNode.getNodeID(), node);
}
if (null != schedulerNode.getReservedContainer()
&& lookingForNewReservationPlacement) {
// Node reserved by the others, skip this node
// We will not try to move the reservation to node which reserved already.
return false;
}
// Need to preemption = asked - (node.total - node.allocated)
Resource lacking = Resources.subtract(requiredResource, Resources
.subtract(node.getTotalResource(), node.getAllocatedResource()));
// On each host, simply check if we could preempt containers from
// lower-prioritized queues or not
List<RMContainer> runningContainers = node.getRunningContainers();
Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR);
// First of all, consider already selected containers
for (RMContainer runningContainer : runningContainers) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(
runningContainer, selectedCandidates)) {
Resources.subtractFrom(lacking,
runningContainer.getAllocatedResource());
}
}
// If we already can allocate the reserved container after preemption,
// skip following steps
if (Resources.fitsIn(rc, clusterResource, lacking,
Resources.none())) {
return true;
}
Resource allowed = Resources.clone(totalPreemptionAllowed);
Resource selected = Resources.createResource(0);
for (RMContainer runningContainer : runningContainers) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(
runningContainer, selectedCandidates)) {
// ignore selected containers
continue;
}
// Only preempt resource from queue with lower priority
if (!preemptionAllowed(demandingQueue,
runningContainer.getQueueName())) {
continue;
}
// Don't preempt AM container
if (runningContainer.isAMContainer()) {
continue;
}
// Not allow to preempt more than limit
if (Resources.greaterThanOrEqual(rc, clusterResource, allowed,
runningContainer.getAllocatedResource())) {
Resources.subtractFrom(allowed,
runningContainer.getAllocatedResource());
Resources.subtractFrom(lacking,
runningContainer.getAllocatedResource());
Resources.addTo(selected, runningContainer.getAllocatedResource());
if (null != newlySelectedContainers) {
newlySelectedContainers.add(runningContainer);
}
}
// Lacking <= 0 means we can allocate the reserved container
if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) {
return true;
}
}
return false;
}
private boolean preChecksForMovingReservedContainerToNode(
RMContainer reservedContainer, FiCaSchedulerNode newNode) {
// Don't do this if it has hard-locality preferences
if (reservedContainer.getReservedSchedulerKey().getContainerToUpdate()
!= null) {
// This means a container update request (like increase / promote)
return false;
}
// For normal requests
FiCaSchedulerApp app =
preemptionContext.getScheduler().getApplicationAttempt(
reservedContainer.getApplicationAttemptId());
if (!app.getAppSchedulingInfo().canDelayTo(
reservedContainer.getReservedSchedulerKey(), ResourceRequest.ANY)) {
// This is a hard locality request
return false;
}
// Check if newNode's partition matches requested partition
if (!StringUtils.equals(reservedContainer.getNodeLabelExpression(),
newNode.getPartition())) {
return false;
}
return true;
}
private void tryToMakeBetterReservationPlacement(
RMContainer reservedContainer,
List<FiCaSchedulerNode> allSchedulerNodes) {
for (FiCaSchedulerNode targetNode : allSchedulerNodes) {
// Precheck if we can move the rmContainer to the new targetNode
if (!preChecksForMovingReservedContainerToNode(reservedContainer,
targetNode)) {
continue;
}
if (canPreemptEnoughResourceForAsked(
reservedContainer.getReservedResource(),
reservedContainer.getQueueName(), targetNode, true, null)) {
NodeId fromNode = reservedContainer.getNodeId();
// We can place container to this targetNode, so just go ahead and notify
// scheduler
if (preemptionContext.getScheduler().moveReservedContainer(
reservedContainer, targetNode)) {
LOG.info("Successfully moved reserved container=" + reservedContainer
.getContainerId() + " from targetNode=" + fromNode
+ " to targetNode=" + targetNode.getNodeID());
touchedNodes.add(targetNode.getNodeID());
}
}
}
}
/**
* Do we allow the demanding queue preempt resource from other queues?
* A satisfied queue is not allowed to preempt resource from other queues.
* @param demandingQueue
* @return allowed/not
*/
private boolean isQueueSatisfied(String demandingQueue,
String partition) {
TempQueuePerPartition tq = preemptionContext.getQueueByPartition(
demandingQueue, partition);
if (null == tq) {
return false;
}
Resource guaranteed = tq.getGuaranteed();
Resource usedDeductReservd = Resources.subtract(tq.getUsed(),
tq.getReserved());
Resource markedToPreemptFromOtherQueue = toPreemptedFromOtherQueues.get(
demandingQueue, partition);
if (null == markedToPreemptFromOtherQueue) {
markedToPreemptFromOtherQueue = Resources.none();
}
// return Used - reserved + to-preempt-from-other-queue >= guaranteed
boolean flag = Resources.greaterThanOrEqual(rc, clusterResource,
Resources.add(usedDeductReservd, markedToPreemptFromOtherQueue),
guaranteed);
return flag;
}
private void incToPreempt(String queue, String partition,
Resource allocated) {
Resource total = toPreemptedFromOtherQueues.get(queue, partition);
if (null == total) {
total = Resources.createResource(0);
toPreemptedFromOtherQueues.put(queue, partition, total);
}
Resources.addTo(total, allocated);
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource,
Resource totalPreemptedResourceAllowed) {
// Initialize digraph from queues
// TODO (wangda): only do this when queue refreshed.
priorityDigraph.clear();
intializePriorityDigraph();
// When all queues are set to same priority, or priority is not respected,
// direct return.
if (priorityDigraph.isEmpty()) {
return selectedCandidates;
}
// Save parameters to be shared by other methods
this.selectedCandidates = selectedCandidates;
this.clusterResource = clusterResource;
this.totalPreemptionAllowed = totalPreemptedResourceAllowed;
toPreemptedFromOtherQueues.clear();
reservedContainers = new ArrayList<>();
// Clear temp-scheduler-node-map every time when doing selection of
// containers.
tempSchedulerNodeMap.clear();
touchedNodes = new HashSet<>();
// Add all reserved containers for analysis
List<FiCaSchedulerNode> allSchedulerNodes =
preemptionContext.getScheduler().getAllNodes();
for (FiCaSchedulerNode node : allSchedulerNodes) {
RMContainer reservedContainer = node.getReservedContainer();
if (null != reservedContainer) {
// Add to reservedContainers list if the queue that the reserved
// container belongs to has high priority than at least one queue
if (priorityDigraph.containsRow(
reservedContainer.getQueueName())) {
reservedContainers.add(reservedContainer);
}
}
}
// Sort reserved container by creation time
Collections.sort(reservedContainers, CONTAINER_CREATION_TIME_COMPARATOR);
long currentTime = System.currentTimeMillis();
// From the beginning of the list
for (RMContainer reservedContainer : reservedContainers) {
// Only try to preempt reserved container after reserved container created
// and cannot be allocated after minTimeout
if (currentTime - reservedContainer.getCreationTime() < minTimeout) {
continue;
}
FiCaSchedulerNode node = preemptionContext.getScheduler().getNode(
reservedContainer.getReservedNode());
if (null == node) {
// Something is wrong, ignore
continue;
}
List<RMContainer> newlySelectedToBePreemptContainers = new ArrayList<>();
// Check if we can preempt for this queue
// We will skip if the demanding queue is already satisfied.
String demandingQueueName = reservedContainer.getQueueName();
boolean demandingQueueSatisfied = isQueueSatisfied(demandingQueueName,
node.getPartition());
// We will continue check if it is possible to preempt reserved container
// from the node.
boolean canPreempt = false;
if (!demandingQueueSatisfied) {
canPreempt = canPreemptEnoughResourceForAsked(
reservedContainer.getReservedResource(), demandingQueueName, node,
false, newlySelectedToBePreemptContainers);
}
// Add selected container if we can allocate reserved container by
// preemption others
if (canPreempt) {
touchedNodes.add(node.getNodeID());
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to preempt following containers to make reserved "
+ "container=" + reservedContainer.getContainerId() + " on node="
+ node.getNodeID() + " can be allocated:");
}
// Update to-be-preempt
incToPreempt(demandingQueueName, node.getPartition(),
reservedContainer.getReservedResource());
for (RMContainer c : newlySelectedToBePreemptContainers) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --container=" + c.getContainerId() + " resource=" + c
.getReservedResource());
}
Set<RMContainer> containers = selectedCandidates.get(
c.getApplicationAttemptId());
if (null == containers) {
containers = new HashSet<>();
selectedCandidates.put(c.getApplicationAttemptId(), containers);
}
containers.add(c);
// Update totalPreemptionResourceAllowed
Resources.subtractFrom(totalPreemptedResourceAllowed,
c.getAllocatedResource());
}
} else if (!demandingQueueSatisfied) {
// We failed to get enough resource to allocate the container
// This typically happens when the reserved node is proper, will
// try to see if we can reserve the container on a better host.
// Only do this if the demanding queue is not satisfied.
//
// TODO (wangda): do more tests before making it usable
//
if (allowMoveReservation) {
tryToMakeBetterReservationPlacement(reservedContainer,
allSchedulerNodes);
}
}
}
return selectedCandidates;
}
}