blob: 2863a97c871ad5208424e76f0562c9f8a139637f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
public class FifoScheduler extends
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
Configuration conf;
private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
private QueueMetrics metrics;
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
private final Queue DEFAULT_QUEUE = new Queue() {
@Override
public String getQueueName() {
return DEFAULT_QUEUE_NAME;
}
@Override
public QueueMetrics getMetrics() {
return metrics;
}
@Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(1.0f);
Resource clusterResource = getClusterResource();
if (clusterResource.getMemorySize() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
queueInfo.setCurrentCapacity((float) usedResource.getMemorySize()
/ clusterResource.getMemorySize());
}
queueInfo.setMaximumCapacity(1.0f);
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
}
public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
for (QueueACL acl : QueueACL.values()) {
acls.put(acl, new AccessControlList("*"));
}
return acls;
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation unused) {
QueueUserACLInfo queueUserAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
queueUserAclInfo.setQueueName(DEFAULT_QUEUE_NAME);
queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values()));
return Collections.singletonList(queueUserAclInfo);
}
@Override
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return getQueueAcls().get(acl).isUserAllowed(user);
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
increaseUsedResources(rmContainer);
updateAppHeadRoom(schedulerAttempt);
updateAvailableResourcesMetrics();
}
@Override
public Set<String> getAccessibleNodeLabels() {
// TODO add implementation for FIFO scheduler
return null;
}
@Override
public String getDefaultNodeLabelExpression() {
// TODO add implementation for FIFO scheduler
return null;
}
@Override
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
}
@Override
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FIFO scheduler
return null;
}
@Override
public void incReservedResource(String partition, Resource reservedRes) {
// TODO add implementation for FIFO scheduler
}
@Override
public void decReservedResource(String partition, Resource reservedRes) {
// TODO add implementation for FIFO scheduler
}
};
public FifoScheduler() {
super(FifoScheduler.class.getName());
}
private synchronized void initScheduler(Configuration conf) {
validateConf(conf);
//Use ConcurrentSkipListMap because applications need to be ordered
this.applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
initMaximumResourceCapability(
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
conf);
this.activeUsersManager = new ActiveUsersManager(metrics);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
initScheduler(conf);
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
super.serviceStop();
}
@Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
}
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
@Override
public synchronized Configuration getConf() {
return conf;
}
@Override
public int getNumClusterNodes() {
return nodeTracker.nodeCount();
}
@Override
public synchronized void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
setConf(conf);
}
@Override
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
List<ContainerResourceChangeRequest> increaseRequests,
List<ContainerResourceChangeRequest> decreaseRequests) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
return EMPTY_ALLOCATION;
}
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
getClusterResource(), minimumAllocation,
getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
synchronized (application) {
// make sure we aren't stopping/removing the application
// when the allocate comes in
if (application.isStopped()) {
LOG.info("Calling allocate on a stopped " +
"application " + applicationAttemptId);
return EMPTY_ALLOCATION;
}
if (!ask.isEmpty()) {
LOG.debug("allocate: pre-update" +
" applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests();
// Update application requests
application.updateResourceRequests(ask);
LOG.debug("allocate: post-update" +
" applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests();
LOG.debug("allocate:" +
" applicationId=" + applicationAttemptId +
" #ask=" + ask.size());
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(application.pullNewlyAllocatedContainers(),
headroom, null, null, null, application.pullUpdatedNMTokens());
}
}
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}
@VisibleForTesting
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
// TODO: Fix store
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext);
if (transferStateFromPreviousAttempt) {
schedulerApp.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(schedulerApp);
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(appAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
}
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(application.getUser(),
applicationId);
application.stop(finalState);
applications.remove(applicationId);
}
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
}
// Kill all 'live' containers
for (RMContainer container : attempt.getLiveContainers()) {
if (keepContainers
&& container.getState().equals(RMContainerState.RUNNING)) {
// do not kill the running container in the case of work-preserving AM
// restart.
LOG.info("Skip killing " + container.getContainerId());
continue;
}
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
attempt.stop(rmAppAttemptFinalState);
}
/**
* Heart of the scheduler...
*
* @param node node on which resources are available to be allocated
*/
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
.entrySet()) {
FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
if (application == null) {
continue;
}
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
// Check if this resource is on the blacklist
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
continue;
}
for (SchedulerRequestKey schedulerKey :
application.getSchedulerKeys()) {
int maxContainers =
getMaxAllocatableContainers(application, schedulerKey, node,
NodeType.OFF_SWITCH);
// Ensure the application needs containers of this priority
if (maxContainers > 0) {
int assignedContainers =
assignContainersOnNode(node, application, schedulerKey);
// Do not assign out of order w.r.t priorities
if (assignedContainers == 0) {
break;
}
}
}
}
LOG.debug("post-assignContainers");
application.showRequests();
// Done
if (Resources.lessThan(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
break;
}
}
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt();
if (attempt == null) {
continue;
}
updateAppHeadRoom(attempt);
}
}
private int getMaxAllocatableContainers(FiCaSchedulerApp application,
SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) {
int maxContainers = 0;
ResourceRequest offSwitchRequest =
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (offSwitchRequest != null) {
maxContainers = offSwitchRequest.getNumContainers();
}
if (type == NodeType.OFF_SWITCH) {
return maxContainers;
}
if (type == NodeType.RACK_LOCAL) {
ResourceRequest rackLocalRequest =
application.getResourceRequest(schedulerKey, node.getRMNode()
.getRackName());
if (rackLocalRequest == null) {
return maxContainers;
}
maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
}
if (type == NodeType.NODE_LOCAL) {
ResourceRequest nodeLocalRequest =
application.getResourceRequest(schedulerKey, node.getRMNode()
.getNodeAddress());
if (nodeLocalRequest != null) {
maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
}
}
return maxContainers;
}
private int assignContainersOnNode(FiCaSchedulerNode node,
FiCaSchedulerApp application, SchedulerRequestKey schedulerKey
) {
// Data-local
int nodeLocalContainers =
assignNodeLocalContainers(node, application, schedulerKey);
// Rack-local
int rackLocalContainers =
assignRackLocalContainers(node, application, schedulerKey);
// Off-switch
int offSwitchContainers =
assignOffSwitchContainers(node, application, schedulerKey);
LOG.debug("assignContainersOnNode:" +
" node=" + node.getRMNode().getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + schedulerKey.getPriority() +
" #assigned=" +
(nodeLocalContainers + rackLocalContainers + offSwitchContainers));
return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
}
private int assignNodeLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(schedulerKey, node.getNodeName());
if (request != null) {
// Don't allocate on this node if we don't need containers on this rack
ResourceRequest rackRequest =
application.getResourceRequest(schedulerKey,
node.getRMNode().getRackName());
if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
return 0;
}
int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, schedulerKey, node,
NodeType.NODE_LOCAL),
request.getNumContainers());
assignedContainers =
assignContainer(node, application, schedulerKey,
assignableContainers, request, NodeType.NODE_LOCAL);
}
return assignedContainers;
}
private int assignRackLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(schedulerKey, node.getRMNode()
.getRackName());
if (request != null) {
// Don't allocate on this rack if the application doens't need containers
ResourceRequest offSwitchRequest =
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (offSwitchRequest.getNumContainers() <= 0) {
return 0;
}
int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, schedulerKey, node,
NodeType.RACK_LOCAL),
request.getNumContainers());
assignedContainers =
assignContainer(node, application, schedulerKey,
assignableContainers, request, NodeType.RACK_LOCAL);
}
return assignedContainers;
}
private int assignOffSwitchContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (request != null) {
assignedContainers =
assignContainer(node, application, schedulerKey,
request.getNumContainers(), request, NodeType.OFF_SWITCH);
}
return assignedContainers;
}
private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
SchedulerRequestKey schedulerKey, int assignableContainers,
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + schedulerKey.getPriority().getPriority() +
" assignableContainers=" + assignableContainers +
" request=" + request + " type=" + type);
Resource capability = request.getCapability();
// TODO: A buggy application with this zero would crash the scheduler.
int availableContainers =
(int) (node.getUnallocatedResource().getMemorySize() /
capability.getMemorySize());
int assignedContainers =
Math.min(assignableContainers, availableContainers);
if (assignedContainers > 0) {
for (int i=0; i < assignedContainers; ++i) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
// Create the container
Container container = BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability,
schedulerKey.getPriority(), null,
schedulerKey.getAllocationRequestId());
// Allocate!
// Inform the application
RMContainer rmContainer =
application.allocate(type, node, schedulerKey, request, container);
// Inform the node
node.allocateContainer(rmContainer);
// Update usage for this container
increaseUsedResources(rmContainer);
}
}
return assignedContainers;
}
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
rmNode.getAggregatedContainersUtilization());
node.setNodeUtilization(rmNode.getNodeUtilization());
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
if (rmNode.getState() == NodeState.DECOMMISSIONING) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(rmNode.getNodeID())
.getAllocatedResource(), 0)));
}
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getUnallocatedResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getUnallocatedResource());
}
updateAvailableResourcesMetrics();
}
private void increaseUsedResources(RMContainer rmContainer) {
Resources.addTo(usedResource, rmContainer.getAllocatedResource());
}
private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(),
usedResource));
}
private void updateAvailableResourcesMetrics() {
metrics.setAvailableResourcesToQueue(
Resources.subtract(getClusterResource(), usedResource));
}
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
{
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
}
break;
case NODE_RESOURCE_UPDATE:
{
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
{
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
doneApplication(appRemovedEvent.getApplicationID(),
appRemovedEvent.getFinalState());
}
break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:
{
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
try {
doneApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
} catch(IOException ie) {
LOG.error("Unable to remove application "
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
}
}
break;
case CONTAINER_EXPIRED:
{
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
ContainerId containerid = containerExpiredEvent.getContainerId();
super.completedContainer(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus(
containerid,
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
@Lock(FifoScheduler.class)
@Override
protected synchronized void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
// Get the application for the finished container
Container container = rmContainer.getContainer();
FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
LOG.info("Unknown application: " + appId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
return;
}
// Inform the application
application.containerCompleted(rmContainer, containerStatus, event,
RMNodeLabelsManager.NO_LABEL);
// Inform the node
node.releaseContainer(container);
// Update total usage
Resources.subtractFrom(usedResource, container.getResource());
LOG.info("Application attempt " + application.getApplicationAttemptId() +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
}
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
FiCaSchedulerNode node = nodeTracker.getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
// Kill running containers
for(RMContainer container : node.getCopiedListOfRunningContainers()) {
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
nodeTracker.removeNode(nodeInfo.getNodeID());
}
@Override
public QueueInfo getQueueInfo(String queueName,
boolean includeChildQueues, boolean recursive) {
return DEFAULT_QUEUE.getQueueInfo(false, false);
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
}
@Override
public ResourceCalculator getResourceCalculator() {
return resourceCalculator;
}
private synchronized void addNode(RMNode nodeManager) {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName);
nodeTracker.addNode(schedulerNode);
}
@Override
public void recover(RMState state) {
// NOT IMPLEMENTED
}
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
@Override
public QueueMetrics getRootQueueMetrics() {
return DEFAULT_QUEUE.getMetrics();
}
@Override
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
}
@Override
public synchronized List<ApplicationAttemptId>
getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> attempts =
new ArrayList<ApplicationAttemptId>(applications.size());
for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
return attempts;
} else {
return null;
}
}
public Resource getUsedResource() {
return usedResource;
}
@Override
protected void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
// TODO Auto-generated method stub
}
}