blob: 33dee8027c8b953320ca8c29c43e70a2c20d88e1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* Represents an application attempt from the viewpoint of the FIFO or Capacity
* scheduler.
*/
@Private
@Unstable
public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
private CapacityHeadroomProvider headroomProvider;
private ResourceCalculator rc = new DefaultResourceCalculator();
private ResourceScheduler scheduler;
private AbstractContainerAllocator containerAllocator;
/**
* to hold the message if its app doesn't not get container from a node
*/
private String appSkipNodeDiagnostics;
private CapacitySchedulerContext capacitySchedulerContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
Priority.newInstance(0), false);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
appPriority, isAttemptRecovering, null);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
Resource amResource;
String partition;
if (rmApp == null || rmApp.getAMResourceRequest() == null) {
// the rmApp may be undefined (the resource manager checks for this too)
// and unmanaged applications do not provide an amResource request
// in these cases, provide a default using the scheduler
amResource = rmContext.getScheduler().getMinimumResourceCapability();
partition = CommonNodeLabelsManager.NO_LABEL;
} else {
amResource = rmApp.getAMResourceRequest().getCapability();
partition =
(rmApp.getAMResourceRequest().getNodeLabelExpression() == null)
? CommonNodeLabelsManager.NO_LABEL
: rmApp.getAMResourceRequest().getNodeLabelExpression();
}
setAppAMNodePartitionName(partition);
setAMResource(partition, amResource);
setPriority(appPriority);
setAttemptRecovering(isAttemptRecovering);
scheduler = rmContext.getScheduler();
if (scheduler.getResourceCalculator() != null) {
rc = scheduler.getResourceCalculator();
}
containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
}
}
public synchronized boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
if (null == liveContainers.remove(containerId)) {
return false;
}
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
containersToPreempt.remove(containerId);
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
return true;
}
public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest request,
Container container) {
if (isStopped) {
return null;
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer =
new RMContainerImpl(container, this.getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, request, container);
attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ containerId.getApplicationAttemptId()
+ " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, container.getResource());
return rmContainer;
}
public synchronized boolean unreserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer) {
// Cancel increase request (if it has reserved increase request
rmContainer.cancelIncreaseReservation();
// Done with the reservation?
if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this);
// Update reserved metrics
queue.getMetrics().unreserveResource(getUser(),
rmContainer.getReservedResource());
queue.decReservedResource(node.getPartition(),
rmContainer.getReservedResource());
return true;
}
return false;
}
private boolean internalUnreserve(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(schedulerKey);
if (reservedContainers != null) {
RMContainer reservedContainer =
reservedContainers.remove(node.getNodeID());
// unreserve is now triggered in new scenarios (preemption)
// as a consequence reservedcontainer might be null, adding NP-checks
if (reservedContainer != null
&& reservedContainer.getContainer() != null
&& reservedContainer.getContainer().getResource() != null) {
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(schedulerKey);
}
// Reset the re-reservation count
resetReReservations(schedulerKey);
Resource resource = reservedContainer.getReservedResource();
this.attemptResourceUsage.decReserved(node.getPartition(), resource);
LOG.info("Application " + getApplicationId() + " unreserved "
+ " on node " + node + ", currently has "
+ reservedContainers.size()
+ " at priority " + schedulerKey.getPriority()
+ "; currentReservation " + this.attemptResourceUsage.getReserved()
+ " on node-label=" + node.getPartition());
return true;
}
}
return false;
}
public synchronized float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
public synchronized Resource getTotalPendingRequests() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
// to avoid double counting we count only "ANY" resource requests
if (ResourceRequest.isAnyLocation(rr.getResourceName())){
Resources.addTo(ret,
Resources.multiply(rr.getCapability(), rr.getNumContainers()));
}
}
return ret;
}
public synchronized void markContainerForPreemption(ContainerId cont) {
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
}
}
/**
* This method produces an Allocation that includes the current view
* of the resources that will be allocated to and preempted from this
* application.
*
* @param rc
* @param clusterResource
* @param minimumAllocation
* @return an allocation
*/
public synchronized Allocation getAllocation(ResourceCalculator rc,
Resource clusterResource, Resource minimumAllocation) {
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containersToPreempt));
containersToPreempt.clear();
Resource tot = Resource.newInstance(0, 0);
for(ContainerId c : currentContPreemption){
Resources.addTo(tot,
liveContainers.get(c).getContainer().getResource());
}
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers);
}
synchronized public NodeId getNodeIdToUnreserve(
SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
ResourceCalculator rc, Resource clusterResource) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
.get(schedulerKey);
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
NodeId nodeId = entry.getKey();
RMContainer reservedContainer = entry.getValue();
if (reservedContainer.hasIncreaseReservation()) {
// Currently, only regular container allocation supports continuous
// reservation looking, we don't support canceling increase request
// reservation when allocating regular container.
continue;
}
Resource reservedResource = reservedContainer.getReservedResource();
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
reservedResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
+ reservedResource
+ " in order to allocate container with size: " + resourceNeedUnreserve);
}
return nodeId;
}
}
}
return null;
}
public synchronized void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) {
this.headroomProvider = headroomProvider;
}
public synchronized CapacityHeadroomProvider getHeadroomProvider() {
return headroomProvider;
}
@Override
public synchronized Resource getHeadroom() {
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
return super.getHeadroom();
}
@Override
public synchronized void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider =
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
}
public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node,
RMContainer rmContainer, Resource reservedResource) {
// Inform the application
if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer,
reservedResource)) {
queue.getMetrics().reserveResource(getUser(), reservedResource);
// Update the node
node.reserveResource(this, schedulerKey, rmContainer);
// Succeeded
return true;
}
return false;
}
public void reserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
// Update reserved metrics if this is the first reservation
if (rmContainer == null) {
queue.getMetrics().reserveResource(
getUser(), container.getResource());
}
// Inform the application
rmContainer = super.reserve(node, schedulerKey, rmContainer, container);
// Update the node
node.reserveResource(this, schedulerKey, rmContainer);
}
@VisibleForTesting
public RMContainer findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
Resource minimumUnreservedResource) {
// need to unreserve some other container first
NodeId idToUnreserve =
getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource,
rc, clusterResource);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
+ "reserved that matches for this app");
}
return null;
}
FiCaSchedulerNode nodeToUnreserve =
((CapacityScheduler) scheduler).getNode(idToUnreserve);
if (nodeToUnreserve == null) {
LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving for app: " + getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + minimumUnreservedResource);
}
// headroom
Resources.addTo(getHeadroom(), nodeToUnreserve
.getReservedContainer().getReservedResource());
return nodeToUnreserve.getReservedContainer();
}
public LeafQueue getCSLeafQueue() {
return (LeafQueue)queue;
}
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode, RMContainer reservedContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
+ getApplicationId());
showRequests();
}
synchronized (this) {
return containerAllocator.assignContainers(clusterResource, node,
schedulingMode, currentResourceLimits, reservedContainer);
}
}
public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
String newPartition) {
Resource containerResource = rmContainer.getAllocatedResource();
this.attemptResourceUsage.decUsed(oldPartition, containerResource);
this.attemptResourceUsage.incUsed(newPartition, containerResource);
getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
// Update new partition name if container is AM and also update AM resource
if (rmContainer.isAMContainer()) {
setAppAMNodePartitionName(newPartition);
this.attemptResourceUsage.decAMUsed(oldPartition, containerResource);
this.attemptResourceUsage.incAMUsed(newPartition, containerResource);
getCSLeafQueue().decAMUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this);
}
}
protected void getPendingAppDiagnosticMessage(
StringBuilder diagnosticMessage) {
LeafQueue queue = getCSLeafQueue();
diagnosticMessage.append(" Details : AM Partition = ");
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
diagnosticMessage.append("; ");
diagnosticMessage.append("AM Resource Request = ");
diagnosticMessage.append(getAMResource(appAMNodePartitionName));
diagnosticMessage.append("; ");
diagnosticMessage.append("Queue Resource Limit for AM = ");
diagnosticMessage
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
diagnosticMessage.append("; ");
diagnosticMessage.append("User AM Resource Limit of the queue = ");
diagnosticMessage.append(
queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
diagnosticMessage.append("; ");
diagnosticMessage.append("Queue AM Resource Usage = ");
diagnosticMessage.append(
queue.getQueueResourceUsage().getAMUsed(appAMNodePartitionName));
diagnosticMessage.append("; ");
}
protected void getActivedAppDiagnosticMessage(
StringBuilder diagnosticMessage) {
LeafQueue queue = getCSLeafQueue();
QueueCapacities queueCapacities = queue.getQueueCapacities();
diagnosticMessage.append(" Details : AM Partition = ");
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
diagnosticMessage.append(" ; ");
diagnosticMessage.append("Partition Resource = ");
diagnosticMessage.append(rmContext.getNodeLabelManager()
.getResourceByLabel(appAMNodePartitionName, Resources.none()));
diagnosticMessage.append(" ; ");
diagnosticMessage.append("Queue's Absolute capacity = ");
diagnosticMessage.append(
queueCapacities.getAbsoluteCapacity(appAMNodePartitionName) * 100);
diagnosticMessage.append(" % ; ");
diagnosticMessage.append("Queue's Absolute used capacity = ");
diagnosticMessage.append(
queueCapacities.getAbsoluteUsedCapacity(appAMNodePartitionName) * 100);
diagnosticMessage.append(" % ; ");
diagnosticMessage.append("Queue's Absolute max capacity = ");
diagnosticMessage.append(
queueCapacities.getAbsoluteMaximumCapacity(appAMNodePartitionName)
* 100);
diagnosticMessage.append(" % ; ");
}
/**
* Set the message temporarily if the reason is known for why scheduling did
* not happen for a given node, if not message will be over written
* @param message
*/
public void updateAppSkipNodeDiagnostics(String message) {
this.appSkipNodeDiagnostics = message;
}
public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
if (isWaitingForAMContainer()) {
StringBuilder diagnosticMessageBldr = new StringBuilder();
if (appSkipNodeDiagnostics != null) {
diagnosticMessageBldr.append(appSkipNodeDiagnostics);
appSkipNodeDiagnostics = null;
}
diagnosticMessageBldr.append(
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG);
diagnosticMessageBldr.append(node.getNodeID());
diagnosticMessageBldr.append(" ( Partition : ");
diagnosticMessageBldr.append(node.getLabels());
diagnosticMessageBldr.append(", Total resource : ");
diagnosticMessageBldr.append(node.getTotalResource());
diagnosticMessageBldr.append(", Available resource : ");
diagnosticMessageBldr.append(node.getUnallocatedResource());
diagnosticMessageBldr.append(" ).");
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
}
}
}