| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; |
| import org.apache.hadoop.classification.InterfaceStability.Evolving; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.QueueState; |
| import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; |
| |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.server.utils.Lock; |
| import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| |
| @LimitedPrivate("yarn") |
| @Evolving |
| @SuppressWarnings("unchecked") |
| public class FifoScheduler extends |
| AbstractYarnScheduler<FifoAppAttempt, FiCaSchedulerNode> implements |
| Configurable { |
| |
| private static final Log LOG = LogFactory.getLog(FifoScheduler.class); |
| |
| private static final RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| Configuration conf; |
| |
| private boolean usePortForNodeName; |
| |
| private ActiveUsersManager activeUsersManager; |
| |
| private static final String DEFAULT_QUEUE_NAME = "default"; |
| private QueueMetrics metrics; |
| |
| private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); |
| |
| private final Queue DEFAULT_QUEUE = new Queue() { |
| @Override |
| public String getQueueName() { |
| return DEFAULT_QUEUE_NAME; |
| } |
| |
| @Override |
| public QueueMetrics getMetrics() { |
| return metrics; |
| } |
| |
| @Override |
| public QueueInfo getQueueInfo( |
| boolean includeChildQueues, boolean recursive) { |
| QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); |
| queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); |
| queueInfo.setCapacity(1.0f); |
| Resource clusterResource = getClusterResource(); |
| if (clusterResource.getMemorySize() == 0) { |
| queueInfo.setCurrentCapacity(0.0f); |
| } else { |
| queueInfo.setCurrentCapacity((float) usedResource.getMemorySize() |
| / clusterResource.getMemorySize()); |
| } |
| queueInfo.setMaximumCapacity(1.0f); |
| queueInfo.setChildQueues(new ArrayList<QueueInfo>()); |
| queueInfo.setQueueState(QueueState.RUNNING); |
| return queueInfo; |
| } |
| |
| public Map<QueueACL, AccessControlList> getQueueAcls() { |
| Map<QueueACL, AccessControlList> acls = |
| new HashMap<QueueACL, AccessControlList>(); |
| for (QueueACL acl : QueueACL.values()) { |
| acls.put(acl, new AccessControlList("*")); |
| } |
| return acls; |
| } |
| |
| @Override |
| public List<QueueUserACLInfo> getQueueUserAclInfo( |
| UserGroupInformation unused) { |
| QueueUserACLInfo queueUserAclInfo = |
| recordFactory.newRecordInstance(QueueUserACLInfo.class); |
| queueUserAclInfo.setQueueName(DEFAULT_QUEUE_NAME); |
| queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values())); |
| return Collections.singletonList(queueUserAclInfo); |
| } |
| |
| @Override |
| public boolean hasAccess(QueueACL acl, UserGroupInformation user) { |
| return getQueueAcls().get(acl).isUserAllowed(user); |
| } |
| |
| @Override |
| public ActiveUsersManager getAbstractUsersManager() { |
| return activeUsersManager; |
| } |
| |
| @Override |
| public void recoverContainer(Resource clusterResource, |
| SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { |
| if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { |
| return; |
| } |
| increaseUsedResources(rmContainer); |
| updateAppHeadRoom(schedulerAttempt); |
| updateAvailableResourcesMetrics(); |
| } |
| |
| @Override |
| public Set<String> getAccessibleNodeLabels() { |
| // TODO add implementation for FIFO scheduler |
| return null; |
| } |
| |
| @Override |
| public String getDefaultNodeLabelExpression() { |
| // TODO add implementation for FIFO scheduler |
| return null; |
| } |
| |
| @Override |
| public void incPendingResource(String nodeLabel, Resource resourceToInc) { |
| } |
| |
| @Override |
| public void decPendingResource(String nodeLabel, Resource resourceToDec) { |
| } |
| |
| @Override |
| public Priority getDefaultApplicationPriority() { |
| // TODO add implementation for FIFO scheduler |
| return null; |
| } |
| |
| @Override |
| public void incReservedResource(String partition, Resource reservedRes) { |
| // TODO add implementation for FIFO scheduler |
| |
| } |
| |
| @Override |
| public void decReservedResource(String partition, Resource reservedRes) { |
| // TODO add implementation for FIFO scheduler |
| |
| } |
| }; |
| |
| public FifoScheduler() { |
| super(FifoScheduler.class.getName()); |
| } |
| |
| private synchronized void initScheduler(Configuration conf) { |
| validateConf(conf); |
| //Use ConcurrentSkipListMap because applications need to be ordered |
| this.applications = |
| new ConcurrentSkipListMap<>(); |
| this.minimumAllocation = super.getMinimumAllocation(); |
| initMaximumResourceCapability(super.getMaximumAllocation()); |
| this.usePortForNodeName = conf.getBoolean( |
| YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); |
| this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, |
| conf); |
| this.activeUsersManager = new ActiveUsersManager(metrics); |
| } |
| |
| @Override |
| public void serviceInit(Configuration conf) throws Exception { |
| initScheduler(conf); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| public void serviceStart() throws Exception { |
| super.serviceStart(); |
| } |
| |
| @Override |
| public void serviceStop() throws Exception { |
| super.serviceStop(); |
| } |
| |
| @Override |
| public synchronized void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| private void validateConf(Configuration conf) { |
| // validate scheduler memory allocation setting |
| int minMem = conf.getInt( |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); |
| int maxMem = conf.getInt( |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); |
| |
| if (minMem <= 0 || minMem > maxMem) { |
| throw new YarnRuntimeException("Invalid resource scheduler memory" |
| + " allocation configuration" |
| + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB |
| + "=" + minMem |
| + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB |
| + "=" + maxMem + ", min and max should be greater than 0" |
| + ", max should be no smaller than min."); |
| } |
| } |
| |
| @Override |
| public synchronized Configuration getConf() { |
| return conf; |
| } |
| |
| @Override |
| public int getNumClusterNodes() { |
| return nodeTracker.nodeCount(); |
| } |
| |
| @Override |
| public synchronized void setRMContext(RMContext rmContext) { |
| this.rmContext = rmContext; |
| } |
| |
| @Override |
| public synchronized void |
| reinitialize(Configuration conf, RMContext rmContext) throws IOException |
| { |
| setConf(conf); |
| } |
| |
| @Override |
| public Allocation allocate(ApplicationAttemptId applicationAttemptId, |
| List<ResourceRequest> ask, List<ContainerId> release, |
| List<String> blacklistAdditions, List<String> blacklistRemovals, |
| ContainerUpdates updateRequests) { |
| FifoAppAttempt application = getApplicationAttempt(applicationAttemptId); |
| if (application == null) { |
| LOG.error("Calling allocate on removed or non existent application " + |
| applicationAttemptId.getApplicationId()); |
| return EMPTY_ALLOCATION; |
| } |
| |
| // The allocate may be the leftover from previous attempt, and it will |
| // impact current attempt, such as confuse the request and allocation for |
| // current attempt's AM container. |
| // Note outside precondition check for the attempt id may be |
| // outdated here, so double check it here is necessary. |
| if (!application.getApplicationAttemptId().equals(applicationAttemptId)) { |
| LOG.error("Calling allocate on previous or removed " + |
| "or non existent application attempt " + applicationAttemptId); |
| return EMPTY_ALLOCATION; |
| } |
| |
| // Sanity check |
| normalizeRequests(ask); |
| |
| // Release containers |
| releaseContainers(release, application); |
| |
| synchronized (application) { |
| |
| // make sure we aren't stopping/removing the application |
| // when the allocate comes in |
| if (application.isStopped()) { |
| LOG.info("Calling allocate on a stopped " + |
| "application " + applicationAttemptId); |
| return EMPTY_ALLOCATION; |
| } |
| |
| if (!ask.isEmpty()) { |
| LOG.debug("allocate: pre-update" + |
| " applicationId=" + applicationAttemptId + |
| " application=" + application); |
| application.showRequests(); |
| |
| // Update application requests |
| application.updateResourceRequests(ask); |
| |
| LOG.debug("allocate: post-update" + |
| " applicationId=" + applicationAttemptId + |
| " application=" + application); |
| application.showRequests(); |
| |
| LOG.debug("allocate:" + |
| " applicationId=" + applicationAttemptId + |
| " #ask=" + ask.size()); |
| } |
| |
| application.updateBlacklist(blacklistAdditions, blacklistRemovals); |
| |
| Resource headroom = application.getHeadroom(); |
| application.setApplicationHeadroomForMetrics(headroom); |
| return new Allocation(application.pullNewlyAllocatedContainers(), |
| headroom, null, null, null, application.pullUpdatedNMTokens()); |
| } |
| } |
| |
| @VisibleForTesting |
| public synchronized void addApplication(ApplicationId applicationId, |
| String queue, String user, boolean isAppRecovering) { |
| SchedulerApplication<FifoAppAttempt> application = |
| new SchedulerApplication<>(DEFAULT_QUEUE, user); |
| applications.put(applicationId, application); |
| metrics.submitApp(user); |
| LOG.info("Accepted application " + applicationId + " from user: " + user |
| + ", currently num of applications: " + applications.size()); |
| if (isAppRecovering) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); |
| } |
| } else { |
| rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); |
| } |
| } |
| |
| @VisibleForTesting |
| public synchronized void |
| addApplicationAttempt(ApplicationAttemptId appAttemptId, |
| boolean transferStateFromPreviousAttempt, |
| boolean isAttemptRecovering) { |
| SchedulerApplication<FifoAppAttempt> application = |
| applications.get(appAttemptId.getApplicationId()); |
| String user = application.getUser(); |
| // TODO: Fix store |
| FifoAppAttempt schedulerApp = |
| new FifoAppAttempt(appAttemptId, user, DEFAULT_QUEUE, |
| activeUsersManager, this.rmContext); |
| |
| if (transferStateFromPreviousAttempt) { |
| schedulerApp.transferStateFromPreviousAttempt(application |
| .getCurrentAppAttempt()); |
| } |
| application.setCurrentAppAttempt(schedulerApp); |
| |
| metrics.submitAppAttempt(user); |
| LOG.info("Added Application Attempt " + appAttemptId |
| + " to scheduler from user " + application.getUser()); |
| if (isAttemptRecovering) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(appAttemptId |
| + " is recovering. Skipping notifying ATTEMPT_ADDED"); |
| } |
| } else { |
| rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppAttemptEvent(appAttemptId, |
| RMAppAttemptEventType.ATTEMPT_ADDED)); |
| } |
| } |
| |
| private synchronized void doneApplication(ApplicationId applicationId, |
| RMAppState finalState) { |
| SchedulerApplication<FifoAppAttempt> application = |
| applications.get(applicationId); |
| if (application == null){ |
| LOG.warn("Couldn't find application " + applicationId); |
| return; |
| } |
| |
| // Inform the activeUsersManager |
| activeUsersManager.deactivateApplication(application.getUser(), |
| applicationId); |
| application.stop(finalState); |
| applications.remove(applicationId); |
| } |
| |
| private synchronized void doneApplicationAttempt( |
| ApplicationAttemptId applicationAttemptId, |
| RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) |
| throws IOException { |
| FifoAppAttempt attempt = getApplicationAttempt(applicationAttemptId); |
| SchedulerApplication<FifoAppAttempt> application = |
| applications.get(applicationAttemptId.getApplicationId()); |
| if (application == null || attempt == null) { |
| throw new IOException("Unknown application " + applicationAttemptId + |
| " has completed!"); |
| } |
| |
| // Kill all 'live' containers |
| for (RMContainer container : attempt.getLiveContainers()) { |
| if (keepContainers |
| && container.getState().equals(RMContainerState.RUNNING)) { |
| // do not kill the running container in the case of work-preserving AM |
| // restart. |
| LOG.info("Skip killing " + container.getContainerId()); |
| continue; |
| } |
| super.completedContainer(container, |
| SchedulerUtils.createAbnormalContainerStatus( |
| container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), |
| RMContainerEventType.KILL); |
| } |
| |
| // Clean up pending requests, metrics etc. |
| attempt.stop(rmAppAttemptFinalState); |
| } |
| |
| /** |
| * Heart of the scheduler... |
| * |
| * @param node node on which resources are available to be allocated |
| */ |
| private void assignContainers(FiCaSchedulerNode node) { |
| LOG.debug("assignContainers:" + |
| " node=" + node.getRMNode().getNodeAddress() + |
| " #applications=" + applications.size()); |
| |
| // Try to assign containers to applications in fifo order |
| for (Map.Entry<ApplicationId, SchedulerApplication<FifoAppAttempt>> e : applications |
| .entrySet()) { |
| FifoAppAttempt application = e.getValue().getCurrentAppAttempt(); |
| if (application == null) { |
| continue; |
| } |
| |
| LOG.debug("pre-assignContainers"); |
| application.showRequests(); |
| synchronized (application) { |
| // Check if this resource is on the blacklist |
| if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { |
| continue; |
| } |
| |
| for (SchedulerRequestKey schedulerKey : |
| application.getSchedulerKeys()) { |
| int maxContainers = |
| getMaxAllocatableContainers(application, schedulerKey, node, |
| NodeType.OFF_SWITCH); |
| // Ensure the application needs containers of this priority |
| if (maxContainers > 0) { |
| int assignedContainers = |
| assignContainersOnNode(node, application, schedulerKey); |
| // Do not assign out of order w.r.t priorities |
| if (assignedContainers == 0) { |
| break; |
| } |
| } |
| } |
| } |
| |
| LOG.debug("post-assignContainers"); |
| application.showRequests(); |
| |
| // Done |
| if (Resources.lessThan(resourceCalculator, getClusterResource(), |
| node.getUnallocatedResource(), minimumAllocation)) { |
| break; |
| } |
| } |
| |
| // Update the applications' headroom to correctly take into |
| // account the containers assigned in this update. |
| for (SchedulerApplication<FifoAppAttempt> application : applications.values()) { |
| FifoAppAttempt attempt = |
| (FifoAppAttempt) application.getCurrentAppAttempt(); |
| if (attempt == null) { |
| continue; |
| } |
| updateAppHeadRoom(attempt); |
| } |
| } |
| |
| private int getMaxAllocatableContainers(FifoAppAttempt application, |
| SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) { |
| PendingAsk offswitchAsk = application.getPendingAsk(schedulerKey, |
| ResourceRequest.ANY); |
| int maxContainers = offswitchAsk.getCount(); |
| |
| if (type == NodeType.OFF_SWITCH) { |
| return maxContainers; |
| } |
| |
| if (type == NodeType.RACK_LOCAL) { |
| PendingAsk rackLocalAsk = application.getPendingAsk(schedulerKey, |
| node.getRackName()); |
| if (rackLocalAsk.getCount() <= 0) { |
| return maxContainers; |
| } |
| |
| maxContainers = Math.min(maxContainers, |
| rackLocalAsk.getCount()); |
| } |
| |
| if (type == NodeType.NODE_LOCAL) { |
| PendingAsk nodeLocalAsk = application.getPendingAsk(schedulerKey, |
| node.getRMNode().getHostName()); |
| |
| if (nodeLocalAsk.getCount() > 0) { |
| maxContainers = Math.min(maxContainers, |
| nodeLocalAsk.getCount()); |
| } |
| } |
| |
| return maxContainers; |
| } |
| |
| |
| private int assignContainersOnNode(FiCaSchedulerNode node, |
| FifoAppAttempt application, SchedulerRequestKey schedulerKey |
| ) { |
| // Data-local |
| int nodeLocalContainers = |
| assignNodeLocalContainers(node, application, schedulerKey); |
| |
| // Rack-local |
| int rackLocalContainers = |
| assignRackLocalContainers(node, application, schedulerKey); |
| |
| // Off-switch |
| int offSwitchContainers = |
| assignOffSwitchContainers(node, application, schedulerKey); |
| |
| |
| LOG.debug("assignContainersOnNode:" + |
| " node=" + node.getRMNode().getNodeAddress() + |
| " application=" + application.getApplicationId().getId() + |
| " priority=" + schedulerKey.getPriority() + |
| " #assigned=" + |
| (nodeLocalContainers + rackLocalContainers + offSwitchContainers)); |
| |
| |
| return (nodeLocalContainers + rackLocalContainers + offSwitchContainers); |
| } |
| |
| private int assignNodeLocalContainers(FiCaSchedulerNode node, |
| FifoAppAttempt application, SchedulerRequestKey schedulerKey) { |
| int assignedContainers = 0; |
| PendingAsk nodeLocalAsk = application.getPendingAsk(schedulerKey, |
| node.getNodeName()); |
| if (nodeLocalAsk.getCount() > 0) { |
| // Don't allocate on this node if we don't need containers on this rack |
| if (application.getOutstandingAsksCount(schedulerKey, |
| node.getRackName()) <= 0) { |
| return 0; |
| } |
| |
| int assignableContainers = Math.min( |
| getMaxAllocatableContainers(application, schedulerKey, node, |
| NodeType.NODE_LOCAL), nodeLocalAsk.getCount()); |
| assignedContainers = |
| assignContainer(node, application, schedulerKey, assignableContainers, |
| nodeLocalAsk.getPerAllocationResource(), NodeType.NODE_LOCAL); |
| } |
| return assignedContainers; |
| } |
| |
| private int assignRackLocalContainers(FiCaSchedulerNode node, |
| FifoAppAttempt application, SchedulerRequestKey schedulerKey) { |
| int assignedContainers = 0; |
| PendingAsk rackAsk = application.getPendingAsk(schedulerKey, |
| node.getRMNode().getRackName()); |
| if (rackAsk.getCount() > 0) { |
| // Don't allocate on this rack if the application doens't need containers |
| if (application.getOutstandingAsksCount(schedulerKey, |
| ResourceRequest.ANY) <= 0) { |
| return 0; |
| } |
| |
| int assignableContainers = |
| Math.min(getMaxAllocatableContainers(application, schedulerKey, node, |
| NodeType.RACK_LOCAL), rackAsk.getCount()); |
| assignedContainers = |
| assignContainer(node, application, schedulerKey, assignableContainers, |
| rackAsk.getPerAllocationResource(), NodeType.RACK_LOCAL); |
| } |
| return assignedContainers; |
| } |
| |
| private int assignOffSwitchContainers(FiCaSchedulerNode node, |
| FifoAppAttempt application, SchedulerRequestKey schedulerKey) { |
| int assignedContainers = 0; |
| PendingAsk offswitchAsk = application.getPendingAsk(schedulerKey, |
| ResourceRequest.ANY); |
| if (offswitchAsk.getCount() > 0) { |
| assignedContainers = |
| assignContainer(node, application, schedulerKey, |
| offswitchAsk.getCount(), |
| offswitchAsk.getPerAllocationResource(), NodeType.OFF_SWITCH); |
| } |
| return assignedContainers; |
| } |
| |
| private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application, |
| SchedulerRequestKey schedulerKey, int assignableContainers, |
| Resource capability, NodeType type) { |
| LOG.debug("assignContainers:" + |
| " node=" + node.getRMNode().getNodeAddress() + |
| " application=" + application.getApplicationId().getId() + |
| " priority=" + schedulerKey.getPriority().getPriority() + |
| " assignableContainers=" + assignableContainers + |
| " capability=" + capability + " type=" + type); |
| |
| // TODO: A buggy application with this zero would crash the scheduler. |
| int availableContainers = |
| (int) (node.getUnallocatedResource().getMemorySize() / |
| capability.getMemorySize()); |
| int assignedContainers = |
| Math.min(assignableContainers, availableContainers); |
| |
| if (assignedContainers > 0) { |
| for (int i=0; i < assignedContainers; ++i) { |
| |
| NodeId nodeId = node.getRMNode().getNodeID(); |
| ContainerId containerId = BuilderUtils.newContainerId(application |
| .getApplicationAttemptId(), application.getNewContainerId()); |
| |
| // Create the container |
| Container container = BuilderUtils.newContainer(containerId, nodeId, |
| node.getRMNode().getHttpAddress(), capability, |
| schedulerKey.getPriority(), null, |
| schedulerKey.getAllocationRequestId()); |
| |
| // Allocate! |
| |
| // Inform the application |
| RMContainer rmContainer = application.allocate(type, node, schedulerKey, |
| container); |
| |
| // Inform the node |
| node.allocateContainer(rmContainer); |
| |
| // Update usage for this container |
| increaseUsedResources(rmContainer); |
| } |
| |
| } |
| |
| return assignedContainers; |
| } |
| |
| private void increaseUsedResources(RMContainer rmContainer) { |
| Resources.addTo(usedResource, rmContainer.getAllocatedResource()); |
| } |
| |
| private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { |
| schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(), |
| usedResource)); |
| } |
| |
| private void updateAvailableResourcesMetrics() { |
| metrics.setAvailableResourcesToQueue( |
| Resources.subtract(getClusterResource(), usedResource)); |
| } |
| |
| @Override |
| public void handle(SchedulerEvent event) { |
| switch(event.getType()) { |
| case NODE_ADDED: |
| { |
| NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; |
| addNode(nodeAddedEvent.getAddedRMNode()); |
| recoverContainersOnNode(nodeAddedEvent.getContainerReports(), |
| nodeAddedEvent.getAddedRMNode()); |
| |
| } |
| break; |
| case NODE_REMOVED: |
| { |
| NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; |
| removeNode(nodeRemovedEvent.getRemovedRMNode()); |
| } |
| break; |
| case NODE_RESOURCE_UPDATE: |
| { |
| NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = |
| (NodeResourceUpdateSchedulerEvent)event; |
| updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), |
| nodeResourceUpdatedEvent.getResourceOption()); |
| } |
| break; |
| case NODE_UPDATE: |
| { |
| NodeUpdateSchedulerEvent nodeUpdatedEvent = |
| (NodeUpdateSchedulerEvent)event; |
| nodeUpdate(nodeUpdatedEvent.getRMNode()); |
| } |
| break; |
| case APP_ADDED: |
| { |
| AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; |
| addApplication(appAddedEvent.getApplicationId(), |
| appAddedEvent.getQueue(), appAddedEvent.getUser(), |
| appAddedEvent.getIsAppRecovering()); |
| } |
| break; |
| case APP_REMOVED: |
| { |
| AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; |
| doneApplication(appRemovedEvent.getApplicationID(), |
| appRemovedEvent.getFinalState()); |
| } |
| break; |
| case APP_ATTEMPT_ADDED: |
| { |
| AppAttemptAddedSchedulerEvent appAttemptAddedEvent = |
| (AppAttemptAddedSchedulerEvent) event; |
| addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), |
| appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), |
| appAttemptAddedEvent.getIsAttemptRecovering()); |
| } |
| break; |
| case APP_ATTEMPT_REMOVED: |
| { |
| AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = |
| (AppAttemptRemovedSchedulerEvent) event; |
| try { |
| doneApplicationAttempt( |
| appAttemptRemovedEvent.getApplicationAttemptID(), |
| appAttemptRemovedEvent.getFinalAttemptState(), |
| appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); |
| } catch(IOException ie) { |
| LOG.error("Unable to remove application " |
| + appAttemptRemovedEvent.getApplicationAttemptID(), ie); |
| } |
| } |
| break; |
| case CONTAINER_EXPIRED: |
| { |
| ContainerExpiredSchedulerEvent containerExpiredEvent = |
| (ContainerExpiredSchedulerEvent) event; |
| ContainerId containerid = containerExpiredEvent.getContainerId(); |
| super.completedContainer(getRMContainer(containerid), |
| SchedulerUtils.createAbnormalContainerStatus( |
| containerid, |
| SchedulerUtils.EXPIRED_CONTAINER), |
| RMContainerEventType.EXPIRE); |
| } |
| break; |
| case RELEASE_CONTAINER: { |
| if (!(event instanceof ReleaseContainerEvent)) { |
| throw new RuntimeException("Unexpected event type: " + event); |
| } |
| RMContainer container = ((ReleaseContainerEvent) event).getContainer(); |
| completedContainer(container, |
| SchedulerUtils.createAbnormalContainerStatus( |
| container.getContainerId(), |
| SchedulerUtils.RELEASED_CONTAINER), |
| RMContainerEventType.RELEASED); |
| } |
| break; |
| default: |
| LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); |
| } |
| } |
| |
| @Lock(FifoScheduler.class) |
| @Override |
| protected synchronized void completedContainerInternal( |
| RMContainer rmContainer, ContainerStatus containerStatus, |
| RMContainerEventType event) { |
| |
| // Get the application for the finished container |
| Container container = rmContainer.getContainer(); |
| FifoAppAttempt application = |
| getCurrentAttemptForContainer(container.getId()); |
| ApplicationId appId = |
| container.getId().getApplicationAttemptId().getApplicationId(); |
| |
| // Get the node on which the container was allocated |
| FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId()); |
| |
| if (application == null) { |
| LOG.info("Unknown application: " + appId + |
| " released container " + container.getId() + |
| " on node: " + node + |
| " with event: " + event); |
| return; |
| } |
| |
| // Inform the application |
| application.containerCompleted(rmContainer, containerStatus, event, |
| RMNodeLabelsManager.NO_LABEL); |
| |
| // Inform the node |
| node.releaseContainer(rmContainer.getContainerId(), false); |
| |
| // Update total usage |
| Resources.subtractFrom(usedResource, container.getResource()); |
| |
| LOG.info("Application attempt " + application.getApplicationAttemptId() + |
| " released container " + container.getId() + |
| " on node: " + node + |
| " with event: " + event); |
| |
| } |
| |
| private Resource usedResource = recordFactory.newRecordInstance(Resource.class); |
| |
| private synchronized void removeNode(RMNode nodeInfo) { |
| FiCaSchedulerNode node = nodeTracker.getNode(nodeInfo.getNodeID()); |
| if (node == null) { |
| return; |
| } |
| // Kill running containers |
| for(RMContainer container : node.getCopiedListOfRunningContainers()) { |
| super.completedContainer(container, |
| SchedulerUtils.createAbnormalContainerStatus( |
| container.getContainerId(), |
| SchedulerUtils.LOST_CONTAINER), |
| RMContainerEventType.KILL); |
| } |
| nodeTracker.removeNode(nodeInfo.getNodeID()); |
| } |
| |
| @Override |
| public QueueInfo getQueueInfo(String queueName, |
| boolean includeChildQueues, boolean recursive) { |
| return DEFAULT_QUEUE.getQueueInfo(false, false); |
| } |
| |
| @Override |
| public List<QueueUserACLInfo> getQueueUserAclInfo() { |
| return DEFAULT_QUEUE.getQueueUserAclInfo(null); |
| } |
| |
| @Override |
| public ResourceCalculator getResourceCalculator() { |
| return resourceCalculator; |
| } |
| |
| private synchronized void addNode(RMNode nodeManager) { |
| FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, |
| usePortForNodeName); |
| nodeTracker.addNode(schedulerNode); |
| } |
| |
| @Override |
| public void recover(RMState state) { |
| // NOT IMPLEMENTED |
| } |
| |
| @Override |
| public RMContainer getRMContainer(ContainerId containerId) { |
| FifoAppAttempt attempt = getCurrentAttemptForContainer(containerId); |
| return (attempt == null) ? null : attempt.getRMContainer(containerId); |
| } |
| |
| @Override |
| public QueueMetrics getRootQueueMetrics() { |
| return DEFAULT_QUEUE.getMetrics(); |
| } |
| |
| @Override |
| public synchronized boolean checkAccess(UserGroupInformation callerUGI, |
| QueueACL acl, String queueName) { |
| return DEFAULT_QUEUE.hasAccess(acl, callerUGI); |
| } |
| |
| @Override |
| public synchronized List<ApplicationAttemptId> |
| getAppsInQueue(String queueName) { |
| if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { |
| List<ApplicationAttemptId> attempts = |
| new ArrayList<ApplicationAttemptId>(applications.size()); |
| for (SchedulerApplication<FifoAppAttempt> app : applications.values()) { |
| attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); |
| } |
| return attempts; |
| } else { |
| return null; |
| } |
| } |
| |
| public Resource getUsedResource() { |
| return usedResource; |
| } |
| |
| @Override |
| protected synchronized void nodeUpdate(RMNode nm) { |
| super.nodeUpdate(nm); |
| |
| FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID()); |
| if (rmContext.isWorkPreservingRecoveryEnabled() |
| && !rmContext.isSchedulerReadyForAllocatingContainers()) { |
| return; |
| } |
| |
| // A decommissioned node might be removed before we get here |
| if (node != null && |
| Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), |
| node.getUnallocatedResource(), minimumAllocation)) { |
| LOG.debug("Node heartbeat " + nm.getNodeID() + |
| " available resource = " + node.getUnallocatedResource()); |
| |
| assignContainers(node); |
| |
| LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " |
| + node.getUnallocatedResource()); |
| } |
| |
| updateAvailableResourcesMetrics(); |
| } |
| |
| @VisibleForTesting |
| @Override |
| public void killContainer(RMContainer container) { |
| ContainerStatus status = SchedulerUtils.createKilledContainerStatus( |
| container.getContainerId(), |
| "Killed by RM to simulate an AM container failure"); |
| LOG.info("Killing container " + container); |
| completedContainer(container, status, RMContainerEventType.KILL); |
| } |
| |
| @Override |
| public synchronized void recoverContainersOnNode( |
| List<NMContainerStatus> containerReports, RMNode nm) { |
| super.recoverContainersOnNode(containerReports, nm); |
| } |
| } |