blob: c633014c1c2f9b5d2013ccd162d10cc88bd4532b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import com.google.common.base.Preconditions;
public class ContainersMonitorImpl extends AbstractService implements
ContainersMonitor {
final static Log LOG = LogFactory
.getLog(ContainersMonitorImpl.class);
private long monitoringInterval;
private MonitoringThread monitoringThread;
final List<ContainerId> containersToBeRemoved;
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
Map<ContainerId, ProcessTreeInfo> trackingContainers =
new HashMap<ContainerId, ProcessTreeInfo>();
final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
private final Context context;
private ResourceCalculatorPlugin resourceCalculatorPlugin;
private long maxVmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
private long maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
/**
* A value which if set for memory related configuration options, indicates
* that the options are turned off.
*/
public static final long DISABLED_MEMORY_LIMIT = -1L;
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
super("containers-monitor");
this.containerExecutor = exec;
this.eventDispatcher = dispatcher;
this.context = context;
this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
this.containersToBeRemoved = new ArrayList<ContainerId>();
this.monitoringThread = new MonitoringThread();
}
@Override
public synchronized void init(Configuration conf) {
this.monitoringInterval =
conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
Class<? extends ResourceCalculatorPlugin> clazz =
conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin);
long totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT;
if (this.resourceCalculatorPlugin != null) {
totalPhysicalMemoryOnNM =
this.resourceCalculatorPlugin.getPhysicalMemorySize();
if (totalPhysicalMemoryOnNM <= 0) {
LOG.warn("NodeManager's totalPmem could not be calculated. "
+ "Setting it to " + DISABLED_MEMORY_LIMIT);
totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT;
}
}
// ///////// Physical memory configuration //////
this.maxPmemAllottedForContainers =
conf.getLong(YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
this.maxPmemAllottedForContainers =
this.maxPmemAllottedForContainers * 1024 * 1024L; //Normalize to bytes
if (totalPhysicalMemoryOnNM != DISABLED_MEMORY_LIMIT &&
this.maxPmemAllottedForContainers >
totalPhysicalMemoryOnNM * 0.80f) {
LOG.warn("NodeManager configured with " +
StringUtils.humanReadableInt(maxPmemAllottedForContainers) +
" physical memory allocated to containers, which is more than " +
"80% of the total physical memory available (" +
StringUtils.humanReadableInt(totalPhysicalMemoryOnNM) +
"). Thrashing might happen.");
}
// ///////// Virtual memory configuration //////
float vmemRatio = conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO +
" should be at least 1.0");
this.maxVmemAllottedForContainers =
(long)(vmemRatio * maxPmemAllottedForContainers);
super.init(conf);
}
/**
* Is the total physical memory check enabled?
*
* @return true if total physical memory check is enabled.
*/
boolean isPhysicalMemoryCheckEnabled() {
return !(this.maxPmemAllottedForContainers == DISABLED_MEMORY_LIMIT);
}
/**
* Is the total virtual memory check enabled?
*
* @return true if total virtual memory check is enabled.
*/
boolean isVirtualMemoryCheckEnabled() {
return !(this.maxVmemAllottedForContainers == DISABLED_MEMORY_LIMIT);
}
private boolean isEnabled() {
if (!ProcfsBasedProcessTree.isAvailable()) {
LOG.info("ProcessTree implementation is missing on this system. "
+ this.getClass().getName() + " is disabled.");
return false;
}
if (!(isPhysicalMemoryCheckEnabled() || isVirtualMemoryCheckEnabled())) {
LOG.info("Neither virutal-memory nor physical-memory monitoring is " +
"needed. Not running the monitor-thread");
return false;
}
return true;
}
@Override
public synchronized void start() {
if (this.isEnabled()) {
this.monitoringThread.start();
}
super.start();
}
@Override
public synchronized void stop() {
if (this.isEnabled()) {
this.monitoringThread.interrupt();
try {
this.monitoringThread.join();
} catch (InterruptedException e) {
;
}
}
super.stop();
}
private static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
private ProcfsBasedProcessTree pTree;
private long vmemLimit;
private long pmemLimit;
public ProcessTreeInfo(ContainerId containerId, String pid,
ProcfsBasedProcessTree pTree, long vmemLimit, long pmemLimit) {
this.containerId = containerId;
this.pid = pid;
this.pTree = pTree;
this.vmemLimit = vmemLimit;
this.pmemLimit = pmemLimit;
}
public ContainerId getContainerId() {
return this.containerId;
}
public String getPID() {
return this.pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public ProcfsBasedProcessTree getProcessTree() {
return this.pTree;
}
public void setProcessTree(ProcfsBasedProcessTree pTree) {
this.pTree = pTree;
}
public long getVmemLimit() {
return this.vmemLimit;
}
/**
* @return Physical memory limit for the process tree in bytes
*/
public long getPmemLimit() {
return this.pmemLimit;
}
}
/**
* Check whether a container's process tree's current memory usage is over
* limit.
*
* When a java process exec's a program, it could momentarily account for
* double the size of it's memory, because the JVM does a fork()+exec()
* which at fork time creates a copy of the parent's memory. If the
* monitoring thread detects the memory used by the container tree at the
* same instance, it could assume it is over limit and kill the tree, for no
* fault of the process itself.
*
* We counter this problem by employing a heuristic check: - if a process
* tree exceeds the memory limit by more than twice, it is killed
* immediately - if a process tree has processes older than the monitoring
* interval exceeding the memory limit by even 1 time, it is killed. Else it
* is given the benefit of doubt to lie around for one more iteration.
*
* @param containerId
* Container Id for the container tree
* @param currentMemUsage
* Memory usage of a container tree
* @param curMemUsageOfAgedProcesses
* Memory usage of processes older than an iteration in a container
* tree
* @param vmemLimit
* The limit specified for the container
* @return true if the memory usage is more than twice the specified limit,
* or if processes in the tree, older than this thread's monitoring
* interval, exceed the memory limit. False, otherwise.
*/
boolean isProcessTreeOverLimit(String containerId,
long currentMemUsage,
long curMemUsageOfAgedProcesses,
long vmemLimit) {
boolean isOverLimit = false;
if (currentMemUsage > (2 * vmemLimit)) {
LOG.warn("Process tree for container: " + containerId
+ " running over twice " + "the configured limit. Limit=" + vmemLimit
+ ", current usage = " + currentMemUsage);
isOverLimit = true;
} else if (curMemUsageOfAgedProcesses > vmemLimit) {
LOG.warn("Process tree for container: " + containerId
+ " has processes older than 1 "
+ "iteration running over the configured limit. Limit=" + vmemLimit
+ ", current usage = " + curMemUsageOfAgedProcesses);
isOverLimit = true;
}
return isOverLimit;
}
// method provided just for easy testing purposes
boolean isProcessTreeOverLimit(ProcfsBasedProcessTree pTree,
String containerId, long limit) {
long currentMemUsage = pTree.getCumulativeVmem();
// as processes begin with an age 1, we want to see if there are processes
// more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
return isProcessTreeOverLimit(containerId, currentMemUsage,
curMemUsageOfAgedProcesses, limit);
}
private class MonitoringThread extends Thread {
public MonitoringThread() {
super("Container Monitor");
}
@Override
public void run() {
while (true) {
// Print the processTrees for debugging.
if (LOG.isDebugEnabled()) {
StringBuilder tmp = new StringBuilder("[ ");
for (ProcessTreeInfo p : trackingContainers.values()) {
tmp.append(p.getPID());
tmp.append(" ");
}
LOG.debug("Current ProcessTree list : "
+ tmp.substring(0, tmp.length()) + "]");
}
// Add new containers
synchronized (containersToBeAdded) {
for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo processTreeInfo = entry.getValue();
LOG.info("Starting resource-monitoring for " + containerId);
trackingContainers.put(containerId, processTreeInfo);
}
containersToBeAdded.clear();
}
// Remove finished containers
synchronized (containersToBeRemoved) {
for (ContainerId containerId : containersToBeRemoved) {
trackingContainers.remove(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
}
containersToBeRemoved.clear();
}
// Now do the monitoring for the trackingContainers
// Check memory usage and kill any overflowing containers
long vmemStillInUsage = 0;
long pmemStillInUsage = 0;
for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
trackingContainers.entrySet().iterator(); it.hasNext();) {
Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
try {
String pId = ptInfo.getPID();
// Initialize any uninitialized processTrees
if (pId == null) {
// get pid from ContainerId
pId = containerExecutor.getProcessId(ptInfo.getContainerId());
if (pId != null) {
// pId will be null, either if the container is not spawned yet
// or if the container's pid is removed from ContainerExecutor
LOG.debug("Tracking ProcessTree " + pId
+ " for the first time");
ProcfsBasedProcessTree pt =
new ProcfsBasedProcessTree(pId,
ContainerExecutor.isSetsidAvailable);
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
}
}
// End of initializing any uninitialized processTrees
if (pId == null) {
continue; // processTree cannot be tracked
}
LOG.debug("Constructing ProcessTree for : PID = " + pId
+ " ContainerId = " + containerId);
ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
pTree = pTree.getProcessTree(); // get the updated process-tree
ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
// updated state
long currentVmemUsage = pTree.getCumulativeVmem();
long currentPmemUsage = pTree.getCumulativeRssmem();
// as processes begin with an age 1, we want to see if there
// are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1);
long vmemLimit = ptInfo.getVmemLimit();
long pmemLimit = ptInfo.getPmemLimit();
LOG.info(String.format(
"Memory usage of ProcessTree %s for container-id %s: ",
pId, containerId.toString()) +
formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
boolean isMemoryOverLimit = false;
String msg = "";
if (isVirtualMemoryCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
// Container (the root process) is still alive and overflowing
// memory.
// Dump the process-tree and then clean it up.
msg = formatErrorMessage("virtual",
currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit,
pId, containerId, pTree);
isMemoryOverLimit = true;
} else if (isPhysicalMemoryCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentPmemUsage, curRssMemUsageOfAgedProcesses,
pmemLimit)) {
// Container (the root process) is still alive and overflowing
// memory.
// Dump the process-tree and then clean it up.
msg = formatErrorMessage("physical",
currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit,
pId, containerId, pTree);
isMemoryOverLimit = true;
}
if (isMemoryOverLimit) {
// Virtual or physical memory over limit. Fail the container and
// remove
// the corresponding process tree
LOG.warn(msg);
// warn if not a leader
if (!pTree.checkPidPgrpidForMatch()) {
LOG.error("Killed container process with PID " + pId
+ " but it is not a process group leader.");
}
// kill the container
eventDispatcher.getEventHandler().handle(
new ContainerKillEvent(containerId, msg));
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
// Accounting the total memory in usage for all containers that
// are still
// alive and within limits.
vmemStillInUsage += currentVmemUsage;
pmemStillInUsage += currentPmemUsage;
}
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainerMemoryManager "
+ "while managing memory of " + containerId, e);
}
}
try {
Thread.sleep(monitoringInterval);
} catch (InterruptedException e) {
LOG.warn(ContainersMonitorImpl.class.getName()
+ " is interrupted. Exiting.");
break;
}
}
}
private String formatErrorMessage(String memTypeExceeded,
long currentVmemUsage, long vmemLimit,
long currentPmemUsage, long pmemLimit,
String pId, ContainerId containerId, ProcfsBasedProcessTree pTree) {
return
String.format("Container [pid=%s,containerID=%s] is running beyond %s memory limits. ",
pId, containerId, memTypeExceeded) +
"Current usage: " +
formatUsageString(currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit) +
". Killing container.\n" +
"Dump of the process-tree for " + containerId + " :\n" +
pTree.getProcessTreeDump();
}
private String formatUsageString(long currentVmemUsage, long vmemLimit,
long currentPmemUsage, long pmemLimit) {
return String.format("%sb of %sb physical memory used; " +
"%sb of %sb virtual memory used",
StringUtils.humanReadableInt(currentPmemUsage),
StringUtils.humanReadableInt(pmemLimit),
StringUtils.humanReadableInt(currentVmemUsage),
StringUtils.humanReadableInt(vmemLimit));
}
}
@Override
public long getVmemAllocatedForContainers() {
return this.maxVmemAllottedForContainers;
}
@Override
public long getPmemAllocatedForContainers() {
return this.maxPmemAllottedForContainers;
}
@Override
public void handle(ContainersMonitorEvent monitoringEvent) {
if (!isEnabled()) {
return;
}
ContainerId containerId = monitoringEvent.getContainerId();
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;
synchronized (this.containersToBeAdded) {
ProcessTreeInfo processTreeInfo =
new ProcessTreeInfo(containerId, null, null,
startEvent.getVmemLimit(), startEvent.getPmemLimit());
this.containersToBeAdded.put(containerId, processTreeInfo);
}
break;
case STOP_MONITORING_CONTAINER:
synchronized (this.containersToBeRemoved) {
this.containersToBeRemoved.add(containerId);
}
break;
default:
// TODO: Wrong event.
}
}
}