blob: 8ef19a41b6cd8f7e9552b61c6d0e68727ca698ab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
/**
* Manages memory usage of tasks running under this TT. Kills any task-trees
* that overflow and over-step memory limits.
*/
class TaskMemoryManagerThread extends Thread {
private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
private TaskTracker taskTracker;
private long monitoringInterval;
private long sleepTimeBeforeSigKill;
private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
private List<TaskAttemptID> tasksToBeRemoved;
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this.taskTracker = taskTracker;
setName(this.getClass().getName());
processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
tasksToBeRemoved = new ArrayList<TaskAttemptID>();
monitoringInterval = taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
sleepTimeBeforeSigKill = taskTracker.getJobConf().getLong(
"mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
}
public void addTask(TaskAttemptID tid, long memLimit) {
synchronized (tasksToBeAdded) {
LOG.debug("Tracking ProcessTree " + tid + " for the first time");
// TODO: Negative values must have been checked in JobConf.
memLimit = (memLimit < 0 ? JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
: memLimit);
ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
sleepTimeBeforeSigKill);
tasksToBeAdded.put(tid, ptInfo);
}
}
public void removeTask(TaskAttemptID tid) {
synchronized (tasksToBeRemoved) {
tasksToBeRemoved.add(tid);
}
}
private static class ProcessTreeInfo {
private TaskAttemptID tid;
private String pid;
private ProcfsBasedProcessTree pTree;
private long memLimit;
public ProcessTreeInfo(TaskAttemptID tid, String pid,
ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
this.tid = tid;
this.pid = pid;
this.pTree = pTree;
if (this.pTree != null) {
this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
}
this.memLimit = memLimit;
}
public TaskAttemptID getTID() {
return tid;
}
public String getPID() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public ProcfsBasedProcessTree getProcessTree() {
return pTree;
}
public void setProcessTree(ProcfsBasedProcessTree pTree) {
this.pTree = pTree;
}
public long getMemLimit() {
return memLimit;
}
}
@Override
public void run() {
LOG.info("Starting thread: " + this.getClass());
while (true) {
// Print the processTrees for debugging.
if (LOG.isDebugEnabled()) {
StringBuffer tmp = new StringBuffer("[ ");
for (ProcessTreeInfo p : processTreeInfoMap.values()) {
tmp.append(p.getPID());
tmp.append(" ");
}
LOG.debug("Current ProcessTree list : "
+ tmp.substring(0, tmp.length()) + "]");
}
//Add new Tasks
synchronized (tasksToBeAdded) {
processTreeInfoMap.putAll(tasksToBeAdded);
tasksToBeAdded.clear();
}
//Remove finished Tasks
synchronized (tasksToBeRemoved) {
for (TaskAttemptID tid : tasksToBeRemoved) {
processTreeInfoMap.remove(tid);
}
tasksToBeRemoved.clear();
}
// Now, check memory usage and kill any overflowing tasks
for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
.entrySet().iterator(); it.hasNext();) {
Map.Entry<TaskAttemptID, ProcessTreeInfo> entry = it.next();
TaskAttemptID tid = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
String pId = ptInfo.getPID();
// Initialize any uninitialized processTrees
if (pId == null) {
pId = getPid(tid); // get pid from pid-file
if (pId != null) {
// PID will be null, either if the pid file is yet to be created
// or if the tip is finished and we removed pidFile, but the TIP
// itself is still retained in runningTasks till successful
// transmission to JT
// create process tree object
ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
LOG.debug("Tracking ProcessTree " + pId + " for the first time");
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
processTreeInfoMap.put(tid, ptInfo);
}
}
// End of initializing any uninitialized processTrees
if (pId == null) {
continue; // processTree cannot be tracked
}
LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
+ tid);
ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
pTree = pTree.getProcessTree(); // get the updated process-tree
ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
// updated state
long currentMemUsage = pTree.getCumulativeVmem();
long limit = ptInfo.getMemLimit();
LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage
+ "kB. Limit : " + limit + "kB");
if (limit != JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
&& currentMemUsage > limit) {
// Task (the root process) is still alive and overflowing memory.
// Clean up.
String msg = "TaskTree [pid=" + pId + ",tipID=" + tid
+ "] is running beyond memory-limits. Current usage : "
+ currentMemUsage + "kB. Limit : " + limit + "kB. Killing task.";
LOG.warn(msg);
taskTracker.cleanUpOverMemoryTask(tid, msg);
// Now destroy the ProcessTree, remove it from monitoring map.
pTree.destroy();
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
}
}
// Sleep for some time before beginning next cycle
try {
LOG.debug(this.getClass() + " : Sleeping for " + monitoringInterval
+ " ms");
Thread.sleep(monitoringInterval);
} catch (InterruptedException ie) {
LOG.warn(this.getClass()
+ " interrupted. Finishing the thread and returning.");
return;
}
}
}
/**
* Load pid of the task from the pidFile.
*
* @param tipID
* @return the pid of the task process.
*/
private String getPid(TaskAttemptID tipID) {
Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
if (pidFileName == null) {
return null;
}
return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
}
private static LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
/**
* Get the pidFile path of a Task
* @param tipID
* @return pidFile's Path
*/
public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
Path pidFileName = null;
try {
//this actually need not use a localdirAllocator since the PID
//files are really small..
pidFileName = lDirAlloc.getLocalPathToRead(
(TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
conf);
} catch (IOException i) {
// PID file is not there
LOG.debug("Failed to get pidFile name for " + tipID);
}
return pidFileName;
}
public void removePidFile(TaskAttemptID tid) {
if (taskTracker.isTaskMemoryManagerEnabled()) {
Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
if (pidFilePath != null) {
try {
FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
} catch(IOException ie) {}
}
}
}
}