MAPREDUCE-2046. Fixes CombineFileInputFormat to allow splits with size less than DFS block size. Contributed by dhruba borthakur

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@992989 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 596397e..8c8659a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -277,6 +277,9 @@
     MAPREDUCE-2031. Fixes test failures TestTaskLauncher and
     TestTaskTrackerLocalization. (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-2046. Fixes CombineFileInputFormat to allow splits with size
+    less than DFS block size. (dhruba borthakur via amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index c2cf39e..0078059 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -268,7 +268,8 @@
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
       files[i] = new OneFileInfo(paths[i], conf, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
+                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes,
+                                 maxSize);
       totLength += files[i].getLength();
     }
 
@@ -467,7 +468,8 @@
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
                 HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                HashMap<String, Set<String>> rackToNodes)
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
                 throws IOException {
       this.fileSize = 0;
 
@@ -480,44 +482,69 @@
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        blocks = new OneBlockInfo[locations.length];
+        ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(locations.length);
         for (int i = 0; i < locations.length; i++) {
            
           fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
+
+          // each split can be a maximum of maxSize
+          long left = locations[i].getLength();
+          long myOffset = locations[i].getOffset();
+          long myLength = 0;
+          while (left > 0) {
+            if (maxSize == 0) {
+              myLength = left;
+            } else {
+              if (left > maxSize && left < 2 * maxSize) {
+                // if remainder is between max and 2*max - then 
+                // instead of creating splits of size max, left-max we
+                // create splits of size left/2 and left/2. This is
+                // a heuristic to avoid creating really really small
+                // splits.
+                myLength = left / 2;
+              } else {
+                myLength = Math.min(maxSize, left);
+              }
+            }
+            OneBlockInfo oneblock =  new OneBlockInfo(path, 
+                                       myOffset,
+                                       myLength,
                                        locations[i].getHosts(),
                                        locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
+            left -= myLength;
+            myOffset += myLength;
 
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
+            blocksList.add(oneblock);
 
-          // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
+            // add this block to the block --> node locations map
+            blockToNodes.put(oneblock, oneblock.hosts);
+
+            // add this block to the rack --> block map
+            for (int j = 0; j < oneblock.racks.length; j++) {
+              String rack = oneblock.racks[j];
+              List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+              if (blklist == null) {
+                blklist = new ArrayList<OneBlockInfo>();
+                rackToBlocks.put(rack, blklist);
+              }
+              blklist.add(oneblock);
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+           }
+
+            // add this block to the node --> block map
+            for (int j = 0; j < oneblock.hosts.length; j++) {
+              String node = oneblock.hosts[j];
+              List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+              if (blklist == null) {
+                blklist = new ArrayList<OneBlockInfo>();
+                nodeToBlocks.put(node, blklist);
+              }
+              blklist.add(oneblock);
             }
-            blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
-         }
-
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
           }
         }
+        blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
       }
     }
 
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index cbe4ff7..c6ebe96 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -463,7 +463,7 @@
         dir1 + "," + dir2 + "," + dir3 + "," + dir4);
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
+        System.out.println("File split(Test5): " + split);
       }
       assertEquals(splits.size(), 4);
       fileSplit = (CombineFileSplit) splits.get(0);
@@ -514,7 +514,7 @@
       FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
+        System.out.println("File split(Test6): " + split);
       }
       assertEquals(splits.size(), 3);
       fileSplit = (CombineFileSplit) splits.get(0);
@@ -562,7 +562,7 @@
         dir1 + "," + dir2 + "," + dir3 + "," + dir4);
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
+        System.out.println("File split(Test7): " + split);
       }
       assertEquals(splits.size(), 2);
       fileSplit = (CombineFileSplit) splits.get(0);
@@ -622,6 +622,18 @@
       System.out.println("Elapsed time for " + numPools + " pools " +
                          " and " + numFiles + " files is " + 
                          ((end - start)/1000) + " seconds.");
+
+      // This file has three whole blocks. If the maxsplit size is
+      // half the block size, then there should be six splits.
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(BLOCKSIZE/2);
+      FileInputFormat.setInputPaths(job, dir3);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(6, splits.size());
+
     } finally {
       if (dfs != null) {
         dfs.shutdown();