Zero map split input length combine with none zero map split input length will cause MR1 job hung. (zxu via rkanter)
diff --git a/CHANGES.txt b/CHANGES.txt
index 57f1cf4..93a0a6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -282,6 +282,9 @@
 
     HDFS-6649. Documentation for setrep is wrong. (aajisaka)
 
+    Zero map split input length combine with none zero map split input
+    length will cause MR1 job hung. (zxu via rkanter)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
index 0861584..f134553 100644
--- a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+++ b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
@@ -938,7 +938,11 @@
   long getInputLength() {
     return inputLength;
   }
- 
+
+  void setInputLength(long length) {
+    inputLength = length;
+  }
+
   boolean isCleanupLaunched() {
     return launchedCleanup;
   }
diff --git a/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java b/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
index e2f8fc6..ec0e64d 100644
--- a/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
+++ b/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
@@ -52,8 +52,16 @@
     //-1 indicates error, which we don't average in.
     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
       completedMapsUpdates++;
-
-      completedMapsInputSize+=(tip.getMapInputSize()+1);
+      long inputSize = tip.getMapInputSize();
+      if (inputSize == 0) {
+        // if map input size is 0, use map output size as input size
+        // to avoid job hung.
+        inputSize = ts.getOutputSize();
+        // map input size is changed, update JobInProgress.inputLength.
+        long length = job.getInputLength() + inputSize;
+        job.setInputLength(length);
+      }
+      completedMapsInputSize+=(inputSize+1);
       completedMapsOutputSize+=ts.getOutputSize();
 
       if(LOG.isDebugEnabled()) {
diff --git a/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java b/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
index 6a16b72..fed722f 100644
--- a/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
+++ b/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
@@ -55,12 +55,13 @@
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
-    for(int i = 0; i < maps / 10 ; ++i) {
-
-      long estOutSize = re.getEstimatedMapOutputSize();
-      System.out.println(estOutSize);
-      assertEquals(0, estOutSize);
-      
+    for(int i = 0; i < maps; ++i) {
+      if (i < maps / 10) {
+        // re.thresholdToUse is maps / 10
+        long estOutSize = re.getEstimatedMapOutputSize();
+        System.out.println(estOutSize);
+        assertEquals(0, estOutSize);
+      }
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
       JobSplit.TaskSplitMetaInfo split =
@@ -120,9 +121,10 @@
     TaskInProgress tip = 
       new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);
-    
+    // for 0 input size, use output size as input size for calculation
     long expectedTotalMapOutSize = (singleMapOutputSize*11) * 
-      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+
+      singleMapOutputSize+1);
     assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
   }