| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.RejectionReason; |
| import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; |
| import org.apache.hadoop.yarn.api.records.SchedulingRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| /** |
| * This class keeps track of all the consumption of an application. This also |
| * keeps track of current running/completed containers for the application. |
| */ |
| @Private |
| @Unstable |
| public class AppSchedulingInfo { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(AppSchedulingInfo.class); |
| |
| private final ApplicationId applicationId; |
| private final ApplicationAttemptId applicationAttemptId; |
| private final AtomicLong containerIdCounter; |
| private final String user; |
| |
| private Queue queue; |
| private AbstractUsersManager abstractUsersManager; |
| // whether accepted/allocated by scheduler |
| private volatile boolean pending = true; |
| private ResourceUsage appResourceUsage; |
| |
| private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); |
| // Set of places (nodes / racks) blacklisted by the system. Today, this only |
| // has places blacklisted for AM containers. |
| private final Set<String> placesBlacklistedBySystem = new HashSet<>(); |
| private Set<String> placesBlacklistedByApp = new HashSet<>(); |
| |
| private Set<String> requestedPartitions = new HashSet<>(); |
| |
| private final ConcurrentSkipListSet<SchedulerRequestKey> |
| schedulerKeys = new ConcurrentSkipListSet<>(); |
| private final Map<SchedulerRequestKey, AppPlacementAllocator<SchedulerNode>> |
| schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>(); |
| |
| private final ReentrantReadWriteLock.ReadLock readLock; |
| private final ReentrantReadWriteLock.WriteLock writeLock; |
| |
| public final ContainerUpdateContext updateContext; |
| private final Map<String, String> applicationSchedulingEnvs = new HashMap<>(); |
| private final RMContext rmContext; |
| private final int retryAttempts; |
| |
| public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, |
| Queue queue, AbstractUsersManager abstractUsersManager, long epoch, |
| ResourceUsage appResourceUsage, |
| Map<String, String> applicationSchedulingEnvs, RMContext rmContext) { |
| this.applicationAttemptId = appAttemptId; |
| this.applicationId = appAttemptId.getApplicationId(); |
| this.queue = queue; |
| this.user = user; |
| this.abstractUsersManager = abstractUsersManager; |
| this.containerIdCounter = new AtomicLong( |
| epoch << ResourceManager.EPOCH_BIT_SHIFT); |
| this.appResourceUsage = appResourceUsage; |
| this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs); |
| this.rmContext = rmContext; |
| this.retryAttempts = rmContext.getYarnConfiguration().getInt( |
| YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS); |
| |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| updateContext = new ContainerUpdateContext(this); |
| readLock = lock.readLock(); |
| writeLock = lock.writeLock(); |
| } |
| |
| public ApplicationId getApplicationId() { |
| return applicationId; |
| } |
| |
| public ApplicationAttemptId getApplicationAttemptId() { |
| return applicationAttemptId; |
| } |
| |
| public String getUser() { |
| return user; |
| } |
| |
| public long getNewContainerId() { |
| return this.containerIdCounter.incrementAndGet(); |
| } |
| |
| public String getQueueName() { |
| this.readLock.lock(); |
| try { |
| return queue.getQueueName(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public boolean isPending() { |
| return pending; |
| } |
| |
| public Set<String> getRequestedPartitions() { |
| return requestedPartitions; |
| } |
| |
| /** |
| * Clear any pending requests from this application. |
| */ |
| private void clearRequests() { |
| schedulerKeys.clear(); |
| schedulerKeyToAppPlacementAllocator.clear(); |
| LOG.info("Application " + applicationId + " requests cleared"); |
| } |
| |
| public ContainerUpdateContext getUpdateContext() { |
| return updateContext; |
| } |
| |
| /** |
| * The ApplicationMaster is updating resource requirements for the |
| * application, by asking for more resources and releasing resources acquired |
| * by the application. |
| * |
| * @param resourceRequests resource requests to be allocated |
| * @param recoverPreemptedRequestForAContainer |
| * recover ResourceRequest/SchedulingRequest on preemption |
| * @return true if any resource was updated, false otherwise |
| */ |
| public boolean updateResourceRequests(List<ResourceRequest> resourceRequests, |
| boolean recoverPreemptedRequestForAContainer) { |
| // Flag to track if any incoming requests update "ANY" requests |
| boolean offswitchResourcesUpdated; |
| |
| writeLock.lock(); |
| try { |
| // Update AppPlacementAllocator by requests |
| offswitchResourcesUpdated = internalAddResourceRequests( |
| recoverPreemptedRequestForAContainer, resourceRequests); |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| return offswitchResourcesUpdated; |
| } |
| |
| /** |
| * The ApplicationMaster is updating resource requirements for the |
| * application, by asking for more resources and releasing resources acquired |
| * by the application. |
| * |
| * @param dedupRequests (dedup) resource requests to be allocated |
| * @param recoverPreemptedRequestForAContainer |
| * recover ResourceRequest/SchedulingRequest on preemption |
| * @return true if any resource was updated, false otherwise |
| */ |
| public boolean updateResourceRequests( |
| Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests, |
| boolean recoverPreemptedRequestForAContainer) { |
| // Flag to track if any incoming requests update "ANY" requests |
| boolean offswitchResourcesUpdated; |
| |
| writeLock.lock(); |
| try { |
| // Update AppPlacementAllocator by requests |
| offswitchResourcesUpdated = internalAddResourceRequests( |
| recoverPreemptedRequestForAContainer, dedupRequests); |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| return offswitchResourcesUpdated; |
| } |
| |
| /** |
| * The ApplicationMaster is updating resource requirements for the |
| * application, by asking for more resources and releasing resources acquired |
| * by the application. |
| * |
| * @param schedulingRequests resource requests to be allocated |
| * @param recoverPreemptedRequestForAContainer |
| * recover ResourceRequest/SchedulingRequest on preemption |
| * @return true if any resource was updated, false otherwise |
| */ |
| public boolean updateSchedulingRequests( |
| List<SchedulingRequest> schedulingRequests, |
| boolean recoverPreemptedRequestForAContainer) { |
| // Flag to track if any incoming requests update "ANY" requests |
| boolean offswitchResourcesUpdated; |
| |
| writeLock.lock(); |
| try { |
| // Update AppPlacementAllocator by requests |
| offswitchResourcesUpdated = addSchedulingRequests( |
| recoverPreemptedRequestForAContainer, schedulingRequests); |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| return offswitchResourcesUpdated; |
| } |
| |
| public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { |
| schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); |
| } |
| |
| private boolean addSchedulingRequests( |
| boolean recoverPreemptedRequestForAContainer, |
| List<SchedulingRequest> schedulingRequests) { |
| // Do we need to update pending resource for app/queue, etc.? |
| boolean requireUpdatePendingResource = false; |
| |
| for (SchedulingRequest request : schedulingRequests) { |
| SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create( |
| request); |
| |
| AppPlacementAllocator appPlacementAllocator = |
| getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey, |
| SingleConstraintAppPlacementAllocator.class.getCanonicalName()); |
| |
| // Update AppPlacementAllocator |
| PendingAskUpdateResult pendingAmountChanges = |
| appPlacementAllocator.updatePendingAsk(schedulerRequestKey, |
| request, recoverPreemptedRequestForAContainer); |
| |
| if (null != pendingAmountChanges) { |
| updatePendingResources(pendingAmountChanges, schedulerRequestKey, |
| queue.getMetrics()); |
| requireUpdatePendingResource = true; |
| } |
| } |
| |
| return requireUpdatePendingResource; |
| } |
| |
| /** |
| * Get and insert AppPlacementAllocator if it doesn't exist, this should be |
| * protected by write lock. |
| * @param schedulerRequestKey schedulerRequestKey |
| * @param placementTypeClass placementTypeClass |
| * @return AppPlacementAllocator |
| */ |
| private AppPlacementAllocator<SchedulerNode> getAndAddAppPlacementAllocatorIfNotExist( |
| SchedulerRequestKey schedulerRequestKey, String placementTypeClass) { |
| AppPlacementAllocator<SchedulerNode> appPlacementAllocator; |
| if ((appPlacementAllocator = schedulerKeyToAppPlacementAllocator.get( |
| schedulerRequestKey)) == null) { |
| appPlacementAllocator = |
| ApplicationPlacementAllocatorFactory.getAppPlacementAllocator( |
| placementTypeClass, this, schedulerRequestKey, rmContext); |
| schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, |
| appPlacementAllocator); |
| } |
| return appPlacementAllocator; |
| } |
| |
| private boolean internalAddResourceRequests( |
| boolean recoverPreemptedRequestForAContainer, |
| Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) { |
| boolean offswitchResourcesUpdated = false; |
| for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : |
| dedupRequests.entrySet()) { |
| SchedulerRequestKey schedulerRequestKey = entry.getKey(); |
| AppPlacementAllocator<SchedulerNode> appPlacementAllocator = |
| getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey, |
| applicationSchedulingEnvs.get( |
| ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS)); |
| |
| // Update AppPlacementAllocator |
| PendingAskUpdateResult pendingAmountChanges = |
| appPlacementAllocator.updatePendingAsk(entry.getValue().values(), |
| recoverPreemptedRequestForAContainer); |
| |
| if (null != pendingAmountChanges) { |
| updatePendingResources(pendingAmountChanges, schedulerRequestKey, |
| queue.getMetrics()); |
| offswitchResourcesUpdated = true; |
| } |
| } |
| return offswitchResourcesUpdated; |
| } |
| |
| private boolean internalAddResourceRequests(boolean recoverPreemptedRequestForAContainer, |
| List<ResourceRequest> resourceRequests) { |
| if (null == resourceRequests || resourceRequests.isEmpty()) { |
| return false; |
| } |
| |
| // A map to group resource requests and dedup |
| Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests = |
| new HashMap<>(); |
| |
| // Group resource request by schedulerRequestKey and resourceName |
| for (ResourceRequest request : resourceRequests) { |
| SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); |
| if (!dedupRequests.containsKey(schedulerKey)) { |
| dedupRequests.put(schedulerKey, new HashMap<>()); |
| } |
| dedupRequests.get(schedulerKey).put(request.getResourceName(), request); |
| } |
| |
| return internalAddResourceRequests(recoverPreemptedRequestForAContainer, |
| dedupRequests); |
| } |
| |
| private void updatePendingResources(PendingAskUpdateResult updateResult, |
| SchedulerRequestKey schedulerKey, QueueMetrics metrics) { |
| |
| PendingAsk lastPendingAsk = updateResult.getLastPendingAsk(); |
| PendingAsk newPendingAsk = updateResult.getNewPendingAsk(); |
| String lastNodePartition = updateResult.getLastNodePartition(); |
| String newNodePartition = updateResult.getNewNodePartition(); |
| |
| int lastRequestContainers = |
| (lastPendingAsk != null) ? lastPendingAsk.getCount() : 0; |
| if (newPendingAsk.getCount() <= 0) { |
| if (lastRequestContainers >= 0) { |
| schedulerKeys.remove(schedulerKey); |
| schedulerKeyToAppPlacementAllocator.remove(schedulerKey); |
| } |
| LOG.info("checking for deactivate of application :" |
| + this.applicationId); |
| checkForDeactivation(); |
| } else { |
| // Activate application. Metrics activation is done here. |
| if (lastRequestContainers <= 0) { |
| schedulerKeys.add(schedulerKey); |
| abstractUsersManager.activateApplication(user, applicationId); |
| } |
| } |
| |
| if (lastPendingAsk != null) { |
| // Deduct resources from metrics / pending resources of queue/app. |
| metrics.decrPendingResources(lastNodePartition, user, |
| lastPendingAsk.getCount(), lastPendingAsk.getPerAllocationResource()); |
| Resource decreasedResource = Resources.multiply( |
| lastPendingAsk.getPerAllocationResource(), lastRequestContainers); |
| queue.decPendingResource(lastNodePartition, decreasedResource); |
| appResourceUsage.decPending(lastNodePartition, decreasedResource); |
| } |
| |
| // Increase resources to metrics / pending resources of queue/app. |
| metrics.incrPendingResources(newNodePartition, user, |
| newPendingAsk.getCount(), newPendingAsk.getPerAllocationResource()); |
| Resource increasedResource = Resources.multiply( |
| newPendingAsk.getPerAllocationResource(), newPendingAsk.getCount()); |
| queue.incPendingResource(newNodePartition, increasedResource); |
| appResourceUsage.incPending(newNodePartition, increasedResource); |
| } |
| |
| public void addRequestedPartition(String partition) { |
| requestedPartitions.add(partition); |
| } |
| |
| public void decPendingResource(String partition, Resource toDecrease) { |
| queue.decPendingResource(partition, toDecrease); |
| appResourceUsage.decPending(partition, toDecrease); |
| } |
| |
| /** |
| * The ApplicationMaster is updating the placesBlacklistedByApp used for |
| * containers other than AMs. |
| * |
| * @param blacklistAdditions |
| * resources to be added to the userBlacklist |
| * @param blacklistRemovals |
| * resources to be removed from the userBlacklist |
| */ |
| public void updatePlacesBlacklistedByApp( |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| if (updateBlacklistedPlaces(placesBlacklistedByApp, blacklistAdditions, |
| blacklistRemovals)) { |
| userBlacklistChanged.set(true); |
| } |
| } |
| |
| /** |
| * Update the list of places that are blacklisted by the system. Today the |
| * system only blacklists places when it sees that AMs failed there |
| * |
| * @param blacklistAdditions |
| * resources to be added to placesBlacklistedBySystem |
| * @param blacklistRemovals |
| * resources to be removed from placesBlacklistedBySystem |
| */ |
| public void updatePlacesBlacklistedBySystem( |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| updateBlacklistedPlaces(placesBlacklistedBySystem, blacklistAdditions, |
| blacklistRemovals); |
| } |
| |
| private static boolean updateBlacklistedPlaces(Set<String> blacklist, |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| boolean changed = false; |
| synchronized (blacklist) { |
| if (blacklistAdditions != null) { |
| changed = blacklist.addAll(blacklistAdditions); |
| } |
| |
| if (blacklistRemovals != null) { |
| changed = blacklist.removeAll(blacklistRemovals) || changed; |
| } |
| } |
| return changed; |
| } |
| |
| public boolean getAndResetBlacklistChanged() { |
| return userBlacklistChanged.getAndSet(false); |
| } |
| |
| public Collection<SchedulerRequestKey> getSchedulerKeys() { |
| return schedulerKeys; |
| } |
| |
| /** |
| * Used by REST API to fetch ResourceRequest |
| * @return All pending ResourceRequests. |
| */ |
| public List<ResourceRequest> getAllResourceRequests() { |
| List<ResourceRequest> ret = new ArrayList<>(); |
| this.readLock.lock(); |
| try { |
| for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator |
| .values()) { |
| ret.addAll(ap.getResourceRequests().values()); |
| } |
| } finally { |
| this.readLock.unlock(); |
| } |
| return ret; |
| } |
| |
| /** |
| * Fetch SchedulingRequests. |
| * @return All pending SchedulingRequests. |
| */ |
| public List<SchedulingRequest> getAllSchedulingRequests() { |
| List<SchedulingRequest> ret = new ArrayList<>(); |
| this.readLock.lock(); |
| try { |
| schedulerKeyToAppPlacementAllocator.values().stream() |
| .filter(ap -> ap.getSchedulingRequest() != null) |
| .forEach(ap -> ret.add(ap.getSchedulingRequest())); |
| } finally { |
| this.readLock.unlock(); |
| } |
| return ret; |
| } |
| |
| public List<RejectedSchedulingRequest> getRejectedRequest() { |
| this.readLock.lock(); |
| try { |
| return schedulerKeyToAppPlacementAllocator.values().stream() |
| .filter(ap -> ap.getPlacementAttempt() >= retryAttempts) |
| .map(ap -> RejectedSchedulingRequest.newInstance( |
| RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, |
| ap.getSchedulingRequest())) |
| .collect(Collectors.toList()); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public PendingAsk getNextPendingAsk() { |
| readLock.lock(); |
| try { |
| if (!schedulerKeys.isEmpty()) { |
| SchedulerRequestKey firstRequestKey = schedulerKeys.first(); |
| return getPendingAsk(firstRequestKey, ResourceRequest.ANY); |
| } else { |
| return null; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) { |
| return getPendingAsk(schedulerKey, ResourceRequest.ANY); |
| } |
| |
| public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey, |
| String resourceName) { |
| this.readLock.lock(); |
| try { |
| AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( |
| schedulerKey); |
| return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Returns if the place (node/rack today) is either blacklisted by the |
| * application (user) or the system. |
| * |
| * @param resourceName |
| * the resourcename |
| * @param blacklistedBySystem |
| * true if it should check amBlacklist |
| * @return true if its blacklisted |
| */ |
| public boolean isPlaceBlacklisted(String resourceName, |
| boolean blacklistedBySystem) { |
| if (blacklistedBySystem){ |
| synchronized (placesBlacklistedBySystem) { |
| return placesBlacklistedBySystem.contains(resourceName); |
| } |
| } else { |
| synchronized (placesBlacklistedByApp) { |
| return placesBlacklistedByApp.contains(resourceName); |
| } |
| } |
| } |
| |
| public ContainerRequest allocate(NodeType type, |
| SchedulerNode node, SchedulerRequestKey schedulerKey, |
| RMContainer containerAllocated) { |
| writeLock.lock(); |
| try { |
| if (null != containerAllocated) { |
| updateMetricsForAllocatedContainer(type, node, containerAllocated); |
| } |
| |
| return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate( |
| schedulerKey, type, node); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public void checkForDeactivation() { |
| if (schedulerKeys.isEmpty()) { |
| abstractUsersManager.deactivateApplication(user, applicationId); |
| } |
| } |
| |
| public void move(Queue newQueue) { |
| this.writeLock.lock(); |
| try { |
| QueueMetrics oldMetrics = queue.getMetrics(); |
| QueueMetrics newMetrics = newQueue.getMetrics(); |
| for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator |
| .values()) { |
| PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); |
| if (ask.getCount() > 0) { |
| oldMetrics.decrPendingResources( |
| ap.getPrimaryRequestedNodePartition(), |
| user, ask.getCount(), ask.getPerAllocationResource()); |
| newMetrics.incrPendingResources( |
| ap.getPrimaryRequestedNodePartition(), |
| user, ask.getCount(), ask.getPerAllocationResource()); |
| |
| Resource delta = Resources.multiply(ask.getPerAllocationResource(), |
| ask.getCount()); |
| // Update Queue |
| queue.decPendingResource( |
| ap.getPrimaryRequestedNodePartition(), delta); |
| newQueue.incPendingResource( |
| ap.getPrimaryRequestedNodePartition(), delta); |
| } |
| } |
| oldMetrics.moveAppFrom(this); |
| newMetrics.moveAppTo(this); |
| abstractUsersManager.deactivateApplication(user, applicationId); |
| abstractUsersManager = newQueue.getAbstractUsersManager(); |
| if (!schedulerKeys.isEmpty()) { |
| abstractUsersManager.activateApplication(user, applicationId); |
| } |
| this.queue = newQueue; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public void stop() { |
| // clear pending resources metrics for the application |
| this.writeLock.lock(); |
| try { |
| QueueMetrics metrics = queue.getMetrics(); |
| for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator |
| .values()) { |
| PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); |
| if (ask.getCount() > 0) { |
| metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(), |
| user, ask.getCount(), ask.getPerAllocationResource()); |
| |
| // Update Queue |
| queue.decPendingResource( |
| ap.getPrimaryRequestedNodePartition(), |
| Resources.multiply(ask.getPerAllocationResource(), |
| ask.getCount())); |
| } |
| } |
| metrics.finishAppAttempt(applicationId, pending, user); |
| |
| // Clear requests themselves |
| clearRequests(); |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public void setQueue(Queue queue) { |
| this.writeLock.lock(); |
| try { |
| this.queue = queue; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| private Set<String> getBlackList() { |
| return this.placesBlacklistedByApp; |
| } |
| |
| public Set<String> getBlackListCopy() { |
| synchronized (placesBlacklistedByApp) { |
| return new HashSet<>(this.placesBlacklistedByApp); |
| } |
| } |
| |
| public void transferStateFromPreviousAppSchedulingInfo( |
| AppSchedulingInfo appInfo) { |
| // This should not require locking the placesBlacklistedByApp since it will |
| // not be used by this instance until after setCurrentAppAttempt. |
| this.placesBlacklistedByApp = appInfo.getBlackList(); |
| } |
| |
| public void recoverContainer(RMContainer rmContainer, String partition) { |
| if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { |
| return; |
| } |
| this.writeLock.lock(); |
| try { |
| QueueMetrics metrics = queue.getMetrics(); |
| if (pending) { |
| // If there was any container to recover, the application was |
| // running from scheduler's POV. |
| pending = false; |
| metrics.runAppAttempt(applicationId, user); |
| } |
| |
| // Container is completed. Skip recovering resources. |
| if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { |
| return; |
| } |
| |
| metrics.allocateResources(partition, user, 1, |
| rmContainer.getAllocatedResource(), false); |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| /* |
| * In async environment, pending resource request could be updated during |
| * scheduling, this method checks pending request before allocating |
| */ |
| public boolean checkAllocation(NodeType type, SchedulerNode node, |
| SchedulerRequestKey schedulerKey) { |
| readLock.lock(); |
| try { |
| AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( |
| schedulerKey); |
| if (null == ap) { |
| return false; |
| } |
| return ap.canAllocate(type, node); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private void updateMetricsForAllocatedContainer(NodeType type, |
| SchedulerNode node, RMContainer containerAllocated) { |
| QueueMetrics metrics = queue.getMetrics(); |
| if (pending) { |
| // once an allocation is done we assume the application is |
| // running from scheduler's POV. |
| pending = false; |
| metrics.runAppAttempt(applicationId, user); |
| } |
| |
| updateMetrics(applicationId, type, node, containerAllocated, user, queue); |
| } |
| |
| public static void updateMetrics(ApplicationId applicationId, NodeType type, |
| SchedulerNode node, RMContainer containerAllocated, String user, |
| Queue queue) { |
| LOG.debug("allocate: applicationId={} container={} host={} user={}" |
| + " resource={} type={}", applicationId, |
| containerAllocated.getContainer().getId(), |
| containerAllocated.getNodeId(), user, |
| containerAllocated.getContainer().getResource(), |
| type); |
| if(node != null) { |
| queue.getMetrics().allocateResources(node.getPartition(), user, 1, |
| containerAllocated.getContainer().getResource(), false); |
| queue.getMetrics().decrPendingResources( |
| containerAllocated.getNodeLabelExpression(), user, 1, |
| containerAllocated.getContainer().getResource()); |
| } |
| queue.getMetrics().incrNodeTypeAggregations(user, type); |
| } |
| |
| // Get AppPlacementAllocator by specified schedulerKey |
| public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator( |
| SchedulerRequestKey schedulerkey) { |
| return (AppPlacementAllocator<N>) schedulerKeyToAppPlacementAllocator.get( |
| schedulerkey); |
| } |
| |
| /** |
| * Can delay to next?. |
| * |
| * @param schedulerKey schedulerKey |
| * @param resourceName resourceName |
| * |
| * @return If request exists, return {relaxLocality} |
| * Otherwise, return true. |
| */ |
| public boolean canDelayTo( |
| SchedulerRequestKey schedulerKey, String resourceName) { |
| this.readLock.lock(); |
| try { |
| AppPlacementAllocator ap = |
| schedulerKeyToAppPlacementAllocator.get(schedulerKey); |
| return (ap == null) || ap.canDelayTo(resourceName); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Pre-check node to see if it satisfy the given schedulerKey and |
| * scheduler mode. |
| * |
| * @param schedulerKey schedulerKey |
| * @param schedulerNode schedulerNode |
| * @param schedulingMode schedulingMode |
| * @param dcOpt optional diagnostics collector |
| * @return can use the node or not. |
| */ |
| public boolean precheckNode(SchedulerRequestKey schedulerKey, |
| SchedulerNode schedulerNode, SchedulingMode schedulingMode, |
| Optional<DiagnosticsCollector> dcOpt) { |
| this.readLock.lock(); |
| try { |
| AppPlacementAllocator ap = |
| schedulerKeyToAppPlacementAllocator.get(schedulerKey); |
| return (ap != null) && (ap.getPlacementAttempt() < retryAttempts) && |
| ap.precheckNode(schedulerNode, schedulingMode, dcOpt); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Get scheduling envs configured for this application. |
| * |
| * @return a map of applicationSchedulingEnvs |
| */ |
| public Map<String, String> getApplicationSchedulingEnvs() { |
| return applicationSchedulingEnvs; |
| } |
| |
| /** |
| * Get the defaultNodeLabelExpression for the application's current queue. |
| * |
| * @return defaultNodeLabelExpression |
| */ |
| public String getDefaultNodeLabelExpression() { |
| try { |
| this.readLock.lock(); |
| return queue.getDefaultNodeLabelExpression(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public RMContext getRMContext() { |
| return this.rmContext; |
| } |
| } |