| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.commons.lang.time.DateUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Stable; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.LogAggregationContext; |
| import org.apache.hadoop.yarn.api.records.NMToken; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.server.api.ContainerType; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitionException; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.HashMultiset; |
| import com.google.common.collect.Multiset; |
| |
| /** |
| * Represents an application attempt from the viewpoint of the scheduler. |
| * Each running app attempt in the RM corresponds to one instance |
| * of this class. |
| */ |
| @Private |
| @Unstable |
| public class SchedulerApplicationAttempt implements SchedulableEntity { |
| |
| private static final Log LOG = LogFactory |
| .getLog(SchedulerApplicationAttempt.class); |
| |
| private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000; |
| protected long lastMemoryAggregateAllocationUpdateTime = 0; |
| private long lastMemorySeconds = 0; |
| private long lastVcoreSeconds = 0; |
| |
| protected final AppSchedulingInfo appSchedulingInfo; |
| protected ApplicationAttemptId attemptId; |
| protected Map<ContainerId, RMContainer> liveContainers = |
| new HashMap<ContainerId, RMContainer>(); |
| protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = |
| new HashMap<Priority, Map<NodeId, RMContainer>>(); |
| |
| private final Multiset<Priority> reReservations = HashMultiset.create(); |
| |
| private Resource resourceLimit = Resource.newInstance(0, 0); |
| private boolean unmanagedAM = true; |
| private boolean amRunning = false; |
| private LogAggregationContext logAggregationContext; |
| |
| private volatile Priority appPriority = null; |
| |
| protected ResourceUsage attemptResourceUsage = new ResourceUsage(); |
| private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); |
| private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); |
| |
| protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>(); |
| protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>(); |
| protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>(); |
| protected Set<NMToken> updatedNMTokens = new HashSet<>(); |
| |
| // This pendingRelease is used in work-preserving recovery scenario to keep |
| // track of the AM's outstanding release requests. RM on recovery could |
| // receive the release request form AM before it receives the container status |
| // from NM for recovery. In this case, the to-be-recovered containers reported |
| // by NM should not be recovered. |
| private Set<ContainerId> pendingRelease = null; |
| |
| /** |
| * Count how many times the application has been given an opportunity to |
| * schedule a task at each priority. Each time the scheduler asks the |
| * application for a task at this priority, it is incremented, and each time |
| * the application successfully schedules a task (at rack or node local), it |
| * is reset to 0. |
| */ |
| Multiset<Priority> schedulingOpportunities = HashMultiset.create(); |
| |
| /** |
| * Count how many times the application has been given an opportunity to |
| * schedule a non-partitioned resource request at each priority. Each time the |
| * scheduler asks the application for a task at this priority, it is |
| * incremented, and each time the application successfully schedules a task, |
| * it is reset to 0 when schedule any task at corresponding priority. |
| */ |
| Multiset<Priority> missedNonPartitionedRequestSchedulingOpportunity = |
| HashMultiset.create(); |
| |
| // Time of the last container scheduled at the current allowed level |
| protected Map<Priority, Long> lastScheduledContainer = |
| new HashMap<Priority, Long>(); |
| |
| protected Queue queue; |
| protected boolean isStopped = false; |
| |
| protected final RMContext rmContext; |
| |
| public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, |
| String user, Queue queue, ActiveUsersManager activeUsersManager, |
| RMContext rmContext) { |
| Preconditions.checkNotNull(rmContext, "RMContext should not be null"); |
| this.rmContext = rmContext; |
| this.appSchedulingInfo = |
| new AppSchedulingInfo(applicationAttemptId, user, queue, |
| activeUsersManager, rmContext.getEpoch(), attemptResourceUsage); |
| this.queue = queue; |
| this.pendingRelease = new HashSet<ContainerId>(); |
| this.attemptId = applicationAttemptId; |
| if (rmContext.getRMApps() != null && |
| rmContext.getRMApps() |
| .containsKey(applicationAttemptId.getApplicationId())) { |
| ApplicationSubmissionContext appSubmissionContext = |
| rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) |
| .getApplicationSubmissionContext(); |
| if (appSubmissionContext != null) { |
| unmanagedAM = appSubmissionContext.getUnmanagedAM(); |
| this.logAggregationContext = |
| appSubmissionContext.getLogAggregationContext(); |
| } |
| } |
| } |
| |
| /** |
| * Get the live containers of the application. |
| * @return live containers of the application |
| */ |
| public synchronized Collection<RMContainer> getLiveContainers() { |
| return new ArrayList<RMContainer>(liveContainers.values()); |
| } |
| |
| public AppSchedulingInfo getAppSchedulingInfo() { |
| return this.appSchedulingInfo; |
| } |
| |
| /** |
| * Is this application pending? |
| * @return true if it is else false. |
| */ |
| public boolean isPending() { |
| return appSchedulingInfo.isPending(); |
| } |
| |
| /** |
| * Get {@link ApplicationAttemptId} of the application master. |
| * @return <code>ApplicationAttemptId</code> of the application master |
| */ |
| public ApplicationAttemptId getApplicationAttemptId() { |
| return appSchedulingInfo.getApplicationAttemptId(); |
| } |
| |
| public ApplicationId getApplicationId() { |
| return appSchedulingInfo.getApplicationId(); |
| } |
| |
| public String getUser() { |
| return appSchedulingInfo.getUser(); |
| } |
| |
| public Map<String, ResourceRequest> getResourceRequests(Priority priority) { |
| return appSchedulingInfo.getResourceRequests(priority); |
| } |
| |
| public Set<ContainerId> getPendingRelease() { |
| return this.pendingRelease; |
| } |
| |
| public long getNewContainerId() { |
| return appSchedulingInfo.getNewContainerId(); |
| } |
| |
| public Collection<Priority> getPriorities() { |
| return appSchedulingInfo.getPriorities(); |
| } |
| |
| public synchronized ResourceRequest getResourceRequest(Priority priority, |
| String resourceName) { |
| return this.appSchedulingInfo.getResourceRequest(priority, resourceName); |
| } |
| |
| public synchronized int getTotalRequiredResources(Priority priority) { |
| return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); |
| } |
| |
| public synchronized Resource getResource(Priority priority) { |
| return appSchedulingInfo.getResource(priority); |
| } |
| |
| public String getQueueName() { |
| return appSchedulingInfo.getQueueName(); |
| } |
| |
| public Resource getAMResource() { |
| return attemptResourceUsage.getAMUsed(); |
| } |
| |
| public void setAMResource(Resource amResource) { |
| attemptResourceUsage.setAMUsed(amResource); |
| } |
| |
| public boolean isAmRunning() { |
| return amRunning; |
| } |
| |
| public void setAmRunning(boolean bool) { |
| amRunning = bool; |
| } |
| |
| public boolean getUnmanagedAM() { |
| return unmanagedAM; |
| } |
| |
| public synchronized RMContainer getRMContainer(ContainerId id) { |
| return liveContainers.get(id); |
| } |
| |
| protected synchronized void resetReReservations(Priority priority) { |
| reReservations.setCount(priority, 0); |
| } |
| |
| protected synchronized void addReReservation(Priority priority) { |
| reReservations.add(priority); |
| } |
| |
| public synchronized int getReReservations(Priority priority) { |
| return reReservations.count(priority); |
| } |
| |
| /** |
| * Get total current reservations. |
| * Used only by unit tests |
| * @return total current reservations |
| */ |
| @Stable |
| @Private |
| public synchronized Resource getCurrentReservation() { |
| return attemptResourceUsage.getReserved(); |
| } |
| |
| public Queue getQueue() { |
| return queue; |
| } |
| |
| public synchronized boolean updateResourceRequests( |
| List<ResourceRequest> requests) { |
| if (!isStopped) { |
| return appSchedulingInfo.updateResourceRequests(requests, false); |
| } |
| return false; |
| } |
| |
| public synchronized void recoverResourceRequests( |
| List<ResourceRequest> requests) { |
| if (!isStopped) { |
| appSchedulingInfo.updateResourceRequests(requests, true); |
| } |
| } |
| |
| public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { |
| // Cleanup all scheduling information |
| isStopped = true; |
| appSchedulingInfo.stop(rmAppAttemptFinalState); |
| } |
| |
| public synchronized boolean isStopped() { |
| return isStopped; |
| } |
| |
| /** |
| * Get the list of reserved containers |
| * @return All of the reserved containers. |
| */ |
| public synchronized List<RMContainer> getReservedContainers() { |
| List<RMContainer> reservedContainers = new ArrayList<RMContainer>(); |
| for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : |
| this.reservedContainers.entrySet()) { |
| reservedContainers.addAll(e.getValue().values()); |
| } |
| return reservedContainers; |
| } |
| |
| public synchronized boolean reserveIncreasedContainer(SchedulerNode node, |
| Priority priority, RMContainer rmContainer, Resource reservedResource) { |
| if (commonReserve(node, priority, rmContainer, reservedResource)) { |
| attemptResourceUsage.incReserved(node.getPartition(), |
| reservedResource); |
| // succeeded |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private synchronized boolean commonReserve(SchedulerNode node, |
| Priority priority, RMContainer rmContainer, Resource reservedResource) { |
| try { |
| rmContainer.handle(new RMContainerReservedEvent(rmContainer |
| .getContainerId(), reservedResource, node.getNodeID(), priority)); |
| } catch (InvalidStateTransitionException e) { |
| // We reach here could be caused by container already finished, return |
| // false indicate it fails |
| return false; |
| } |
| |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| if (reservedContainers == null) { |
| reservedContainers = new HashMap<NodeId, RMContainer>(); |
| this.reservedContainers.put(priority, reservedContainers); |
| } |
| reservedContainers.put(node.getNodeID(), rmContainer); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Application attempt " + getApplicationAttemptId() |
| + " reserved container " + rmContainer + " on node " + node |
| + ". This attempt currently has " + reservedContainers.size() |
| + " reserved containers at priority " + priority |
| + "; currentReservation " + reservedResource); |
| } |
| |
| return true; |
| } |
| |
| public synchronized RMContainer reserve(SchedulerNode node, |
| Priority priority, RMContainer rmContainer, Container container) { |
| // Create RMContainer if necessary |
| if (rmContainer == null) { |
| rmContainer = |
| new RMContainerImpl(container, getApplicationAttemptId(), |
| node.getNodeID(), appSchedulingInfo.getUser(), rmContext); |
| attemptResourceUsage.incReserved(node.getPartition(), |
| container.getResource()); |
| |
| // Reset the re-reservation count |
| resetReReservations(priority); |
| } else { |
| // Note down the re-reservation |
| addReReservation(priority); |
| } |
| |
| commonReserve(node, priority, rmContainer, container.getResource()); |
| |
| return rmContainer; |
| } |
| |
| /** |
| * Has the application reserved the given <code>node</code> at the |
| * given <code>priority</code>? |
| * @param node node to be checked |
| * @param priority priority of reserved container |
| * @return true is reserved, false if not |
| */ |
| public synchronized boolean isReserved(SchedulerNode node, Priority priority) { |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| if (reservedContainers != null) { |
| return reservedContainers.containsKey(node.getNodeID()); |
| } |
| return false; |
| } |
| |
| public synchronized void setHeadroom(Resource globalLimit) { |
| this.resourceLimit = globalLimit; |
| } |
| |
| /** |
| * Get available headroom in terms of resources for the application's user. |
| * @return available resource headroom |
| */ |
| public synchronized Resource getHeadroom() { |
| // Corner case to deal with applications being slightly over-limit |
| if (resourceLimit.getMemory() < 0) { |
| resourceLimit.setMemory(0); |
| } |
| |
| return resourceLimit; |
| } |
| |
| public synchronized int getNumReservedContainers(Priority priority) { |
| Map<NodeId, RMContainer> reservedContainers = |
| this.reservedContainers.get(priority); |
| return (reservedContainers == null) ? 0 : reservedContainers.size(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public synchronized void containerLaunchedOnNode(ContainerId containerId, |
| NodeId nodeId) { |
| // Inform the container |
| RMContainer rmContainer = getRMContainer(containerId); |
| if (rmContainer == null) { |
| // Some unknown container sneaked into the system. Kill it. |
| rmContext.getDispatcher().getEventHandler() |
| .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); |
| return; |
| } |
| |
| rmContainer.handle(new RMContainerEvent(containerId, |
| RMContainerEventType.LAUNCHED)); |
| } |
| |
| public synchronized void showRequests() { |
| if (LOG.isDebugEnabled()) { |
| for (Priority priority : getPriorities()) { |
| Map<String, ResourceRequest> requests = getResourceRequests(priority); |
| if (requests != null) { |
| LOG.debug("showRequests:" + " application=" + getApplicationId() |
| + " headRoom=" + getHeadroom() + " currentConsumption=" |
| + attemptResourceUsage.getUsed().getMemory()); |
| for (ResourceRequest request : requests.values()) { |
| LOG.debug("showRequests:" + " application=" + getApplicationId() |
| + " request=" + request); |
| } |
| } |
| } |
| } |
| } |
| |
| public Resource getCurrentConsumption() { |
| return attemptResourceUsage.getUsed(); |
| } |
| |
| private Container updateContainerAndNMToken(RMContainer rmContainer, |
| boolean newContainer, boolean increasedContainer) { |
| Container container = rmContainer.getContainer(); |
| ContainerType containerType = ContainerType.TASK; |
| // The working knowledge is that masterContainer for AM is null as it |
| // itself is the master container. |
| if (isWaitingForAMContainer(getApplicationId())) { |
| containerType = ContainerType.APPLICATION_MASTER; |
| } |
| try { |
| // create container token and NMToken altogether. |
| container.setContainerToken(rmContext.getContainerTokenSecretManager() |
| .createContainerToken(container.getId(), container.getNodeId(), |
| getUser(), container.getResource(), container.getPriority(), |
| rmContainer.getCreationTime(), this.logAggregationContext, |
| rmContainer.getNodeLabelExpression(), containerType)); |
| NMToken nmToken = |
| rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), |
| getApplicationAttemptId(), container); |
| if (nmToken != null) { |
| updatedNMTokens.add(nmToken); |
| } |
| } catch (IllegalArgumentException e) { |
| // DNS might be down, skip returning this container. |
| LOG.error("Error trying to assign container token and NM token to" |
| + " an updated container " + container.getId(), e); |
| return null; |
| } |
| |
| if (newContainer) { |
| rmContainer.handle(new RMContainerEvent( |
| rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); |
| } else { |
| rmContainer.handle(new RMContainerUpdatesAcquiredEvent( |
| rmContainer.getContainerId(), increasedContainer)); |
| } |
| return container; |
| } |
| |
| // Create container token and update NMToken altogether, if either of them fails for |
| // some reason like DNS unavailable, do not return this container and keep it |
| // in the newlyAllocatedContainers waiting to be refetched. |
| public synchronized List<Container> pullNewlyAllocatedContainers() { |
| List<Container> returnContainerList = |
| new ArrayList<Container>(newlyAllocatedContainers.size()); |
| for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i |
| .hasNext();) { |
| RMContainer rmContainer = i.next(); |
| Container updatedContainer = |
| updateContainerAndNMToken(rmContainer, true, false); |
| // Only add container to return list when it's not null. updatedContainer |
| // could be null when generate token failed, it can be caused by DNS |
| // resolving failed. |
| if (updatedContainer != null) { |
| returnContainerList.add(updatedContainer); |
| i.remove(); |
| } |
| } |
| return returnContainerList; |
| } |
| |
| private synchronized List<Container> pullNewlyUpdatedContainers( |
| Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) { |
| List<Container> returnContainerList = |
| new ArrayList<Container>(updatedContainerMap.size()); |
| for (Iterator<Entry<ContainerId, RMContainer>> i = |
| updatedContainerMap.entrySet().iterator(); i.hasNext();) { |
| RMContainer rmContainer = i.next().getValue(); |
| Container updatedContainer = |
| updateContainerAndNMToken(rmContainer, false, increase); |
| if (updatedContainer != null) { |
| returnContainerList.add(updatedContainer); |
| i.remove(); |
| } |
| } |
| return returnContainerList; |
| } |
| |
| public synchronized List<Container> pullNewlyIncreasedContainers() { |
| return pullNewlyUpdatedContainers(newlyIncreasedContainers, true); |
| } |
| |
| public synchronized List<Container> pullNewlyDecreasedContainers() { |
| return pullNewlyUpdatedContainers(newlyDecreasedContainers, false); |
| } |
| |
| public synchronized List<NMToken> pullUpdatedNMTokens() { |
| List<NMToken> returnList = new ArrayList<NMToken>(updatedNMTokens); |
| updatedNMTokens.clear(); |
| return returnList; |
| } |
| |
| public boolean isWaitingForAMContainer(ApplicationId applicationId) { |
| // The working knowledge is that masterContainer for AM is null as it |
| // itself is the master container. |
| RMAppAttempt appAttempt = |
| rmContext.getRMApps().get(applicationId).getCurrentAppAttempt(); |
| return (appAttempt != null && appAttempt.getMasterContainer() == null |
| && appAttempt.getSubmissionContext().getUnmanagedAM() == false); |
| } |
| |
| // Blacklist used for user containers |
| public synchronized void updateBlacklist( |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| if (!isStopped) { |
| this.appSchedulingInfo.updateBlacklist( |
| blacklistAdditions, blacklistRemovals); |
| } |
| } |
| |
| // Blacklist used for AM containers |
| public synchronized void updateAMBlacklist( |
| List<String> blacklistAdditions, List<String> blacklistRemovals) { |
| if (!isStopped) { |
| this.appSchedulingInfo.updateAMBlacklist( |
| blacklistAdditions, blacklistRemovals); |
| } |
| } |
| |
| public boolean isBlacklisted(String resourceName) { |
| boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId()); |
| return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist); |
| } |
| |
| public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity( |
| Priority priority) { |
| missedNonPartitionedRequestSchedulingOpportunity.add(priority); |
| return missedNonPartitionedRequestSchedulingOpportunity.count(priority); |
| } |
| |
| public synchronized void |
| resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) { |
| missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0); |
| } |
| |
| |
| public synchronized void addSchedulingOpportunity(Priority priority) { |
| schedulingOpportunities.setCount(priority, |
| schedulingOpportunities.count(priority) + 1); |
| } |
| |
| public synchronized void subtractSchedulingOpportunity(Priority priority) { |
| int count = schedulingOpportunities.count(priority) - 1; |
| this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); |
| } |
| |
| /** |
| * Return the number of times the application has been given an opportunity |
| * to schedule a task at the given priority since the last time it |
| * successfully did so. |
| */ |
| public synchronized int getSchedulingOpportunities(Priority priority) { |
| return schedulingOpportunities.count(priority); |
| } |
| |
| /** |
| * Should be called when an application has successfully scheduled a container, |
| * or when the scheduling locality threshold is relaxed. |
| * Reset various internal counters which affect delay scheduling |
| * |
| * @param priority The priority of the container scheduled. |
| */ |
| public synchronized void resetSchedulingOpportunities(Priority priority) { |
| resetSchedulingOpportunities(priority, System.currentTimeMillis()); |
| } |
| |
| // used for continuous scheduling |
| public synchronized void resetSchedulingOpportunities(Priority priority, |
| long currentTimeMs) { |
| lastScheduledContainer.put(priority, currentTimeMs); |
| schedulingOpportunities.setCount(priority, 0); |
| } |
| |
| synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { |
| long currentTimeMillis = System.currentTimeMillis(); |
| // Don't walk the whole container list if the resources were computed |
| // recently. |
| if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime) |
| > MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) { |
| long memorySeconds = 0; |
| long vcoreSeconds = 0; |
| for (RMContainer rmContainer : this.liveContainers.values()) { |
| long usedMillis = currentTimeMillis - rmContainer.getCreationTime(); |
| Resource resource = rmContainer.getContainer().getResource(); |
| memorySeconds += resource.getMemory() * usedMillis / |
| DateUtils.MILLIS_PER_SECOND; |
| vcoreSeconds += resource.getVirtualCores() * usedMillis |
| / DateUtils.MILLIS_PER_SECOND; |
| } |
| |
| lastMemoryAggregateAllocationUpdateTime = currentTimeMillis; |
| lastMemorySeconds = memorySeconds; |
| lastVcoreSeconds = vcoreSeconds; |
| } |
| return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds); |
| } |
| |
| public synchronized ApplicationResourceUsageReport getResourceUsageReport() { |
| AggregateAppResourceUsage runningResourceUsage = |
| getRunningAggregateAppResourceUsage(); |
| Resource usedResourceClone = |
| Resources.clone(attemptResourceUsage.getAllUsed()); |
| Resource reservedResourceClone = |
| Resources.clone(attemptResourceUsage.getReserved()); |
| return ApplicationResourceUsageReport.newInstance(liveContainers.size(), |
| reservedContainers.size(), usedResourceClone, reservedResourceClone, |
| Resources.add(usedResourceClone, reservedResourceClone), |
| runningResourceUsage.getMemorySeconds(), |
| runningResourceUsage.getVcoreSeconds()); |
| } |
| |
| public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() { |
| return this.liveContainers; |
| } |
| |
| public synchronized Resource getResourceLimit() { |
| return this.resourceLimit; |
| } |
| |
| public synchronized Map<Priority, Long> getLastScheduledContainer() { |
| return this.lastScheduledContainer; |
| } |
| |
| public synchronized void transferStateFromPreviousAttempt( |
| SchedulerApplicationAttempt appAttempt) { |
| this.liveContainers = appAttempt.getLiveContainersMap(); |
| // this.reReservations = appAttempt.reReservations; |
| this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage); |
| this.resourceLimit = appAttempt.getResourceLimit(); |
| // this.currentReservation = appAttempt.currentReservation; |
| // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; |
| // this.schedulingOpportunities = appAttempt.schedulingOpportunities; |
| this.lastScheduledContainer = appAttempt.getLastScheduledContainer(); |
| this.appSchedulingInfo |
| .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo); |
| } |
| |
| public synchronized void move(Queue newQueue) { |
| QueueMetrics oldMetrics = queue.getMetrics(); |
| QueueMetrics newMetrics = newQueue.getMetrics(); |
| String user = getUser(); |
| for (RMContainer liveContainer : liveContainers.values()) { |
| Resource resource = liveContainer.getContainer().getResource(); |
| oldMetrics.releaseResources(user, 1, resource); |
| newMetrics.allocateResources(user, 1, resource, false); |
| } |
| for (Map<NodeId, RMContainer> map : reservedContainers.values()) { |
| for (RMContainer reservedContainer : map.values()) { |
| Resource resource = reservedContainer.getReservedResource(); |
| oldMetrics.unreserveResource(user, resource); |
| newMetrics.reserveResource(user, resource); |
| } |
| } |
| |
| appSchedulingInfo.move(newQueue); |
| this.queue = newQueue; |
| } |
| |
| public synchronized void recoverContainer(SchedulerNode node, |
| RMContainer rmContainer) { |
| // recover app scheduling info |
| appSchedulingInfo.recoverContainer(rmContainer); |
| |
| if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { |
| return; |
| } |
| LOG.info("SchedulerAttempt " + getApplicationAttemptId() |
| + " is recovering container " + rmContainer.getContainerId()); |
| liveContainers.put(rmContainer.getContainerId(), rmContainer); |
| attemptResourceUsage.incUsed(node.getPartition(), rmContainer |
| .getContainer().getResource()); |
| |
| // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource |
| // is called. |
| // newlyAllocatedContainers.add(rmContainer); |
| // schedulingOpportunities |
| // lastScheduledContainer |
| } |
| |
| public void incNumAllocatedContainers(NodeType containerType, |
| NodeType requestType) { |
| RMAppAttempt attempt = |
| rmContext.getRMApps().get(attemptId.getApplicationId()) |
| .getCurrentAppAttempt(); |
| if (attempt != null) { |
| attempt.getRMAppAttemptMetrics().incNumAllocatedContainers(containerType, |
| requestType); |
| } |
| } |
| |
| public void setApplicationHeadroomForMetrics(Resource headroom) { |
| RMAppAttempt attempt = |
| rmContext.getRMApps().get(attemptId.getApplicationId()) |
| .getCurrentAppAttempt(); |
| if (attempt != null) { |
| attempt.getRMAppAttemptMetrics().setApplicationAttemptHeadRoom( |
| Resources.clone(headroom)); |
| } |
| } |
| |
| public void recordContainerRequestTime(long value) { |
| firstAllocationRequestSentTime.compareAndSet(0, value); |
| } |
| |
| public void recordContainerAllocationTime(long value) { |
| if (firstContainerAllocatedTime.compareAndSet(0, value)) { |
| long timediff = firstContainerAllocatedTime.longValue() - |
| firstAllocationRequestSentTime.longValue(); |
| if (timediff > 0) { |
| queue.getMetrics().addAppAttemptFirstContainerAllocationDelay(timediff); |
| } |
| } |
| } |
| |
| public Set<String> getBlacklistedNodes() { |
| return this.appSchedulingInfo.getBlackListCopy(); |
| } |
| |
| @Private |
| public boolean hasPendingResourceRequest(ResourceCalculator rc, |
| String nodePartition, Resource cluster, |
| SchedulingMode schedulingMode) { |
| return SchedulerUtils.hasPendingResourceRequest(rc, |
| this.attemptResourceUsage, nodePartition, cluster, |
| schedulingMode); |
| } |
| |
| @VisibleForTesting |
| public ResourceUsage getAppAttemptResourceUsage() { |
| return this.attemptResourceUsage; |
| } |
| |
| @Override |
| public Priority getPriority() { |
| return appPriority; |
| } |
| |
| public void setPriority(Priority appPriority) { |
| this.appPriority = appPriority; |
| } |
| |
| @Override |
| public String getId() { |
| return getApplicationId().toString(); |
| } |
| |
| @Override |
| public int compareInputOrderTo(SchedulableEntity other) { |
| if (other instanceof SchedulerApplicationAttempt) { |
| return getApplicationId().compareTo( |
| ((SchedulerApplicationAttempt)other).getApplicationId()); |
| } |
| return 1;//let other types go before this, if any |
| } |
| |
| @Override |
| public synchronized ResourceUsage getSchedulingResourceUsage() { |
| return attemptResourceUsage; |
| } |
| |
| public synchronized boolean removeIncreaseRequest(NodeId nodeId, |
| Priority priority, ContainerId containerId) { |
| return appSchedulingInfo.removeIncreaseRequest(nodeId, priority, |
| containerId); |
| } |
| |
| public synchronized boolean updateIncreaseRequests( |
| List<SchedContainerChangeRequest> increaseRequests) { |
| return appSchedulingInfo.updateIncreaseRequests(increaseRequests); |
| } |
| |
| private synchronized void changeContainerResource( |
| SchedContainerChangeRequest changeRequest, boolean increase) { |
| if (increase) { |
| appSchedulingInfo.increaseContainer(changeRequest); |
| } else { |
| appSchedulingInfo.decreaseContainer(changeRequest); |
| } |
| |
| RMContainer changedRMContainer = changeRequest.getRMContainer(); |
| changedRMContainer.handle( |
| new RMContainerChangeResourceEvent(changeRequest.getContainerId(), |
| changeRequest.getTargetCapacity(), increase)); |
| |
| // remove pending and not pulled by AM newly-increased/decreased-containers |
| // and add the new one |
| if (increase) { |
| newlyDecreasedContainers.remove(changeRequest.getContainerId()); |
| newlyIncreasedContainers.put(changeRequest.getContainerId(), |
| changedRMContainer); |
| } else { |
| newlyIncreasedContainers.remove(changeRequest.getContainerId()); |
| newlyDecreasedContainers.put(changeRequest.getContainerId(), |
| changedRMContainer); |
| } |
| } |
| |
| public synchronized void decreaseContainer( |
| SchedContainerChangeRequest decreaseRequest) { |
| changeContainerResource(decreaseRequest, false); |
| } |
| |
| public synchronized void increaseContainer( |
| SchedContainerChangeRequest increaseRequest) { |
| changeContainerResource(increaseRequest, true); |
| } |
| } |