MAPREDUCE-1906. Lower minimum heartbeat interval for TaskTracker. Contributed by Scott Carey and Todd Lipcon

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1054154 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 98d02d1..3470c4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,9 @@
     MAPREDUCE-1831. BlockPlacement policy for HDFS-RAID.
     (Scott Chen via dhruba)
 
+    MAPREDUCE-1906. Lower minimum heartbeat interval for TaskTracker
+    (Scott Carey and Todd Lipcon via todd)
+
   OPTIMIZATIONS
 
   BUG FIXES
diff --git a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
index cf77cb7..32defa9 100644
--- a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
+++ b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
@@ -42,6 +42,7 @@
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -677,7 +678,7 @@
     }
 
     public int getNextHeartbeatInterval() {
-      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+      return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
     }
 
     /**
diff --git a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
index 22c2c05..d5dec88 100644
--- a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
+++ b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.net.Node;
@@ -406,7 +407,7 @@
     
     @Override
     public int getNextHeartbeatInterval() {
-      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+      return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
     }
 
     @Override
diff --git a/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java b/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
index 0d80918..08245b3 100644
--- a/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
+++ b/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * The response sent by the {@link JobTracker} to the hearbeat sent
@@ -43,7 +44,7 @@
   HeartbeatResponse(short responseId, TaskTrackerAction[] actions) {
     this.responseId = responseId;
     this.actions = actions;
-    this.heartbeatInterval = MRConstants.HEARTBEAT_INTERVAL_MIN;
+    this.heartbeatInterval = JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
   }
   
   public void setResponseId(short responseId) {
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 06238f6..fd32303 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -159,6 +159,9 @@
   private float HEARTBEATS_SCALING_FACTOR;
   private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
   private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+
+  // Minimum interval for heartbeats regardless of cluster size.
+  private int HEARTBEAT_INTERVAL_MIN;
   
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
@@ -1422,6 +1425,9 @@
       HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
     }
 
+    HEARTBEAT_INTERVAL_MIN = conf.getInt(JT_HEARTBEAT_INTERVAL_MIN,
+                                         JT_HEARTBEAT_INTERVAL_MIN_DEFAULT);
+
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
     //value for the threshold has been found, this config might be taken out.
@@ -2520,7 +2526,7 @@
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
                                 (int)(1000 * HEARTBEATS_SCALING_FACTOR *
-                                      Math.ceil((double)clusterSize / 
+                                      ((double)clusterSize / 
                                                 NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
diff --git a/src/java/org/apache/hadoop/mapred/MRConstants.java b/src/java/org/apache/hadoop/mapred/MRConstants.java
index fc60649..e2c16fb 100644
--- a/src/java/org/apache/hadoop/mapred/MRConstants.java
+++ b/src/java/org/apache/hadoop/mapred/MRConstants.java
@@ -25,8 +25,6 @@
   //
   // Timeouts, constants
   //
-  public static final int HEARTBEAT_INTERVAL_MIN = 3 * 1000;
-  
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 
   //
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index d24b626..dadc49f 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -88,6 +88,7 @@
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -283,7 +284,8 @@
   /**
    * the minimum interval between jobtracker polls
    */
-  private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
+  private volatile int heartbeatInterval =
+    JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
   /**
    * Number of maptask completion events locations to poll for at one time
    */  
diff --git a/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java b/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
index 95929ff..dfe7e72 100644
--- a/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
@@ -47,6 +47,9 @@
     "mapreduce.jobtracker.heartbeats.in.second";
   public static final String JT_HEARTBEATS_SCALING_FACTOR = 
     "mapreduce.jobtracker.heartbeats.scaling.factor";
+  public static final String JT_HEARTBEAT_INTERVAL_MIN =
+    "mapreduce.jobtracker.heartbeat.interval.min";
+  public static final int JT_HEARTBEAT_INTERVAL_MIN_DEFAULT = 300;
   public static final String JT_PERSIST_JOBSTATUS = 
     "mapreduce.jobtracker.persist.jobstatus.active";
   public static final String JT_PERSIST_JOBSTATUS_HOURS = 
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
index d126abe..3d1ed8d 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 
@@ -190,7 +191,7 @@
     
     @Override
     public int getNextHeartbeatInterval() {
-      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+      return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
     }
 
     @Override
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java b/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
index 041ec5e..1b2d8f8 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
@@ -43,7 +43,7 @@
       while(jc.getClusterStatus().getTaskTrackers() != taskTrackers) {
         UtilsForTests.waitFor(100);
       }
-      assertEquals(MRConstants.HEARTBEAT_INTERVAL_MIN, 
+      assertEquals(JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT,
         mr.getJobTrackerRunner().getJobTracker().getNextHeartbeatInterval());
       mr.shutdown(); 
       
@@ -62,14 +62,14 @@
       
       // test configured heartbeat interval is capped with min value
       taskTrackers = 5;
-      conf.setInt(JTConfig.JT_HEARTBEATS_IN_SECOND, 10);
+      conf.setInt(JTConfig.JT_HEARTBEATS_IN_SECOND, 100);
       mr = new MiniMRCluster(taskTrackers, "file:///", 3, 
           null, null, conf);
       jc = new JobClient(mr.createJobConf());
       while(jc.getClusterStatus().getTaskTrackers() != taskTrackers) {
         UtilsForTests.waitFor(100);
       }
-      assertEquals(MRConstants.HEARTBEAT_INTERVAL_MIN, 
+      assertEquals(JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT,
         mr.getJobTrackerRunner().getJobTracker().getNextHeartbeatInterval());
     } finally {
       if (mr != null) { mr.shutdown(); }
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java b/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
index 6b97fc9..546af87 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 public class TestParallelInitialization extends TestCase {
   
@@ -131,7 +132,7 @@
     }
     
     public int getNextHeartbeatInterval() {
-      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+      return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
     }
 
     public void killJob(JobID jobid) {