MAPREDUCE-1906. Lower minimum heartbeat interval for TaskTracker. Contributed by Scott Carey and Todd Lipcon
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1054154 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 98d02d1..3470c4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,9 @@
MAPREDUCE-1831. BlockPlacement policy for HDFS-RAID.
(Scott Chen via dhruba)
+ MAPREDUCE-1906. Lower minimum heartbeat interval for TaskTracker
+ (Scott Carey and Todd Lipcon via todd)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
index cf77cb7..32defa9 100644
--- a/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
+++ b/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
@@ -42,6 +42,7 @@
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -677,7 +678,7 @@
}
public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
}
/**
diff --git a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
index 22c2c05..d5dec88 100644
--- a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
+++ b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.net.Node;
@@ -406,7 +407,7 @@
@Override
public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
}
@Override
diff --git a/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java b/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
index 0d80918..08245b3 100644
--- a/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
+++ b/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
/**
* The response sent by the {@link JobTracker} to the hearbeat sent
@@ -43,7 +44,7 @@
HeartbeatResponse(short responseId, TaskTrackerAction[] actions) {
this.responseId = responseId;
this.actions = actions;
- this.heartbeatInterval = MRConstants.HEARTBEAT_INTERVAL_MIN;
+ this.heartbeatInterval = JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
}
public void setResponseId(short responseId) {
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 06238f6..fd32303 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -159,6 +159,9 @@
private float HEARTBEATS_SCALING_FACTOR;
private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+
+ // Minimum interval for heartbeats regardless of cluster size.
+ private int HEARTBEAT_INTERVAL_MIN;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -1422,6 +1425,9 @@
HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
}
+ HEARTBEAT_INTERVAL_MIN = conf.getInt(JT_HEARTBEAT_INTERVAL_MIN,
+ JT_HEARTBEAT_INTERVAL_MIN_DEFAULT);
+
//This configuration is there solely for tuning purposes and
//once this feature has been tested in real clusters and an appropriate
//value for the threshold has been found, this config might be taken out.
@@ -2520,7 +2526,7 @@
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatInterval = Math.max(
(int)(1000 * HEARTBEATS_SCALING_FACTOR *
- Math.ceil((double)clusterSize /
+ ((double)clusterSize /
NUM_HEARTBEATS_IN_SECOND)),
HEARTBEAT_INTERVAL_MIN) ;
return heartbeatInterval;
diff --git a/src/java/org/apache/hadoop/mapred/MRConstants.java b/src/java/org/apache/hadoop/mapred/MRConstants.java
index fc60649..e2c16fb 100644
--- a/src/java/org/apache/hadoop/mapred/MRConstants.java
+++ b/src/java/org/apache/hadoop/mapred/MRConstants.java
@@ -25,8 +25,6 @@
//
// Timeouts, constants
//
- public static final int HEARTBEAT_INTERVAL_MIN = 3 * 1000;
-
public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
//
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index d24b626..dadc49f 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -88,6 +88,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -283,7 +284,8 @@
/**
* the minimum interval between jobtracker polls
*/
- private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
+ private volatile int heartbeatInterval =
+ JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
/**
* Number of maptask completion events locations to poll for at one time
*/
diff --git a/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java b/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
index 95929ff..dfe7e72 100644
--- a/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
@@ -47,6 +47,9 @@
"mapreduce.jobtracker.heartbeats.in.second";
public static final String JT_HEARTBEATS_SCALING_FACTOR =
"mapreduce.jobtracker.heartbeats.scaling.factor";
+ public static final String JT_HEARTBEAT_INTERVAL_MIN =
+ "mapreduce.jobtracker.heartbeat.interval.min";
+ public static final int JT_HEARTBEAT_INTERVAL_MIN_DEFAULT = 300;
public static final String JT_PERSIST_JOBSTATUS =
"mapreduce.jobtracker.persist.jobstatus.active";
public static final String JT_PERSIST_JOBSTATUS_HOURS =
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
index d126abe..3d1ed8d 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
@@ -190,7 +191,7 @@
@Override
public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
}
@Override
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java b/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
index 041ec5e..1b2d8f8 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
@@ -43,7 +43,7 @@
while(jc.getClusterStatus().getTaskTrackers() != taskTrackers) {
UtilsForTests.waitFor(100);
}
- assertEquals(MRConstants.HEARTBEAT_INTERVAL_MIN,
+ assertEquals(JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT,
mr.getJobTrackerRunner().getJobTracker().getNextHeartbeatInterval());
mr.shutdown();
@@ -62,14 +62,14 @@
// test configured heartbeat interval is capped with min value
taskTrackers = 5;
- conf.setInt(JTConfig.JT_HEARTBEATS_IN_SECOND, 10);
+ conf.setInt(JTConfig.JT_HEARTBEATS_IN_SECOND, 100);
mr = new MiniMRCluster(taskTrackers, "file:///", 3,
null, null, conf);
jc = new JobClient(mr.createJobConf());
while(jc.getClusterStatus().getTaskTrackers() != taskTrackers) {
UtilsForTests.waitFor(100);
}
- assertEquals(MRConstants.HEARTBEAT_INTERVAL_MIN,
+ assertEquals(JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT,
mr.getJobTrackerRunner().getJobTracker().getNextHeartbeatInterval());
} finally {
if (mr != null) { mr.shutdown(); }
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java b/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
index 6b97fc9..546af87 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
public class TestParallelInitialization extends TestCase {
@@ -131,7 +132,7 @@
}
public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
}
public void killJob(JobID jobid) {