blob: 27bfe6c41b5c9a79aca1af98f854d2dbbd3f4b48 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.RMContainerNMDoneChangeResourceEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
@SuppressWarnings("unchecked")
@Private
@Unstable
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
extends AbstractService implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
protected final ClusterNodeTracker<N> nodeTracker =
new ClusterNodeTracker<>();
protected Resource minimumAllocation;
protected volatile RMContext rmContext;
private volatile Priority maxClusterLevelAppPriority;
protected ActivitiesManager activitiesManager;
protected SchedulerHealth schedulerHealth = new SchedulerHealth();
protected volatile long lastNodeUpdateTime;
private volatile Clock clock;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
*/
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
protected int nmExpireInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
protected final ReentrantReadWriteLock.ReadLock readLock;
/*
* Use writeLock for any of operations below:
* - queue change (hierarchy / configuration / container allocation)
* - application(add/remove/allocate-container, but not include container
* finish)
* - node (add/remove/change-resource/container-allocation, but not include
* container finish)
*/
protected final ReentrantReadWriteLock.WriteLock writeLock;
/**
* Construct the service.
*
* @param name service name
*/
public AbstractYarnScheduler(String name) {
super(name);
clock = SystemClock.getInstance();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
public void serviceInit(Configuration conf) throws Exception {
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
nodeTracker.setConfiguredMaxAllocationWaitTime(
configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
createReleaseCache();
super.serviceInit(conf);
}
@VisibleForTesting
public ClusterNodeTracker getNodeTracker() {
return nodeTracker;
}
/*
* YARN-3136 removed synchronized lock for this method for performance
* purposes
*/
public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
SchedulerApplication<T> app = applications.get(appId);
List<Container> containerList = new ArrayList<Container>();
RMApp appImpl = this.rmContext.getRMApps().get(appId);
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
return containerList;
}
if (app == null) {
return containerList;
}
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
ContainerId amContainerId = rmContext.getRMApps().get(appId)
.getCurrentAppAttempt().getMasterContainer().getId();
for (RMContainer rmContainer : liveContainers) {
if (!rmContainer.getContainerId().equals(amContainerId)) {
containerList.add(rmContainer.getContainer());
}
}
return containerList;
}
public Map<ApplicationId, SchedulerApplication<T>>
getSchedulerApplications() {
return applications;
}
/**
* Add blacklisted NodeIds to the list that is passed.
*
* @param app application attempt.
*/
public List<N> getBlacklistedNodes(final SchedulerApplicationAttempt app) {
NodeFilter nodeFilter = new NodeFilter() {
@Override
public boolean accept(SchedulerNode node) {
return SchedulerAppUtils.isPlaceBlacklisted(app, node, LOG);
}
};
return nodeTracker.getNodes(nodeFilter);
}
@Override
public Resource getClusterResource() {
return nodeTracker.getClusterCapacity();
}
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
@Override
public Resource getMaximumResourceCapability() {
return nodeTracker.getMaxAllowedAllocation();
}
@Override
public Resource getMaximumResourceCapability(String queueName) {
return getMaximumResourceCapability();
}
protected void initMaximumResourceCapability(Resource maximumAllocation) {
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
public SchedulerHealth getSchedulerHealth() {
return this.schedulerHealth;
}
protected void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
public long getLastNodeUpdateTime() {
return lastNodeUpdateTime;
}
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
try {
readLock.lock();
// Get the application for the finished container
SchedulerApplicationAttempt application =
getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
.getApplicationId() + " launched container " + containerId
+ " on node: " + node);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
} finally {
readLock.unlock();
}
}
protected void containerIncreasedOnNode(ContainerId containerId,
SchedulerNode node, Container increasedContainerReportedByNM) {
/*
* No lock is required, as this method is protected by scheduler's writeLock
*/
// Get the application for the finished container
SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
containerId);
if (application == null) {
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
.getApplicationId() + " increased container " + containerId
+ " on node: " + node);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
increasedContainerReportedByNM.getResource()));
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app = applications.get(
applicationAttemptId.getApplicationId());
return app == null ? null : app.getCurrentAppAttempt();
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return new SchedulerAppReport(attempt);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return attempt.getResourceUsageReport();
}
public T getCurrentAttemptForContainer(ContainerId containerId) {
return getApplicationAttempt(containerId.getApplicationAttemptId());
}
@Override
public RMContainer getRMContainer(ContainerId containerId) {
SchedulerApplicationAttempt attempt =
getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
return nodeTracker.getNodeReport(nodeId);
}
@Override
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
}
@Override
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support pre-validation of moving apps between queues");
}
public void removeQueue(String queueName) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support removing queues");
}
@Override
public void addQueue(Queue newQueue) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
}
@Override
public void setEntitlement(String queue, QueueEntitlement entitlement)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
}
private void killOrphanContainerOnNode(RMNode node,
NMContainerStatus container) {
if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(),
container.getContainerId()));
}
}
public synchronized void recoverContainersOnNode(
List<NMContainerStatus> containerReports, RMNode nm) {
try {
writeLock.lock();
if (!rmContext.isWorkPreservingRecoveryEnabled()
|| containerReports == null || (containerReports != null
&& containerReports.isEmpty())) {
return;
}
for (NMContainerStatus container : containerReports) {
ApplicationId appId =
container.getContainerId().getApplicationAttemptId()
.getApplicationId();
RMApp rmApp = rmContext.getRMApps().get(appId);
if (rmApp == null) {
LOG.error("Skip recovering container " + container
+ " for unknown application.");
killOrphanContainerOnNode(nm, container);
continue;
}
SchedulerApplication<T> schedulerApp = applications.get(appId);
if (schedulerApp == null) {
LOG.info("Skip recovering container " + container
+ " for unknown SchedulerApplication. "
+ "Application current state is " + rmApp.getState());
killOrphanContainerOnNode(nm, container);
continue;
}
LOG.info("Recovering container " + container);
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
if (!rmApp.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
// Do not recover containers for stopped attempt or previous attempt.
if (schedulerAttempt.isStopped() || !schedulerAttempt
.getApplicationAttemptId().equals(
container.getContainerId().getApplicationAttemptId())) {
LOG.info("Skip recovering container " + container
+ " for already stopped attempt.");
killOrphanContainerOnNode(nm, container);
continue;
}
}
// create container
RMContainer rmContainer = recoverAndCreateContainer(container, nm);
// recover RMContainer
rmContainer.handle(
new RMContainerRecoverEvent(container.getContainerId(), container));
// recover scheduler node
SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
schedulerNode.recoverContainer(rmContainer);
// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(getClusterResource(), schedulerAttempt,
rmContainer);
// recover scheduler attempt
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
// set master container for the current running AMContainer for this
// attempt.
RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
if (appAttempt != null) {
Container masterContainer = appAttempt.getMasterContainer();
// Mark current running AMContainer's RMContainer based on the master
// container ID stored in AppAttempt.
if (masterContainer != null && masterContainer.getId().equals(
rmContainer.getContainerId())) {
((RMContainerImpl) rmContainer).setAMContainer(true);
}
}
if (schedulerAttempt.getPendingRelease().remove(
container.getContainerId())) {
// release the container
rmContainer.handle(
new RMContainerFinishedEvent(container.getContainerId(),
SchedulerUtils
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED));
LOG.info(container.getContainerId() + " is released by application.");
}
}
} finally {
writeLock.unlock();
}
}
private RMContainer recoverAndCreateContainer(NMContainerStatus status,
RMNode node) {
Container container =
Container.newInstance(status.getContainerId(), node.getNodeID(),
node.getHttpAddress(), status.getAllocatedResource(),
status.getPriority(), null);
container.setVersion(status.getVersion());
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
RMContainer rmContainer =
new RMContainerImpl(container, attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime(), status.getNodeLabelExpression());
return rmContainer;
}
/**
* Recover resource request back from RMContainer when a container is
* preempted before AM pulled the same. If container is pulled by
* AM, then RMContainer will not have resource request to recover.
* @param rmContainer rmContainer
*/
private void recoverResourceRequestForContainer(RMContainer rmContainer) {
List<ResourceRequest> requests = rmContainer.getResourceRequests();
// If container state is moved to ACQUIRED, request will be empty.
if (requests == null) {
return;
}
// Add resource request back to Scheduler ApplicationAttempt.
// We lookup the application-attempt here again using
// getCurrentApplicationAttempt() because there is only one app-attempt at
// any point in the scheduler. But in corner cases, AMs can crash,
// corresponding containers get killed and recovered to the same-attempt,
// but because the app-attempt is extinguished right after, the recovered
// requests don't serve any purpose, but that's okay.
SchedulerApplicationAttempt schedulerAttempt =
getCurrentAttemptForContainer(rmContainer.getContainerId());
if (schedulerAttempt != null) {
schedulerAttempt.recoverResourceRequestsForContainer(requests);
}
}
protected void createReleaseCache() {
// Cleanup the cache after nm expire interval.
new Timer().schedule(new TimerTask() {
@Override
public void run() {
clearPendingContainerCache();
LOG.info("Release request cache is cleaned up");
}
}, nmExpireInterval);
}
@VisibleForTesting
public void clearPendingContainerCache() {
for (SchedulerApplication<T> app : applications.values()) {
T attempt = app.getCurrentAppAttempt();
if (attempt != null) {
for (ContainerId containerId : attempt.getPendingRelease()) {
RMAuditLogger.logFailure(app.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "Scheduler",
"Trying to release container not owned by app "
+ "or with invalid id.", attempt.getApplicationId(),
containerId, null);
}
attempt.getPendingRelease().clear();
}
}
}
@VisibleForTesting
@Private
// clean up a completed container
public void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Container " + containerStatus.getContainerId()
+ " completed with event " + event
+ ", but corresponding RMContainer doesn't exist.");
return;
}
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
completedContainerInternal(rmContainer, containerStatus, event);
} else {
ContainerId containerId = rmContainer.getContainerId();
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
SchedulerApplicationAttempt schedulerAttempt =
getCurrentAttemptForContainer(containerId);
if (schedulerAttempt != null) {
schedulerAttempt.removeRMContainer(containerId);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
}
getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
rmContainer.getContainer());
}
// If the container is getting killed in ACQUIRED state, the requester (AM
// for regular containers and RM itself for AM container) will not know what
// happened. Simply add the ResourceRequest back again so that requester
// doesn't need to do anything conditionally.
recoverResourceRequestForContainer(rmContainer);
}
// clean up a completed container
protected abstract void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event);
protected void releaseContainers(List<ContainerId> containers,
SchedulerApplicationAttempt attempt) {
for (ContainerId containerId : containers) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the release request cache as it maybe on recovery.");
attempt.getPendingRelease().add(containerId);
} else {
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "Scheduler",
"Trying to release container not owned by app or with invalid id.",
attempt.getApplicationId(), containerId, null);
}
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
}
protected void decreaseContainers(
List<UpdateContainerRequest> decreaseRequests,
SchedulerApplicationAttempt attempt) {
if (null == decreaseRequests || decreaseRequests.isEmpty()) {
return;
}
// Pre-process decrease requests
List<SchedContainerChangeRequest> schedDecreaseRequests =
createSchedContainerChangeRequests(decreaseRequests, false);
for (SchedContainerChangeRequest request : schedDecreaseRequests) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing decrease request:" + request);
}
// handle decrease request
decreaseContainer(request, attempt);
}
}
protected abstract void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt);
@Override
public N getSchedulerNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
@Override
public void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
try {
writeLock.lock();
// check if destination queue is a valid leaf queue
try {
getQueueInfo(destQueue, false, false);
} catch (IOException e) {
LOG.warn(e);
throw new YarnException(e);
}
// check if source queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
if (apps == null) {
String errMsg =
"The specified Queue: " + sourceQueue + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate move events for each pending/running app
for (ApplicationAttemptId appAttemptId : apps) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppManagerEvent(appAttemptId.getApplicationId(),
destQueue, RMAppManagerEventType.APP_MOVE));
}
} finally {
writeLock.unlock();
}
}
@Override
public void killAllAppsInQueue(String queueName)
throws YarnException {
try {
writeLock.lock();
// check if queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
String errMsg = "The specified Queue: " + queueName + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate kill events for each pending/running app
for (ApplicationAttemptId app : apps) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
"Application killed due to expiry of reservation queue "
+ queueName + "."));
}
} finally {
writeLock.unlock();
}
}
/**
* Process resource update on a node.
*/
public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
try {
writeLock.lock();
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();
if (!oldResource.equals(newResource)) {
// Notify NodeLabelsManager about this change
rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
newResource);
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName() + " from: "
+ oldResource + ", to: " + newResource);
nodeTracker.removeNode(nm.getNodeID());
// update resource to node
node.updateTotalResource(newResource);
nodeTracker.addNode((N) node);
} else{
// Log resource change
LOG.warn("Update resource on node: " + node.getNodeName()
+ " with the same resource: " + newResource);
}
} finally {
writeLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
return EnumSet.of(SchedulerResourceTypes.MEMORY);
}
@Override
public Set<String> getPlanQueues() throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support reservations");
}
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
nodeTracker.setConfiguredMaxAllocation(newMaxAlloc);
}
@Override
public List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);
if (attempt != null) {
return attempt.getAppSchedulingInfo().getAllResourceRequests();
}
return null;
}
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.
return Priority.newInstance(0);
}
@Override
public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId, SettableFuture<Object> future)
throws YarnException {
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.
return Priority.newInstance(0);
}
@Override
public Priority getMaxClusterLevelAppPriority() {
return maxClusterLevelAppPriority;
}
private Priority getMaxPriorityFromConf(Configuration conf) {
return Priority.newInstance(conf.getInt(
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
}
@Override
public void setClusterMaxPriority(Configuration conf)
throws YarnException {
try {
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
} catch (NumberFormatException e) {
throw new YarnException(e);
}
LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
+ maxClusterLevelAppPriority);
}
/**
* Sanity check increase/decrease request, and return
* SchedulerContainerResourceChangeRequest according to given
* UpdateContainerRequest.
*
* <pre>
* - Returns non-null value means validation succeeded
* - Throw exception when any other error happens
* </pre>
*/
private SchedContainerChangeRequest createSchedContainerChangeRequest(
UpdateContainerRequest request, boolean increase)
throws YarnException {
ContainerId containerId = request.getContainerId();
RMContainer rmContainer = getRMContainer(containerId);
if (null == rmContainer) {
String msg =
"Failed to get rmContainer for "
+ (increase ? "increase" : "decrease")
+ " request, with container-id=" + containerId;
throw new InvalidResourceRequestException(msg);
}
SchedulerNode schedulerNode =
getSchedulerNode(rmContainer.getAllocatedNode());
return new SchedContainerChangeRequest(
this.rmContext, schedulerNode, rmContainer, request.getCapability());
}
protected List<SchedContainerChangeRequest>
createSchedContainerChangeRequests(
List<UpdateContainerRequest> changeRequests,
boolean increase) {
List<SchedContainerChangeRequest> schedulerChangeRequests =
new ArrayList<SchedContainerChangeRequest>();
for (UpdateContainerRequest r : changeRequests) {
SchedContainerChangeRequest sr = null;
try {
sr = createSchedContainerChangeRequest(r, increase);
} catch (YarnException e) {
LOG.warn("Error happens when checking increase request, Ignoring.."
+ " exception=", e);
continue;
}
schedulerChangeRequests.add(sr);
}
return schedulerChangeRequests;
}
public ActivitiesManager getActivitiesManager() {
return this.activitiesManager;
}
public Clock getClock() {
return clock;
}
@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}
@Lock(Lock.NoLock.class)
public SchedulerNode getNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
/**
* Get lists of new containers from NodeManager and process them.
* @param nm The RMNode corresponding to the NodeManager
* @return list of completed containers
*/
protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
SchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<>();
List<ContainerStatus> completedContainers =
new ArrayList<>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers
.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Processing the newly increased containers
List<Container> newlyIncreasedContainers =
nm.pullNewlyIncreasedContainers();
for (Container container : newlyIncreasedContainers) {
containerIncreasedOnNode(container.getId(), node, container);
}
return completedContainers;
}
/**
* Process completed container list.
* @param completedContainers Extracted list of completed containers
* @param releasedResources Reference resource object for completed containers
* @param nodeId NodeId corresponding to the NodeManager
* @return The total number of released containers
*/
protected int updateCompletedContainers(List<ContainerStatus>
completedContainers, Resource releasedResources, NodeId nodeId) {
int releasedContainers = 0;
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
RMContainer container = getRMContainer(containerId);
completedContainer(container,
completedContainer, RMContainerEventType.FINISHED);
if (container != null) {
releasedContainers++;
Resource ars = container.getAllocatedResource();
if (ars != null) {
Resources.addTo(releasedResources, ars);
}
Resource rrs = container.getReservedResource();
if (rrs != null) {
Resources.addTo(releasedResources, rrs);
}
} else {
// Add containers which are untracked by RM.
untrackedContainerIdList.add(containerId);
}
}
// Acknowledge NM to remove RM-untracked-containers from NM context.
if (!untrackedContainerIdList.isEmpty()) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
untrackedContainerIdList));
}
return releasedContainers;
}
/**
* Update schedulerHealth information.
* @param releasedResources Reference resource object for completed containers
* @param releasedContainers Count of released containers
*/
protected void updateSchedulerHealthInformation(Resource releasedResources,
int releasedContainers) {
schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(),
releasedResources);
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
}
/**
* Update container and utilization information on the NodeManager.
* @param nm The NodeManager to update
*/
protected void updateNodeResourceUtilization(RMNode nm) {
SchedulerNode node = getNode(nm.getNodeID());
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
}
/**
* Process a heartbeat update from a node.
* @param nm The RMNode corresponding to the NodeManager
*/
protected synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm +
" cluster capacity: " + getClusterResource());
}
// Process new container information
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
// Process completed containers
Resource releasedResources = Resource.newInstance(0, 0);
int releasedContainers = updateCompletedContainers(completedContainers,
releasedResources, nm.getNodeID());
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
// TODO YARN-5128: Fix possible race-condition when request comes in before
// update is propagated
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(nm.getNodeID())
.getAllocatedResource(), 0)));
}
updateSchedulerHealthInformation(releasedResources, releasedContainers);
updateNodeResourceUtilization(nm);
// Now node data structures are up-to-date and ready for scheduling.
if(LOG.isDebugEnabled()) {
SchedulerNode node = getNode(nm.getNodeID());
LOG.debug("Node being looked for scheduling " + nm +
" availableResource: " + node.getUnallocatedResource());
}
}
@Override
public void normalizeRequest(AbstractResourceRequest ask) {
SchedulerUtils.normalizeRequest(ask,
getResourceCalculator(),
getMinimumResourceCapability(),
getMaximumResourceCapability(),
getMinimumResourceCapability());
}
/**
* Normalize a list of resource requests.
*
* @param asks resource requests
*/
protected void normalizeRequests(List<ResourceRequest> asks) {
for (ResourceRequest ask: asks) {
normalizeRequest(ask);
}
}
/**
* Get a Resource object with for the minimum allocation possible. If resource
* profiles are enabled then the 'minimum' resource profile will be used. If
* they are not enabled, use the minimums specified in the config files.
*
* @return a Resource object with the minimum allocation for the scheduler
*/
public Resource getMinimumAllocation() {
boolean profilesEnabled = getConfig()
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
Resource ret;
if (!profilesEnabled) {
ret = ResourceUtils.getResourceTypesMinimumAllocation();
} else {
ret = rmContext.getResourceProfilesManager().getMinimumProfile();
}
LOG.info("Minimum allocation = " + ret);
return ret;
}
/**
* Get a Resource object with for the maximum allocation possible. If resource
* profiles are enabled then the 'maximum' resource profile will be used. If
* they are not enabled, use the maximums specified in the config files.
*
* @return a Resource object with the maximum allocation for the scheduler
*/
public Resource getMaximumAllocation() {
boolean profilesEnabled = getConfig()
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
Resource ret;
if (!profilesEnabled) {
ret = ResourceUtils.getResourceTypesMaximumAllocation();
} else {
ret = rmContext.getResourceProfilesManager().getMaximumProfile();
}
LOG.info("Maximum allocation = " + ret);
return ret;
}
}