MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
correctly. (Ramkumar Vadali via schen)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1021524 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8666fef..5cfdf74 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -325,6 +325,9 @@
     MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
     Mathew via amareshwari)
 
+    MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
+    correctly. (Ramkumar Vadali via schen)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
index 89f404b..49defb0 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
@@ -334,14 +334,18 @@
         while (alternates != null && nextLocation < alternates.length) {
           try {
             int idx = nextLocation++;
-            long corruptOffset = -1;
-            if (curexp instanceof BlockMissingException) {
-              corruptOffset = ((BlockMissingException)curexp).getOffset();
-            } else if (curexp instanceof ChecksumException) {
-              corruptOffset = ((ChecksumException)curexp).getPos();
-            }
-            Path npath = RaidNode.unRaid(conf, path, alternates[idx], stripeLength, 
-                                         corruptOffset);
+            long corruptOffset = underLyingStream.getPos();
+
+            // Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
+            Configuration clientConf = new Configuration(conf);
+            Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
+                                                DistributedFileSystem.class);
+            clientConf.set("fs.hdfs.impl", clazz.getName());
+            // Disable caching so that a previously cached RaidDfs is not used.
+            clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+            Path npath = RaidNode.unRaid(clientConf, path,
+                         alternates[idx], stripeLength,
+                         corruptOffset);
             FileSystem fs1 = getUnderlyingFileSystem(conf);
             fs1.initialize(npath.toUri(), conf);
             LOG.info("Opening alternate path " + npath + " at offset " + curpos);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index c788a1e..f6e3aa6 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -22,6 +22,7 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.RandomAccessFile;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -187,11 +188,18 @@
       
       assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem);
       
-      // corrupt first block of file
       LOG.info("Corrupt first block of file");
-      corruptBlock(file1, locations.get(0).getBlock());
+      corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false);
       validateFile(raidfs, file1, file1, crc1);
 
+      // Corrupt one more block. This is expected to fail.
+      LOG.info("Corrupt second block of file");
+      corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false);
+      try {
+        validateFile(raidfs, file1, file1, crc1);
+        fail("Expected exception ChecksumException not thrown!");
+      } catch (org.apache.hadoop.fs.ChecksumException e) {
+      }
     } catch (Exception e) {
       LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
       throw e;
@@ -288,43 +296,67 @@
   /*
    * The Data directories for a datanode
    */
-  private File[] getDataNodeDirs(int i) throws IOException {
+  static private File[] getDataNodeDirs(int i) throws IOException {
     File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
     File data_dir = new File(base_dir, "data");
     File dir1 = new File(data_dir, "data"+(2*i+1));
     File dir2 = new File(data_dir, "data"+(2*i+2));
     if (dir1.isDirectory() && dir2.isDirectory()) {
       File[] dir = new File[2];
-      dir[0] = new File(dir1, "current");
-      dir[1] = new File(dir2, "current"); 
+      dir[0] = new File(dir1, "current/finalized");
+      dir[1] = new File(dir2, "current/finalized"); 
       return dir;
     }
     return new File[0];
   }
 
   //
-  // Corrupt specified block of file
+  // Delete/Corrupt specified block of file
   //
-  void corruptBlock(Path file, Block blockNum) throws IOException {
+  public static void corruptBlock(Path file, Block blockNum,
+                    int numDataNodes, boolean delete) throws IOException {
     long id = blockNum.getBlockId();
 
     // Now deliberately remove/truncate data blocks from the block.
-    //
-    for (int i = 0; i < NUM_DATANODES; i++) {
+    int numDeleted = 0;
+    int numCorrupted = 0;
+    for (int i = 0; i < numDataNodes; i++) {
       File[] dirs = getDataNodeDirs(i);
-      
+
       for (int j = 0; j < dirs.length; j++) {
         File[] blocks = dirs[j].listFiles();
         assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
         for (int idx = 0; idx < blocks.length; idx++) {
           if (blocks[idx].getName().startsWith("blk_" + id) &&
               !blocks[idx].getName().endsWith(".meta")) {
-            blocks[idx].delete();
-            LOG.info("Deleted block " + blocks[idx]);
+            if (delete) {
+              blocks[idx].delete();
+              LOG.info("Deleted block " + blocks[idx]);
+              numDeleted++;
+            } else {
+              // Corrupt
+              File f = blocks[idx];
+              long seekPos = f.length()/2;
+              RandomAccessFile raf = new RandomAccessFile(f, "rw");
+              raf.seek(seekPos);
+              int data = raf.readInt();
+              raf.seek(seekPos);
+              raf.writeInt(data+1);
+              LOG.info("Corrupted block " + blocks[idx]);
+              numCorrupted++;
+            }
           }
         }
       }
     }
+    assertTrue("Nothing corrupted or deleted",
+              (numCorrupted + numDeleted) > 0);
   }
 
+  //
+  // Corrupt specified block of file
+  //
+  void corruptBlock(Path file, Block blockNum) throws IOException {
+    corruptBlock(file, blockNum, NUM_DATANODES, true);
+  }
 }