blob: 95e05337f836bafd1e776c4c98ed1c3928783ee2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* For an application, resource limits and resource requests, decide how to
* allocate container. This is to make application resource allocation logic
* extensible.
*/
public abstract class AbstractContainerAllocator {
private static final Log LOG = LogFactory.getLog(AbstractContainerAllocator.class);
FiCaSchedulerApp application;
AppSchedulingInfo appInfo;
final ResourceCalculator rc;
final RMContext rmContext;
ActivitiesManager activitiesManager;
public AbstractContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
this(application, rc, rmContext, null);
}
public AbstractContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext,
ActivitiesManager activitiesManager) {
this.application = application;
this.appInfo =
application == null ? null : application.getAppSchedulingInfo();
this.rc = rc;
this.rmContext = rmContext;
this.activitiesManager = activitiesManager;
}
protected CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result,
RMContainer rmContainer, FiCaSchedulerNode node) {
// Handle skipped
CSAssignment.SkippedType skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED) ?
CSAssignment.SkippedType.OTHER :
CSAssignment.SkippedType.NONE;
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(application);
// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());
assignment.setRequestLocalityType(result.requestLocalityType);
// If we allocated something
if (Resources.greaterThan(rc, clusterResource,
result.getResourceToBeAllocated(), Resources.none())) {
Resource allocatedResource = result.getResourceToBeAllocated();
RMContainer updatedContainer = result.getUpdatedContainer();
assignment.setResource(allocatedResource);
assignment.setType(result.getContainerNodeType());
if (result.getAllocationState() == AllocationState.RESERVED) {
// This is a reserved container
LOG.info("Reserved container " + " application="
+ application.getApplicationId() + " resource=" + allocatedResource
+ " queue=" + this.toString() + " cluster=" + clusterResource);
assignment.getAssignmentInformation().addReservationDetails(
updatedContainer, application.getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
allocatedResource);
if (rmContainer != null) {
ActivitiesLogger.APP.recordAppActivityWithAllocation(
activitiesManager, node, application, updatedContainer,
ActivityState.RE_RESERVED);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
} else {
ActivitiesLogger.APP.recordAppActivityWithAllocation(
activitiesManager, node, application, updatedContainer,
ActivityState.RESERVED);
ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
updatedContainer.getContainerId(), ActivityState.RESERVED,
ActivityDiagnosticConstant.EMPTY);
}
} else if (result.getAllocationState() == AllocationState.ALLOCATED){
// This is a new container
// Inform the ordering policy
LOG.info("assignedContainer" + " application attempt=" + application
.getApplicationAttemptId() + " container=" + updatedContainer
.getContainerId() + " queue=" + this + " clusterResource="
+ clusterResource + " type=" + assignment.getType()
+ " requestedPartition="
+ updatedContainer.getNodeLabelExpression());
assignment.getAssignmentInformation().addAllocationDetails(
updatedContainer, application.getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);
if (rmContainer != null) {
assignment.setFulfilledReservation(true);
assignment.setFulfilledReservedContainer(rmContainer);
}
ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager,
node, application, updatedContainer, ActivityState.ALLOCATED);
ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
updatedContainer.getContainerId(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY);
// Update unformed resource
application.incUnconfirmedRes(allocatedResource);
}
assignment.setContainersToKill(result.getToKillContainers());
} else {
if (result.getAllocationState() == AllocationState.QUEUE_SKIPPED) {
assignment.setSkippedType(
CSAssignment.SkippedType.QUEUE_LIMIT);
}
}
return assignment;
}
/**
* allocate needs to handle following stuffs:
*
* <ul>
* <li>Select request: Select a request to allocate. E.g. select a resource
* request based on requirement/priority/locality.</li>
* <li>Check if a given resource can be allocated based on resource
* availability</li>
* <li>Do allocation: this will decide/create allocated/reserved
* container, this will also update metrics</li>
* </ul>
*
* @param clusterResource clusterResource
* @param candidates CandidateNodeSet
* @param schedulingMode scheduling mode (exclusive or nonexclusive)
* @param resourceLimits resourceLimits
* @param reservedContainer reservedContainer
* @return CSAssignemnt proposal
*/
public abstract CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
RMContainer reservedContainer);
}