MAPREDUCE-2185. Fix infinite loop at creating splits using CombineFileInputFormat. (Ramkumar Vadali via schen)
    


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1132807 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 4d88302..8b5967d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -844,6 +844,9 @@
     MAPREDUCE-2487. ChainReducer uses MAPPER_BY_VALUE instead of
     REDUCER_BY_VALUE. (Devaraj K via todd)
 
+    MAPREDUCE-2185. Fix infinite loop at creating splits using
+    CombineFileInputFormat. (Ramkumar Vadali via schen)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index ef63ea2..d213f59 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -545,17 +545,28 @@
           // add this block to the block --> node locations map
           blockToNodes.put(oneblock, oneblock.hosts);
 
+          // For blocks that do not have host/rack information,
+          // assign to default  rack.
+          String[] racks = null;
+          if (oneblock.hosts.length == 0) {
+            racks = new String[]{NetworkTopology.DEFAULT_RACK};
+          } else {
+            racks = oneblock.racks;
+          }
+
           // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
+          for (int j = 0; j < racks.length; j++) {
+            String rack = racks[j];
             List<OneBlockInfo> blklist = rackToBlocks.get(rack);
             if (blklist == null) {
               blklist = new ArrayList<OneBlockInfo>();
               rackToBlocks.put(rack, blklist);
             }
             blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+            }
           }
 
           // add this block to the node --> block map
@@ -619,6 +630,11 @@
     }
   }
 
+  protected BlockLocation[] getFileBlockLocations(
+    FileSystem fs, FileStatus stat) throws IOException {
+    return fs.getFileBlockLocations(stat, 0, stat.getLen());
+  }
+
   private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
                                     String rack, String host) {
     Set<String> hosts = rackToNodes.get(rack);
@@ -632,7 +648,9 @@
   private Set<String> getHosts(Set<String> racks) {
     Set<String> hosts = new HashSet<String>();
     for (String rack : racks) {
-      hosts.addAll(rackToNodes.get(rack));
+      if (rackToNodes.containsKey(rack)) {
+        hosts.addAll(rackToNodes.get(rack));
+      }
     }
     return hosts;
   }
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index af0ecc9..4915be5 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -19,16 +19,14 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.URI;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.zip.GZIPOutputStream;
 
 import junit.framework.TestCase;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -103,6 +101,41 @@
     }
   }
 
+  /** Dummy class to extend CombineFileInputFormat. It allows
+   * testing with files having missing blocks without actually removing replicas.
+   */
+  public static class MissingBlockFileSystem extends DistributedFileSystem {
+    String fileWithMissingBlocks;
+
+    @Override
+    public void initialize(URI name, Configuration conf) throws IOException {
+      fileWithMissingBlocks = "";
+      super.initialize(name, conf);
+    }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(
+        FileStatus stat, long start, long len) throws IOException {
+      if (stat.isDir()) {
+        return null;
+      }
+      System.out.println("File " + stat.getPath());
+      String name = stat.getPath().toUri().getPath();
+      BlockLocation[] locs =
+        super.getFileBlockLocations(stat, start, len);
+      if (name.equals(fileWithMissingBlocks)) {
+        System.out.println("Returing missing blocks for " + fileWithMissingBlocks);
+        locs[0] = new BlockLocation(new String[0], new String[0],
+            locs[0].getOffset(), locs[0].getLength());
+      }
+      return locs;
+    }
+
+    public void setFileWithMissingBlocks(String f) {
+      fileWithMissingBlocks = f;
+    }
+  }
+
   private static final String DUMMY_KEY = "dummy.rr.key";
 
   private static class DummyRecordReader extends RecordReader<Text, Text> {
@@ -1021,6 +1054,64 @@
     }
   }
 
+  /**
+   * Test that CFIF can handle missing blocks.
+   */
+  public void testMissingBlocks() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    String testName = "testMissingBlocks";
+    try {
+      Configuration conf = new Configuration();
+      conf.set("fs.hdfs.impl", MissingBlockFileSystem.class.getName());
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                 (dfs.getFileSystem()).getUri().getPort();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      Path file1 = new Path(dir1 + "/file1");
+      writeFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5");
+      writeFile(conf, file5, (short)1, 1);
+
+      ((MissingBlockFileSystem)fileSys).setFileWithMissingBlocks(file1.toUri().getPath());
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      List<InputSplit> splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test0): " + splits.size());
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.size(), 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
   static class TestFilter implements PathFilter {
     private Path p;