MAPREDUCE-2185. Fix infinite loop at creating splits using CombineFileInputFormat. (Ramkumar Vadali via schen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1132807 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 4d88302..8b5967d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -844,6 +844,9 @@
MAPREDUCE-2487. ChainReducer uses MAPPER_BY_VALUE instead of
REDUCER_BY_VALUE. (Devaraj K via todd)
+ MAPREDUCE-2185. Fix infinite loop at creating splits using
+ CombineFileInputFormat. (Ramkumar Vadali via schen)
+
Release 0.21.1 - Unreleased
NEW FEATURES
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index ef63ea2..d213f59 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -545,17 +545,28 @@
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
+ // For blocks that do not have host/rack information,
+ // assign to default rack.
+ String[] racks = null;
+ if (oneblock.hosts.length == 0) {
+ racks = new String[]{NetworkTopology.DEFAULT_RACK};
+ } else {
+ racks = oneblock.racks;
+ }
+
// add this block to the rack --> block map
- for (int j = 0; j < oneblock.racks.length; j++) {
- String rack = oneblock.racks[j];
+ for (int j = 0; j < racks.length; j++) {
+ String rack = racks[j];
List<OneBlockInfo> blklist = rackToBlocks.get(rack);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+ if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+ }
}
// add this block to the node --> block map
@@ -619,6 +630,11 @@
}
}
+ protected BlockLocation[] getFileBlockLocations(
+ FileSystem fs, FileStatus stat) throws IOException {
+ return fs.getFileBlockLocations(stat, 0, stat.getLen());
+ }
+
private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
@@ -632,7 +648,9 @@
private Set<String> getHosts(Set<String> racks) {
Set<String> hosts = new HashSet<String>();
for (String rack : racks) {
- hosts.addAll(rackToNodes.get(rack));
+ if (rackToNodes.containsKey(rack)) {
+ hosts.addAll(rackToNodes.get(rack));
+ }
}
return hosts;
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index af0ecc9..4915be5 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -19,16 +19,14 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.net.URI;
import java.util.List;
import java.util.ArrayList;
import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -103,6 +101,41 @@
}
}
+ /** Dummy class to extend CombineFileInputFormat. It allows
+ * testing with files having missing blocks without actually removing replicas.
+ */
+ public static class MissingBlockFileSystem extends DistributedFileSystem {
+ String fileWithMissingBlocks;
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ fileWithMissingBlocks = "";
+ super.initialize(name, conf);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(
+ FileStatus stat, long start, long len) throws IOException {
+ if (stat.isDir()) {
+ return null;
+ }
+ System.out.println("File " + stat.getPath());
+ String name = stat.getPath().toUri().getPath();
+ BlockLocation[] locs =
+ super.getFileBlockLocations(stat, start, len);
+ if (name.equals(fileWithMissingBlocks)) {
+ System.out.println("Returing missing blocks for " + fileWithMissingBlocks);
+ locs[0] = new BlockLocation(new String[0], new String[0],
+ locs[0].getOffset(), locs[0].getLength());
+ }
+ return locs;
+ }
+
+ public void setFileWithMissingBlocks(String f) {
+ fileWithMissingBlocks = f;
+ }
+ }
+
private static final String DUMMY_KEY = "dummy.rr.key";
private static class DummyRecordReader extends RecordReader<Text, Text> {
@@ -1021,6 +1054,64 @@
}
}
+ /**
+ * Test that CFIF can handle missing blocks.
+ */
+ public void testMissingBlocks() throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ String testName = "testMissingBlocks";
+ try {
+ Configuration conf = new Configuration();
+ conf.set("fs.hdfs.impl", MissingBlockFileSystem.class.getName());
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+ dfs.waitActive();
+
+ namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+ (dfs.getFileSystem()).getUri().getPort();
+
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ Path file1 = new Path(dir1 + "/file1");
+ writeFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5");
+ writeFile(conf, file5, (short)1, 1);
+
+ ((MissingBlockFileSystem)fileSys).setFileWithMissingBlocks(file1.toUri().getPath());
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ Job job = Job.getInstance(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ List<InputSplit> splits = inFormat.getSplits(job);
+ System.out.println("Made splits(Test0): " + splits.size());
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.size(), 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
static class TestFilter implements PathFilter {
private Path p;