blob: 5c3fd43d99cd20936b5a3e93e17a0315b1a98abb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
implements SchedulingPlacementSet<N> {
private static final Log LOG =
LogFactory.getLog(LocalitySchedulingPlacementSet.class);
private final Map<String, ResourceRequest> resourceRequestMap =
new ConcurrentHashMap<>();
private AppSchedulingInfo appSchedulingInfo;
private volatile String primaryRequestedPartition =
RMNodeLabelsManager.NO_LABEL;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
this.appSchedulingInfo = info;
}
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
PlacementSet<N> clusterPlacementSet) {
// Now only handle the case that single node in placementSet
// TODO, Add support to multi-hosts inside placement-set which is passed in.
N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
if (null != singleNode) {
return IteratorUtils.singletonIterator(singleNode);
}
return IteratorUtils.emptyIterator();
}
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
ResourceRequest requestTwo) {
String requestOneLabelExp = requestOne.getNodeLabelExpression();
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
// First request label expression can be null and second request
// is not null then we have to consider it as changed.
if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
return true;
}
// If the label is not matching between both request when
// requestOneLabelExp is not null.
return ((null != requestOneLabelExp) && !(requestOneLabelExp
.equals(requestTwoLabelExp)));
}
private void updateNodeLabels(ResourceRequest request) {
String resourceName = request.getResourceName();
if (resourceName.equals(ResourceRequest.ANY)) {
ResourceRequest previousAnyRequest =
getResourceRequest(resourceName);
// When there is change in ANY request label expression, we should
// update label for all resource requests already added of same
// priority as ANY resource request.
if ((null == previousAnyRequest) || hasRequestLabelChanged(
previousAnyRequest, request)) {
for (ResourceRequest r : resourceRequestMap.values()) {
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
r.setNodeLabelExpression(request.getNodeLabelExpression());
}
}
}
} else{
ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY);
if (anyRequest != null) {
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
}
}
}
@Override
public ResourceRequestUpdateResult updateResourceRequests(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
try {
this.writeLock.lock();
ResourceRequestUpdateResult updateResult = null;
// Update resource requests
for (ResourceRequest request : requests) {
String resourceName = request.getResourceName();
// Update node labels if required
updateNodeLabels(request);
// Increment number of containers if recovering preempted resources
ResourceRequest lastRequest = resourceRequestMap.get(resourceName);
if (recoverPreemptedRequestForAContainer && lastRequest != null) {
request.setNumContainers(lastRequest.getNumContainers() + 1);
}
// Update asks
resourceRequestMap.put(resourceName, request);
if (resourceName.equals(ResourceRequest.ANY)) {
String partition = request.getNodeLabelExpression() == null ?
RMNodeLabelsManager.NO_LABEL :
request.getNodeLabelExpression();
this.primaryRequestedPartition = partition;
//update the applications requested labels set
appSchedulingInfo.addRequestedPartition(partition);
updateResult = new ResourceRequestUpdateResult(lastRequest, request);
}
}
return updateResult;
} finally {
this.writeLock.unlock();
}
}
@Override
public Map<String, ResourceRequest> getResourceRequests() {
return resourceRequestMap;
}
private ResourceRequest getResourceRequest(String resourceName) {
return resourceRequestMap.get(resourceName);
}
@Override
public PendingAsk getPendingAsk(String resourceName) {
try {
readLock.lock();
ResourceRequest request = getResourceRequest(resourceName);
if (null == request) {
return PendingAsk.ZERO;
} else{
return new PendingAsk(request.getCapability(),
request.getNumContainers());
}
} finally {
readLock.unlock();
}
}
@Override
public int getOutstandingAsksCount(String resourceName) {
try {
readLock.lock();
ResourceRequest request = getResourceRequest(resourceName);
if (null == request) {
return 0;
} else{
return request.getNumContainers();
}
} finally {
readLock.unlock();
}
}
private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
offSwitchRequest.setNumContainers(numOffSwitchContainers);
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey);
appSchedulingInfo.checkForDeactivation();
resourceRequestMap.remove(ResourceRequest.ANY);
if (resourceRequestMap.isEmpty()) {
appSchedulingInfo.removePlacementSets(schedulerRequestKey);
}
}
appSchedulingInfo.decPendingResource(
offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
}
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest = ResourceRequest.clone(request);
newRequest.setNumContainers(1);
return newRequest;
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateRackLocal(SchedulerRequestKey schedulerKey,
SchedulerNode node, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getRackName(), rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(
ResourceRequest.ANY);
decrementOutstanding(schedulerKey, offRackRequest);
// Update cloned RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateOffSwitch(SchedulerRequestKey schedulerKey,
ResourceRequest offSwitchRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(schedulerKey, offSwitchRequest);
// Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateNodeLocal(SchedulerRequestKey schedulerKey,
SchedulerNode node, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getNodeName(), nodeLocalRequest);
ResourceRequest rackLocalRequest = resourceRequestMap.get(
node.getRackName());
decResourceRequest(node.getRackName(), rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(
ResourceRequest.ANY);
decrementOutstanding(schedulerKey, offRackRequest);
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
resourceRequests.add(cloneResourceRequest(offRackRequest));
}
private void decResourceRequest(String resourceName,
ResourceRequest request) {
request.setNumContainers(request.getNumContainers() - 1);
if (request.getNumContainers() == 0) {
resourceRequestMap.remove(resourceName);
}
}
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
try {
readLock.lock();
ResourceRequest r = resourceRequestMap.get(
ResourceRequest.ANY);
if (r == null || r.getNumContainers() <= 0) {
return false;
}
if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
r = resourceRequestMap.get(node.getRackName());
if (r == null || r.getNumContainers() <= 0) {
return false;
}
if (type == NodeType.NODE_LOCAL) {
r = resourceRequestMap.get(node.getNodeName());
if (r == null || r.getNumContainers() <= 0) {
return false;
}
}
}
return true;
} finally {
readLock.unlock();
}
}
@Override
public boolean canDelayTo(String resourceName) {
try {
readLock.lock();
ResourceRequest request = getResourceRequest(resourceName);
return request == null || request.getRelaxLocality();
} finally {
readLock.unlock();
}
}
@Override
public boolean acceptNodePartition(String nodePartition,
SchedulingMode schedulingMode) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
String nodePartitionToLookAt;
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
nodePartitionToLookAt = nodePartition;
} else {
nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
}
return primaryRequestedPartition.equals(nodePartitionToLookAt);
}
@Override
public String getPrimaryRequestedNodePartition() {
return primaryRequestedPartition;
}
@Override
public int getUniqueLocationAsks() {
return resourceRequestMap.size();
}
@Override
public void showRequests() {
for (ResourceRequest request : resourceRequestMap.values()) {
if (request.getNumContainers() > 0) {
LOG.debug("\tRequest=" + request);
}
}
}
@Override
public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node) {
try {
writeLock.lock();
List<ResourceRequest> resourceRequests = new ArrayList<>();
ResourceRequest request;
if (type == NodeType.NODE_LOCAL) {
request = resourceRequestMap.get(node.getNodeName());
} else if (type == NodeType.RACK_LOCAL) {
request = resourceRequestMap.get(node.getRackName());
} else{
request = resourceRequestMap.get(ResourceRequest.ANY);
}
if (type == NodeType.NODE_LOCAL) {
allocateNodeLocal(schedulerKey, node, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
allocateRackLocal(schedulerKey, node, request, resourceRequests);
} else{
allocateOffSwitch(schedulerKey, request, resourceRequests);
}
return resourceRequests;
} finally {
writeLock.unlock();
}
}
@Override
public Iterator<String> getAcceptedResouceNames() {
try {
readLock.lock();
return resourceRequestMap.keySet().iterator();
} finally {
readLock.unlock();
}
}
}