| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Stable; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; |
| |
| import com.google.common.collect.HashMultiset; |
| import com.google.common.collect.Multiset; |
| |
| public class FSSchedulerApp extends SchedulerApplication { |
| |
| private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); |
| |
| private final RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| private final AppSchedulingInfo appSchedulingInfo; |
| private AppSchedulable appSchedulable; |
| private final Queue queue; |
| |
| private final Resource currentConsumption = recordFactory |
| .newRecordInstance(Resource.class); |
| private Resource resourceLimit = recordFactory |
| .newRecordInstance(Resource.class); |
| |
| private Map<ContainerId, RMContainer> liveContainers |
| = new HashMap<ContainerId, RMContainer>(); |
| private List<RMContainer> newlyAllocatedContainers = |
| new ArrayList<RMContainer>(); |
| |
| final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = |
| new HashMap<Priority, Map<NodeId, RMContainer>>(); |
| |
| /** |
| * Count how many times the application has been given an opportunity |
| * to schedule a task at each priority. Each time the scheduler |
| * asks the application for a task at this priority, it is incremented, |
| * and each time the application successfully schedules a task, it |
| * is reset to 0. |
| */ |
| Multiset<Priority> schedulingOpportunities = HashMultiset.create(); |
| |
| Multiset<Priority> reReservations = HashMultiset.create(); |
| |
| Resource currentReservation = recordFactory |
| .newRecordInstance(Resource.class); |
| |
| private final RMContext rmContext; |
| public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, |
| String user, Queue queue, ActiveUsersManager activeUsersManager, |
| RMContext rmContext) { |
| this.rmContext = rmContext; |
| this.appSchedulingInfo = |
| new AppSchedulingInfo(applicationAttemptId, user, queue, |
| activeUsersManager); |
| this.queue = queue; |
| } |
| |
| public ApplicationId getApplicationId() { |
| return appSchedulingInfo.getApplicationId(); |
| } |
| |
| @Override |
| public ApplicationAttemptId getApplicationAttemptId() { |
| return appSchedulingInfo.getApplicationAttemptId(); |
| } |
| |
| public void setAppSchedulable(AppSchedulable appSchedulable) { |
| this.appSchedulable = appSchedulable; |
| } |
| |
| public AppSchedulable getAppSchedulable() { |
| return appSchedulable; |
| } |
| |
| public String getUser() { |
| return appSchedulingInfo.getUser(); |
| } |
| |
| public synchronized void updateResourceRequests( |
| List<ResourceRequest> requests) { |
| this.appSchedulingInfo.updateResourceRequests(requests); |
| } |
| |
| public Map<String, ResourceRequest> getResourceRequests(Priority priority) { |
| return appSchedulingInfo.getResourceRequests(priority); |
| } |
| |
| public int getNewContainerId() { |
| return appSchedulingInfo.getNewContainerId(); |
| } |
| |
| public Collection<Priority> getPriorities() { |
| return appSchedulingInfo.getPriorities(); |
| } |
| |
| public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { |
| return appSchedulingInfo.getResourceRequest(priority, nodeAddress); |
| } |
| |
| public synchronized int getTotalRequiredResources(Priority priority) { |
| return getResourceRequest(priority, RMNode.ANY).getNumContainers(); |
| } |
| |
| public Resource getResource(Priority priority) { |
| return appSchedulingInfo.getResource(priority); |
| } |
| |
| /** |
| * Is this application pending? |
| * @return true if it is else false. |
| */ |
| @Override |
| public boolean isPending() { |
| return appSchedulingInfo.isPending(); |
| } |
| |
| public String getQueueName() { |
| return appSchedulingInfo.getQueueName(); |
| } |
| |
| /** |
| * Get the list of live containers |
| * @return All of the live containers |
| */ |
| @Override |
| public synchronized Collection<RMContainer> getLiveContainers() { |
| return new ArrayList<RMContainer>(liveContainers.values()); |
| } |
| |
| public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { |
| // Cleanup all scheduling information |
| appSchedulingInfo.stop(rmAppAttemptFinalState); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public synchronized void containerLaunchedOnNode(ContainerId containerId, |
| NodeId nodeId) { |
| // Inform the container |
| RMContainer rmContainer = |
| getRMContainer(containerId); |
| if (rmContainer == null) { |
| // Some unknown container sneaked into the system. Kill it. |
| rmContext.getDispatcher().getEventHandler() |
| .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); |
| return; |
| } |
| |
| rmContainer.handle(new RMContainerEvent(containerId, |
| RMContainerEventType.LAUNCHED)); |
| } |
| |
| synchronized public void containerCompleted(RMContainer rmContainer, |
| ContainerStatus containerStatus, RMContainerEventType event) { |
| |
| Container container = rmContainer.getContainer(); |
| ContainerId containerId = container.getId(); |
| |
| // Inform the container |
| rmContainer.handle( |
| new RMContainerFinishedEvent( |
| containerId, |
| containerStatus, |
| event) |
| ); |
| LOG.info("Completed container: " + rmContainer.getContainerId() + |
| " in state: " + rmContainer.getState() + " event:" + event); |
| |
| // Remove from the list of containers |
| liveContainers.remove(rmContainer.getContainerId()); |
| |
| RMAuditLogger.logSuccess(getUser(), |
| AuditConstants.RELEASE_CONTAINER, "SchedulerApp", |
| getApplicationId(), containerId); |
| |
| // Update usage metrics |
| Resource containerResource = rmContainer.getContainer().getResource(); |
| queue.getMetrics().releaseResources(getUser(), 1, containerResource); |
| Resources.subtractFrom(currentConsumption, containerResource); |
| } |
| |
| synchronized public List<Container> pullNewlyAllocatedContainers() { |
| List<Container> returnContainerList = new ArrayList<Container>( |
| newlyAllocatedContainers.size()); |
| for (RMContainer rmContainer : newlyAllocatedContainers) { |
| rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), |
| RMContainerEventType.ACQUIRED)); |
| returnContainerList.add(rmContainer.getContainer()); |
| } |
| newlyAllocatedContainers.clear(); |
| return returnContainerList; |
| } |
| |
| public Resource getCurrentConsumption() { |
| return this.currentConsumption; |
| } |
| |
| synchronized public void showRequests() { |
| if (LOG.isDebugEnabled()) { |
| for (Priority priority : getPriorities()) { |
| Map<String, ResourceRequest> requests = getResourceRequests(priority); |
| if (requests != null) { |
| LOG.debug("showRequests:" + " application=" + getApplicationId() + |
| " headRoom=" + getHeadroom() + |
| " currentConsumption=" + currentConsumption.getMemory()); |
| for (ResourceRequest request : requests.values()) { |
| LOG.debug("showRequests:" + " application=" + getApplicationId() |
| + " request=" + request); |
| } |
| } |
| } |
| } |
| } |
| |
| public synchronized RMContainer getRMContainer(ContainerId id) { |
| return liveContainers.get(id); |
| } |
| |
| synchronized public void addSchedulingOpportunity(Priority priority) { |
| schedulingOpportunities.setCount(priority, |
| schedulingOpportunities.count(priority) + 1); |
| } |
| |
| /** |
| * Return the number of times the application has been given an opportunity |
| * to schedule a task at the given priority since the last time it |
| * successfully did so. |
| */ |
| synchronized public int getSchedulingOpportunities(Priority priority) { |
| return schedulingOpportunities.count(priority); |
| } |
| |
| synchronized void resetReReservations(Priority priority) { |
| reReservations.setCount(priority, 0); |
| } |
| |
| synchronized void addReReservation(Priority priority) { |
| reReservations.add(priority); |
| } |
| |
| synchronized public int getReReservations(Priority priority) { |
| return reReservations.count(priority); |
| } |
| |
| public synchronized int getNumReservedContainers(Priority priority) { |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| return (reservedContainers == null) ? 0 : reservedContainers.size(); |
| } |
| |
| /** |
| * Get total current reservations. |
| * Used only by unit tests |
| * @return total current reservations |
| */ |
| @Stable |
| @Private |
| public synchronized Resource getCurrentReservation() { |
| return currentReservation; |
| } |
| |
| public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority, |
| RMContainer rmContainer, Container container) { |
| // Create RMContainer if necessary |
| if (rmContainer == null) { |
| rmContainer = |
| new RMContainerImpl(container, getApplicationAttemptId(), |
| node.getNodeID(), rmContext.getDispatcher().getEventHandler(), |
| rmContext.getContainerAllocationExpirer()); |
| |
| Resources.addTo(currentReservation, container.getResource()); |
| |
| // Reset the re-reservation count |
| resetReReservations(priority); |
| } else { |
| // Note down the re-reservation |
| addReReservation(priority); |
| } |
| rmContainer.handle(new RMContainerReservedEvent(container.getId(), |
| container.getResource(), node.getNodeID(), priority)); |
| |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| if (reservedContainers == null) { |
| reservedContainers = new HashMap<NodeId, RMContainer>(); |
| this.reservedContainers.put(priority, reservedContainers); |
| } |
| reservedContainers.put(node.getNodeID(), rmContainer); |
| |
| LOG.info("Application " + getApplicationId() |
| + " reserved container " + rmContainer |
| + " on node " + node + ", currently has " + reservedContainers.size() |
| + " at priority " + priority |
| + "; currentReservation " + currentReservation.getMemory()); |
| |
| return rmContainer; |
| } |
| |
| public synchronized void unreserve(FSSchedulerNode node, Priority priority) { |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); |
| if (reservedContainers.isEmpty()) { |
| this.reservedContainers.remove(priority); |
| } |
| |
| // Reset the re-reservation count |
| resetReReservations(priority); |
| |
| Resource resource = reservedContainer.getContainer().getResource(); |
| Resources.subtractFrom(currentReservation, resource); |
| |
| LOG.info("Application " + getApplicationId() + " unreserved " + " on node " |
| + node + ", currently has " + reservedContainers.size() + " at priority " |
| + priority + "; currentReservation " + currentReservation); |
| } |
| |
| /** |
| * Has the application reserved the given <code>node</code> at the |
| * given <code>priority</code>? |
| * @param node node to be checked |
| * @param priority priority of reserved container |
| * @return true is reserved, false if not |
| */ |
| public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) { |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| if (reservedContainers != null) { |
| return reservedContainers.containsKey(node.getNodeID()); |
| } |
| return false; |
| } |
| |
| public synchronized float getLocalityWaitFactor( |
| Priority priority, int clusterNodes) { |
| // Estimate: Required unique resources (i.e. hosts + racks) |
| int requiredResources = |
| Math.max(this.getResourceRequests(priority).size() - 1, 0); |
| |
| // waitFactor can't be more than '1' |
| // i.e. no point skipping more than clustersize opportunities |
| return Math.min(((float)requiredResources / clusterNodes), 1.0f); |
| } |
| |
| /** |
| * Get the list of reserved containers |
| * @return All of the reserved containers. |
| */ |
| @Override |
| public synchronized List<RMContainer> getReservedContainers() { |
| List<RMContainer> reservedContainers = new ArrayList<RMContainer>(); |
| for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : |
| this.reservedContainers.entrySet()) { |
| reservedContainers.addAll(e.getValue().values()); |
| } |
| return reservedContainers; |
| } |
| |
| public synchronized void setHeadroom(Resource globalLimit) { |
| this.resourceLimit = globalLimit; |
| } |
| |
| /** |
| * Get available headroom in terms of resources for the application's user. |
| * @return available resource headroom |
| */ |
| public synchronized Resource getHeadroom() { |
| // Corner case to deal with applications being slightly over-limit |
| if (resourceLimit.getMemory() < 0) { |
| resourceLimit.setMemory(0); |
| } |
| |
| return resourceLimit; |
| } |
| |
| public Queue getQueue() { |
| return queue; |
| } |
| |
| /** |
| * Delay scheduling: We often want to prioritize scheduling of node-local |
| * containers over rack-local or off-switch containers. To acheive this |
| * we first only allow node-local assigments for a given prioirty level, |
| * then relax the locality threshold once we've had a long enough period |
| * without succesfully scheduling. We measure both the number of "missed" |
| * scheduling opportunities since the last container was scheduled |
| * at the current allowed level and the time since the last container |
| * was scheduled. Currently we use only the former. |
| */ |
| |
| // Current locality threshold |
| final Map<Priority, NodeType> allowedLocalityLevel = new HashMap< |
| Priority, NodeType>(); |
| |
| // Time of the last container scheduled at the current allowed level |
| Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>(); |
| |
| /** |
| * Should be called when an application has successfully scheduled a container, |
| * or when the scheduling locality threshold is relaxed. |
| * Reset various internal counters which affect delay scheduling |
| * |
| * @param priority The priority of the container scheduled. |
| */ |
| synchronized public void resetSchedulingOpportunities(Priority priority) { |
| lastScheduledContainer.put(priority, System.currentTimeMillis()); |
| schedulingOpportunities.setCount(priority, 0); |
| } |
| |
| /** |
| * Return the level at which we are allowed to schedule containers, given the |
| * current size of the cluster and thresholds indicating how many nodes to |
| * fail at (as a fraction of cluster size) before relaxing scheduling |
| * constraints. |
| */ |
| public synchronized NodeType getAllowedLocalityLevel(Priority priority, |
| int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { |
| // upper limit on threshold |
| if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } |
| if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } |
| |
| // If delay scheduling is not being used, can schedule anywhere |
| if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { |
| return NodeType.OFF_SWITCH; |
| } |
| |
| // Default level is NODE_LOCAL |
| if (!allowedLocalityLevel.containsKey(priority)) { |
| allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); |
| return NodeType.NODE_LOCAL; |
| } |
| |
| NodeType allowed = allowedLocalityLevel.get(priority); |
| |
| // If level is already most liberal, we're done |
| if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; |
| |
| double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : |
| rackLocalityThreshold; |
| |
| // Relax locality constraints once we've surpassed threshold. |
| if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { |
| if (allowed.equals(NodeType.NODE_LOCAL)) { |
| allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); |
| resetSchedulingOpportunities(priority); |
| } |
| else if (allowed.equals(NodeType.RACK_LOCAL)) { |
| allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); |
| resetSchedulingOpportunities(priority); |
| } |
| } |
| return allowedLocalityLevel.get(priority); |
| } |
| |
| |
| synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, |
| Priority priority, ResourceRequest request, |
| Container container) { |
| // Update allowed locality level |
| NodeType allowed = allowedLocalityLevel.get(priority); |
| if (allowed != null) { |
| if (allowed.equals(NodeType.OFF_SWITCH) && |
| (type.equals(NodeType.NODE_LOCAL) || |
| type.equals(NodeType.RACK_LOCAL))) { |
| this.resetAllowedLocalityLevel(priority, type); |
| } |
| else if (allowed.equals(NodeType.RACK_LOCAL) && |
| type.equals(NodeType.NODE_LOCAL)) { |
| this.resetAllowedLocalityLevel(priority, type); |
| } |
| } |
| |
| // Required sanity check - AM can call 'allocate' to update resource |
| // request without locking the scheduler, hence we need to check |
| if (getTotalRequiredResources(priority) <= 0) { |
| return null; |
| } |
| |
| // Create RMContainer |
| RMContainer rmContainer = new RMContainerImpl(container, |
| getApplicationAttemptId(), node.getNodeID(), rmContext |
| .getDispatcher().getEventHandler(), rmContext |
| .getContainerAllocationExpirer()); |
| |
| // Add it to allContainers list. |
| newlyAllocatedContainers.add(rmContainer); |
| liveContainers.put(container.getId(), rmContainer); |
| |
| // Update consumption and track allocations |
| appSchedulingInfo.allocate(type, node, priority, request, container); |
| Resources.addTo(currentConsumption, container.getResource()); |
| |
| // Inform the container |
| rmContainer.handle( |
| new RMContainerEvent(container.getId(), RMContainerEventType.START)); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("allocate: applicationAttemptId=" |
| + container.getId().getApplicationAttemptId() |
| + " container=" + container.getId() + " host=" |
| + container.getNodeId().getHost() + " type=" + type); |
| } |
| RMAuditLogger.logSuccess(getUser(), |
| AuditConstants.ALLOC_CONTAINER, "SchedulerApp", |
| getApplicationId(), container.getId()); |
| |
| return rmContainer; |
| } |
| |
| /** |
| * Should be called when the scheduler assigns a container at a higher |
| * degree of locality than the current threshold. Reset the allowed locality |
| * level to a higher degree of locality. |
| */ |
| public synchronized void resetAllowedLocalityLevel(Priority priority, |
| NodeType level) { |
| NodeType old = allowedLocalityLevel.get(priority); |
| LOG.info("Raising locality level from " + old + " to " + level + " at " + |
| " priority " + priority); |
| allowedLocalityLevel.put(priority, level); |
| } |
| } |