blob: 5d6f2e3da027e58a8e18fcee25813099f32fb774 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* The ContainerScheduler manages a collection of runnable containers. It
* ensures that a container is launched only if all its launch criteria are
* met. It also ensures that OPPORTUNISTIC containers are killed to make
* room for GUARANTEED containers.
*/
public class ContainerScheduler extends AbstractService implements
EventHandler<ContainerSchedulerEvent> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerScheduler.class);
private final Context context;
private final int maxOppQueueLength;
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedGuaranteedContainers = new LinkedHashMap<>();
// Queue of Opportunistic Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedOpportunisticContainers = new LinkedHashMap<>();
// Used to keep track of containers that have been marked to be killed
// or paused to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
// Containers launched by the Scheduler will take a while to actually
// move to the RUNNING state, but should still be fair game for killing
// by the scheduler to make room for guaranteed containers. This holds
// containers that are in RUNNING as well as those in SCHEDULED state that
// have been marked to run, but not yet RUNNING.
private final LinkedHashMap<ContainerId, Container> runningContainers =
new LinkedHashMap<>();
private final ContainerQueuingLimit queuingLimit =
ContainerQueuingLimit.newInstance();
private final OpportunisticContainersStatus opportunisticContainersStatus;
// Resource Utilization Tracker that decides how utilization of the cluster
// increases / decreases based on container start / finish
private ResourceUtilizationTracker utilizationTracker;
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
private Boolean usePauseEventForPreemption = false;
/**
* Instantiate a Container Scheduler.
* @param context NodeManager Context.
* @param dispatcher AsyncDispatcher.
* @param metrics NodeManagerMetrics.
*/
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics) {
this(context, dispatcher, metrics, context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.usePauseEventForPreemption =
conf.getBoolean(
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
}
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
super(ContainerScheduler.class.getName());
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
this.utilizationTracker =
new AllocationBasedResourceUtilizationTracker(this);
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
/**
* Handle ContainerSchedulerEvents.
* @param event ContainerSchedulerEvent.
*/
@Override
public void handle(ContainerSchedulerEvent event) {
switch (event.getType()) {
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
// NOTE: Is sent only after container state has changed to PAUSED...
case CONTAINER_PAUSED:
// NOTE: Is sent only after container state has changed to DONE...
case CONTAINER_COMPLETED:
onResourcesReclaimed(event.getContainer());
break;
case UPDATE_CONTAINER:
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
startPendingContainers(maxOppQueueLength <= 0);
default:
LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString());
}
}
/**
* We assume that the ContainerManager has already figured out what kind
* of update this is.
*/
private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
ContainerId containerId = updateEvent.getContainer().getContainerId();
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
this.utilizationTracker.subtractContainerResource(
new ContainerImpl(getConfig(), null, null, null, null,
updateEvent.getOriginalToken(), context));
this.utilizationTracker.addContainerResources(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
updateEvent.getUpdatedToken().getResource()));
}
}
if (updateEvent.isExecTypeUpdate()) {
// Promotion or not (Increase signifies either a promotion
// or container size increase)
if (updateEvent.isIncrease()) {
// Promotion of queued container..
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
//Kill/pause opportunistic containers if any to make room for
// promotion request
reclaimOpportunisticContainerResources(updateEvent.getContainer());
}
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
}
}
startPendingContainers(maxOppQueueLength <= 0);
}
}
/**
* Populates auxiliary data structures used by the ContainerScheduler on
* recovery.
* @param container container recovered
* @param rcs Recovered Container status
*/
public void recoverActiveContainer(Container container,
RecoveredContainerState rcs) {
ExecutionType execType =
container.getContainerTokenIdentifier().getExecutionType();
if (rcs.getStatus() == RecoveredContainerStatus.QUEUED
|| rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
if (execType == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
} else if (execType == ExecutionType.OPPORTUNISTIC) {
queuedOpportunisticContainers
.put(container.getContainerId(), container);
} else {
LOG.error(
"UnKnown execution type received " + container.getContainerId()
+ ", execType " + execType);
}
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
runningContainers.put(container.getContainerId(), container);
utilizationTracker.addContainerResources(container);
}
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
&& rcs.getCapability() != null) {
metrics.launchedContainer();
metrics.allocateContainer(rcs.getCapability());
}
}
/**
* Return number of queued containers.
* @return Number of queued containers.
*/
public int getNumQueuedContainers() {
return this.queuedGuaranteedContainers.size()
+ this.queuedOpportunisticContainers.size();
}
@VisibleForTesting
public int getNumQueuedGuaranteedContainers() {
return this.queuedGuaranteedContainers.size();
}
@VisibleForTesting
public int getNumQueuedOpportunisticContainers() {
return this.queuedOpportunisticContainers.size();
}
@VisibleForTesting
public int getNumRunningContainers() {
return this.runningContainers.size();
}
@VisibleForTesting
public void setUsePauseEventForPreemption(
boolean usePauseEventForPreemption) {
this.usePauseEventForPreemption = usePauseEventForPreemption;
}
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
this.opportunisticContainersStatus.setQueuedOpportContainers(
getNumQueuedOpportunisticContainers());
this.opportunisticContainersStatus.setWaitQueueLength(
getNumQueuedContainers());
this.opportunisticContainersStatus.setOpportMemoryUsed(
metrics.getAllocatedOpportunisticGB());
this.opportunisticContainersStatus.setOpportCoresUsed(
metrics.getAllocatedOpportunisticVCores());
this.opportunisticContainersStatus.setRunningOpportContainers(
metrics.getRunningOpportunisticContainers());
return this.opportunisticContainersStatus;
}
private void onResourcesReclaimed(Container container) {
oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
// in which case, the container might still be queued.
Container queued =
queuedOpportunisticContainers.remove(container.getContainerId());
if (queued == null) {
queuedGuaranteedContainers.remove(container.getContainerId());
}
// Requeue PAUSED containers
if (container.getContainerState() == ContainerState.PAUSED) {
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
} else {
queuedOpportunisticContainers.put(
container.getContainerId(), container);
}
}
// decrement only if it was a running container
Container completedContainer = runningContainers.remove(container
.getContainerId());
// only a running container releases resources upon completion
boolean resourceReleased = completedContainer != null;
if (resourceReleased) {
this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
}
}
/**
* Start pending containers in the queue.
* @param forceStartGuaranteedContaieners When this is true, start guaranteed
* container without looking at available resource
*/
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
// Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
}
}
private boolean startContainers(
Collection<Container> containersToBeStarted, boolean force) {
Iterator<Container> cIter = containersToBeStarted.iterator();
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
if (tryStartContainer(container, force)) {
cIter.remove();
} else {
resourcesAvailable = false;
}
}
return resourcesAvailable;
}
private boolean tryStartContainer(Container container, boolean force) {
boolean containerStarted = false;
// call startContainer without checking available resource when force==true
if (force || resourceAvailableToStartContainer(
container)) {
startContainer(container);
containerStarted = true;
}
return containerStarted;
}
/**
* Check if there is resource available to start a given container
* immediately. (This can be extended to include overallocated resources)
* @param container the container to start
* @return true if container can be launched directly
*/
private boolean resourceAvailableToStartContainer(Container container) {
return this.utilizationTracker.hasResourcesAvailable(container);
}
private boolean enqueueContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
boolean isQueued;
if (isGuaranteedContainer) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
container.getContainerId(), container);
isQueued = true;
} else {
LOG.info("Opportunistic container [{}] will not be queued at the NM" +
"since max queue length [{}] has been reached",
container.getContainerId(), maxOppQueueLength);
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
isQueued = false;
}
}
if (isQueued) {
try {
this.context.getNMStateStore().storeContainerQueued(
container.getContainerId());
} catch (IOException e) {
LOG.warn("Could not store container [" + container.getContainerId()
+ "] state. The Container has been queued.", e);
}
}
return isQueued;
}
@VisibleForTesting
protected void scheduleContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
// Given a guaranteed container, we enqueue it first and then try to start
// as many queuing guaranteed containers as possible followed by queuing
// opportunistic containers based on remaining resources available. If the
// container still stays in the queue afterwards, we need to preempt just
// enough number of opportunistic containers.
if (isGuaranteedContainer) {
enqueueContainer(container);
// When opportunistic container not allowed (which is determined by
// max-queue length of pending opportunistic containers <= 0), start
// guaranteed containers without looking at available resources.
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
// if the guaranteed container is queued, we need to preempt opportunistic
// containers for make room for it
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
reclaimOpportunisticContainerResources(container);
}
} else {
// Given an opportunistic container, we first try to start as many queuing
// guaranteed containers as possible followed by queuing opportunistic
// containers based on remaining resource available, then enqueue the
// opportunistic container. If the container is enqueued, we do another
// pass to try to start the newly enqueued opportunistic container.
startPendingContainers(false);
boolean containerQueued = enqueueContainer(container);
// container may not get queued because the max opportunistic container
// queue length is reached. If so, there is no point doing another pass
if (containerQueued) {
startPendingContainers(false);
}
}
}
@SuppressWarnings("unchecked")
private void reclaimOpportunisticContainerResources(Container container) {
List<Container> extraOppContainersToReclaim =
pickOpportunisticContainersToReclaimResources(
container.getContainerId());
// Kill the opportunistic containers that were chosen.
for (Container contToReclaim : extraOppContainersToReclaim) {
String preemptionAction = usePauseEventForPreemption == true ? "paused" :
"resumed";
LOG.info(
"Container {} will be {} to start the "
+ "execution of guaranteed container {}.",
contToReclaim.getContainerId(), preemptionAction,
container.getContainerId());
if (usePauseEventForPreemption) {
contToReclaim.sendPauseEvent(
"Container Paused to make room for Guaranteed Container");
} else {
contToReclaim.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container Killed to make room for Guaranteed Container.");
}
oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
}
}
private void startContainer(Container container) {
LOG.info("Starting container [" + container.getContainerId()+ "]");
runningContainers.put(container.getContainerId(), container);
this.utilizationTracker.addContainerResources(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.startOpportunisticContainer(container.getResource());
}
container.sendLaunchEvent();
}
private List<Container> pickOpportunisticContainersToReclaimResources(
ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.
List<Container> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
containerToStartId);
// Go over the running opportunistic containers.
// Use a descending iterator to kill more recently started containers.
Iterator<Container> lifoIterator = new LinkedList<>(
runningContainers.values()).descendingIterator();
while(lifoIterator.hasNext() &&
!hasSufficientResources(resourcesToFreeUp)) {
Container runningCont = lifoIterator.next();
if (runningCont.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
if (oppContainersToKill.containsKey(
runningCont.getContainerId())) {
// These containers have already been marked to be killed.
// So exclude them..
continue;
}
extraOpportContainersToKill.add(runningCont);
ContainersMonitor.ContainerManagerUtils.decreaseResourceUtilization(
getContainersMonitor(), resourcesToFreeUp,
runningCont.getResource());
}
}
if (!hasSufficientResources(resourcesToFreeUp)) {
LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
"at the moment. Opportunistic containers are in the process of" +
"being killed to make room.", containerToStartId);
}
return extraOpportContainersToKill;
}
private boolean hasSufficientResources(
ResourceUtilization resourcesToFreeUp) {
return resourcesToFreeUp.getPhysicalMemory() <= 0 &&
resourcesToFreeUp.getVirtualMemory() <= 0 &&
// Convert the number of cores to nearest integral number, due to
// imprecision of direct float comparison.
Math.round(resourcesToFreeUp.getCPU()
* getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
}
private ResourceUtilization resourcesToFreeUp(
ContainerId containerToStartId) {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
.newInstance(this.utilizationTracker.getCurrentUtilization());
// Add to the allocation the allocation of the pending guaranteed
// containers that will start before the current container will be started.
for (Container container : queuedGuaranteedContainers.values()) {
ContainersMonitor.ContainerManagerUtils.increaseResourceUtilization(
getContainersMonitor(), resourceAllocationToFreeUp,
container.getResource());
if (container.getContainerId().equals(containerToStartId)) {
break;
}
}
// These resources are being freed, likely at the behest of another
// guaranteed container..
for (Container container : oppContainersToKill.values()) {
ContainersMonitor.ContainerManagerUtils.decreaseResourceUtilization(
getContainersMonitor(), resourceAllocationToFreeUp,
container.getResource());
}
// Subtract the overall node resources.
getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
@SuppressWarnings("unchecked")
public void updateQueuingLimit(ContainerQueuingLimit limit) {
this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
// YARN-2886 should add support for wait-times. Include wait time as
// well once it is implemented
if ((queuingLimit.getMaxQueueLength() > -1) &&
(queuingLimit.getMaxQueueLength() <
queuedOpportunisticContainers.size())) {
dispatcher.getEventHandler().handle(
new ContainerSchedulerEvent(null,
ContainerSchedulerEventType.SHED_QUEUED_CONTAINERS));
}
}
private void shedQueuedOpportunisticContainers() {
int numAllowed = this.queuingLimit.getMaxQueueLength();
Iterator<Container> containerIter =
queuedOpportunisticContainers.values().iterator();
while (containerIter.hasNext()) {
Container container = containerIter.next();
// Do not shed PAUSED containers
if (container.getContainerState() != ContainerState.PAUSED) {
if (numAllowed <= 0) {
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container De-queued to meet NM queuing limits.");
containerIter.remove();
LOG.info(
"Opportunistic container {} will be killed to meet NM queuing" +
" limits.", container.getContainerId());
}
numAllowed--;
}
}
}
public ContainersMonitor getContainersMonitor() {
return this.context.getContainerManager().getContainersMonitor();
}
@VisibleForTesting
public ResourceUtilization getCurrentUtilization() {
return this.utilizationTracker.getCurrentUtilization();
}
}