| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.commons.collections.IteratorUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| /** |
| * This class keeps track of all the consumption of an application. This also |
| * keeps track of current running/completed containers for the application. |
| */ |
| @Private |
| @Unstable |
| public class AppSchedulingInfo { |
| |
| private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); |
| |
| private final ApplicationId applicationId; |
| private final ApplicationAttemptId applicationAttemptId; |
| private final AtomicLong containerIdCounter; |
| private final String user; |
| |
| private Queue queue; |
| private ActiveUsersManager activeUsersManager; |
| // whether accepted/allocated by scheduler |
| private volatile boolean pending = true; |
| private ResourceUsage appResourceUsage; |
| |
| private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); |
| // Set of places (nodes / racks) blacklisted by the system. Today, this only |
| // has places blacklisted for AM containers. |
| private final Set<String> placesBlacklistedBySystem = new HashSet<>(); |
| private Set<String> placesBlacklistedByApp = new HashSet<>(); |
| |
| private Set<String> requestedPartitions = new HashSet<>(); |
| |
| private final ConcurrentSkipListMap<SchedulerRequestKey, Integer> |
| schedulerKeys = new ConcurrentSkipListMap<>(); |
| final Map<SchedulerRequestKey, Map<String, ResourceRequest>> |
| resourceRequestMap = new ConcurrentHashMap<>(); |
| final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId, |
| SchedContainerChangeRequest>>> containerIncreaseRequestMap = |
| new ConcurrentHashMap<>(); |
| |
| private final ReentrantReadWriteLock.ReadLock readLock; |
| private final ReentrantReadWriteLock.WriteLock writeLock; |
| |
| public AppSchedulingInfo(ApplicationAttemptId appAttemptId, |
| String user, Queue queue, ActiveUsersManager activeUsersManager, |
| long epoch, ResourceUsage appResourceUsage) { |
| this.applicationAttemptId = appAttemptId; |
| this.applicationId = appAttemptId.getApplicationId(); |
| this.queue = queue; |
| this.user = user; |
| this.activeUsersManager = activeUsersManager; |
| this.containerIdCounter = new AtomicLong( |
| epoch << ResourceManager.EPOCH_BIT_SHIFT); |
| this.appResourceUsage = appResourceUsage; |
| |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| readLock = lock.readLock(); |
| writeLock = lock.writeLock(); |
| } |
| |
| public ApplicationId getApplicationId() { |
| return applicationId; |
| } |
| |
| public ApplicationAttemptId getApplicationAttemptId() { |
| return applicationAttemptId; |
| } |
| |
| public String getUser() { |
| return user; |
| } |
| |
| public long getNewContainerId() { |
| return this.containerIdCounter.incrementAndGet(); |
| } |
| |
| public String getQueueName() { |
| try { |
| this.readLock.lock(); |
| return queue.getQueueName(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public boolean isPending() { |
| return pending; |
| } |
| |
| public Set<String> getRequestedPartitions() { |
| return requestedPartitions; |
| } |
| |
| /** |
| * Clear any pending requests from this application. |
| */ |
| private void clearRequests() { |
| schedulerKeys.clear(); |
| resourceRequestMap.clear(); |
| LOG.info("Application " + applicationId + " requests cleared"); |
| } |
| |
| public boolean hasIncreaseRequest(NodeId nodeId) { |
| try { |
| this.readLock.lock(); |
| Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> |
| requestsOnNode = containerIncreaseRequestMap.get(nodeId); |
| return requestsOnNode == null ? false : requestsOnNode.size() > 0; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public Map<ContainerId, SchedContainerChangeRequest> |
| getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) { |
| try { |
| this.readLock.lock(); |
| Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> |
| requestsOnNode = containerIncreaseRequestMap.get(nodeId); |
| return requestsOnNode == null ? null : requestsOnNode.get( |
| schedulerKey); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| /** |
| * return true if any of the existing increase requests are updated, |
| * false if none of them are updated |
| */ |
| public boolean updateIncreaseRequests( |
| List<SchedContainerChangeRequest> increaseRequests) { |
| boolean resourceUpdated = false; |
| |
| try { |
| this.writeLock.lock(); |
| for (SchedContainerChangeRequest r : increaseRequests) { |
| if (r.getRMContainer().getState() != RMContainerState.RUNNING) { |
| LOG.warn("rmContainer's state is not RUNNING, for increase request" |
| + " with container-id=" + r.getContainerId()); |
| continue; |
| } |
| try { |
| RMServerUtils.checkSchedContainerChangeRequest(r, true); |
| } catch (YarnException e) { |
| LOG.warn("Error happens when checking increase request, Ignoring.." |
| + " exception=", e); |
| continue; |
| } |
| NodeId nodeId = r.getRMContainer().getAllocatedNode(); |
| |
| Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> |
| requestsOnNode = containerIncreaseRequestMap.get(nodeId); |
| if (null == requestsOnNode) { |
| requestsOnNode = new TreeMap<>(); |
| containerIncreaseRequestMap.put(nodeId, requestsOnNode); |
| } |
| |
| SchedContainerChangeRequest prevChangeRequest = |
| getIncreaseRequest(nodeId, |
| r.getRMContainer().getAllocatedSchedulerKey(), |
| r.getContainerId()); |
| if (null != prevChangeRequest) { |
| if (Resources.equals(prevChangeRequest.getTargetCapacity(), |
| r.getTargetCapacity())) { |
| // increase request hasn't changed |
| continue; |
| } |
| |
| // remove the old one, as we will use the new one going forward |
| removeIncreaseRequest(nodeId, |
| prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(), |
| prevChangeRequest.getContainerId()); |
| } |
| |
| if (Resources.equals(r.getTargetCapacity(), |
| r.getRMContainer().getAllocatedResource())) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to increase container " + r.getContainerId() |
| + ", target capacity = previous capacity = " + prevChangeRequest |
| + ". Will ignore this increase request."); |
| } |
| continue; |
| } |
| |
| // add the new one |
| resourceUpdated = true; |
| insertIncreaseRequest(r); |
| } |
| return resourceUpdated; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Insert increase request, adding any missing items in the data-structure |
| * hierarchy. |
| */ |
| private void insertIncreaseRequest(SchedContainerChangeRequest request) { |
| NodeId nodeId = request.getNodeId(); |
| SchedulerRequestKey schedulerKey = |
| request.getRMContainer().getAllocatedSchedulerKey(); |
| ContainerId containerId = request.getContainerId(); |
| |
| Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> |
| requestsOnNode = containerIncreaseRequestMap.get(nodeId); |
| if (null == requestsOnNode) { |
| requestsOnNode = new HashMap<>(); |
| containerIncreaseRequestMap.put(nodeId, requestsOnNode); |
| } |
| |
| Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = |
| requestsOnNode.get(schedulerKey); |
| if (null == requestsOnNodeWithPriority) { |
| requestsOnNodeWithPriority = new TreeMap<>(); |
| requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority); |
| incrementSchedulerKeyReference(schedulerKey); |
| } |
| |
| requestsOnNodeWithPriority.put(containerId, request); |
| |
| // update resources |
| String partition = request.getRMContainer().getNodeLabelExpression(); |
| Resource delta = request.getDeltaCapacity(); |
| appResourceUsage.incPending(partition, delta); |
| queue.incPendingResource(partition, delta); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Added increase request:" + request.getContainerId() |
| + " delta=" + delta); |
| } |
| } |
| |
| private void incrementSchedulerKeyReference( |
| SchedulerRequestKey schedulerKey) { |
| Integer schedulerKeyCount = schedulerKeys.get(schedulerKey); |
| if (schedulerKeyCount == null) { |
| schedulerKeys.put(schedulerKey, 1); |
| } else { |
| schedulerKeys.put(schedulerKey, schedulerKeyCount + 1); |
| } |
| } |
| |
| private void decrementSchedulerKeyReference( |
| SchedulerRequestKey schedulerKey) { |
| Integer schedulerKeyCount = schedulerKeys.get(schedulerKey); |
| if (schedulerKeyCount != null) { |
| if (schedulerKeyCount > 1) { |
| schedulerKeys.put(schedulerKey, schedulerKeyCount - 1); |
| } else { |
| schedulerKeys.remove(schedulerKey); |
| } |
| } |
| } |
| |
| public boolean removeIncreaseRequest(NodeId nodeId, |
| SchedulerRequestKey schedulerKey, ContainerId containerId) { |
| try { |
| this.writeLock.lock(); |
| Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> |
| requestsOnNode = containerIncreaseRequestMap.get(nodeId); |
| if (null == requestsOnNode) { |
| return false; |
| } |
| |
| Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = |
| requestsOnNode.get(schedulerKey); |
| if (null == requestsOnNodeWithPriority) { |
| return false; |
| } |
| |
| SchedContainerChangeRequest request = |
| requestsOnNodeWithPriority.remove(containerId); |
| |
| // remove hierarchies if it becomes empty |
| if (requestsOnNodeWithPriority.isEmpty()) { |
| requestsOnNode.remove(schedulerKey); |
| decrementSchedulerKeyReference(schedulerKey); |
| } |
| if (requestsOnNode.isEmpty()) { |
| containerIncreaseRequestMap.remove(nodeId); |
| } |
| |
| if (request == null) { |
| return false; |
| } |
| |
| // update queue's pending resource if request exists |
| String partition = request.getRMContainer().getNodeLabelExpression(); |
| Resource delta = request.getDeltaCapacity(); |
| appResourceUsage.decPending(partition, delta); |
| queue.decPendingResource(partition, delta); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("remove increase request:" + request); |
| } |
| |
| return true; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, |
| SchedulerRequestKey schedulerKey, ContainerId containerId) { |
| try { |
| this.readLock.lock(); |
| Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> |
| requestsOnNode = containerIncreaseRequestMap.get(nodeId); |
| if (null == requestsOnNode) { |
| return null; |
| } |
| |
| Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = |
| requestsOnNode.get(schedulerKey); |
| return requestsOnNodeWithPriority == null ? null |
| : requestsOnNodeWithPriority.get(containerId); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| /** |
| * The ApplicationMaster is updating resource requirements for the |
| * application, by asking for more resources and releasing resources acquired |
| * by the application. |
| * |
| * @param requests |
| * resources to be acquired |
| * @param recoverPreemptedRequestForAContainer |
| * recover ResourceRequest on preemption |
| * @return true if any resource was updated, false otherwise |
| */ |
| public boolean updateResourceRequests(List<ResourceRequest> requests, |
| boolean recoverPreemptedRequestForAContainer) { |
| // Flag to track if any incoming requests update "ANY" requests |
| boolean anyResourcesUpdated = false; |
| |
| try { |
| this.writeLock.lock(); |
| // Update resource requests |
| for (ResourceRequest request : requests) { |
| SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); |
| String resourceName = request.getResourceName(); |
| |
| // Update node labels if required |
| updateNodeLabels(request); |
| |
| Map<String, ResourceRequest> asks = |
| this.resourceRequestMap.get(schedulerKey); |
| if (asks == null) { |
| asks = new ConcurrentHashMap<>(); |
| this.resourceRequestMap.put(schedulerKey, asks); |
| } |
| |
| // Increment number of containers if recovering preempted resources |
| ResourceRequest lastRequest = asks.get(resourceName); |
| if (recoverPreemptedRequestForAContainer && lastRequest != null) { |
| request.setNumContainers(lastRequest.getNumContainers() + 1); |
| } |
| |
| // Update asks |
| asks.put(resourceName, request); |
| |
| if (resourceName.equals(ResourceRequest.ANY)) { |
| //update the applications requested labels set |
| requestedPartitions.add(request.getNodeLabelExpression() == null |
| ? RMNodeLabelsManager.NO_LABEL : |
| request.getNodeLabelExpression()); |
| |
| anyResourcesUpdated = true; |
| |
| // Update pendingResources |
| updatePendingResources(lastRequest, request, schedulerKey, |
| queue.getMetrics()); |
| } |
| } |
| return anyResourcesUpdated; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| private void updatePendingResources(ResourceRequest lastRequest, |
| ResourceRequest request, SchedulerRequestKey schedulerKey, |
| QueueMetrics metrics) { |
| int lastRequestContainers = |
| (lastRequest != null) ? lastRequest.getNumContainers() : 0; |
| if (request.getNumContainers() <= 0) { |
| if (lastRequestContainers >= 0) { |
| decrementSchedulerKeyReference(schedulerKey); |
| } |
| LOG.info("checking for deactivate of application :" |
| + this.applicationId); |
| checkForDeactivation(); |
| } else { |
| // Activate application. Metrics activation is done here. |
| if (lastRequestContainers <= 0) { |
| incrementSchedulerKeyReference(schedulerKey); |
| activeUsersManager.activateApplication(user, applicationId); |
| } |
| } |
| |
| Resource lastRequestCapability = |
| lastRequest != null ? lastRequest.getCapability() : Resources.none(); |
| metrics.incrPendingResources(user, |
| request.getNumContainers(), request.getCapability()); |
| metrics.decrPendingResources(user, |
| lastRequestContainers, lastRequestCapability); |
| |
| // update queue: |
| Resource increasedResource = |
| Resources.multiply(request.getCapability(), request.getNumContainers()); |
| queue.incPendingResource(request.getNodeLabelExpression(), |
| increasedResource); |
| appResourceUsage.incPending(request.getNodeLabelExpression(), |
| increasedResource); |
| if (lastRequest != null) { |
| Resource decreasedResource = |
| Resources.multiply(lastRequestCapability, lastRequestContainers); |
| queue.decPendingResource(lastRequest.getNodeLabelExpression(), |
| decreasedResource); |
| appResourceUsage.decPending(lastRequest.getNodeLabelExpression(), |
| decreasedResource); |
| } |
| } |
| |
| private void updateNodeLabels(ResourceRequest request) { |
| SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); |
| String resourceName = request.getResourceName(); |
| if (resourceName.equals(ResourceRequest.ANY)) { |
| ResourceRequest previousAnyRequest = |
| getResourceRequest(schedulerKey, resourceName); |
| |
| // When there is change in ANY request label expression, we should |
| // update label for all resource requests already added of same |
| // priority as ANY resource request. |
| if ((null == previousAnyRequest) |
| || hasRequestLabelChanged(previousAnyRequest, request)) { |
| Map<String, ResourceRequest> resourceRequest = |
| getResourceRequests(schedulerKey); |
| if (resourceRequest != null) { |
| for (ResourceRequest r : resourceRequest.values()) { |
| if (!r.getResourceName().equals(ResourceRequest.ANY)) { |
| r.setNodeLabelExpression(request.getNodeLabelExpression()); |
| } |
| } |
| } |
| } |
| } else { |
| ResourceRequest anyRequest = |
| getResourceRequest(schedulerKey, ResourceRequest.ANY); |
| if (anyRequest != null) { |
| request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); |
| } |
| } |
| } |
| |
| private boolean hasRequestLabelChanged(ResourceRequest requestOne, |
| ResourceRequest requestTwo) { |
| String requestOneLabelExp = requestOne.getNodeLabelExpression(); |
| String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); |
| // First request label expression can be null and second request |
| // is not null then we have to consider it as changed. |
| if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { |
| return true; |
| } |
| // If the label is not matching between both request when |
| // requestOneLabelExp is not null. |
| return ((null != requestOneLabelExp) && !(requestOneLabelExp |
| .equals(requestTwoLabelExp))); |
| } |
| |
| /** |
| * The ApplicationMaster is updating the placesBlacklistedByApp used for |
| * containers other than AMs. |
| * |
| * @param blacklistAdditions |
| * resources to be added to the userBlacklist |
| * @param blacklistRemovals |
| * resources to be removed from the userBlacklist |
| */ |
| public void updatePlacesBlacklistedByApp( |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| if (updateBlacklistedPlaces(placesBlacklistedByApp, blacklistAdditions, |
| blacklistRemovals)) { |
| userBlacklistChanged.set(true); |
| } |
| } |
| |
| /** |
| * Update the list of places that are blacklisted by the system. Today the |
| * system only blacklists places when it sees that AMs failed there |
| * |
| * @param blacklistAdditions |
| * resources to be added to placesBlacklistedBySystem |
| * @param blacklistRemovals |
| * resources to be removed from placesBlacklistedBySystem |
| */ |
| public void updatePlacesBlacklistedBySystem( |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| updateBlacklistedPlaces(placesBlacklistedBySystem, blacklistAdditions, |
| blacklistRemovals); |
| } |
| |
| private static boolean updateBlacklistedPlaces(Set<String> blacklist, |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| boolean changed = false; |
| synchronized (blacklist) { |
| if (blacklistAdditions != null) { |
| changed = blacklist.addAll(blacklistAdditions); |
| } |
| |
| if (blacklistRemovals != null) { |
| changed = blacklist.removeAll(blacklistRemovals) || changed; |
| } |
| } |
| return changed; |
| } |
| |
| public boolean getAndResetBlacklistChanged() { |
| return userBlacklistChanged.getAndSet(false); |
| } |
| |
| public Collection<SchedulerRequestKey> getSchedulerKeys() { |
| return schedulerKeys.keySet(); |
| } |
| |
| public Map<String, ResourceRequest> getResourceRequests( |
| SchedulerRequestKey schedulerKey) { |
| return resourceRequestMap.get(schedulerKey); |
| } |
| |
| public List<ResourceRequest> getAllResourceRequests() { |
| List<ResourceRequest> ret = new ArrayList<>(); |
| try { |
| this.readLock.lock(); |
| for (Map<String, ResourceRequest> r : resourceRequestMap.values()) { |
| ret.addAll(r.values()); |
| } |
| } finally { |
| this.readLock.unlock(); |
| } |
| return ret; |
| } |
| |
| public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey, |
| String resourceName) { |
| try { |
| this.readLock.lock(); |
| Map<String, ResourceRequest> nodeRequests = |
| resourceRequestMap.get(schedulerKey); |
| return (nodeRequests == null) ? null : nodeRequests.get(resourceName); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public Resource getResource(SchedulerRequestKey schedulerKey) { |
| try { |
| this.readLock.lock(); |
| ResourceRequest request = |
| getResourceRequest(schedulerKey, ResourceRequest.ANY); |
| return (request == null) ? null : request.getCapability(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Method to return the next resource request to be serviced. |
| * |
| * In the initial implementation, we just pick any {@link ResourceRequest} |
| * corresponding to the highest priority. |
| * |
| * @return next {@link ResourceRequest} to allocate resources for. |
| */ |
| @Unstable |
| public synchronized ResourceRequest getNextResourceRequest() { |
| for (ResourceRequest rr: |
| resourceRequestMap.get(schedulerKeys.firstKey()).values()) { |
| return rr; |
| } |
| return null; |
| } |
| |
| /** |
| * Returns if the place (node/rack today) is either blacklisted by the |
| * application (user) or the system |
| * |
| * @param resourceName |
| * the resourcename |
| * @param blacklistedBySystem |
| * true if it should check amBlacklist |
| * @return true if its blacklisted |
| */ |
| public boolean isPlaceBlacklisted(String resourceName, |
| boolean blacklistedBySystem) { |
| if (blacklistedBySystem){ |
| synchronized (placesBlacklistedBySystem) { |
| return placesBlacklistedBySystem.contains(resourceName); |
| } |
| } else { |
| synchronized (placesBlacklistedByApp) { |
| return placesBlacklistedByApp.contains(resourceName); |
| } |
| } |
| } |
| |
| public void increaseContainer(SchedContainerChangeRequest increaseRequest) { |
| NodeId nodeId = increaseRequest.getNodeId(); |
| SchedulerRequestKey schedulerKey = |
| increaseRequest.getRMContainer().getAllocatedSchedulerKey(); |
| ContainerId containerId = increaseRequest.getContainerId(); |
| Resource deltaCapacity = increaseRequest.getDeltaCapacity(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("allocated increase request : applicationId=" + applicationId |
| + " container=" + containerId + " host=" |
| + increaseRequest.getNodeId() + " user=" + user + " resource=" |
| + deltaCapacity); |
| } |
| try { |
| this.writeLock.lock(); |
| // Set queue metrics |
| queue.getMetrics().allocateResources(user, deltaCapacity); |
| // remove the increase request from pending increase request map |
| removeIncreaseRequest(nodeId, schedulerKey, containerId); |
| // update usage |
| appResourceUsage.incUsed(increaseRequest.getNodePartition(), |
| deltaCapacity); |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) { |
| // Delta is negative when it's a decrease request |
| Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Decrease container : applicationId=" + applicationId |
| + " container=" + decreaseRequest.getContainerId() + " host=" |
| + decreaseRequest.getNodeId() + " user=" + user + " resource=" |
| + absDelta); |
| } |
| |
| try { |
| this.writeLock.lock(); |
| // Set queue metrics |
| queue.getMetrics().releaseResources(user, absDelta); |
| |
| // update usage |
| appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public List<ResourceRequest> allocate(NodeType type, |
| SchedulerNode node, SchedulerRequestKey schedulerKey, |
| Container containerAllocated) { |
| try { |
| writeLock.lock(); |
| ResourceRequest request; |
| if (type == NodeType.NODE_LOCAL) { |
| request = resourceRequestMap.get(schedulerKey).get(node.getNodeName()); |
| } else if (type == NodeType.RACK_LOCAL) { |
| request = resourceRequestMap.get(schedulerKey).get(node.getRackName()); |
| } else{ |
| request = resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY); |
| } |
| return allocate(type, node, schedulerKey, request, containerAllocated); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Resources have been allocated to this application by the resource |
| * scheduler. Track them. |
| * @param type Node Type |
| * @param node SchedulerNode |
| * @param schedulerKey SchedulerRequestKey |
| * @param request ResourceRequest |
| * @param containerAllocated Container Allocated |
| * @return List of ResourceRequests |
| */ |
| public List<ResourceRequest> allocate(NodeType type, |
| SchedulerNode node, SchedulerRequestKey schedulerKey, |
| ResourceRequest request, Container containerAllocated) { |
| try { |
| writeLock.lock(); |
| List<ResourceRequest> resourceRequests = new ArrayList<>(); |
| if (type == NodeType.NODE_LOCAL) { |
| allocateNodeLocal(node, schedulerKey, request, resourceRequests); |
| } else if (type == NodeType.RACK_LOCAL) { |
| allocateRackLocal(node, schedulerKey, request, resourceRequests); |
| } else{ |
| allocateOffSwitch(request, resourceRequests, schedulerKey); |
| } |
| |
| if (null != containerAllocated) { |
| updateMetricsForAllocatedContainer(request, type, containerAllocated); |
| } |
| return resourceRequests; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * The {@link ResourceScheduler} is allocating data-local resources to the |
| * application. |
| */ |
| private void allocateNodeLocal(SchedulerNode node, |
| SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest, |
| List<ResourceRequest> resourceRequests) { |
| // Update future requirements |
| decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest); |
| |
| ResourceRequest rackLocalRequest = resourceRequestMap.get(schedulerKey).get( |
| node.getRackName()); |
| decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest); |
| |
| ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( |
| ResourceRequest.ANY); |
| decrementOutstanding(offRackRequest, schedulerKey); |
| |
| // Update cloned NodeLocal, RackLocal and OffRack requests for recovery |
| resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); |
| resourceRequests.add(cloneResourceRequest(rackLocalRequest)); |
| resourceRequests.add(cloneResourceRequest(offRackRequest)); |
| } |
| |
| private void decResourceRequest(String resourceName, |
| SchedulerRequestKey schedulerKey, ResourceRequest request) { |
| request.setNumContainers(request.getNumContainers() - 1); |
| if (request.getNumContainers() == 0) { |
| resourceRequestMap.get(schedulerKey).remove(resourceName); |
| } |
| } |
| |
| /** |
| * The {@link ResourceScheduler} is allocating data-local resources to the |
| * application. |
| */ |
| private void allocateRackLocal(SchedulerNode node, |
| SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest, |
| List<ResourceRequest> resourceRequests) { |
| // Update future requirements |
| decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest); |
| |
| ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( |
| ResourceRequest.ANY); |
| decrementOutstanding(offRackRequest, schedulerKey); |
| |
| // Update cloned RackLocal and OffRack requests for recovery |
| resourceRequests.add(cloneResourceRequest(rackLocalRequest)); |
| resourceRequests.add(cloneResourceRequest(offRackRequest)); |
| } |
| |
| /** |
| * The {@link ResourceScheduler} is allocating data-local resources to the |
| * application. |
| */ |
| private void allocateOffSwitch(ResourceRequest offSwitchRequest, |
| List<ResourceRequest> resourceRequests, |
| SchedulerRequestKey schedulerKey) { |
| // Update future requirements |
| decrementOutstanding(offSwitchRequest, schedulerKey); |
| // Update cloned OffRack requests for recovery |
| resourceRequests.add(cloneResourceRequest(offSwitchRequest)); |
| } |
| |
| private void decrementOutstanding(ResourceRequest offSwitchRequest, |
| SchedulerRequestKey schedulerKey) { |
| int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; |
| |
| // Do not remove ANY |
| offSwitchRequest.setNumContainers(numOffSwitchContainers); |
| |
| // Do we have any outstanding requests? |
| // If there is nothing, we need to deactivate this application |
| if (numOffSwitchContainers == 0) { |
| decrementSchedulerKeyReference(schedulerKey); |
| checkForDeactivation(); |
| } |
| |
| appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(), |
| offSwitchRequest.getCapability()); |
| queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(), |
| offSwitchRequest.getCapability()); |
| } |
| |
| private void checkForDeactivation() { |
| if (schedulerKeys.isEmpty()) { |
| activeUsersManager.deactivateApplication(user, applicationId); |
| } |
| } |
| |
| public void move(Queue newQueue) { |
| try { |
| this.writeLock.lock(); |
| QueueMetrics oldMetrics = queue.getMetrics(); |
| QueueMetrics newMetrics = newQueue.getMetrics(); |
| for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { |
| ResourceRequest request = asks.get(ResourceRequest.ANY); |
| if (request != null) { |
| oldMetrics.decrPendingResources(user, request.getNumContainers(), |
| request.getCapability()); |
| newMetrics.incrPendingResources(user, request.getNumContainers(), |
| request.getCapability()); |
| |
| Resource delta = Resources.multiply(request.getCapability(), |
| request.getNumContainers()); |
| // Update Queue |
| queue.decPendingResource(request.getNodeLabelExpression(), delta); |
| newQueue.incPendingResource(request.getNodeLabelExpression(), delta); |
| } |
| } |
| oldMetrics.moveAppFrom(this); |
| newMetrics.moveAppTo(this); |
| activeUsersManager.deactivateApplication(user, applicationId); |
| activeUsersManager = newQueue.getActiveUsersManager(); |
| activeUsersManager.activateApplication(user, applicationId); |
| this.queue = newQueue; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public void stop() { |
| // clear pending resources metrics for the application |
| try { |
| this.writeLock.lock(); |
| QueueMetrics metrics = queue.getMetrics(); |
| for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { |
| ResourceRequest request = asks.get(ResourceRequest.ANY); |
| if (request != null) { |
| metrics.decrPendingResources(user, request.getNumContainers(), |
| request.getCapability()); |
| |
| // Update Queue |
| queue.decPendingResource( |
| request.getNodeLabelExpression(), |
| Resources.multiply(request.getCapability(), |
| request.getNumContainers())); |
| } |
| } |
| metrics.finishAppAttempt(applicationId, pending, user); |
| |
| // Clear requests themselves |
| clearRequests(); |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public void setQueue(Queue queue) { |
| try { |
| this.writeLock.lock(); |
| this.queue = queue; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| private Set<String> getBlackList() { |
| return this.placesBlacklistedByApp; |
| } |
| |
| public Set<String> getBlackListCopy() { |
| synchronized (placesBlacklistedByApp) { |
| return new HashSet<>(this.placesBlacklistedByApp); |
| } |
| } |
| |
| public void transferStateFromPreviousAppSchedulingInfo( |
| AppSchedulingInfo appInfo) { |
| // This should not require locking the placesBlacklistedByApp since it will |
| // not be used by this instance until after setCurrentAppAttempt. |
| this.placesBlacklistedByApp = appInfo.getBlackList(); |
| } |
| |
| public void recoverContainer(RMContainer rmContainer) { |
| try { |
| this.writeLock.lock(); |
| QueueMetrics metrics = queue.getMetrics(); |
| if (pending) { |
| // If there was any container to recover, the application was |
| // running from scheduler's POV. |
| pending = false; |
| metrics.runAppAttempt(applicationId, user); |
| } |
| |
| // Container is completed. Skip recovering resources. |
| if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { |
| return; |
| } |
| |
| metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), |
| false); |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public ResourceRequest cloneResourceRequest(ResourceRequest request) { |
| ResourceRequest newRequest = ResourceRequest.newBuilder() |
| .priority(request.getPriority()) |
| .resourceName(request.getResourceName()) |
| .capability(request.getCapability()) |
| .numContainers(1) |
| .relaxLocality(request.getRelaxLocality()) |
| .nodeLabelExpression(request.getNodeLabelExpression()).build(); |
| return newRequest; |
| } |
| |
| /* |
| * In async environment, pending resource request could be updated during |
| * scheduling, this method checks pending request before allocating |
| */ |
| public boolean checkAllocation(NodeType type, SchedulerNode node, |
| SchedulerRequestKey schedulerKey) { |
| try { |
| readLock.lock(); |
| ResourceRequest r = resourceRequestMap.get(schedulerKey).get( |
| ResourceRequest.ANY); |
| if (r == null || r.getNumContainers() <= 0) { |
| return false; |
| } |
| if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { |
| r = resourceRequestMap.get(schedulerKey).get(node.getRackName()); |
| if (r == null || r.getNumContainers() <= 0) { |
| return false; |
| } |
| if (type == NodeType.NODE_LOCAL) { |
| r = resourceRequestMap.get(schedulerKey).get(node.getNodeName()); |
| if (r == null || r.getNumContainers() <= 0) { |
| return false; |
| } |
| } |
| } |
| |
| return true; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void updateMetricsForAllocatedContainer( |
| ResourceRequest request, NodeType type, Container containerAllocated) { |
| try { |
| writeLock.lock(); |
| QueueMetrics metrics = queue.getMetrics(); |
| if (pending) { |
| // once an allocation is done we assume the application is |
| // running from scheduler's POV. |
| pending = false; |
| metrics.runAppAttempt(applicationId, user); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("allocate: applicationId=" + applicationId + " container=" |
| + containerAllocated.getId() + " host=" + containerAllocated |
| .getNodeId().toString() + " user=" + user + " resource=" + request |
| .getCapability() + " type=" + type); |
| } |
| metrics.allocateResources(user, 1, request.getCapability(), true); |
| metrics.incrNodeTypeAggregations(user, type); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| // Get placement-set by specified schedulerKey |
| // Now simply return all node of the input clusterPlacementSet |
| // TODO, need update this when we support global scheduling |
| public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet( |
| SchedulerRequestKey schedulerkey) { |
| return new SchedulingPlacementSet<N>() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Iterator<N> getPreferredNodeIterator( |
| PlacementSet<N> clusterPlacementSet) { |
| return IteratorUtils.singletonIterator( |
| clusterPlacementSet.getAllNodes().values().iterator().next()); |
| } |
| |
| @Override |
| public ResourceRequestUpdateResult updateResourceRequests( |
| List<ResourceRequest> requests, |
| boolean recoverPreemptedRequestForAContainer) { |
| return null; |
| } |
| |
| @Override |
| public Map<String, ResourceRequest> getResourceRequests() { |
| return null; |
| } |
| |
| @Override |
| public ResourceRequest getResourceRequest(String resourceName, |
| SchedulerRequestKey requestKey) { |
| return null; |
| } |
| |
| @Override |
| public List<ResourceRequest> allocate(NodeType type, SchedulerNode node, |
| ResourceRequest request) { |
| return null; |
| } |
| |
| @Override |
| public Map<NodeId, N> getAllNodes() { |
| return null; |
| } |
| |
| @Override |
| public long getVersion() { |
| return 0; |
| } |
| |
| @Override |
| public String getPartition() { |
| return null; |
| } |
| }; |
| } |
| } |