blob: 57f91478fe281b77d338e9cf2ac41720a0baf7dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
/**
* This is a simple implementation to do affinity or anti-affinity for
* inter/intra apps.
*/
public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
extends AppPlacementAllocator<N> {
private static final Logger LOG =
LoggerFactory.getLogger(SingleConstraintAppPlacementAllocator.class);
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
private SchedulingRequest schedulingRequest = null;
private String targetNodePartition;
private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
public SingleConstraintAppPlacementAllocator() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
public PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
if (requests != null && !requests.isEmpty()) {
throw new SchedulerInvalidResoureRequestException(
this.getClass().getName()
+ " not be able to handle ResourceRequest, there exists a "
+ "SchedulingRequest with the same scheduler key="
+ SchedulerRequestKey.create(requests.iterator().next())
+ ", please send ResourceRequest with a different allocationId and "
+ "priority");
}
// Do nothing
return null;
}
private PendingAskUpdateResult internalUpdatePendingAsk(
SchedulingRequest newSchedulingRequest, boolean recoverContainer) {
// When it is a recover container, there must exists an schedulingRequest.
if (recoverContainer && schedulingRequest == null) {
throw new SchedulerInvalidResoureRequestException("Trying to recover a "
+ "container request=" + newSchedulingRequest.toString() + ", however"
+ "there's no existing scheduling request, this should not happen.");
}
if (schedulingRequest != null) {
// If we have an old scheduling request, we will make sure that no changes
// made except sizing.
// To avoid unnecessary copy of the data structure, we do this by
// replacing numAllocations with old numAllocations in the
// newSchedulingRequest#getResourceSizing, and compare the two objects.
ResourceSizing sizing = newSchedulingRequest.getResourceSizing();
int existingNumAllocations =
schedulingRequest.getResourceSizing().getNumAllocations();
// When it is a recovered container request, just set
// #newAllocations = #existingAllocations + 1;
int newNumAllocations;
if (recoverContainer) {
newNumAllocations = existingNumAllocations + 1;
} else {
newNumAllocations = sizing.getNumAllocations();
}
sizing.setNumAllocations(existingNumAllocations);
// Compare two objects
if (!schedulingRequest.equals(newSchedulingRequest)) {
// Rollback #numAllocations
sizing.setNumAllocations(newNumAllocations);
throw new SchedulerInvalidResoureRequestException(
"Invalid updated SchedulingRequest added to scheduler, "
+ " we only allows changing numAllocations for the updated "
+ "SchedulingRequest. Old=" + schedulingRequest.toString()
+ " new=" + newSchedulingRequest.toString()
+ ", if any fields need to be updated, please cancel the "
+ "old request (by setting numAllocations to 0) and send a "
+ "SchedulingRequest with different combination of "
+ "priority/allocationId");
} else {
if (newNumAllocations == existingNumAllocations) {
// No update on pending asks, return null.
return null;
}
}
// Rollback #numAllocations
sizing.setNumAllocations(newNumAllocations);
// Basic sanity check
if (newNumAllocations < 0) {
throw new SchedulerInvalidResoureRequestException(
"numAllocation in ResourceSizing field must be >= 0, "
+ "updating schedulingRequest failed.");
}
PendingAskUpdateResult updateResult = new PendingAskUpdateResult(
new PendingAsk(schedulingRequest.getResourceSizing()),
new PendingAsk(newSchedulingRequest.getResourceSizing()),
targetNodePartition, targetNodePartition);
// Ok, now everything is same except numAllocation, update numAllocation.
this.schedulingRequest.getResourceSizing().setNumAllocations(
newNumAllocations);
LOG.info(
"Update numAllocation from old=" + existingNumAllocations + " to new="
+ newNumAllocations);
return updateResult;
}
// For a new schedulingRequest, we need to validate if we support its asks.
// This will update internal partitions, etc. after the SchedulingRequest is
// valid.
validateAndSetSchedulingRequest(newSchedulingRequest);
return new PendingAskUpdateResult(null,
new PendingAsk(newSchedulingRequest.getResourceSizing()), null,
targetNodePartition);
}
@Override
public PendingAskUpdateResult updatePendingAsk(
SchedulerRequestKey schedulerRequestKey,
SchedulingRequest newSchedulingRequest,
boolean recoverPreemptedRequestForAContainer) {
writeLock.lock();
try {
return internalUpdatePendingAsk(newSchedulingRequest,
recoverPreemptedRequestForAContainer);
} finally {
writeLock.unlock();
}
}
private String throwExceptionWithMetaInfo(String message) {
StringBuilder sb = new StringBuilder();
sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append(
" Key=").append(this.schedulerRequestKey).append(". Exception message:")
.append(message);
throw new SchedulerInvalidResoureRequestException(sb.toString());
}
private void validateAndSetSchedulingRequest(SchedulingRequest
newSchedulingRequest)
throws SchedulerInvalidResoureRequestException {
// Check sizing exists
if (newSchedulingRequest.getResourceSizing() == null
|| newSchedulingRequest.getResourceSizing().getResources() == null) {
throwExceptionWithMetaInfo(
"No ResourceSizing found in the scheduling request, please double "
+ "check");
}
// Check execution type == GUARANTEED
if (newSchedulingRequest.getExecutionType() != null
&& newSchedulingRequest.getExecutionType().getExecutionType()
!= ExecutionType.GUARANTEED) {
throwExceptionWithMetaInfo(
"Only GUARANTEED execution type is supported.");
}
this.targetNodePartition = validateAndGetTargetNodePartition(
newSchedulingRequest.getPlacementConstraint());
this.schedulingRequest = new SchedulingRequestPBImpl(
((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
LOG.info("Successfully added SchedulingRequest to app="
+ appSchedulingInfo.getApplicationAttemptId()
+ " placementConstraint=["
+ schedulingRequest.getPlacementConstraint()
+ "]. nodePartition=" + targetNodePartition);
}
// Tentatively find out potential exist node-partition in the placement
// constraint and set as the app's primary node-partition.
// Currently only single constraint is handled.
private String validateAndGetTargetNodePartition(
PlacementConstraint placementConstraint) {
String defaultNodeLabelExpression =
appSchedulingInfo.getDefaultNodeLabelExpression();
String nodePartition = defaultNodeLabelExpression == null ?
RMNodeLabelsManager.NO_LABEL : defaultNodeLabelExpression;
if (placementConstraint != null &&
placementConstraint.getConstraintExpr() != null) {
PlacementConstraint.AbstractConstraint ac =
placementConstraint.getConstraintExpr();
if (ac != null && ac instanceof PlacementConstraint.SingleConstraint) {
PlacementConstraint.SingleConstraint singleConstraint =
(PlacementConstraint.SingleConstraint) ac;
for (PlacementConstraint.TargetExpression targetExpression :
singleConstraint.getTargetExpressions()) {
// Handle node partition
if (targetExpression.getTargetType().equals(NODE_ATTRIBUTE) &&
targetExpression.getTargetKey().equals(NODE_PARTITION)) {
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
continue;
}
if (values.size() > 1) {
throwExceptionWithMetaInfo(
"Inside one targetExpression, we only support"
+ " affinity to at most one node partition now");
}
nodePartition = values.iterator().next();
if (nodePartition != null) {
break;
}
}
}
}
}
return nodePartition;
}
@Override
public Map<String, ResourceRequest> getResourceRequests() {
return Collections.emptyMap();
}
@Override
public PendingAsk getPendingAsk(String resourceName) {
readLock.lock();
try {
if (resourceName.equals("*") && schedulingRequest != null) {
return new PendingAsk(schedulingRequest.getResourceSizing());
}
return PendingAsk.ZERO;
} finally {
readLock.unlock();
}
}
@Override
public int getOutstandingAsksCount(String resourceName) {
readLock.lock();
try {
if (resourceName.equals("*") && schedulingRequest != null) {
return schedulingRequest.getResourceSizing().getNumAllocations();
}
return 0;
} finally {
readLock.unlock();
}
}
private void decreasePendingNumAllocation() {
// Deduct pending #allocations by 1
ResourceSizing sizing = schedulingRequest.getResourceSizing();
sizing.setNumAllocations(sizing.getNumAllocations() - 1);
}
@Override
public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node) {
writeLock.lock();
try {
// Per container scheduling request, it is just a copy of existing
// scheduling request with #allocations=1
SchedulingRequest containerSchedulingRequest = new SchedulingRequestPBImpl(
((SchedulingRequestPBImpl) schedulingRequest).getProto());
containerSchedulingRequest.getResourceSizing().setNumAllocations(1);
// Deduct sizing
decreasePendingNumAllocation();
return new ContainerRequest(containerSchedulingRequest);
} finally {
writeLock.unlock();
}
}
private boolean checkCardinalityAndPending(SchedulerNode node,
Optional<DiagnosticsCollector> dcOpt) {
// Do we still have pending resource?
if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) {
return false;
}
// node type will be ignored.
try {
return PlacementConstraintsUtil.canSatisfyConstraints(
appSchedulingInfo.getApplicationId(), schedulingRequest, node,
placementConstraintManager, allocationTagsManager, dcOpt);
} catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Failed to query node cardinality:", e);
this.incrementPlacementAttempt();
return false;
}
}
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
readLock.lock();
try {
return checkCardinalityAndPending(node, Optional.empty());
} finally {
readLock.unlock();
}
}
@Override
public boolean canDelayTo(String resourceName) {
return true;
}
@Override
public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
return precheckNode(schedulerNode, schedulingMode, Optional.empty());
}
@Override
public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode,
Optional<DiagnosticsCollector> dcOpt) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
String nodePartitionToLookAt;
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
nodePartitionToLookAt = schedulerNode.getPartition();
} else{
nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
}
readLock.lock();
try {
// Check node partition as well as cardinality/pending resources.
boolean rst = this.targetNodePartition.equals(nodePartitionToLookAt);
if (!rst) {
if (dcOpt.isPresent()) {
dcOpt.get().collectPartitionDiagnostics(targetNodePartition,
nodePartitionToLookAt);
}
return rst;
}
return checkCardinalityAndPending(schedulerNode, dcOpt);
} finally {
readLock.unlock();
}
}
@Override
public String getPrimaryRequestedNodePartition() {
return targetNodePartition;
}
@Override
public int getUniqueLocationAsks() {
return 1;
}
@Override
public void showRequests() {
readLock.lock();
try {
if (schedulingRequest != null) {
LOG.info(schedulingRequest.toString());
}
} finally {
readLock.unlock();
}
}
@Override
public SchedulingRequest getSchedulingRequest() {
return schedulingRequest;
}
@VisibleForTesting
String getTargetNodePartition() {
return targetNodePartition;
}
@Override
public void initialize(AppSchedulingInfo appSchedulingInfo,
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
this.allocationTagsManager = rmContext.getAllocationTagsManager();
this.placementConstraintManager = rmContext.getPlacementConstraintManager();
}
}