Merge -c 1194854 from branch-0.20-security to branch-0.20-security-204 to fix MAPREDUCE-2355.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1194856 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/mapred/mapred-default.xml b/src/mapred/mapred-default.xml
index bf075f7..55f2f86 100644
--- a/src/mapred/mapred-default.xml
+++ b/src/mapred/mapred-default.xml
@@ -242,6 +242,24 @@
</property>
<property>
+ <name>mapreduce.tasktracker.outofband.heartbeat.damper</name>
+ <value>1000000</value>
+ <description>When out-of-band heartbeats are enabled, provides
+ damping to avoid overwhelming the JobTracker if too many out-of-band
+ heartbeats would occur. The damping is calculated such that the
+ heartbeat interval is divided by (T*D + 1) where T is the number
+ of completed tasks and D is the damper value.
+
+ Setting this to a high value like the default provides no damping --
+ as soon as any task finishes, a heartbeat will be sent. Setting this
+ parameter to 0 is equivalent to disabling the out-of-band heartbeat feature.
+ A value of 1 would indicate that, after one task has completed, the
+ time to wait before the next heartbeat would be 1/2 the usual time.
+ After two tasks have finished, it would be 1/3 the usual time, etc.
+ </description>
+</property>
+
+<property>
<name>mapred.jobtracker.restart.recover</name>
<value>false</value>
<description>"true" to enable (job) recovery upon restart,
diff --git a/src/mapred/org/apache/hadoop/mapred/TaskTracker.java b/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
index acd0b2f..d72554d 100644
--- a/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
@@ -45,6 +45,7 @@
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -360,9 +361,13 @@
static final String TT_OUTOFBAND_HEARBEAT =
"mapreduce.tasktracker.outofband.heartbeat";
private volatile boolean oobHeartbeatOnTaskCompletion;
+ static final String TT_OUTOFBAND_HEARTBEAT_DAMPER =
+ "mapreduce.tasktracker.outofband.heartbeat.damper";
+ static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+ private volatile int oobHeartbeatDamper;
// Track number of completed tasks to send an out-of-band heartbeat
- private IntWritable finishedCount = new IntWritable(0);
+ private AtomicInteger finishedCount = new AtomicInteger(0);
private MapEventsFetcherThread mapEventsFetcher;
final int workerThreads;
@@ -854,6 +859,9 @@
oobHeartbeatOnTaskCompletion =
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+ oobHeartbeatDamper =
+ fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER,
+ DEFAULT_OOB_HEARTBEAT_DAMPER);
}
/**
@@ -1555,25 +1563,39 @@
return recentMapEvents;
}
+ private long getHeartbeatInterval(int numFinishedTasks) {
+ return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+ }
+
/**
* Main service loop. Will stay in this loop forever.
*/
State offerService() throws Exception {
- long lastHeartbeat = 0;
+ long lastHeartbeat = System.currentTimeMillis();
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();
-
- long waitTime = heartbeatInterval - (now - lastHeartbeat);
- if (waitTime > 0) {
+
+ // accelerate to account for multiple finished tasks up-front
+ long remaining =
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+ while (remaining > 0) {
// sleeps for the wait time or
- // until there are empty slots to schedule tasks
+ // until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
- if (finishedCount.get() == 0) {
- finishedCount.wait(waitTime);
+ finishedCount.wait(remaining);
+
+ // Recompute
+ now = System.currentTimeMillis();
+ remaining =
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+
+ if (remaining <= 0) {
+ // Reset count
+ finishedCount.set(0);
+ break;
}
- finishedCount.set(0);
}
}
@@ -2349,8 +2371,7 @@
private void notifyTTAboutTaskCompletion() {
if (oobHeartbeatOnTaskCompletion) {
synchronized (finishedCount) {
- int value = finishedCount.get();
- finishedCount.set(value+1);
+ finishedCount.incrementAndGet();
finishedCount.notify();
}
}