MAPREDUCE-2021. Fixes duplicate hostnames in CombineFileInputFormat's split locations.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@991827 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a8dc4b..1d87133 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -265,6 +265,9 @@
MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files
have been created. (Ramkumar Vadali via dhruba)
+ MAPREDUCE-2021. Fixes duplicate hostnames in CombineFileInputFormat's
+ split locations. (amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index f64a0a3..c2cf39e 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
@@ -272,7 +273,7 @@
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> nodes = new ArrayList<String>();
+ Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
@@ -326,7 +327,7 @@
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> racks = new ArrayList<String>();
+ Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
@@ -431,7 +432,7 @@
* Add this new split into splitList.
*/
private void addCreatedSplit(List<InputSplit> splitList,
- List<String> locations,
+ Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
@@ -577,8 +578,8 @@
hosts.add(host);
}
- private List<String> getHosts(List<String> racks) {
- List<String> hosts = new ArrayList<String>();
+ private Set<String> getHosts(Set<String> racks) {
+ Set<String> hosts = new HashSet<String>();
for (String rack : racks) {
hosts.addAll(rackToNodes.get(rack));
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index 5cfd996..cbe4ff7 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -69,6 +69,7 @@
final Path dir2 = new Path(inDir, "/dir2");
final Path dir3 = new Path(inDir, "/dir3");
final Path dir4 = new Path(inDir, "/dir4");
+ final Path dir5 = new Path(inDir, "/dir5");
static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE];
@@ -245,16 +246,16 @@
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
try {
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
- * 1) file1, just after starting the datanode on r1, with
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
+ * 1) file1 and file5, just after starting the datanode on r1, with
* a repl factor of 1, and,
* 2) file2, just after starting the datanode on r2, with
* a repl factor of 2, and,
- * 3) file3 after starting the all three datanodes, with a repl
+ * 3) file3, file4 after starting the all three datanodes, with a repl
* factor of 3.
- * At the end, file1 will be present on only datanode1, file2 will be
- * present on datanode 1 and datanode2 and
- * file3 will be present on all datanodes.
+ * At the end, file1, file5 will be present on only datanode1, file2 will
+ * be present on datanode 1 and datanode2 and
+ * file3, file4 will be present on all datanodes.
*/
Configuration conf = new Configuration();
conf.setBoolean("dfs.replication.considerLoad", false);
@@ -267,6 +268,30 @@
}
Path file1 = new Path(dir1 + "/file1");
writeFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5");
+ writeFile(conf, file5, (short)1, 1);
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ Job job = Job.getInstance(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ List<InputSplit> splits = inFormat.getSplits(job);
+ System.out.println("Made splits(Test0): " + splits.size());
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.size(), 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
@@ -275,11 +300,10 @@
writeFile(conf, file2, (short)2, 2);
// split it using a CombinedFile input format
- DummyInputFormat inFormat = new DummyInputFormat();
- Job job = Job.getInstance(conf);
+ inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
- List<InputSplit> splits = inFormat.getSplits(job);
+ splits = inFormat.getSplits(job);
System.out.println("Made splits(Test1): " + splits.size());
// make sure that each split has different locations
@@ -287,7 +311,7 @@
System.out.println("File split(Test1): " + split);
}
assertEquals(splits.size(), 2);
- CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+ fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(fileSplit.getNumPaths(), 2);
assertEquals(fileSplit.getLocations().length, 1);
assertEquals(fileSplit.getPath(0).getName(), file2.getName());