Zero map split input length combine with none zero map split input length will cause MR1 job hung. (zxu via rkanter)
diff --git a/CHANGES.txt b/CHANGES.txt
index 57f1cf4..93a0a6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -282,6 +282,9 @@
HDFS-6649. Documentation for setrep is wrong. (aajisaka)
+ Zero map split input length combine with none zero map split input
+ length will cause MR1 job hung. (zxu via rkanter)
+
Release 1.2.2 - unreleased
INCOMPATIBLE CHANGES
diff --git a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
index 0861584..f134553 100644
--- a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+++ b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
@@ -938,7 +938,11 @@
long getInputLength() {
return inputLength;
}
-
+
+ void setInputLength(long length) {
+ inputLength = length;
+ }
+
boolean isCleanupLaunched() {
return launchedCleanup;
}
diff --git a/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java b/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
index e2f8fc6..ec0e64d 100644
--- a/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
+++ b/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
@@ -52,8 +52,16 @@
//-1 indicates error, which we don't average in.
if(tip.isMapTask() && ts.getOutputSize() != -1) {
completedMapsUpdates++;
-
- completedMapsInputSize+=(tip.getMapInputSize()+1);
+ long inputSize = tip.getMapInputSize();
+ if (inputSize == 0) {
+ // if map input size is 0, use map output size as input size
+ // to avoid job hung.
+ inputSize = ts.getOutputSize();
+ // map input size is changed, update JobInProgress.inputLength.
+ long length = job.getInputLength() + inputSize;
+ job.setInputLength(length);
+ }
+ completedMapsInputSize+=(inputSize+1);
completedMapsOutputSize+=ts.getOutputSize();
if(LOG.isDebugEnabled()) {
diff --git a/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java b/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
index 6a16b72..fed722f 100644
--- a/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
+++ b/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
@@ -55,12 +55,13 @@
//unfortunately, we can't set job input size from here.
ResourceEstimator re = new ResourceEstimator(jip);
- for(int i = 0; i < maps / 10 ; ++i) {
-
- long estOutSize = re.getEstimatedMapOutputSize();
- System.out.println(estOutSize);
- assertEquals(0, estOutSize);
-
+ for(int i = 0; i < maps; ++i) {
+ if (i < maps / 10) {
+ // re.thresholdToUse is maps / 10
+ long estOutSize = re.getEstimatedMapOutputSize();
+ System.out.println(estOutSize);
+ assertEquals(0, estOutSize);
+ }
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
JobSplit.TaskSplitMetaInfo split =
@@ -120,9 +121,10 @@
TaskInProgress tip =
new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
-
+ // for 0 input size, use output size as input size for calculation
long expectedTotalMapOutSize = (singleMapOutputSize*11) *
- ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+ ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+
+ singleMapOutputSize+1);
assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
}