| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import com.google.gson.Gson; |
| import com.google.gson.reflect.TypeToken; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerUpdateType; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceOption; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.SchedulingRequest; |
| import org.apache.hadoop.yarn.api.records.UpdateContainerError; |
| import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; |
| import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer |
| .RMContainerNMDoneChangeResourceEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; |
| import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.server.utils.Lock; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture; |
| |
| |
| @SuppressWarnings("unchecked") |
| @Private |
| @Unstable |
| public abstract class AbstractYarnScheduler |
| <T extends SchedulerApplicationAttempt, N extends SchedulerNode> |
| extends AbstractService implements ResourceScheduler { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(AbstractYarnScheduler.class); |
| |
| private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); |
| |
| protected final ClusterNodeTracker<N> nodeTracker = |
| new ClusterNodeTracker<>(); |
| |
| protected Resource minimumAllocation; |
| |
| protected volatile RMContext rmContext; |
| |
| private volatile Priority maxClusterLevelAppPriority; |
| |
| protected ActivitiesManager activitiesManager; |
| protected SchedulerHealth schedulerHealth = new SchedulerHealth(); |
| protected volatile long lastNodeUpdateTime; |
| |
| // timeout to join when we stop this service |
| protected final long THREAD_JOIN_TIMEOUT_MS = 1000; |
| |
| private volatile Clock clock; |
| |
| /** |
| * To enable the update thread, subclasses should set updateInterval to a |
| * positive value during {@link #serviceInit(Configuration)}. |
| */ |
| protected long updateInterval = -1L; |
| @VisibleForTesting |
| Thread updateThread; |
| private final Object updateThreadMonitor = new Object(); |
| private Timer releaseCache; |
| |
| /* |
| * All schedulers which are inheriting AbstractYarnScheduler should use |
| * concurrent version of 'applications' map. |
| */ |
| protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications; |
| protected int nmExpireInterval; |
| protected long nmHeartbeatInterval; |
| private long skipNodeInterval; |
| |
| private final static List<Container> EMPTY_CONTAINER_LIST = |
| new ArrayList<Container>(); |
| protected static final Allocation EMPTY_ALLOCATION = new Allocation( |
| EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); |
| |
| protected final ReentrantReadWriteLock.ReadLock readLock; |
| |
| /* |
| * Use writeLock for any of operations below: |
| * - queue change (hierarchy / configuration / container allocation) |
| * - application(add/remove/allocate-container, but not include container |
| * finish) |
| * - node (add/remove/change-resource/container-allocation, but not include |
| * container finish) |
| */ |
| protected final ReentrantReadWriteLock.WriteLock writeLock; |
| |
| // If set to true, then ALL container updates will be automatically sent to |
| // the NM in the next heartbeat. |
| private boolean autoUpdateContainers = false; |
| |
| protected SchedulingMonitorManager schedulingMonitorManager = |
| new SchedulingMonitorManager(); |
| |
| private boolean migration; |
| |
| /** |
| * Construct the service. |
| * |
| * @param name service name |
| */ |
| public AbstractYarnScheduler(String name) { |
| super(name); |
| clock = SystemClock.getInstance(); |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| readLock = lock.readLock(); |
| writeLock = lock.writeLock(); |
| } |
| |
| @Override |
| public void serviceInit(Configuration conf) throws Exception { |
| migration = |
| conf.getBoolean(FairSchedulerConfiguration.MIGRATION_MODE, false); |
| |
| nmExpireInterval = |
| conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); |
| nmHeartbeatInterval = |
| conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); |
| skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); |
| long configuredMaximumAllocationWaitTime = |
| conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, |
| YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); |
| nodeTracker.setConfiguredMaxAllocationWaitTime( |
| configuredMaximumAllocationWaitTime); |
| maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); |
| if (!migration) { |
| this.releaseCache = new Timer("Pending Container Clear Timer"); |
| } |
| |
| autoUpdateContainers = |
| conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, |
| YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); |
| |
| if (updateInterval > 0) { |
| updateThread = new UpdateThread(); |
| updateThread.setName("SchedulerUpdateThread"); |
| updateThread.setUncaughtExceptionHandler( |
| new RMCriticalThreadUncaughtExceptionHandler(rmContext)); |
| updateThread.setDaemon(true); |
| } |
| super.serviceInit(conf); |
| |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| if (!migration) { |
| if (updateThread != null) { |
| updateThread.start(); |
| } |
| schedulingMonitorManager.startAll(); |
| createReleaseCache(); |
| } |
| |
| super.serviceStart(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| if (updateThread != null) { |
| updateThread.interrupt(); |
| updateThread.join(THREAD_JOIN_TIMEOUT_MS); |
| } |
| |
| //Stop Timer |
| if (releaseCache != null) { |
| releaseCache.cancel(); |
| releaseCache = null; |
| } |
| schedulingMonitorManager.stop(); |
| super.serviceStop(); |
| } |
| |
| @VisibleForTesting |
| public ClusterNodeTracker<N> getNodeTracker() { |
| return nodeTracker; |
| } |
| |
| @VisibleForTesting |
| public SchedulingMonitorManager getSchedulingMonitorManager() { |
| return schedulingMonitorManager; |
| } |
| |
| /* |
| * YARN-3136 removed synchronized lock for this method for performance |
| * purposes |
| */ |
| public List<Container> getTransferredContainers( |
| ApplicationAttemptId currentAttempt) { |
| ApplicationId appId = currentAttempt.getApplicationId(); |
| SchedulerApplication<T> app = applications.get(appId); |
| List<Container> containerList = new ArrayList<Container>(); |
| if (app == null) { |
| return containerList; |
| } |
| Collection<RMContainer> liveContainers = app.getCurrentAppAttempt() |
| .pullContainersToTransfer(); |
| ContainerId amContainerId = null; |
| // For UAM, amContainer would be null |
| if (rmContext.getRMApps().get(appId).getCurrentAppAttempt() |
| .getMasterContainer() != null) { |
| amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt() |
| .getMasterContainer().getId(); |
| } |
| for (RMContainer rmContainer : liveContainers) { |
| if (!rmContainer.getContainerId().equals(amContainerId)) { |
| containerList.add(rmContainer.getContainer()); |
| } |
| } |
| return containerList; |
| } |
| |
| public Map<ApplicationId, SchedulerApplication<T>> |
| getSchedulerApplications() { |
| return applications; |
| } |
| |
| /** |
| * Add blacklisted NodeIds to the list that is passed. |
| * |
| * @param app application attempt. |
| * @return blacklisted NodeIds. |
| */ |
| public List<N> getBlacklistedNodes(final SchedulerApplicationAttempt app) { |
| |
| NodeFilter nodeFilter = new NodeFilter() { |
| @Override |
| public boolean accept(SchedulerNode node) { |
| return SchedulerAppUtils.isPlaceBlacklisted(app, node, LOG); |
| } |
| }; |
| return nodeTracker.getNodes(nodeFilter); |
| } |
| |
| public List<N> getNodes(final NodeFilter filter) { |
| return nodeTracker.getNodes(filter); |
| } |
| |
| public boolean shouldContainersBeAutoUpdated() { |
| return this.autoUpdateContainers; |
| } |
| |
| @Override |
| public Resource getClusterResource() { |
| return nodeTracker.getClusterCapacity(); |
| } |
| |
| @Override |
| public Resource getMinimumResourceCapability() { |
| return minimumAllocation; |
| } |
| |
| @Override |
| public Resource getMaximumResourceCapability() { |
| return nodeTracker.getMaxAllowedAllocation(); |
| } |
| |
| @Override |
| public Resource getMaximumResourceCapability(String queueName) { |
| return getMaximumResourceCapability(); |
| } |
| |
| protected void initMaximumResourceCapability(Resource maximumAllocation) { |
| nodeTracker.setConfiguredMaxAllocation(maximumAllocation); |
| } |
| |
| public SchedulerHealth getSchedulerHealth() { |
| return this.schedulerHealth; |
| } |
| |
| protected void setLastNodeUpdateTime(long time) { |
| this.lastNodeUpdateTime = time; |
| } |
| |
| public long getLastNodeUpdateTime() { |
| return lastNodeUpdateTime; |
| } |
| |
| public long getSkipNodeInterval(){ |
| return skipNodeInterval; |
| } |
| |
| protected void containerLaunchedOnNode( |
| ContainerId containerId, SchedulerNode node) { |
| readLock.lock(); |
| try { |
| // Get the application for the finished container |
| SchedulerApplicationAttempt application = |
| getCurrentAttemptForContainer(containerId); |
| if (application == null) { |
| LOG.info("Unknown application " + containerId.getApplicationAttemptId() |
| .getApplicationId() + " launched container " + containerId |
| + " on node: " + node); |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); |
| return; |
| } |
| |
| application.containerLaunchedOnNode(containerId, node.getNodeID()); |
| node.containerStarted(containerId); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| protected void containerIncreasedOnNode(ContainerId containerId, |
| SchedulerNode node, Container increasedContainerReportedByNM) { |
| /* |
| * No lock is required, as this method is protected by scheduler's writeLock |
| */ |
| // Get the application for the finished container |
| SchedulerApplicationAttempt application = getCurrentAttemptForContainer( |
| containerId); |
| if (application == null) { |
| LOG.info("Unknown application " + containerId.getApplicationAttemptId() |
| .getApplicationId() + " increased container " + containerId |
| + " on node: " + node); |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); |
| return; |
| } |
| |
| RMContainer rmContainer = getRMContainer(containerId); |
| if (rmContainer == null) { |
| // Some unknown container sneaked into the system. Kill it. |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); |
| return; |
| } |
| rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId, |
| increasedContainerReportedByNM.getResource())); |
| |
| } |
| |
| // TODO: Rename it to getCurrentApplicationAttempt |
| public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { |
| SchedulerApplication<T> app = applications.get( |
| applicationAttemptId.getApplicationId()); |
| return app == null ? null : app.getCurrentAppAttempt(); |
| } |
| |
| @Override |
| public SchedulerAppReport getSchedulerAppInfo( |
| ApplicationAttemptId appAttemptId) { |
| SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); |
| if (attempt == null) { |
| LOG.debug("Request for appInfo of unknown attempt {}", appAttemptId); |
| return null; |
| } |
| return new SchedulerAppReport(attempt); |
| } |
| |
| @Override |
| public ApplicationResourceUsageReport getAppResourceUsageReport( |
| ApplicationAttemptId appAttemptId) { |
| SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); |
| if (attempt == null) { |
| LOG.debug("Request for appInfo of unknown attempt {}", appAttemptId); |
| return null; |
| } |
| return attempt.getResourceUsageReport(); |
| } |
| |
| public T getCurrentAttemptForContainer(ContainerId containerId) { |
| return getApplicationAttempt(containerId.getApplicationAttemptId()); |
| } |
| |
| @Override |
| public RMContainer getRMContainer(ContainerId containerId) { |
| SchedulerApplicationAttempt attempt = |
| getCurrentAttemptForContainer(containerId); |
| return (attempt == null) ? null : attempt.getRMContainer(containerId); |
| } |
| |
| @Override |
| public SchedulerNodeReport getNodeReport(NodeId nodeId) { |
| return nodeTracker.getNodeReport(nodeId); |
| } |
| |
| @Override |
| public String moveApplication(ApplicationId appId, String newQueue) |
| throws YarnException { |
| throw new YarnException(getClass().getSimpleName() |
| + " does not support moving apps between queues"); |
| } |
| |
| @Override |
| public void preValidateMoveApplication(ApplicationId appId, |
| String newQueue) throws YarnException { |
| throw new YarnException(getClass().getSimpleName() |
| + " does not support pre-validation of moving apps between queues"); |
| } |
| |
| public void removeQueue(String queueName) throws YarnException { |
| throw new YarnException(getClass().getSimpleName() |
| + " does not support removing queues"); |
| } |
| |
| @Override |
| public void addQueue(Queue newQueue) throws YarnException, IOException { |
| throw new YarnException(getClass().getSimpleName() |
| + " does not support this operation"); |
| } |
| |
| @Override |
| public void setEntitlement(String queue, QueueEntitlement entitlement) |
| throws YarnException { |
| throw new YarnException(getClass().getSimpleName() |
| + " does not support this operation"); |
| } |
| |
| private void killOrphanContainerOnNode(RMNode node, |
| NMContainerStatus container) { |
| if (!container.getContainerState().equals(ContainerState.COMPLETE)) { |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMNodeCleanContainerEvent(node.getNodeID(), |
| container.getContainerId())); |
| } |
| } |
| |
| public void recoverContainersOnNode(List<NMContainerStatus> containerReports, |
| RMNode nm) { |
| writeLock.lock(); |
| try { |
| if (!rmContext.isWorkPreservingRecoveryEnabled() |
| || containerReports == null || (containerReports != null |
| && containerReports.isEmpty())) { |
| return; |
| } |
| |
| for (NMContainerStatus container : containerReports) { |
| ApplicationId appId = |
| container.getContainerId().getApplicationAttemptId() |
| .getApplicationId(); |
| RMApp rmApp = rmContext.getRMApps().get(appId); |
| if (rmApp == null) { |
| LOG.error("Skip recovering container " + container |
| + " for unknown application."); |
| killOrphanContainerOnNode(nm, container); |
| continue; |
| } |
| |
| SchedulerApplication<T> schedulerApp = applications.get(appId); |
| if (schedulerApp == null) { |
| LOG.info("Skip recovering container " + container |
| + " for unknown SchedulerApplication. " |
| + "Application current state is " + rmApp.getState()); |
| killOrphanContainerOnNode(nm, container); |
| continue; |
| } |
| |
| LOG.info("Recovering container " + container); |
| SchedulerApplicationAttempt schedulerAttempt = |
| schedulerApp.getCurrentAppAttempt(); |
| |
| if (!rmApp.getApplicationSubmissionContext() |
| .getKeepContainersAcrossApplicationAttempts()) { |
| // Do not recover containers for stopped attempt or previous attempt. |
| if (schedulerAttempt.isStopped() || !schedulerAttempt |
| .getApplicationAttemptId().equals( |
| container.getContainerId().getApplicationAttemptId())) { |
| LOG.info("Skip recovering container " + container |
| + " for already stopped attempt."); |
| killOrphanContainerOnNode(nm, container); |
| continue; |
| } |
| } |
| |
| Queue queue = schedulerApp.getQueue(); |
| //To make sure we don't face ambiguity, CS queues should be referenced |
| //by their full queue names |
| String queueName = queue instanceof CSQueue ? |
| ((CSQueue)queue).getQueuePath() : queue.getQueueName(); |
| |
| // create container |
| RMContainer rmContainer = recoverAndCreateContainer(container, nm, |
| queueName); |
| |
| // recover RMContainer |
| rmContainer.handle( |
| new RMContainerRecoverEvent(container.getContainerId(), container)); |
| |
| // recover scheduler node |
| SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID()); |
| schedulerNode.recoverContainer(rmContainer); |
| |
| // recover queue: update headroom etc. |
| Queue queueToRecover = schedulerAttempt.getQueue(); |
| queueToRecover.recoverContainer(getClusterResource(), schedulerAttempt, |
| rmContainer); |
| |
| // recover scheduler attempt |
| final boolean recovered = schedulerAttempt.recoverContainer( |
| schedulerNode, rmContainer); |
| |
| if (recovered && rmContainer.getExecutionType() == |
| ExecutionType.OPPORTUNISTIC) { |
| OpportunisticSchedulerMetrics.getMetrics() |
| .incrAllocatedOppContainers(1); |
| } |
| // set master container for the current running AMContainer for this |
| // attempt. |
| RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); |
| if (appAttempt != null) { |
| Container masterContainer = appAttempt.getMasterContainer(); |
| |
| // Mark current running AMContainer's RMContainer based on the master |
| // container ID stored in AppAttempt. |
| if (masterContainer != null && masterContainer.getId().equals( |
| rmContainer.getContainerId())) { |
| ((RMContainerImpl) rmContainer).setAMContainer(true); |
| } |
| } |
| |
| if (schedulerAttempt.getPendingRelease().remove( |
| container.getContainerId())) { |
| // release the container |
| rmContainer.handle( |
| new RMContainerFinishedEvent(container.getContainerId(), |
| SchedulerUtils |
| .createAbnormalContainerStatus(container.getContainerId(), |
| SchedulerUtils.RELEASED_CONTAINER), |
| RMContainerEventType.RELEASED)); |
| LOG.info(container.getContainerId() + " is released by application."); |
| } |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private RMContainer recoverAndCreateContainer(NMContainerStatus status, |
| RMNode node, String queueName) { |
| Container container = |
| Container.newInstance(status.getContainerId(), node.getNodeID(), |
| node.getHttpAddress(), status.getAllocatedResource(), |
| status.getPriority(), null); |
| container.setVersion(status.getVersion()); |
| container.setExecutionType(status.getExecutionType()); |
| container.setAllocationRequestId(status.getAllocationRequestId()); |
| container.setAllocationTags(status.getAllocationTags()); |
| ApplicationAttemptId attemptId = |
| container.getId().getApplicationAttemptId(); |
| RMContainer rmContainer = new RMContainerImpl(container, |
| SchedulerRequestKey.extractFrom(container), attemptId, node.getNodeID(), |
| applications.get(attemptId.getApplicationId()).getUser(), rmContext, |
| status.getCreationTime(), status.getNodeLabelExpression()); |
| ((RMContainerImpl) rmContainer).setQueueName(queueName); |
| return rmContainer; |
| } |
| |
| /** |
| * Recover resource request back from RMContainer when a container is |
| * preempted before AM pulled the same. If container is pulled by |
| * AM, then RMContainer will not have resource request to recover. |
| * @param rmContainer rmContainer |
| */ |
| private void recoverResourceRequestForContainer(RMContainer rmContainer) { |
| ContainerRequest containerRequest = rmContainer.getContainerRequest(); |
| |
| // If container state is moved to ACQUIRED, request will be empty. |
| if (containerRequest == null) { |
| return; |
| } |
| |
| // Add resource request back to Scheduler ApplicationAttempt. |
| |
| // We lookup the application-attempt here again using |
| // getCurrentApplicationAttempt() because there is only one app-attempt at |
| // any point in the scheduler. But in corner cases, AMs can crash, |
| // corresponding containers get killed and recovered to the same-attempt, |
| // but because the app-attempt is extinguished right after, the recovered |
| // requests don't serve any purpose, but that's okay. |
| SchedulerApplicationAttempt schedulerAttempt = |
| getCurrentAttemptForContainer(rmContainer.getContainerId()); |
| if (schedulerAttempt != null) { |
| schedulerAttempt.recoverResourceRequestsForContainer(containerRequest); |
| } |
| } |
| |
| protected void createReleaseCache() { |
| // Cleanup the cache after nm expire interval. |
| releaseCache.schedule(new TimerTask() { |
| @Override |
| public void run() { |
| clearPendingContainerCache(); |
| LOG.info("Release request cache is cleaned up"); |
| } |
| }, nmExpireInterval); |
| } |
| |
| @VisibleForTesting |
| public void clearPendingContainerCache() { |
| for (SchedulerApplication<T> app : applications.values()) { |
| T attempt = app.getCurrentAppAttempt(); |
| if (attempt != null) { |
| for (ContainerId containerId : attempt.getPendingRelease()) { |
| RMAuditLogger.logFailure(app.getUser(), |
| AuditConstants.RELEASE_CONTAINER, |
| "Unauthorized access or invalid container", "Scheduler", |
| "Trying to release container not owned by app " |
| + "or with invalid id.", attempt.getApplicationId(), |
| containerId, null); |
| } |
| attempt.getPendingRelease().clear(); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| @Private |
| // clean up a completed container |
| public void completedContainer(RMContainer rmContainer, |
| ContainerStatus containerStatus, RMContainerEventType event) { |
| |
| if (rmContainer == null) { |
| LOG.info("Container " + containerStatus.getContainerId() |
| + " completed with event " + event |
| + ", but corresponding RMContainer doesn't exist."); |
| return; |
| } |
| |
| if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { |
| completedContainerInternal(rmContainer, containerStatus, event); |
| completeOustandingUpdatesWhichAreReserved( |
| rmContainer, containerStatus, event); |
| } else { |
| ContainerId containerId = rmContainer.getContainerId(); |
| // Inform the container |
| rmContainer.handle( |
| new RMContainerFinishedEvent(containerId, containerStatus, event)); |
| SchedulerApplicationAttempt schedulerAttempt = |
| getCurrentAttemptForContainer(containerId); |
| if (schedulerAttempt != null) { |
| if (schedulerAttempt.removeRMContainer(containerId)) { |
| OpportunisticSchedulerMetrics.getMetrics() |
| .incrReleasedOppContainers(1); |
| } |
| } |
| LOG.debug("Completed container: {} in state: {} event:{}", |
| rmContainer.getContainerId(), rmContainer.getState(), event); |
| |
| SchedulerNode node = getSchedulerNode(rmContainer.getNodeId()); |
| if (node != null) { |
| node.releaseContainer(rmContainer.getContainerId(), false); |
| } |
| } |
| |
| // If the container is getting killed in ACQUIRED state, the requester (AM |
| // for regular containers and RM itself for AM container) will not know what |
| // happened. Simply add the ResourceRequest back again so that requester |
| // doesn't need to do anything conditionally. |
| recoverResourceRequestForContainer(rmContainer); |
| } |
| |
| // Optimization: |
| // Check if there are in-flight container updates and complete the |
| // associated temp containers. These are removed when the app completes, |
| // but removing them when the actual container completes would allow the |
| // scheduler to reallocate those resources sooner. |
| private void completeOustandingUpdatesWhichAreReserved( |
| RMContainer rmContainer, ContainerStatus containerStatus, |
| RMContainerEventType event) { |
| N schedulerNode = getSchedulerNode(rmContainer.getNodeId()); |
| if (schedulerNode != null && |
| schedulerNode.getReservedContainer() != null) { |
| RMContainer resContainer = schedulerNode.getReservedContainer(); |
| if (resContainer.getReservedSchedulerKey() != null) { |
| ContainerId containerToUpdate = resContainer |
| .getReservedSchedulerKey().getContainerToUpdate(); |
| if (containerToUpdate != null && |
| containerToUpdate.equals(containerStatus.getContainerId())) { |
| completedContainerInternal(resContainer, |
| ContainerStatus.newInstance(resContainer.getContainerId(), |
| containerStatus.getState(), containerStatus |
| .getDiagnostics(), |
| containerStatus.getExitStatus()), event); |
| } |
| } |
| } |
| } |
| |
| // clean up a completed container |
| protected abstract void completedContainerInternal(RMContainer rmContainer, |
| ContainerStatus containerStatus, RMContainerEventType event); |
| |
| protected void releaseContainers(List<ContainerId> containers, |
| SchedulerApplicationAttempt attempt) { |
| for (ContainerId containerId : containers) { |
| RMContainer rmContainer = getRMContainer(containerId); |
| if (rmContainer == null) { |
| if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() |
| < nmExpireInterval) { |
| LOG.info(containerId + " doesn't exist. Add the container" |
| + " to the release request cache as it maybe on recovery."); |
| attempt.getPendingRelease().add(containerId); |
| } else { |
| RMAuditLogger.logFailure(attempt.getUser(), |
| AuditConstants.RELEASE_CONTAINER, |
| "Unauthorized access or invalid container", "Scheduler", |
| "Trying to release container not owned by app or with invalid id.", |
| attempt.getApplicationId(), containerId, null); |
| } |
| } |
| completedContainer(rmContainer, |
| SchedulerUtils.createAbnormalContainerStatus(containerId, |
| SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); |
| } |
| } |
| |
| @Override |
| public N getSchedulerNode(NodeId nodeId) { |
| return nodeTracker.getNode(nodeId); |
| } |
| |
| @Override |
| public void moveAllApps(String sourceQueue, String destQueue) |
| throws YarnException { |
| writeLock.lock(); |
| try { |
| // check if destination queue is a valid leaf queue |
| try { |
| getQueueInfo(destQueue, false, false); |
| } catch (IOException e) { |
| LOG.warn(e.toString()); |
| throw new YarnException(e); |
| } |
| |
| // generate move events for each pending/running app |
| for (ApplicationAttemptId appAttemptId : getAppsFromQueue(sourceQueue)) { |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppManagerEvent(appAttemptId.getApplicationId(), |
| destQueue, RMAppManagerEventType.APP_MOVE)); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void killAllAppsInQueue(String queueName) |
| throws YarnException { |
| writeLock.lock(); |
| try { |
| // generate kill events for each pending/running app |
| for (ApplicationAttemptId app : getAppsFromQueue(queueName)) { |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL, |
| "Application killed due to expiry of reservation queue " |
| + queueName + ".")); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Process resource update on a node. |
| * |
| * @param nm RMNode. |
| * @param resourceOption resourceOption. |
| */ |
| public void updateNodeResource(RMNode nm, |
| ResourceOption resourceOption) { |
| writeLock.lock(); |
| try { |
| SchedulerNode node = getSchedulerNode(nm.getNodeID()); |
| if (node == null) { |
| LOG.info("Node: " + nm.getNodeID() + " has already been taken out of " + |
| "scheduling. Skip updating its resource"); |
| return; |
| } |
| Resource newResource = resourceOption.getResource(); |
| final int timeout = resourceOption.getOverCommitTimeout(); |
| Resource oldResource = node.getTotalResource(); |
| if (!oldResource.equals(newResource)) { |
| // Notify NodeLabelsManager about this change |
| rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), |
| newResource); |
| |
| // Log resource change |
| LOG.info("Update resource on node: {} from: {}, to: {} in {} ms", |
| node.getNodeName(), oldResource, newResource, timeout); |
| |
| nodeTracker.removeNode(nm.getNodeID()); |
| |
| // update resource to node |
| node.updateTotalResource(newResource); |
| node.setOvercommitTimeOut(timeout); |
| signalContainersIfOvercommitted(node, timeout == 0); |
| |
| nodeTracker.addNode((N) node); |
| } else{ |
| // Log resource change |
| LOG.warn("Update resource on node: " + node.getNodeName() |
| + " with the same resource: " + newResource); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() { |
| return EnumSet.of(SchedulerResourceTypes.MEMORY); |
| } |
| |
| @Override |
| public Set<String> getPlanQueues() throws YarnException { |
| throw new YarnException(getClass().getSimpleName() |
| + " does not support reservations"); |
| } |
| |
| /** |
| * By default placement constraint is disabled. Schedulers which support |
| * placement constraint can override this value. |
| * @return enabled or not |
| */ |
| public boolean placementConstraintEnabled() { |
| return false; |
| } |
| |
| protected void refreshMaximumAllocation(Resource newMaxAlloc) { |
| nodeTracker.setConfiguredMaxAllocation(newMaxAlloc); |
| } |
| |
| @Override |
| public List<ResourceRequest> getPendingResourceRequestsForAttempt( |
| ApplicationAttemptId attemptId) { |
| SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId); |
| if (attempt != null) { |
| return attempt.getAppSchedulingInfo().getAllResourceRequests(); |
| } |
| return null; |
| } |
| |
| @Override |
| public List<SchedulingRequest> getPendingSchedulingRequestsForAttempt( |
| ApplicationAttemptId attemptId) { |
| SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId); |
| if (attempt != null) { |
| return attempt.getAppSchedulingInfo().getAllSchedulingRequests(); |
| } |
| return null; |
| } |
| |
| @Override |
| public Priority checkAndGetApplicationPriority( |
| Priority priorityRequestedByApp, UserGroupInformation user, |
| String queuePath, ApplicationId applicationId) throws YarnException { |
| // Dummy Implementation till Application Priority changes are done in |
| // specific scheduler. |
| return Priority.newInstance(0); |
| } |
| |
| @Override |
| public Priority updateApplicationPriority(Priority newPriority, |
| ApplicationId applicationId, SettableFuture<Object> future, |
| UserGroupInformation user) |
| throws YarnException { |
| // Dummy Implementation till Application Priority changes are done in |
| // specific scheduler. |
| return Priority.newInstance(0); |
| } |
| |
| @Override |
| public Priority getMaxClusterLevelAppPriority() { |
| return maxClusterLevelAppPriority; |
| } |
| |
| private Priority getMaxPriorityFromConf(Configuration conf) { |
| return Priority.newInstance(conf.getInt( |
| YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, |
| YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY)); |
| } |
| |
| @Override |
| public void setClusterMaxPriority(Configuration conf) |
| throws YarnException { |
| try { |
| maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); |
| } catch (NumberFormatException e) { |
| throw new YarnException(e); |
| } |
| LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = " |
| + maxClusterLevelAppPriority); |
| } |
| |
| /** |
| * Sanity check increase/decrease request, and return |
| * SchedulerContainerResourceChangeRequest according to given |
| * UpdateContainerRequest. |
| * |
| * <pre> |
| * - Returns non-null value means validation succeeded |
| * - Throw exception when any other error happens |
| * </pre> |
| */ |
| private SchedContainerChangeRequest createSchedContainerChangeRequest( |
| UpdateContainerRequest request, boolean increase) |
| throws YarnException { |
| ContainerId containerId = request.getContainerId(); |
| RMContainer rmContainer = getRMContainer(containerId); |
| if (null == rmContainer) { |
| String msg = |
| "Failed to get rmContainer for " |
| + (increase ? "increase" : "decrease") |
| + " request, with container-id=" + containerId; |
| throw new InvalidResourceRequestException(msg); |
| } |
| SchedulerNode schedulerNode = |
| getSchedulerNode(rmContainer.getAllocatedNode()); |
| return new SchedContainerChangeRequest( |
| this.rmContext, schedulerNode, rmContainer, request.getCapability()); |
| } |
| |
| protected List<SchedContainerChangeRequest> |
| createSchedContainerChangeRequests( |
| List<UpdateContainerRequest> changeRequests, |
| boolean increase) { |
| List<SchedContainerChangeRequest> schedulerChangeRequests = |
| new ArrayList<SchedContainerChangeRequest>(); |
| for (UpdateContainerRequest r : changeRequests) { |
| SchedContainerChangeRequest sr = null; |
| try { |
| sr = createSchedContainerChangeRequest(r, increase); |
| } catch (YarnException e) { |
| LOG.warn("Error happens when checking increase request, Ignoring.." |
| + " exception=", e); |
| continue; |
| } |
| schedulerChangeRequests.add(sr); |
| } |
| return schedulerChangeRequests; |
| } |
| |
| public ActivitiesManager getActivitiesManager() { |
| return this.activitiesManager; |
| } |
| |
| public Clock getClock() { |
| return clock; |
| } |
| |
| @VisibleForTesting |
| public void setClock(Clock clock) { |
| this.clock = clock; |
| } |
| |
| @Lock(Lock.NoLock.class) |
| public SchedulerNode getNode(NodeId nodeId) { |
| return nodeTracker.getNode(nodeId); |
| } |
| |
| /** |
| * Get lists of new containers from NodeManager and process them. |
| * @param nm The RMNode corresponding to the NodeManager |
| * @param schedulerNode schedulerNode |
| * @return list of completed containers |
| */ |
| private List<ContainerStatus> updateNewContainerInfo(RMNode nm, |
| SchedulerNode schedulerNode) { |
| List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); |
| List<ContainerStatus> newlyLaunchedContainers = |
| new ArrayList<>(); |
| List<ContainerStatus> completedContainers = |
| new ArrayList<>(); |
| List<Map.Entry<ApplicationId, ContainerStatus>> updateExistContainers = |
| new ArrayList<>(); |
| |
| for(UpdatedContainerInfo containerInfo : containerInfoList) { |
| newlyLaunchedContainers |
| .addAll(containerInfo.getNewlyLaunchedContainers()); |
| completedContainers.addAll(containerInfo.getCompletedContainers()); |
| updateExistContainers.addAll(containerInfo.getUpdateContainers()); |
| } |
| |
| // Processing the newly launched containers |
| for (ContainerStatus launchedContainer : newlyLaunchedContainers) { |
| containerLaunchedOnNode(launchedContainer.getContainerId(), |
| schedulerNode); |
| } |
| |
| // Processing the newly increased containers |
| List<Container> newlyIncreasedContainers = |
| nm.pullNewlyIncreasedContainers(); |
| for (Container container : newlyIncreasedContainers) { |
| containerIncreasedOnNode(container.getId(), schedulerNode, container); |
| } |
| |
| // Processing the update exist containers |
| for (Map.Entry<ApplicationId, ContainerStatus> c : updateExistContainers) { |
| SchedulerApplication<T> app = applications.get(c.getKey()); |
| ContainerId containerId = c.getValue().getContainerId(); |
| if (app == null || app.getCurrentAppAttempt() == null) { |
| continue; |
| } |
| RMContainer rmContainer |
| = app.getCurrentAppAttempt().getRMContainer(containerId); |
| if (rmContainer == null) { |
| continue; |
| } |
| // exposed ports are already set for the container, skip |
| if (rmContainer.getExposedPorts() != null && |
| rmContainer.getExposedPorts().size() > 0) { |
| continue; |
| } |
| |
| String strExposedPorts = c.getValue().getExposedPorts(); |
| if (null != strExposedPorts && !strExposedPorts.isEmpty()) { |
| Gson gson = new Gson(); |
| Map<String, List<Map<String, String>>> exposedPorts = |
| gson.fromJson(strExposedPorts, |
| new TypeToken<Map<String, List<Map<String, String>>>>() |
| {}.getType()); |
| LOG.info("update exist container " + containerId.getContainerId() |
| + ", strExposedPorts = " + strExposedPorts); |
| rmContainer.setExposedPorts(exposedPorts); |
| } |
| } |
| |
| return completedContainers; |
| } |
| |
| /** |
| * Process completed container list. |
| * @param completedContainers Extracted list of completed containers |
| * @param releasedResources Reference resource object for completed containers |
| * @param nodeId NodeId corresponding to the NodeManager |
| * @param schedulerNode schedulerNode |
| * @return The total number of released containers |
| */ |
| private int updateCompletedContainers(List<ContainerStatus> completedContainers, |
| Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) { |
| int releasedContainers = 0; |
| List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>(); |
| for (ContainerStatus completedContainer : completedContainers) { |
| ContainerId containerId = completedContainer.getContainerId(); |
| LOG.debug("Container FINISHED: {}", containerId); |
| RMContainer container = getRMContainer(containerId); |
| completedContainer(container, |
| completedContainer, RMContainerEventType.FINISHED); |
| if (schedulerNode != null) { |
| schedulerNode.releaseContainer(containerId, true); |
| } |
| |
| if (container != null) { |
| releasedContainers++; |
| Resource ars = container.getAllocatedResource(); |
| if (ars != null) { |
| Resources.addTo(releasedResources, ars); |
| } |
| Resource rrs = container.getReservedResource(); |
| if (rrs != null) { |
| Resources.addTo(releasedResources, rrs); |
| } |
| } else { |
| // Add containers which are untracked by RM. |
| untrackedContainerIdList.add(containerId); |
| } |
| } |
| |
| // Acknowledge NM to remove RM-untracked-containers from NM context. |
| if (!untrackedContainerIdList.isEmpty()) { |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, |
| untrackedContainerIdList)); |
| } |
| |
| return releasedContainers; |
| } |
| |
| /** |
| * Update schedulerHealth information. |
| * @param releasedResources Reference resource object for completed containers |
| * @param releasedContainers Count of released containers |
| */ |
| protected void updateSchedulerHealthInformation(Resource releasedResources, |
| int releasedContainers) { |
| |
| schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(), |
| releasedResources); |
| schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); |
| } |
| |
| /** |
| * Update container and utilization information on the NodeManager. |
| * @param nm The NodeManager to update |
| * @param schedulerNode schedulerNode |
| */ |
| protected void updateNodeResourceUtilization(RMNode nm, |
| SchedulerNode schedulerNode) { |
| // Updating node resource utilization |
| schedulerNode.setAggregatedContainersUtilization( |
| nm.getAggregatedContainersUtilization()); |
| schedulerNode.setNodeUtilization(nm.getNodeUtilization()); |
| } |
| |
| /** |
| * Process a heartbeat update from a node. |
| * @param nm The RMNode corresponding to the NodeManager |
| */ |
| protected void nodeUpdate(RMNode nm) { |
| LOG.debug("nodeUpdate: {} cluster capacity: {}", |
| nm, getClusterResource()); |
| |
| // Process new container information |
| // NOTICE: it is possible to not find the NodeID as a node can be |
| // decommissioned at the same time. Skip updates if node is null. |
| SchedulerNode schedulerNode = getNode(nm.getNodeID()); |
| List<ContainerStatus> completedContainers = updateNewContainerInfo(nm, |
| schedulerNode); |
| |
| // Notify Scheduler Node updated. |
| if (schedulerNode != null) { |
| schedulerNode.notifyNodeUpdate(); |
| } |
| |
| // Process completed containers |
| Resource releasedResources = Resource.newInstance(0, 0); |
| int releasedContainers = updateCompletedContainers(completedContainers, |
| releasedResources, nm.getNodeID(), schedulerNode); |
| |
| // If the node is decommissioning, send an update to have the total |
| // resource equal to the used resource, so no available resource to |
| // schedule. |
| if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null |
| && schedulerNode.getTotalResource().compareTo( |
| schedulerNode.getAllocatedResource()) != 0) { |
| this.rmContext |
| .getDispatcher() |
| .getEventHandler() |
| .handle( |
| new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption |
| .newInstance(schedulerNode.getAllocatedResource(), 0))); |
| } |
| |
| updateSchedulerHealthInformation(releasedResources, releasedContainers); |
| if (schedulerNode != null) { |
| updateNodeResourceUtilization(nm, schedulerNode); |
| } |
| |
| if (schedulerNode != null) { |
| signalContainersIfOvercommitted(schedulerNode, true); |
| } |
| |
| // Now node data structures are up-to-date and ready for scheduling. |
| if(LOG.isDebugEnabled()) { |
| LOG.debug( |
| "Node being looked for scheduling " + nm + " availableResource: " + |
| (schedulerNode == null ? "unknown (decommissioned)" : |
| schedulerNode.getUnallocatedResource())); |
| } |
| } |
| |
| /** |
| * Check if the node is overcommitted and needs to remove containers. If |
| * it is overcommitted, it will kill or preempt (notify the AM to stop them) |
| * containers. It also takes into account the overcommit timeout. It only |
| * notifies the application to preempt a container if the timeout hasn't |
| * passed. If the timeout has passed, it tries to kill the containers. If |
| * there is no timeout, it doesn't do anything and just prevents new |
| * allocations. |
| * |
| * This action is taken when the change of resources happens (to preempt |
| * containers or killing them if specified) or when the node heart beats |
| * (for killing only). |
| * |
| * @param schedulerNode The node to check whether is overcommitted. |
| * @param kill If the container should be killed or just notify the AM. |
| */ |
| private void signalContainersIfOvercommitted( |
| SchedulerNode schedulerNode, boolean kill) { |
| |
| // If there is no time out, we don't do anything |
| if (!schedulerNode.isOvercommitTimeOutSet()) { |
| return; |
| } |
| |
| SchedulerEventType eventType = |
| SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; |
| if (kill) { |
| eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; |
| |
| // If it hasn't timed out yet, don't kill |
| if (!schedulerNode.isOvercommitTimedOut()) { |
| return; |
| } |
| } |
| |
| // Check if the node is overcommitted (negative resources) |
| ResourceCalculator rc = getResourceCalculator(); |
| Resource unallocated = Resource.newInstance( |
| schedulerNode.getUnallocatedResource()); |
| if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) { |
| return; |
| } |
| |
| LOG.info("{} is overcommitted ({}), preempt/kill containers", |
| schedulerNode.getNodeID(), unallocated); |
| for (RMContainer container : schedulerNode.getContainersToKill()) { |
| LOG.info("Send {} to {} to free up {}", eventType, |
| container.getContainerId(), container.getAllocatedResource()); |
| ApplicationAttemptId appId = container.getApplicationAttemptId(); |
| ContainerPreemptEvent event = |
| new ContainerPreemptEvent(appId, container, eventType); |
| this.rmContext.getDispatcher().getEventHandler().handle(event); |
| Resources.addTo(unallocated, container.getAllocatedResource()); |
| |
| if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) { |
| LOG.debug("Enough unallocated resources {}", unallocated); |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public Resource getNormalizedResource(Resource requestedResource, |
| Resource maxResourceCapability) { |
| return SchedulerUtils.getNormalizedResource(requestedResource, |
| getResourceCalculator(), |
| getMinimumResourceCapability(), |
| maxResourceCapability, |
| getMinimumResourceCapability()); |
| } |
| |
| /** |
| * Normalize a list of resource requests. |
| * |
| * @param asks resource requests |
| */ |
| protected void normalizeResourceRequests(List<ResourceRequest> asks) { |
| normalizeResourceRequests(asks, null); |
| } |
| |
| /** |
| * Normalize a list of resource requests |
| * using queue maximum resource allocations. |
| * @param asks resource requests |
| * @param queueName queue Name. |
| */ |
| protected void normalizeResourceRequests(List<ResourceRequest> asks, |
| String queueName) { |
| Resource maxAllocation = getMaximumResourceCapability(queueName); |
| for (ResourceRequest ask : asks) { |
| ask.setCapability( |
| getNormalizedResource(ask.getCapability(), maxAllocation)); |
| } |
| } |
| |
| protected void handleContainerUpdates( |
| SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) { |
| List<UpdateContainerRequest> promotionRequests = |
| updates.getPromotionRequests(); |
| if (promotionRequests != null && !promotionRequests.isEmpty()) { |
| LOG.info("Promotion Update requests : " + promotionRequests); |
| // Promotion is technically an increase request from |
| // 0 resources to target resources. |
| handleIncreaseRequests(appAttempt, promotionRequests); |
| } |
| List<UpdateContainerRequest> increaseRequests = |
| updates.getIncreaseRequests(); |
| if (increaseRequests != null && !increaseRequests.isEmpty()) { |
| LOG.info("Resource increase requests : " + increaseRequests); |
| handleIncreaseRequests(appAttempt, increaseRequests); |
| } |
| List<UpdateContainerRequest> demotionRequests = |
| updates.getDemotionRequests(); |
| if (demotionRequests != null && !demotionRequests.isEmpty()) { |
| LOG.info("Demotion Update requests : " + demotionRequests); |
| // Demotion is technically a decrease request from initial |
| // to 0 resources |
| handleDecreaseRequests(appAttempt, demotionRequests); |
| } |
| List<UpdateContainerRequest> decreaseRequests = |
| updates.getDecreaseRequests(); |
| if (decreaseRequests != null && !decreaseRequests.isEmpty()) { |
| LOG.info("Resource decrease requests : " + decreaseRequests); |
| handleDecreaseRequests(appAttempt, decreaseRequests); |
| } |
| } |
| |
| private void handleIncreaseRequests( |
| SchedulerApplicationAttempt applicationAttempt, |
| List<UpdateContainerRequest> updateContainerRequests) { |
| for (UpdateContainerRequest uReq : updateContainerRequests) { |
| RMContainer rmContainer = |
| rmContext.getScheduler().getRMContainer(uReq.getContainerId()); |
| // Check if this is a container update |
| // And not in the middle of a Demotion |
| if (rmContainer != null) { |
| // Check if this is an executionType change request |
| // If so, fix the rr to make it look like a normal rr |
| // with relaxLocality=false and numContainers=1 |
| SchedulerNode schedulerNode = rmContext.getScheduler() |
| .getSchedulerNode(rmContainer.getContainer().getNodeId()); |
| |
| // Add only if no outstanding promote requests exist. |
| if (!applicationAttempt.getUpdateContext() |
| .checkAndAddToOutstandingIncreases( |
| rmContainer, schedulerNode, uReq)) { |
| applicationAttempt.addToUpdateContainerErrors( |
| UpdateContainerError.newInstance( |
| RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); |
| } |
| } else { |
| LOG.warn("Cannot promote non-existent (or completed) Container [" |
| + uReq.getContainerId() + "]"); |
| } |
| } |
| } |
| |
| private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt, |
| List<UpdateContainerRequest> demotionRequests) { |
| OpportunisticContainerContext oppCntxt = |
| appAttempt.getOpportunisticContainerContext(); |
| for (UpdateContainerRequest uReq : demotionRequests) { |
| RMContainer rmContainer = |
| rmContext.getScheduler().getRMContainer(uReq.getContainerId()); |
| if (rmContainer != null) { |
| SchedulerNode schedulerNode = rmContext.getScheduler() |
| .getSchedulerNode(rmContainer.getContainer().getNodeId()); |
| if (appAttempt.getUpdateContext() |
| .checkAndAddToOutstandingDecreases(uReq, schedulerNode, |
| rmContainer.getContainer())) { |
| if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == |
| uReq.getContainerUpdateType()) { |
| RMContainer demotedRMContainer = |
| createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); |
| if (demotedRMContainer != null) { |
| OpportunisticSchedulerMetrics.getMetrics() |
| .incrAllocatedOppContainers(1); |
| appAttempt.addToNewlyDemotedContainers( |
| uReq.getContainerId(), demotedRMContainer); |
| } |
| } else { |
| RMContainer demotedRMContainer = createDecreasedRMContainer( |
| appAttempt, uReq, rmContainer); |
| appAttempt.addToNewlyDecreasedContainers( |
| uReq.getContainerId(), demotedRMContainer); |
| } |
| } else { |
| appAttempt.addToUpdateContainerErrors( |
| UpdateContainerError.newInstance( |
| RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); |
| } |
| } else { |
| LOG.warn("Cannot demote/decrease non-existent (or completed) " + |
| "Container [" + uReq.getContainerId() + "]"); |
| } |
| } |
| } |
| |
| private RMContainer createDecreasedRMContainer( |
| SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq, |
| RMContainer rmContainer) { |
| SchedulerRequestKey sk = |
| SchedulerRequestKey.extractFrom(rmContainer.getContainer()); |
| Container decreasedContainer = BuilderUtils.newContainer( |
| ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), |
| appAttempt.getNewContainerId()), |
| rmContainer.getContainer().getNodeId(), |
| rmContainer.getContainer().getNodeHttpAddress(), |
| Resources.none(), |
| sk.getPriority(), null, rmContainer.getExecutionType(), |
| sk.getAllocationRequestId()); |
| decreasedContainer.setVersion(rmContainer.getContainer().getVersion()); |
| RMContainer newRmContainer = new RMContainerImpl(decreasedContainer, |
| sk, appAttempt.getApplicationAttemptId(), |
| decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext, |
| rmContainer.isRemotelyAllocated()); |
| appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer); |
| ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( |
| decreasedContainer.getNodeId()).allocateContainer(newRmContainer); |
| return newRmContainer; |
| } |
| |
| private RMContainer createDemotedRMContainer( |
| SchedulerApplicationAttempt appAttempt, |
| OpportunisticContainerContext oppCntxt, |
| RMContainer rmContainer) { |
| SchedulerRequestKey sk = |
| SchedulerRequestKey.extractFrom(rmContainer.getContainer()); |
| Container demotedContainer = BuilderUtils.newContainer( |
| ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), |
| oppCntxt.getContainerIdGenerator().generateContainerId()), |
| rmContainer.getContainer().getNodeId(), |
| rmContainer.getContainer().getNodeHttpAddress(), |
| rmContainer.getContainer().getResource(), |
| sk.getPriority(), null, ExecutionType.OPPORTUNISTIC, |
| sk.getAllocationRequestId()); |
| demotedContainer.setVersion(rmContainer.getContainer().getVersion()); |
| return SchedulerUtils.createOpportunisticRmContainer( |
| rmContext, demotedContainer, false); |
| } |
| |
| /** |
| * Rollback container update after expiry. |
| * @param containerId ContainerId. |
| */ |
| protected void rollbackContainerUpdate( |
| ContainerId containerId) { |
| RMContainer rmContainer = getRMContainer(containerId); |
| if (rmContainer == null) { |
| LOG.info("Cannot rollback resource for container " + containerId |
| + ". The container does not exist."); |
| return; |
| } |
| T app = getCurrentAttemptForContainer(containerId); |
| if (getCurrentAttemptForContainer(containerId) == null) { |
| LOG.info("Cannot rollback resource for container " + containerId |
| + ". The application that the container " |
| + "belongs to does not exist."); |
| return; |
| } |
| |
| if (Resources.fitsIn(rmContainer.getLastConfirmedResource(), |
| rmContainer.getContainer().getResource())) { |
| LOG.info("Roll back resource for container " + containerId); |
| handleDecreaseRequests(app, Arrays.asList( |
| UpdateContainerRequest.newInstance( |
| rmContainer.getContainer().getVersion(), |
| rmContainer.getContainerId(), |
| ContainerUpdateType.DECREASE_RESOURCE, |
| rmContainer.getLastConfirmedResource(), null))); |
| } |
| } |
| |
| @Override |
| public List<NodeId> getNodeIds(String resourceName) { |
| return nodeTracker.getNodeIdsByResourceName(resourceName); |
| } |
| |
| /** |
| * To be used to release a container via a Scheduler Event rather than |
| * in the same thread. |
| * @param container Container. |
| */ |
| public void asyncContainerRelease(RMContainer container) { |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new ReleaseContainerEvent(container)); |
| } |
| |
| /* |
| * Get a Resource object with for the minimum allocation possible. |
| * |
| * @return a Resource object with the minimum allocation for the scheduler |
| */ |
| public Resource getMinimumAllocation() { |
| Resource ret = ResourceUtils.getResourceTypesMinimumAllocation(); |
| LOG.info("Minimum allocation = " + ret); |
| return ret; |
| } |
| |
| /** |
| * Get a Resource object with for the maximum allocation possible. |
| * |
| * @return a Resource object with the maximum allocation for the scheduler |
| */ |
| |
| public Resource getMaximumAllocation() { |
| Resource ret = ResourceUtils.getResourceTypesMaximumAllocation(); |
| LOG.info("Maximum allocation = " + ret); |
| return ret; |
| } |
| |
| @Override |
| public long checkAndGetApplicationLifetime(String queueName, long lifetime) { |
| // Lifetime is the application lifetime by default. |
| return lifetime; |
| } |
| |
| @Override |
| public long getMaximumApplicationLifetime(String queueName) { |
| return -1; |
| } |
| |
| /** |
| * Kill a RMContainer. This is meant to be called in tests only to simulate |
| * AM container failures. |
| * @param container the container to kill |
| */ |
| @VisibleForTesting |
| public abstract void killContainer(RMContainer container); |
| |
| /** |
| * Update internal state of the scheduler. This can be useful for scheduler |
| * implementations that maintain some state that needs to be periodically |
| * updated; for example, metrics or queue resources. It will be called by the |
| * {@link UpdateThread} every {@link #updateInterval}. By default, it will |
| * not run; subclasses should set {@link #updateInterval} to a |
| * positive value during {@link #serviceInit(Configuration)} if they want to |
| * enable the thread. |
| */ |
| @VisibleForTesting |
| public void update() { |
| // do nothing by default |
| } |
| |
| /** |
| * Thread which calls {@link #update()} every |
| * <code>updateInterval</code> milliseconds. |
| */ |
| private class UpdateThread extends Thread { |
| @Override |
| public void run() { |
| while (!Thread.currentThread().isInterrupted()) { |
| try { |
| synchronized (updateThreadMonitor) { |
| updateThreadMonitor.wait(updateInterval); |
| } |
| update(); |
| } catch (InterruptedException ie) { |
| LOG.warn("Scheduler UpdateThread interrupted. Exiting."); |
| return; |
| } catch (Exception e) { |
| LOG.error("Exception in scheduler UpdateThread", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Allows {@link UpdateThread} to start processing without waiting till |
| * {@link #updateInterval}. |
| */ |
| protected void triggerUpdate() { |
| synchronized (updateThreadMonitor) { |
| updateThreadMonitor.notify(); |
| } |
| } |
| |
| @Override |
| public void reinitialize(Configuration conf, RMContext rmContext) |
| throws IOException { |
| try { |
| LOG.info("Reinitializing SchedulingMonitorManager ..."); |
| schedulingMonitorManager.reinitialize(rmContext, conf); |
| } catch (YarnException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * Default implementation. Always returns false. |
| * @param appAttempt ApplicationAttempt. |
| * @param schedulingRequest SchedulingRequest. |
| * @param schedulerNode SchedulerNode. |
| * @return Success or not. |
| */ |
| @Override |
| public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, |
| SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) { |
| return false; |
| } |
| |
| @Override |
| public void resetSchedulerMetrics() { |
| // reset scheduler metrics |
| } |
| |
| /** |
| * Gets the apps from a given queue. |
| * |
| * Mechanics: |
| * 1. Get all {@link ApplicationAttemptId}s in the given queue by |
| * {@link #getAppsInQueue(String)} method. |
| * 2. Always need to check validity for the given queue by the returned |
| * values. |
| * |
| * @param queueName queue name |
| * @return a collection of app attempt ids in the given queue, it maybe empty. |
| * @throws YarnException if {@link #getAppsInQueue(String)} return null, will |
| * throw this exception. |
| */ |
| private List<ApplicationAttemptId> getAppsFromQueue(String queueName) |
| throws YarnException { |
| List<ApplicationAttemptId> apps = getAppsInQueue(queueName); |
| if (apps == null) { |
| throw new YarnException("The specified queue: " + queueName |
| + " doesn't exist"); |
| } |
| return apps; |
| } |
| } |