blob: b1b340269d87f040e9259d05ae15e0cb4811cc10 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>
* This class has the following functionality:
* 1) Keeps track of pending resource requests when following events happen:
* - New ResourceRequests are added to scheduler.
* - New containers get allocated.
*
* 2) Determines the order that the nodes given in the {@link CandidateNodeSet}
* will be used for allocating containers.
* </p>
*
* <p>
* And different set of resource requests (E.g., resource requests with the
* same schedulerKey) can have one instance of AppPlacementAllocator, each
* AppPlacementAllocator can have different ways to order nodes depends on
* requests.
* </p>
*/
public abstract class AppPlacementAllocator<N extends SchedulerNode> {
protected AppSchedulingInfo appSchedulingInfo;
protected SchedulerRequestKey schedulerRequestKey;
protected RMContext rmContext;
private AtomicInteger placementAttempt = new AtomicInteger(0);
private MultiNodeSortingManager<N> multiNodeSortingManager = null;
private String multiNodeSortPolicyName;
private static final Logger LOG =
LoggerFactory.getLogger(AppPlacementAllocator.class);
/**
* Get iterator of preferred node depends on requirement and/or availability.
* @param candidateNodeSet input CandidateNodeSet
* @return iterator of preferred node
*/
public Iterator<N> getPreferredNodeIterator(
CandidateNodeSet<N> candidateNodeSet) {
// Now only handle the case that single node in the candidateNodeSet
// TODO, Add support to multi-hosts inside candidateNodeSet which is passed
// in.
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
if (singleNode != null) {
return IteratorUtils.singletonIterator(singleNode);
}
// singleNode will be null if Multi-node placement lookup is enabled, and
// hence could consider sorting policies.
return multiNodeSortingManager.getMultiNodeSortIterator(
candidateNodeSet.getAllNodes().values(),
candidateNodeSet.getPartition(),
multiNodeSortPolicyName);
}
/**
* Replace existing pending asks by the new requests
*
* @param requests new asks
* @param recoverPreemptedRequestForAContainer if we're recovering resource
* requests for preempted container
* @return true if total pending resource changed
*/
public abstract PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer);
/**
* Replace existing pending asks by the new SchedulingRequest
*
* @param schedulerRequestKey scheduler request key
* @param schedulingRequest new asks
* @param recoverPreemptedRequestForAContainer if we're recovering resource
* requests for preempted container
* @return true if total pending resource changed
*/
public abstract PendingAskUpdateResult updatePendingAsk(
SchedulerRequestKey schedulerRequestKey,
SchedulingRequest schedulingRequest,
boolean recoverPreemptedRequestForAContainer);
/**
* Get pending ResourceRequests by given schedulerRequestKey
* @return Map of resourceName to ResourceRequest
*/
public abstract Map<String, ResourceRequest> getResourceRequests();
/**
* Get pending ask for given resourceName. If there's no such pendingAsk,
* returns {@link PendingAsk#ZERO}
*
* @param resourceName resourceName
* @return PendingAsk
*/
public abstract PendingAsk getPendingAsk(String resourceName);
/**
* Get #pending-allocations for given resourceName. If there's no such
* pendingAsk, returns 0
*
* @param resourceName resourceName
* @return #pending-allocations
*/
public abstract int getOutstandingAsksCount(String resourceName);
/**
* Notify container allocated.
* @param schedulerKey SchedulerRequestKey for this ResourceRequest
* @param type Type of the allocation
* @param node Which node this container allocated on
* @return ContainerRequest which include resource requests associated with
* the container. This will be used by scheduler to recover requests.
* Please refer to {@link ContainerRequest} for more details.
*/
public abstract ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node);
/**
* We can still have pending requirement for a given NodeType and node
* @param type Locality Type
* @param node which node we will allocate on
* @return true if we has pending requirement
*/
public abstract boolean canAllocate(NodeType type, SchedulerNode node);
/**
* Can delay to give locality?
* TODO: This should be moved out of AppPlacementAllocator
* and should belong to specific delay scheduling policy impl.
* See YARN-7457 for more details.
*
* @param resourceName resourceName
* @return can/cannot
*/
public abstract boolean canDelayTo(String resourceName);
/**
* Does this {@link AppPlacementAllocator} accept resources on given node?
*
* @param schedulerNode schedulerNode
* @param schedulingMode schedulingMode
* @param dcOpt optional diagnostics collector
* @return accepted/not
*/
public abstract boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode,
Optional<DiagnosticsCollector> dcOpt);
public abstract boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode);
/**
* It is possible that one request can accept multiple node partition,
* So this method returns primary node partition for pending resource /
* headroom calculation.
*
* @return primary requested node partition
*/
public abstract String getPrimaryRequestedNodePartition();
/**
* @return number of unique location asks with #pending greater than 0,
* (like /rack1, host1, etc.).
*
* TODO: This should be moved out of AppPlacementAllocator
* and should belong to specific delay scheduling policy impl.
* See YARN-7457 for more details.
*/
public abstract int getUniqueLocationAsks();
/**
* Print human-readable requests to LOG debug.
*/
public abstract void showRequests();
/**
* Initialize this allocator, this will be called by Factory automatically.
*
* @param appSchedulingInfo appSchedulingInfo
* @param schedulerRequestKey schedulerRequestKey
* @param rmContext rmContext
*/
public void initialize(AppSchedulingInfo appSchedulingInfo,
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
this.appSchedulingInfo = appSchedulingInfo;
this.rmContext = rmContext;
this.schedulerRequestKey = schedulerRequestKey;
multiNodeSortPolicyName = appSchedulingInfo
.getApplicationSchedulingEnvs().get(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
.getMultiNodeSortingManager();
if (LOG.isDebugEnabled()) {
LOG.debug(
"nodeLookupPolicy used for " + appSchedulingInfo.getApplicationId()
+ " is " + ((multiNodeSortPolicyName != null)
? multiNodeSortPolicyName : ""));
}
}
/**
* Get pending SchedulingRequest.
* @return SchedulingRequest
*/
public abstract SchedulingRequest getSchedulingRequest();
public int getPlacementAttempt() {
return placementAttempt.get();
}
public void incrementPlacementAttempt() {
placementAttempt.getAndIncrement();
}
}