blob: 46a6fe9bb7a8622865869f3c84b8988214bcb06f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.Arrays;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@Unstable
public class AppSchedulable extends Schedulable {
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
private boolean runnable = false; // everyone starts as not runnable
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
private FSLeafQueue queue;
private RMContainerTokenSecretManager containerTokenSecretManager;
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = System.currentTimeMillis();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
}
@Override
public String getName() {
return app.getApplicationId().toString();
}
public FSSchedulerApp getApp() {
return app;
}
@Override
public void updateDemand() {
demand = Resources.createResource(0);
// Demand is current consumption plus outstanding requests
Resources.addTo(demand, app.getCurrentConsumption());
// Add up outstanding resource requests
for (Priority p : app.getPriorities()) {
for (ResourceRequest r : app.getResourceRequests(p).values()) {
Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
Resources.addTo(demand, total);
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public Resource getResourceUsage() {
return app.getCurrentConsumption();
}
@Override
public Resource getMinShare() {
return Resources.createResource(0);
}
/**
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
return queue.getMetrics();
}
@Override
public double getWeight() {
return scheduler.getAppWeight(this);
}
@Override
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
}
/**
* Is this application runnable? Runnable means that the user and queue
* application counts are within configured quotas.
*/
public boolean getRunnable() {
return runnable;
}
public void setRunnable(boolean runnable) {
this.runnable = runnable;
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
*/
public Container createContainer(
FSSchedulerApp application, FSSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
ContainerToken containerToken = null;
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
containerToken =
containerTokenSecretManager.createContainerToken(containerId, nodeId,
application.getUser(), capability);
if (containerToken == null) {
return null; // Try again later.
}
}
// Create the container
Container container = BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability, priority,
containerToken);
return container;
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply
* update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
*/
private void reserve(FSSchedulerApp application, Priority priority,
FSSchedulerNode node, Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getHostName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(application.getUser(), container.getResource());
RMContainer rmContainer = application.reserve(node, priority, null,
container);
node.reserveResource(application, priority, rmContainer);
getMetrics().reserveResource(app.getUser(),
container.getResource());
scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
container.getResource());
}
else {
RMContainer rmContainer = node.getReservedContainer();
application.reserve(node, priority, rmContainer, container);
node.reserveResource(application, priority, rmContainer);
}
}
/**
* Remove the reservation on {@code node} for {@ application} at the given
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation.
*/
private void unreserve(FSSchedulerApp application, Priority priority,
FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
application.unreserve(node, priority);
node.unreserveResource(application);
getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
scheduler.getRootQueueMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
}
/**
* Assign a container to this node to facilitate {@code request}. If node does
* not have enough memory, create a reservation. This is called once we are
* sure the particular request should be facilitated by this node.
*/
private Resource assignContainer(FSSchedulerNode node,
FSSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
// How much does the node have?
Resource available = node.getAvailableResource();
Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(application, node, capability, priority);
}
// Can we allocate a container on this node?
int availableContainers =
available.getMemory() / capability.getMemory();
if (availableContainers > 0) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
return Resources.none();
}
else {
// TODO this should subtract resource just assigned
// TEMPROARY
getMetrics().setAvailableResourcesToQueue(
scheduler.getClusterCapacity());
}
// If we had previously made a reservation, delete it
if (reserved) {
unreserve(application, priority, node);
}
// Inform the node
node.allocateContainer(application.getApplicationId(),
allocatedContainer);
return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(application, priority, node, container, reserved);
return FairScheduler.CONTAINER_RESERVED;
}
}
@Override
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
if (reserved) {
RMContainer rmContainer = node.getReservedContainer();
Priority priority = rmContainer.getReservedPriority();
// Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) {
unreserve(app, priority, node);
return Resources.none();
}
} else {
// If this app is over quota, don't schedule anything
if (!(getRunnable())) { return Resources.none(); }
}
Collection<Priority> prioritiesToTry = (reserved) ?
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
app.getPriorities();
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
synchronized (app) {
for (Priority priority : prioritiesToTry) {
if (app.getTotalRequiredResources(priority) <= 0) {
continue;
}
app.addSchedulingOpportunity(priority);
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,
node.getHostName());
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, app, priority,
localRequest, NodeType.NODE_LOCAL, reserved);
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, app, priority, rackLocalRequest,
NodeType.RACK_LOCAL, reserved);
}
ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
RMNode.ANY);
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, app, priority, offSwitchRequest,
NodeType.OFF_SWITCH, reserved);
}
}
}
return Resources.none();
}
}