blob: 6f3762335e34490d03e06dc2be3ea3c39359a123 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* Represents an application attempt from the viewpoint of the FIFO or Capacity
* scheduler.
*/
@Private
@Unstable
public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
private CapacityHeadroomProvider headroomProvider;
private ResourceCalculator rc = new DefaultResourceCalculator();
private ResourceScheduler scheduler;
private AbstractContainerAllocator containerAllocator;
/**
* to hold the message if its app doesn't not get container from a node
*/
private String appSkipNodeDiagnostics;
private Map<ContainerId, SchedContainerChangeRequest> toBeRemovedIncRequests =
new ConcurrentHashMap<>();
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
Priority.newInstance(0), false);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
appPriority, isAttemptRecovering, null);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
Resource amResource;
String partition;
if (rmApp == null || rmApp.getAMResourceRequests() == null
|| rmApp.getAMResourceRequests().isEmpty()) {
// the rmApp may be undefined (the resource manager checks for this too)
// and unmanaged applications do not provide an amResource request
// in these cases, provide a default using the scheduler
amResource = rmContext.getScheduler().getMinimumResourceCapability();
partition = CommonNodeLabelsManager.NO_LABEL;
} else {
amResource = rmApp.getAMResourceRequests().get(0).getCapability();
partition =
(rmApp.getAMResourceRequests().get(0)
.getNodeLabelExpression() == null)
? CommonNodeLabelsManager.NO_LABEL
: rmApp.getAMResourceRequests().get(0).getNodeLabelExpression();
}
setAppAMNodePartitionName(partition);
setAMResource(partition, amResource);
setPriority(appPriority);
setAttemptRecovering(isAttemptRecovering);
scheduler = rmContext.getScheduler();
if (scheduler.getResourceCalculator() != null) {
rc = scheduler.getResourceCalculator();
}
containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager);
}
public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
try {
writeLock.lock();
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
if (null == liveContainers.remove(containerId)) {
return false;
}
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
containersToPreempt.remove(containerId);
// In order to save space in the audit log, only include the partition
// if it is not the default partition.
String containerPartition = null;
if (partition != null && !partition.isEmpty()) {
containerPartition = partition;
}
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource,
getQueueName(), containerPartition);
// Update usage metrics
queue.getMetrics().releaseResources(partition,
getUser(), 1, containerResource);
attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
return true;
} finally {
writeLock.unlock();
}
}
public RMContainer allocate(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Container container) {
try {
readLock.lock();
if (isStopped) {
return null;
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getOutstandingAsksCount(schedulerKey) <= 0) {
return null;
}
SchedulingPlacementSet<FiCaSchedulerNode> ps =
appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
if (null == ps) {
LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
+ " for application=" + getApplicationId() + " schedulerRequestKey="
+ schedulerKey);
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, schedulerKey,
this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
ps.getPrimaryRequestedNodePartition());
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
// FIXME, should set when confirmed
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
return rmContainer;
} finally {
readLock.unlock();
}
}
private boolean rmContainerInFinalState(RMContainer rmContainer) {
if (null == rmContainer) {
return false;
}
return rmContainer.completed();
}
private boolean anyContainerInFinalState(
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
.getContainersToRelease()) {
if (rmContainerInFinalState(c.getRmContainer())) {
if (LOG.isDebugEnabled()) {
LOG.debug("To-release container=" + c.getRmContainer()
+ " is in final state");
}
return true;
}
}
for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c : request
.getContainersToAllocate()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> r : c
.getToRelease()) {
if (rmContainerInFinalState(r.getRmContainer())) {
if (LOG.isDebugEnabled()) {
LOG.debug("To-release container=" + r.getRmContainer()
+ ", for to a new allocated container, is in final state");
}
return true;
}
}
if (null != c.getAllocateFromReservedContainer()) {
if (rmContainerInFinalState(
c.getAllocateFromReservedContainer().getRmContainer())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Allocate from reserved container" + c
.getAllocateFromReservedContainer().getRmContainer()
+ " is in final state");
}
return true;
}
}
}
for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c : request
.getContainersToReserve()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> r : c
.getToRelease()) {
if (rmContainerInFinalState(r.getRmContainer())) {
if (LOG.isDebugEnabled()) {
LOG.debug("To-release container=" + r.getRmContainer()
+ ", for a reserved container, is in final state");
}
return true;
}
}
}
return false;
}
private boolean commonCheckContainerAllocation(
Resource cluster,
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
// Make sure node is not reserved by anyone else
RMContainer reservedContainerOnNode =
schedulerContainer.getSchedulerNode().getReservedContainer();
if (reservedContainerOnNode != null) {
// adding NP check as this proposal could not be allocated from reserved
// container in async-scheduling mode
if (allocation.getAllocateFromReservedContainer() == null) {
return false;
}
RMContainer fromReservedContainer =
allocation.getAllocateFromReservedContainer().getRmContainer();
if (fromReservedContainer != reservedContainerOnNode) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Try to allocate from a non-existed reserved container");
}
return false;
}
}
// Do we have enough space on this node?
Resource availableResource = Resources.clone(
schedulerContainer.getSchedulerNode().getUnallocatedResource());
// If we have any to-release container in non-reserved state, they are
// from lazy-preemption, add their consumption to available resource
// of this node
if (allocation.getToRelease() != null && !allocation.getToRelease()
.isEmpty()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
releaseContainer : allocation.getToRelease()) {
// Only consider non-reserved container (reserved container will
// not affect available resource of node) on the same node
if (releaseContainer.getRmContainer().getState()
!= RMContainerState.RESERVED
&& releaseContainer.getSchedulerNode() == schedulerContainer
.getSchedulerNode()) {
Resources.addTo(availableResource,
releaseContainer.getRmContainer().getAllocatedResource());
}
}
}
if (!Resources.fitsIn(rc, cluster,
allocation.getAllocatedOrReservedResource(),
availableResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node doesn't have enough available resource, asked="
+ allocation.getAllocatedOrReservedResource() + " available="
+ availableResource);
}
return false;
}
return true;
}
public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
List<ResourceRequest> resourceRequests = null;
boolean reReservation = false;
try {
readLock.lock();
// First make sure no container in release list in final state
if (anyContainerInFinalState(request)) {
return false;
}
// TODO, make sure all scheduler nodes are existed
// TODO, make sure all node labels are not changed
if (request.anythingAllocatedOrReserved()) {
/*
* 1) If this is a newly allocated container, check if the node is reserved
* / not-reserved by any other application
* 2) If this is a newly reserved container, check if the node is reserved or not
*/
// Assume we have only one container allocated or reserved
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
allocation = request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
if (schedulerContainer.isAllocated()) {
// When allocate a new container
resourceRequests =
schedulerContainer.getRmContainer().getResourceRequests();
// Check pending resource request
if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey())) {
if (LOG.isDebugEnabled()) {
LOG.debug("No pending resource for: nodeType=" + allocation
.getAllocationLocalityType() + ", node=" + schedulerContainer
.getSchedulerNode() + ", requestKey=" + schedulerContainer
.getSchedulerRequestKey() + ", application="
+ getApplicationAttemptId());
}
return false;
}
// Common part of check container allocation regardless if it is a
// increase container or regular container
if (!commonCheckContainerAllocation(cluster, allocation,
schedulerContainer)) {
return false;
}
} else {
// Container reserved first time will be NEW, after the container
// accepted & confirmed, it will become RESERVED state
if (schedulerContainer.getRmContainer().getState()
== RMContainerState.RESERVED) {
// Check if node currently reserved by other application, there may
// be some outdated proposals in async-scheduling environment
if (schedulerContainer.getRmContainer() != schedulerContainer
.getSchedulerNode().getReservedContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to re-reserve a container, but node "
+ schedulerContainer.getSchedulerNode()
+ " is already reserved by another container="
+ schedulerContainer.getSchedulerNode()
.getReservedContainer());
}
return false;
}
// Set reReservation == true
reReservation = true;
} else {
// When reserve a resource (state == NEW is for new container,
// state == RUNNING is for increase container).
// Just check if the node is not already reserved by someone
if (schedulerContainer.getSchedulerNode().getReservedContainer()
!= null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to reserve a container, but the node is "
+ "already reserved by another container="
+ schedulerContainer.getSchedulerNode()
.getReservedContainer().getContainerId());
}
return false;
}
}
}
}
} finally {
readLock.unlock();
}
// Skip check parent if this is a re-reservation container
boolean accepted = true;
if (!reReservation) {
// Check parent if anything allocated or reserved
if (request.anythingAllocatedOrReserved()) {
accepted = getCSLeafQueue().accept(cluster, request);
}
}
// When rejected, recover resource requests for this app
if (!accepted && resourceRequests != null) {
recoverResourceRequestsForContainer(resourceRequests);
}
return accepted;
}
public boolean apply(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
boolean reReservation = false;
try {
writeLock.lock();
// If we allocated something
if (request.anythingAllocatedOrReserved()) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
allocation = request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey())
<= 0) {
return false;
}
RMContainer rmContainer = schedulerContainer.getRmContainer();
reReservation =
(!schedulerContainer.isAllocated()) && (rmContainer.getState()
== RMContainerState.RESERVED);
// Generate new containerId if it is not an allocation for increasing
// Or re-reservation
if (rmContainer.getContainer().getId() == null) {
rmContainer.setContainerId(BuilderUtils
.newContainerId(getApplicationAttemptId(),
getNewContainerId()));
}
ContainerId containerId = rmContainer.getContainerId();
if (schedulerContainer.isAllocated()) {
// This allocation is from a reserved container
// Unreserve it first
if (allocation.getAllocateFromReservedContainer() != null) {
RMContainer reservedContainer =
allocation.getAllocateFromReservedContainer().getRmContainer();
// Handling container allocation
// Did we previously reserve containers at this 'priority'?
unreserve(schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getSchedulerNode(), reservedContainer);
}
// Allocate a new container
addToNewlyAllocatedContainers(
schedulerContainer.getSchedulerNode(), rmContainer);
liveContainers.put(containerId, rmContainer);
// Deduct pending resource requests
List<ResourceRequest> requests = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setResourceRequests(requests);
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
allocation.getAllocatedOrReservedResource());
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.START));
// Inform the node
schedulerContainer.getSchedulerNode().allocateContainer(
rmContainer);
// update locality statistics,
incNumAllocatedContainers(allocation.getAllocationLocalityType(),
allocation.getRequestLocalityType());
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" + containerId
.getApplicationAttemptId() + " container=" + containerId
+ " host=" + rmContainer.getAllocatedNode().getHost()
+ " type=" + allocation.getAllocationLocalityType());
}
// In order to save space in the audit log, only include the partition
// if it is not the default partition.
String partition =
schedulerContainer.getSchedulerNode().getPartition();
if (partition != null && partition.isEmpty()) {
partition = null;
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), containerId,
allocation.getAllocatedOrReservedResource(), getQueueName(),
partition);
} else {
// If the rmContainer's state is already updated to RESERVED, this is
// a reReservation
reserve(schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getRmContainer(),
schedulerContainer.getRmContainer().getContainer(),
reReservation);
}
}
} finally {
writeLock.unlock();
}
// Don't bother CS leaf queue if it is a re-reservation
if (!reReservation) {
getCSLeafQueue().apply(cluster, request);
}
return true;
}
public boolean unreserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer) {
try {
writeLock.lock();
// Done with the reservation?
if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this);
// Update reserved metrics
queue.getMetrics().unreserveResource(node.getPartition(),
getUser(), rmContainer.getReservedResource());
queue.decReservedResource(node.getPartition(),
rmContainer.getReservedResource());
return true;
}
return false;
} finally {
writeLock.unlock();
}
}
private boolean internalUnreserve(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(schedulerKey);
if (reservedContainers != null) {
RMContainer reservedContainer =
reservedContainers.remove(node.getNodeID());
// unreserve is now triggered in new scenarios (preemption)
// as a consequence reservedcontainer might be null, adding NP-checks
if (reservedContainer != null
&& reservedContainer.getContainer() != null
&& reservedContainer.getContainer().getResource() != null) {
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(schedulerKey);
}
// Reset the re-reservation count
resetReReservations(schedulerKey);
Resource resource = reservedContainer.getReservedResource();
this.attemptResourceUsage.decReserved(node.getPartition(), resource);
LOG.info("Application " + getApplicationId() + " unreserved "
+ " on node " + node + ", currently has "
+ reservedContainers.size()
+ " at priority " + schedulerKey.getPriority()
+ "; currentReservation " + this.attemptResourceUsage.getReserved()
+ " on node-label=" + node.getPartition());
return true;
}
}
return false;
}
public Map<String, Resource> getTotalPendingRequestsPerPartition() {
try {
readLock.lock();
Map<String, Resource> ret = new HashMap<>();
for (SchedulerRequestKey schedulerKey : appSchedulingInfo
.getSchedulerKeys()) {
SchedulingPlacementSet<FiCaSchedulerNode> ps =
appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
String nodePartition = ps.getPrimaryRequestedNodePartition();
Resource res = ret.get(nodePartition);
if (null == res) {
res = Resources.createResource(0);
ret.put(nodePartition, res);
}
PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
if (ask.getCount() > 0) {
Resources.addTo(res, Resources
.multiply(ask.getPerAllocationResource(),
ask.getCount()));
}
}
return ret;
} finally {
readLock.unlock();
}
}
public void markContainerForPreemption(ContainerId cont) {
try {
writeLock.lock();
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
}
} finally {
writeLock.unlock();
}
}
/**
* This method produces an Allocation that includes the current view
* of the resources that will be allocated to and preempted from this
* application.
*
* @param resourceCalculator resourceCalculator
* @param clusterResource clusterResource
* @param minimumAllocation minimumAllocation
* @return an allocation
*/
public Allocation getAllocation(ResourceCalculator resourceCalculator,
Resource clusterResource, Resource minimumAllocation) {
try {
writeLock.lock();
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containersToPreempt));
containersToPreempt.clear();
Resource tot = Resource.newInstance(0, 0);
for (ContainerId c : currentContPreemption) {
Resources.addTo(tot, liveContainers.get(c).getContainer()
.getResource());
}
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newBuilder()
.priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY)
.capability(minimumAllocation).numContainers(numCont).build();
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
List<Container> newlyPromotedContainers = pullNewlyPromotedContainers();
List<Container> newlyDemotedContainers = pullNewlyDemotedContainers();
List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers,
newlyPromotedContainers, newlyDemotedContainers);
} finally {
writeLock.unlock();
}
}
@VisibleForTesting
public NodeId getNodeIdToUnreserve(
SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
ResourceCalculator rc, Resource clusterResource) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
schedulerKey);
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
.entrySet()) {
NodeId nodeId = entry.getKey();
RMContainer reservedContainer = entry.getValue();
Resource reservedResource = reservedContainer.getReservedResource();
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
reservedResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"unreserving node with reservation size: " + reservedResource
+ " in order to allocate container with size: "
+ resourceNeedUnreserve);
}
return nodeId;
}
}
}
return null;
}
public void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) {
try {
writeLock.lock();
this.headroomProvider = headroomProvider;
} finally {
writeLock.unlock();
}
}
@Override
public Resource getHeadroom() {
try {
readLock.lock();
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
return super.getHeadroom();
} finally {
readLock.unlock();
}
}
@Override
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
try {
writeLock.lock();
super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
} finally {
writeLock.unlock();
}
}
public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node,
RMContainer rmContainer, Container container, boolean reReservation) {
// Update reserved metrics if this is the first reservation
// rmContainer will be moved to reserved in the super.reserve
if (!reReservation) {
queue.getMetrics().reserveResource(node.getPartition(),
getUser(), container.getResource());
}
// Inform the application
rmContainer = super.reserve(node, schedulerKey, rmContainer, container);
// Update the node
node.reserveResource(this, schedulerKey, rmContainer);
}
@VisibleForTesting
public RMContainer findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
Resource minimumUnreservedResource) {
try {
readLock.lock();
// need to unreserve some other container first
NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey,
minimumUnreservedResource, rc, clusterResource);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
+ "reserved that matches for this app");
}
return null;
}
FiCaSchedulerNode nodeToUnreserve =
((CapacityScheduler) scheduler).getNode(idToUnreserve);
if (nodeToUnreserve == null) {
LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving for app: " + getApplicationId() + " on nodeId: "
+ idToUnreserve
+ " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + minimumUnreservedResource);
}
// headroom
Resources.addTo(getHeadroom(),
nodeToUnreserve.getReservedContainer().getReservedResource());
return nodeToUnreserve.getReservedContainer();
} finally {
readLock.unlock();
}
}
public LeafQueue getCSLeafQueue() {
return (LeafQueue)queue;
}
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode, RMContainer reservedContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
+ getApplicationId());
showRequests();
}
return containerAllocator.assignContainers(clusterResource, ps,
schedulingMode, currentResourceLimits, reservedContainer);
}
public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
String newPartition) {
Resource containerResource = rmContainer.getAllocatedResource();
this.attemptResourceUsage.decUsed(oldPartition, containerResource);
this.attemptResourceUsage.incUsed(newPartition, containerResource);
getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
// Update new partition name if container is AM and also update AM resource
if (rmContainer.isAMContainer()) {
setAppAMNodePartitionName(newPartition);
this.attemptResourceUsage.decAMUsed(oldPartition, containerResource);
this.attemptResourceUsage.incAMUsed(newPartition, containerResource);
getCSLeafQueue().decAMUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this);
}
}
protected void getPendingAppDiagnosticMessage(
StringBuilder diagnosticMessage) {
LeafQueue queue = getCSLeafQueue();
diagnosticMessage.append(" Details : AM Partition = ");
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
diagnosticMessage.append("; ");
diagnosticMessage.append("AM Resource Request = ");
diagnosticMessage.append(getAMResource(appAMNodePartitionName));
diagnosticMessage.append("; ");
diagnosticMessage.append("Queue Resource Limit for AM = ");
diagnosticMessage
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
diagnosticMessage.append("; ");
diagnosticMessage.append("User AM Resource Limit of the queue = ");
diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition(
appAMNodePartitionName, getUser()));
diagnosticMessage.append("; ");
diagnosticMessage.append("Queue AM Resource Usage = ");
diagnosticMessage.append(
queue.getQueueResourceUsage().getAMUsed(appAMNodePartitionName));
diagnosticMessage.append("; ");
}
protected void getActivedAppDiagnosticMessage(
StringBuilder diagnosticMessage) {
LeafQueue queue = getCSLeafQueue();
QueueCapacities queueCapacities = queue.getQueueCapacities();
diagnosticMessage.append(" Details : AM Partition = ");
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
diagnosticMessage.append(" ; ");
diagnosticMessage.append("Partition Resource = ");
diagnosticMessage.append(rmContext.getNodeLabelManager()
.getResourceByLabel(appAMNodePartitionName, Resources.none()));
diagnosticMessage.append(" ; ");
diagnosticMessage.append("Queue's Absolute capacity = ");
diagnosticMessage.append(
queueCapacities.getAbsoluteCapacity(appAMNodePartitionName) * 100);
diagnosticMessage.append(" % ; ");
diagnosticMessage.append("Queue's Absolute used capacity = ");
diagnosticMessage.append(
queueCapacities.getAbsoluteUsedCapacity(appAMNodePartitionName) * 100);
diagnosticMessage.append(" % ; ");
diagnosticMessage.append("Queue's Absolute max capacity = ");
diagnosticMessage.append(
queueCapacities.getAbsoluteMaximumCapacity(appAMNodePartitionName)
* 100);
diagnosticMessage.append(" % ; ");
}
/**
* Set the message temporarily if the reason is known for why scheduling did
* not happen for a given node, if not message will be over written
* @param message Message of app skip diagnostics
*/
public void updateAppSkipNodeDiagnostics(String message) {
this.appSkipNodeDiagnostics = message;
}
public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
// FIXME, update AM diagnostics when global scheduling is enabled
if (null == node) {
return;
}
if (isWaitingForAMContainer()) {
StringBuilder diagnosticMessageBldr = new StringBuilder();
if (appSkipNodeDiagnostics != null) {
diagnosticMessageBldr.append(appSkipNodeDiagnostics);
appSkipNodeDiagnostics = null;
}
diagnosticMessageBldr.append(
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG);
diagnosticMessageBldr.append(node.getNodeID());
diagnosticMessageBldr.append(" ( Partition : ");
diagnosticMessageBldr.append(node.getLabels());
diagnosticMessageBldr.append(", Total resource : ");
diagnosticMessageBldr.append(node.getTotalResource());
diagnosticMessageBldr.append(", Available resource : ");
diagnosticMessageBldr.append(node.getUnallocatedResource());
diagnosticMessageBldr.append(" ).");
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
}
}
@Override
@SuppressWarnings("unchecked")
public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet(
SchedulerRequestKey schedulerRequestKey) {
return super.getSchedulingPlacementSet(schedulerRequestKey);
}
/**
* Recalculates the per-app, percent of queue metric, specific to the
* Capacity Scheduler.
*/
@Override
public ApplicationResourceUsageReport getResourceUsageReport() {
try {
// Use write lock here because
// SchedulerApplicationAttempt#getResourceUsageReport updated fields
// TODO: improve this
writeLock.lock();
ApplicationResourceUsageReport report = super.getResourceUsageReport();
Resource cluster = rmContext.getScheduler().getClusterResource();
Resource totalPartitionRes =
rmContext.getNodeLabelManager().getResourceByLabel(
getAppAMNodePartitionName(), cluster);
ResourceCalculator calc =
rmContext.getScheduler().getResourceCalculator();
float queueUsagePerc = 0.0f;
if (!calc.isInvalidDivisor(totalPartitionRes)) {
float queueAbsMaxCapPerPartition =
((AbstractCSQueue) getQueue()).getQueueCapacities()
.getAbsoluteCapacity(getAppAMNodePartitionName());
if (queueAbsMaxCapPerPartition != 0) {
queueUsagePerc = calc.divide(totalPartitionRes,
report.getUsedResources(),
Resources.multiply(totalPartitionRes, queueAbsMaxCapPerPartition))
* 100;
}
if (!(Float.isNaN(queueUsagePerc) || Float.isInfinite(queueUsagePerc))) {
report.setQueueUsagePercentage(queueUsagePerc);
}
}
return report;
} finally {
writeLock.unlock();
}
}
public ReentrantReadWriteLock.WriteLock getWriteLock() {
return this.writeLock;
}
/**
* Move reservation from one node to another
* Comparing to unreserve container on source node and reserve a new
* container on target node. This method will not create new RMContainer
* instance. And this operation is atomic.
*
* @param reservedContainer to be moved reserved container
* @param sourceNode source node
* @param targetNode target node
*
* @return succeeded or not
*/
public boolean moveReservation(RMContainer reservedContainer,
FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) {
try {
writeLock.lock();
if (!sourceNode.getPartition().equals(targetNode.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Failed to move reservation, two nodes are in different partition");
}
return false;
}
// Update reserved container to node map
Map<NodeId, RMContainer> map = reservedContainers.get(
reservedContainer.getReservedSchedulerKey());
if (null == map) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot find reserved container map.");
}
return false;
}
// Check if reserved container changed
if (sourceNode.getReservedContainer() != reservedContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("To-be-moved container already updated.");
}
return false;
}
// Check if target node is empty, acquires lock of target node to make sure
// reservation happens transactional
synchronized (targetNode){
if (targetNode.getReservedContainer() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Target node is already occupied before moving");
}
}
try {
targetNode.reserveResource(this,
reservedContainer.getReservedSchedulerKey(), reservedContainer);
} catch (IllegalStateException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reserve on target node failed, e=", e);
}
return false;
}
// Set source node's reserved container to null
sourceNode.setReservedContainer(null);
map.remove(sourceNode.getNodeID());
// Update reserved container
reservedContainer.handle(
new RMContainerReservedEvent(reservedContainer.getContainerId(),
reservedContainer.getReservedResource(), targetNode.getNodeID(),
reservedContainer.getReservedSchedulerKey()));
// Add to target node
map.put(targetNode.getNodeID(), reservedContainer);
return true;
}
} finally {
writeLock.unlock();
}
}
/*
* Overriding to appease findbugs
*/
@Override
public int hashCode() {
return super.hashCode();
}
/*
* Overriding to appease findbugs
*/
@Override
public boolean equals(Object o) {
return super.equals(o);
}
}