Merge -c 1194854 from branch-0.20-security to branch-0.20-security-204 to fix MAPREDUCE-2355.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1194856 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/mapred/mapred-default.xml b/src/mapred/mapred-default.xml
index bf075f7..55f2f86 100644
--- a/src/mapred/mapred-default.xml
+++ b/src/mapred/mapred-default.xml
@@ -242,6 +242,24 @@
 </property>
 
 <property>
+  <name>mapreduce.tasktracker.outofband.heartbeat.damper</name>
+  <value>1000000</value>
+  <description>When out-of-band heartbeats are enabled, provides
+  damping to avoid overwhelming the JobTracker if too many out-of-band
+  heartbeats would occur. The damping is calculated such that the
+  heartbeat interval is divided by (T*D + 1) where T is the number
+  of completed tasks and D is the damper value.
+  
+  Setting this to a high value like the default provides no damping --
+  as soon as any task finishes, a heartbeat will be sent. Setting this
+  parameter to 0 is equivalent to disabling the out-of-band heartbeat feature.
+  A value of 1 would indicate that, after one task has completed, the
+  time to wait before the next heartbeat would be 1/2 the usual time.
+  After two tasks have finished, it would be 1/3 the usual time, etc.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.restart.recover</name>
   <value>false</value>
   <description>"true" to enable (job) recovery upon restart,
diff --git a/src/mapred/org/apache/hadoop/mapred/TaskTracker.java b/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
index acd0b2f..d72554d 100644
--- a/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
@@ -45,6 +45,7 @@
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.crypto.SecretKey;
@@ -360,9 +361,13 @@
   static final String TT_OUTOFBAND_HEARBEAT =
     "mapreduce.tasktracker.outofband.heartbeat";
   private volatile boolean oobHeartbeatOnTaskCompletion;
+  static final String TT_OUTOFBAND_HEARTBEAT_DAMPER = 
+    "mapreduce.tasktracker.outofband.heartbeat.damper";
+  static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+  private volatile int oobHeartbeatDamper;
   
   // Track number of completed tasks to send an out-of-band heartbeat
-  private IntWritable finishedCount = new IntWritable(0);
+  private AtomicInteger finishedCount = new AtomicInteger(0);
   
   private MapEventsFetcherThread mapEventsFetcher;
   final int workerThreads;
@@ -854,6 +859,9 @@
     
     oobHeartbeatOnTaskCompletion = 
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+    oobHeartbeatDamper = 
+      fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
+          DEFAULT_OOB_HEARTBEAT_DAMPER);
   }
 
   /**
@@ -1555,25 +1563,39 @@
     return recentMapEvents;
   }
 
+  private long getHeartbeatInterval(int numFinishedTasks) {
+    return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+  }
+  
   /**
    * Main service loop.  Will stay in this loop forever.
    */
   State offerService() throws Exception {
-    long lastHeartbeat = 0;
+    long lastHeartbeat = System.currentTimeMillis();
 
     while (running && !shuttingDown) {
       try {
         long now = System.currentTimeMillis();
-
-        long waitTime = heartbeatInterval - (now - lastHeartbeat);
-        if (waitTime > 0) {
+        
+        // accelerate to account for multiple finished tasks up-front
+        long remaining = 
+          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+        while (remaining > 0) {
           // sleeps for the wait time or 
-          // until there are empty slots to schedule tasks
+          // until there are *enough* empty slots to schedule tasks
           synchronized (finishedCount) {
-            if (finishedCount.get() == 0) {
-              finishedCount.wait(waitTime);
+            finishedCount.wait(remaining);
+            
+            // Recompute
+            now = System.currentTimeMillis();
+            remaining = 
+              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+            
+            if (remaining <= 0) {
+              // Reset count 
+              finishedCount.set(0);
+              break;
             }
-            finishedCount.set(0);
           }
         }
 
@@ -2349,8 +2371,7 @@
   private void notifyTTAboutTaskCompletion() {
     if (oobHeartbeatOnTaskCompletion) {
       synchronized (finishedCount) {
-        int value = finishedCount.get();
-        finishedCount.set(value+1);
+        finishedCount.incrementAndGet();
         finishedCount.notify();
       }
     }