MAPREDUCE-2046. Fixes CombineFileInputFormat to allow splits with size less than DFS block size. Contributed by dhruba borthakur
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@992989 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 596397e..8c8659a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -277,6 +277,9 @@
MAPREDUCE-2031. Fixes test failures TestTaskLauncher and
TestTaskTrackerLocalization. (Ravi Gummadi via amareshwari)
+ MAPREDUCE-2046. Fixes CombineFileInputFormat to allow splits with size
+ less than DFS block size. (dhruba borthakur via amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index c2cf39e..0078059 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -268,7 +268,8 @@
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
files[i] = new OneFileInfo(paths[i], conf,
- rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
+ rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes,
+ maxSize);
totLength += files[i].getLength();
}
@@ -467,7 +468,8 @@
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
- HashMap<String, Set<String>> rackToNodes)
+ HashMap<String, Set<String>> rackToNodes,
+ long maxSize)
throws IOException {
this.fileSize = 0;
@@ -480,44 +482,69 @@
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
- blocks = new OneBlockInfo[locations.length];
+ ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(locations.length);
for (int i = 0; i < locations.length; i++) {
fileSize += locations[i].getLength();
- OneBlockInfo oneblock = new OneBlockInfo(path,
- locations[i].getOffset(),
- locations[i].getLength(),
+
+ // each split can be a maximum of maxSize
+ long left = locations[i].getLength();
+ long myOffset = locations[i].getOffset();
+ long myLength = 0;
+ while (left > 0) {
+ if (maxSize == 0) {
+ myLength = left;
+ } else {
+ if (left > maxSize && left < 2 * maxSize) {
+ // if remainder is between max and 2*max - then
+ // instead of creating splits of size max, left-max we
+ // create splits of size left/2 and left/2. This is
+ // a heuristic to avoid creating really really small
+ // splits.
+ myLength = left / 2;
+ } else {
+ myLength = Math.min(maxSize, left);
+ }
+ }
+ OneBlockInfo oneblock = new OneBlockInfo(path,
+ myOffset,
+ myLength,
locations[i].getHosts(),
locations[i].getTopologyPaths());
- blocks[i] = oneblock;
+ left -= myLength;
+ myOffset += myLength;
- // add this block to the block --> node locations map
- blockToNodes.put(oneblock, oneblock.hosts);
+ blocksList.add(oneblock);
- // add this block to the rack --> block map
- for (int j = 0; j < oneblock.racks.length; j++) {
- String rack = oneblock.racks[j];
- List<OneBlockInfo> blklist = rackToBlocks.get(rack);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- rackToBlocks.put(rack, blklist);
+ // add this block to the block --> node locations map
+ blockToNodes.put(oneblock, oneblock.hosts);
+
+ // add this block to the rack --> block map
+ for (int j = 0; j < oneblock.racks.length; j++) {
+ String rack = oneblock.racks[j];
+ List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ rackToBlocks.put(rack, blklist);
+ }
+ blklist.add(oneblock);
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+ }
+
+ // add this block to the node --> block map
+ for (int j = 0; j < oneblock.hosts.length; j++) {
+ String node = oneblock.hosts[j];
+ List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ nodeToBlocks.put(node, blklist);
+ }
+ blklist.add(oneblock);
}
- blklist.add(oneblock);
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
- }
-
- // add this block to the node --> block map
- for (int j = 0; j < oneblock.hosts.length; j++) {
- String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- nodeToBlocks.put(node, blklist);
- }
- blklist.add(oneblock);
}
}
+ blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index cbe4ff7..c6ebe96 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -463,7 +463,7 @@
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
- System.out.println("File split(Test1): " + split);
+ System.out.println("File split(Test5): " + split);
}
assertEquals(splits.size(), 4);
fileSplit = (CombineFileSplit) splits.get(0);
@@ -514,7 +514,7 @@
FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
- System.out.println("File split(Test1): " + split);
+ System.out.println("File split(Test6): " + split);
}
assertEquals(splits.size(), 3);
fileSplit = (CombineFileSplit) splits.get(0);
@@ -562,7 +562,7 @@
dir1 + "," + dir2 + "," + dir3 + "," + dir4);
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
- System.out.println("File split(Test1): " + split);
+ System.out.println("File split(Test7): " + split);
}
assertEquals(splits.size(), 2);
fileSplit = (CombineFileSplit) splits.get(0);
@@ -622,6 +622,18 @@
System.out.println("Elapsed time for " + numPools + " pools " +
" and " + numFiles + " files is " +
((end - start)/1000) + " seconds.");
+
+ // This file has three whole blocks. If the maxsplit size is
+ // half the block size, then there should be six splits.
+ inFormat = new DummyInputFormat();
+ inFormat.setMaxSplitSize(BLOCKSIZE/2);
+ FileInputFormat.setInputPaths(job, dir3);
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test8): " + split);
+ }
+ assertEquals(6, splits.size());
+
} finally {
if (dfs != null) {
dfs.shutdown();