MAPREDUCE-2156. Raid-aware FSCK. (Patrick Kling via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1041702 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 07a96a0..0a5e56a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@
 
     MAPREDUCE-2172. Added test-patch.properties required by test-patch.sh (nigel)
 
+    MAPREDUCE-2156. Raid-aware FSCK. (Patrick Kling via dhruba)
+
   OPTIMIZATIONS
 
   BUG FIXES
diff --git a/src/contrib/raid/README b/src/contrib/raid/README
index d8d00f1..d445319 100644
--- a/src/contrib/raid/README
+++ b/src/contrib/raid/README
@@ -165,8 +165,12 @@
 start-raidnode-remote.sh (and do the equivalent thing for stop-mapred.sh and 
 stop-raidnode-remote.sh).
 
-Run fsckraid periodically (being developed as part of another JIRA). This valudates parity
-blocsk of a file.
+To validate the integrity of a file system, run RaidFSCK as follows:
+$HADOOP_HOME/bin/hadoop org.apache.hadoop.raid.RaidShell -fsck [path]
+
+This will print a list of corrupt files (i.e., files which have lost too many
+blocks and can no longer be fixed by Raid).
+
 
 --------------------------------------------------------------------------------
 
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
index 80108c6..eadcb1f 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
@@ -19,9 +19,14 @@
 package org.apache.hadoop.raid;
 
 import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.net.InetSocketAddress;
 import javax.security.auth.login.LoginException;
@@ -39,6 +44,14 @@
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.HarFileSystem;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
 
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
@@ -160,6 +173,7 @@
       System.err.println("           [-help [cmd]]");
       System.err.println("           [-recover srcPath1 corruptOffset]");
       System.err.println("           [-recoverBlocks path1 path2...]");
+      System.err.println("           [-fsck [path]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
     }
@@ -191,6 +205,11 @@
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-fsck".equals(cmd)) {
+      if ((argv.length < 1) || (argv.length > 2)) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
 
     try {
@@ -204,6 +223,15 @@
         initializeLocal(conf);
         recoverBlocks(argv, i);
         exitCode = 0;
+      } else if ("-fsck".equals(cmd)) {
+        if (argv.length == 1) {
+          // if there are no args, check the whole file system
+          exitCode = fsck("/");
+        } else {
+          // argv.length == 2
+          // otherwise, check the path passed
+          exitCode = fsck(argv[1]);
+        }
       } else {
         exitCode = -1;
         System.err.println(cmd.substring(1) + ": Unknown command");
@@ -296,6 +324,325 @@
   }
 
   /**
+   * checks whether a file has more than the allowable number of
+   * corrupt blocks and must therefore be considered corrupt
+   */
+  private boolean isFileCorrupt(final DistributedFileSystem dfs,
+                                final Path filePath)
+    throws IOException {
+    // corruptBlocksPerStripe:
+    // map stripe # -> # of corrupt blocks in that stripe (data + parity)
+    HashMap<Integer, Integer> corruptBlocksPerStripe =
+      new LinkedHashMap<Integer, Integer>();
+
+    // read conf
+    final int stripeBlocks = RaidNode.getStripeLength(conf);
+
+    // figure out which blocks are missing/corrupted
+    final FileStatus fileStatus = dfs.getFileStatus(filePath);
+    final long blockSize = fileStatus.getBlockSize();
+    final long fileLength = fileStatus.getLen();
+    final long fileLengthInBlocks = (fileLength / blockSize) +
+      (((fileLength % blockSize) == 0) ? 0L : 1L);
+    final long fileStripes = (fileLengthInBlocks / stripeBlocks) +
+      (((fileLengthInBlocks % stripeBlocks) == 0) ? 0L : 1L);
+    final BlockLocation[] fileBlocks =
+      dfs.getFileBlockLocations(fileStatus, 0, fileLength);
+
+    // figure out which stripes these corrupted blocks belong to
+    for (BlockLocation fileBlock: fileBlocks) {
+      int blockNo = (int) (fileBlock.getOffset() / blockSize);
+      final int stripe = (int) (blockNo / stripeBlocks);
+      if (fileBlock.isCorrupt() ||
+          (fileBlock.getNames().length == 0 && fileBlock.getLength() > 0)) {
+        if (corruptBlocksPerStripe.get(stripe) == null) {
+          corruptBlocksPerStripe.put(stripe, 1);
+        } else {
+          corruptBlocksPerStripe.put(stripe, corruptBlocksPerStripe.
+                                     get(stripe) + 1);
+        }
+        LOG.info("file " + filePath.toString() + " corrupt in block " +
+                 blockNo + "/" + fileLengthInBlocks + ", stripe " + stripe +
+                 "/" + fileStripes);
+      } else {
+        LOG.info("file " + filePath.toString() + " OK in block " + blockNo +
+                 "/" + fileLengthInBlocks + ", stripe " + stripe + "/" +
+                 fileStripes);
+      }
+    }
+
+    RaidInfo raidInfo = getFileRaidInfo(dfs, filePath);
+
+    // now check parity blocks
+    if (raidInfo.raidType != RaidType.NONE) {
+      checkParityBlocks(filePath, corruptBlocksPerStripe, blockSize,
+                        fileStripes, raidInfo);
+    }
+
+    final int maxCorruptBlocksPerStripe = raidInfo.parityBlocksPerStripe;
+
+    for (int corruptBlocksInStripe: corruptBlocksPerStripe.values()) {
+      if (corruptBlocksInStripe > maxCorruptBlocksPerStripe) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * holds the type of raid used for a particular file
+   */
+  private enum RaidType {
+    XOR,
+    RS,
+    NONE
+  }
+
+  /**
+   * holds raid type and parity file pair
+   */
+  private class RaidInfo {
+    public RaidInfo(final RaidType raidType,
+                    final RaidNode.ParityFilePair parityPair,
+                    final int parityBlocksPerStripe) {
+      this.raidType = raidType;
+      this.parityPair = parityPair;
+      this.parityBlocksPerStripe = parityBlocksPerStripe;
+    }
+    public final RaidType raidType;
+    public final RaidNode.ParityFilePair parityPair;
+    public final int parityBlocksPerStripe;
+  }
+
+  /**
+   * returns the raid for a given file
+   */
+  private RaidInfo getFileRaidInfo(final DistributedFileSystem dfs,
+                                   final Path filePath)
+    throws IOException {
+    // now look for the parity file
+    Path destPath = null;
+    RaidNode.ParityFilePair ppair = null;
+    try {
+      // look for xor parity file first
+      destPath = RaidNode.xorDestinationPath(conf);
+      ppair = RaidNode.getParityFile(destPath, filePath, conf);
+    } catch (FileNotFoundException ignore) {
+    }
+    if (ppair != null) {
+      return new RaidInfo(RaidType.XOR, ppair, 1);
+    } else {
+      // failing that, look for rs parity file
+      try {
+        destPath = RaidNode.rsDestinationPath(conf);
+        ppair = RaidNode.getParityFile(destPath, filePath, conf);
+      } catch (FileNotFoundException ignore) {
+      }
+      if (ppair != null) {
+        return new RaidInfo(RaidType.RS, ppair, RaidNode.rsParityLength(conf));
+      } else {
+        return new RaidInfo(RaidType.NONE, null, 0);
+      }
+    }
+  }
+
+  /**
+   * gets the parity blocks corresponding to file
+   * returns the parity blocks in case of DFS
+   * and the part blocks containing parity blocks
+   * in case of HAR FS
+   */
+  private BlockLocation[] getParityBlocks(final Path filePath,
+                                          final long blockSize,
+                                          final long fileStripes,
+                                          final RaidInfo raidInfo)
+    throws IOException {
+
+
+    final String parityPathStr = raidInfo.parityPair.getPath().toUri().
+      getPath();
+    FileSystem parityFS = raidInfo.parityPair.getFileSystem();
+
+    // get parity file metadata
+    FileStatus parityFileStatus = parityFS.
+      getFileStatus(new Path(parityPathStr));
+    long parityFileLength = parityFileStatus.getLen();
+
+    if (parityFileLength != fileStripes * raidInfo.parityBlocksPerStripe *
+        blockSize) {
+      throw new IOException("expected parity file of length" +
+                            (fileStripes * raidInfo.parityBlocksPerStripe *
+                             blockSize) +
+                            " but got parity file of length " +
+                            parityFileLength);
+    }
+
+    BlockLocation[] parityBlocks =
+      parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);
+
+    if (parityFS instanceof DistributedFileSystem ||
+        parityFS instanceof DistributedRaidFileSystem) {
+      long parityBlockSize = parityFileStatus.getBlockSize();
+      if (parityBlockSize != blockSize) {
+        throw new IOException("file block size is " + blockSize +
+                              " but parity file block size is " +
+                              parityBlockSize);
+      }
+    } else if (parityFS instanceof HarFileSystem) {
+      LOG.info("HAR FS found");
+    } else {
+      LOG.warn("parity file system is not of a supported type");
+    }
+
+    return parityBlocks;
+  }
+
+  /**
+   * checks the parity blocks for a given file and modifies
+   * corruptBlocksPerStripe accordingly
+   */
+  private void checkParityBlocks(final Path filePath,
+                                 final HashMap<Integer, Integer>
+                                 corruptBlocksPerStripe,
+                                 final long blockSize,
+                                 final long fileStripes,
+                                 final RaidInfo raidInfo)
+    throws IOException {
+
+    // get the blocks of the parity file
+    // because of har, multiple blocks may be returned as one container block
+    BlockLocation[] containerBlocks = getParityBlocks(filePath, blockSize,
+                                                      fileStripes, raidInfo);
+
+    long parityStripeLength = blockSize *
+      ((long) raidInfo.parityBlocksPerStripe);
+
+    long parityFileLength = parityStripeLength * fileStripes;
+
+    long parityBlocksFound = 0L;
+
+    for (BlockLocation cb: containerBlocks) {
+      if (cb.getLength() % blockSize != 0) {
+        throw new IOException("container block size is not " +
+                              "multiple of parity block size");
+      }
+      int blocksInContainer = (int) (cb.getLength() / blockSize);
+      LOG.info("found container with offset " + cb.getOffset() +
+               ", length " + cb.getLength());
+
+      for (long offset = cb.getOffset();
+           offset < cb.getOffset() + cb.getLength();
+           offset += blockSize) {
+        long block = offset / blockSize;
+
+        int stripe = (int) (offset / parityStripeLength);
+
+        if (stripe < 0) {
+          // before the beginning of the parity file
+          continue;
+        }
+        if (stripe >= fileStripes) {
+          // past the end of the parity file
+          break;
+        }
+
+        parityBlocksFound++;
+
+        if (cb.isCorrupt() ||
+            (cb.getNames().length == 0 && cb.getLength() > 0)) {
+          LOG.info("parity file for " + filePath.toString() +
+                   " corrupt in block " + block +
+                   ", stripe " + stripe + "/" + fileStripes);
+
+          if (corruptBlocksPerStripe.get(stripe) == null) {
+            corruptBlocksPerStripe.put(stripe, 1);
+          } else {
+            corruptBlocksPerStripe.put(stripe,
+                                       corruptBlocksPerStripe.get(stripe) +
+                                       1);
+          }
+        } else {
+          LOG.info("parity file for " + filePath.toString() +
+                   " OK in block " + block +
+                   ", stripe " + stripe + "/" + fileStripes);
+        }
+      }
+    }
+
+    long parityBlocksExpected = raidInfo.parityBlocksPerStripe * fileStripes;
+    if (parityBlocksFound != parityBlocksExpected ) {
+      throw new IOException("expected " + parityBlocksExpected +
+                            " parity blocks but got " + parityBlocksFound);
+    }
+  }
+
+
+  /**
+   * checks the raided file system, prints a list of corrupt files to
+   * System.out and returns the number of corrupt files
+   */
+  public int fsck(final String path) throws IOException {
+
+    FileSystem fs = (new Path(path)).getFileSystem(conf);
+
+    // if we got a raid fs, get the underlying fs
+    if (fs instanceof DistributedRaidFileSystem) {
+      fs = ((DistributedRaidFileSystem) fs).getFileSystem();
+    }
+
+    // check that we have a distributed fs
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IOException("expected DistributedFileSystem but got " +
+                fs.getClass().getName());
+    }
+    final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+
+    // get conf settings
+    String xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+    String rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath();
+    if (!xorPrefix.endsWith("/")) {
+      xorPrefix = xorPrefix + "/";
+    }
+    if (!rsPrefix.endsWith("/")) {
+      rsPrefix = rsPrefix + "/";
+    }
+    LOG.info("prefixes: " + xorPrefix + ", " + rsPrefix);
+
+    // get a list of corrupted files (not considering parity blocks just yet)
+    // from the name node
+    // these are the only files we need to consider:
+    // if a file has no corrupted data blocks, it is OK even if some
+    // of its parity blocks are corrupted, so no further checking is
+    // necessary
+    final String[] files = RaidDFSUtil.getCorruptFiles(dfs);
+    final List<Path> corruptFileCandidates = new LinkedList<Path>();
+    for (final String f: files) {
+      final Path p = new Path(f);
+      // if this file is a parity file
+      // or if it does not start with the specified path,
+      // ignore it
+      if (!p.toString().startsWith(xorPrefix) &&
+          !p.toString().startsWith(rsPrefix) &&
+          p.toString().startsWith(path)) {
+        corruptFileCandidates.add(p);
+      }
+    }
+    // filter files marked for deletion
+    RaidUtils.filterTrash(conf, corruptFileCandidates);
+
+    int numberOfCorruptFiles = 0;
+
+    for (final Path corruptFileCandidate: corruptFileCandidates) {
+      if (isFileCorrupt(dfs, corruptFileCandidate)) {
+        System.out.println(corruptFileCandidate.toString());
+        numberOfCorruptFiles++;
+      }
+    }
+
+    return numberOfCorruptFiles;
+  }
+
+  /**
    * main() has some simple utility methods
    */
   public static void main(String argv[]) throws Exception {
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
new file mode 100644
index 0000000..973f6df
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.junit.Test;
+import org.junit.After;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.HarIndex;
+
+
+public class TestRaidShellFsck {
+  final static Log LOG =
+    LogFactory.getLog("org.apache.hadoop.raid.TestRaidShellFsck");
+  final static String TEST_DIR = 
+    new File(System.
+             getProperty("test.build.data", "build/contrib/raid/test/data")).
+    getAbsolutePath();
+  final static String CONFIG_FILE = new File(TEST_DIR, "test-raid.xml").
+    getAbsolutePath();
+  final static long RELOAD_INTERVAL = 1000;
+  final static int NUM_DATANODES = 4;
+  final static int STRIPE_BLOCKS = 3; // number of blocks per stripe
+  final static int FILE_BLOCKS = 6; // number of blocks that file consists of
+  final static short REPL = 1; // replication factor before raiding
+  final static long BLOCK_SIZE = 8192L; // size of block in byte
+  final static String DIR_PATH = "/user/pkling/raidtest";
+  final static Path FILE_PATH0 =
+    new Path("/user/pkling/raidtest/raidfsck.test");
+  final static Path FILE_PATH1 =
+    new Path("/user/pkling/raidtest/raidfsck2.test");
+  final static Path RAID_PATH = new Path("/destraid/user/pkling/raidtest");
+  final static String HAR_NAME = "raidtest_raid.har";
+  final static String RAID_DIR = "/destraid";
+
+  Configuration conf = null;
+  Configuration raidConf = null;
+  Configuration clientConf = null;
+  MiniDFSCluster cluster = null;
+  DistributedFileSystem dfs = null;
+  RaidNode rnode = null;
+  
+  
+  RaidShell shell = null;
+  String[] args = null;
+
+
+  /**
+   * creates a MiniDFS instance with a raided file in it
+   */
+  private void setUp(boolean doHar) throws IOException, ClassNotFoundException {
+
+    final int timeBeforeHar;
+    if (doHar) {
+      timeBeforeHar = 0;
+    } else {
+      timeBeforeHar = -1;
+    }
+
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+
+    conf.set("raid.config.file", CONFIG_FILE);
+    conf.setBoolean("raid.config.reload", true);
+    conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+    // scan all policies once every 5 second
+    conf.setLong("raid.policy.rescan.interval", 5000);
+
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+    // do not use map-reduce cluster for Raiding
+    conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+    // use local block fixer
+    conf.set("raid.blockfix.classname", 
+             "org.apache.hadoop.raid.LocalBlockFixer");
+
+    conf.set("raid.server.address", "localhost:0");
+    conf.setInt("hdfs.raid.stripeLength", STRIPE_BLOCKS);
+    conf.set("hdfs.raid.locations", RAID_DIR);
+
+    conf.setInt("dfs.corruptfilesreturned.max", 500);
+
+    conf.setBoolean("dfs.permissions", false);
+
+    cluster = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+    cluster.waitActive();
+    dfs = (DistributedFileSystem) cluster.getFileSystem();
+    String namenode = dfs.getUri().toString();
+
+    FileSystem.setDefaultUri(conf, namenode);
+
+    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    String str =
+      "<configuration> " +
+      "  <srcPath prefix=\"" + DIR_PATH + "\"> " +
+      "    <policy name = \"RaidTest1\"> " +
+      "      <erasureCode>xor</erasureCode> " +
+      "      <destPath> " + RAID_DIR + " </destPath> " +
+      "      <property> " +
+      "        <name>targetReplication</name> " +
+      "        <value>1</value> " +
+      "        <description>after RAIDing, decrease the replication " +
+      "factor of a file to this value.</description> " +
+      "      </property> " +
+      "      <property> " +
+      "        <name>metaReplication</name> " +
+      "        <value>1</value> " +
+      "        <description> replication factor of parity file</description> " +
+      "      </property> " +
+      "      <property> " +
+      "        <name>modTimePeriod</name> " +
+      "        <value>2000</value> " +
+      "        <description>time (milliseconds) after a file is modified " + 
+      "to make it a candidate for RAIDing</description> " +
+      "      </property> ";
+   
+    if (timeBeforeHar >= 0) {
+      str +=
+        "      <property> " +
+        "        <name>time_before_har</name> " +
+        "        <value>" + timeBeforeHar + "</value> " +
+        "        <description> amount of time waited before har'ing parity " +
+        "files</description> " +
+        "     </property> ";
+    }
+
+    str +=
+      "    </policy>" +
+      "  </srcPath>" +
+      "</configuration>";
+
+    fileWriter.write(str);
+    fileWriter.close();
+
+    createTestFile(FILE_PATH0);
+    createTestFile(FILE_PATH1);
+
+    Path[] filePaths = { FILE_PATH0, FILE_PATH1 };
+    raidTestFiles(RAID_PATH, filePaths, doHar);
+
+    clientConf = new Configuration(raidConf);
+    clientConf.set("fs.hdfs.impl",
+                   "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+    clientConf.set("fs.raid.underlyingfs.impl",
+                   "org.apache.hadoop.hdfs.DistributedFileSystem");
+
+    // prepare shell and arguments
+    shell = new RaidShell(clientConf);
+    args = new String[2];
+    args[0] = "-fsck";
+    args[1] = DIR_PATH;
+
+  }
+  
+  /**
+   * Creates test file consisting of random data
+   */
+  private void createTestFile(Path filePath) throws IOException {
+    Random rand = new Random();
+    FSDataOutputStream stm = dfs.create(filePath, true, 
+                                        conf.getInt("io.file.buffer.size",
+                                                    4096), REPL, BLOCK_SIZE);
+
+    final byte[] b = new byte[(int) BLOCK_SIZE];
+    for (int i = 0; i < FILE_BLOCKS; i++) {
+      rand.nextBytes(b);
+      stm.write(b);
+    }
+    stm.close();
+    LOG.info("test file created");
+
+  }
+
+  /**
+   * raids test file
+   */
+  private void raidTestFiles(Path raidPath, Path[] filePaths, boolean doHar)
+    throws IOException, ClassNotFoundException {
+    // create RaidNode
+    raidConf = new Configuration(conf);
+    raidConf.set(RaidNode.RAID_LOCATION_KEY, RAID_DIR);
+    raidConf.setInt("raid.blockfix.interval", 1000);
+    raidConf.setLong("har.block.size", BLOCK_SIZE * 3);
+    // the RaidNode does the raiding inline (instead of submitting to MR node)
+    conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+    rnode = RaidNode.createRaidNode(null, raidConf);
+
+    for (Path filePath: filePaths) {
+      long waitStart = System.currentTimeMillis();
+      boolean raided = false;
+      
+      Path parityFilePath = new Path(RAID_DIR, 
+                                     filePath.toString().substring(1));
+
+      while (!raided) {
+        try {
+          FileStatus[] listPaths = dfs.listStatus(raidPath);
+          if (listPaths != null) {
+            if (doHar) {
+              // case with HAR
+              for (FileStatus f: listPaths) {
+                if (f.getPath().toString().endsWith(".har")) {
+                  // check if the parity file is in the index
+                  final Path indexPath = new Path(f.getPath(), "_index");
+                  final FileStatus indexFileStatus = 
+                    dfs.getFileStatus(indexPath);
+                  final HarIndex harIndex = 
+                    new HarIndex(dfs.open(indexPath), indexFileStatus.getLen());
+                  final HarIndex.IndexEntry indexEntry =
+                    harIndex.findEntryByFileName(parityFilePath.toString());
+                  if (indexEntry != null) {
+                    LOG.info("raid file " + parityFilePath.toString() + 
+                             " found in Har archive: " + 
+                             f.getPath().toString() +
+                             " ts=" + indexEntry.mtime);
+                    raided = true;
+                    break;
+                  }
+                }
+              }
+              
+            } else {
+              // case without HAR
+              for (FileStatus f : listPaths) {
+                Path found = new Path(f.getPath().toUri().getPath());
+                if (parityFilePath.equals(found)) {
+                  LOG.info("raid file found: " + f.getPath().toString());
+                  raided = true;
+                  break;
+                }
+              }
+            }
+          }
+        } catch (FileNotFoundException ignore) {
+        }
+        if (!raided) {
+          if (System.currentTimeMillis() > waitStart + 40000L) {
+            LOG.error("parity file not created after 40s");
+            throw new IOException("parity file not HARed after 40s");
+          } else {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+          }
+        }
+      }
+    }
+
+    rnode.stop();
+    rnode.join();
+    rnode = null;
+    LOG.info("test file raided");    
+  }
+
+  /**
+   * sleeps for up to 20s until the number of corrupt files 
+   * in the file system is equal to the number specified
+   */
+  private void waitUntilCorruptFileCount(DistributedFileSystem dfs,
+                                         int corruptFiles)
+    throws IOException {
+    int initialCorruptFiles = RaidDFSUtil.getCorruptFiles(dfs).length;
+    long waitStart = System.currentTimeMillis();
+    while (RaidDFSUtil.getCorruptFiles(dfs).length != corruptFiles) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+        
+      }
+
+      if (System.currentTimeMillis() > waitStart + 20000L) {
+        break;
+      }
+    }
+    
+    long waited = System.currentTimeMillis() - waitStart;
+
+    int corruptFilesFound = RaidDFSUtil.getCorruptFiles(dfs).length;
+    if (corruptFilesFound != corruptFiles) {
+      throw new IOException("expected " + corruptFiles + 
+                            " corrupt files but got " +
+                            corruptFilesFound);
+    }
+  }
+
+  /**
+   * removes a specified block from MiniDFS storage and reports it as corrupt
+   */
+  private void removeAndReportBlock(DistributedFileSystem blockDfs,
+                                    Path filePath,
+                                    LocatedBlock block) 
+    throws IOException {
+    TestRaidDfs.corruptBlock(filePath, block.getBlock(), NUM_DATANODES, true);
+   
+    // report deleted block to the name node
+    LocatedBlock[] toReport = { block };
+    blockDfs.getClient().getNamenode().reportBadBlocks(toReport);
+
+  }
+
+
+  /**
+   * removes a file block in the specified stripe
+   */
+  private void removeFileBlock(Path filePath, int stripe, int blockInStripe)
+    throws IOException {
+    LocatedBlocks fileBlocks = dfs.getClient().getNamenode().
+      getBlockLocations(filePath.toString(), 0, FILE_BLOCKS * BLOCK_SIZE);
+    if (fileBlocks.locatedBlockCount() != FILE_BLOCKS) {
+      throw new IOException("expected " + FILE_BLOCKS + 
+                            " file blocks but found " + 
+                            fileBlocks.locatedBlockCount());
+    }
+    if (blockInStripe >= STRIPE_BLOCKS) {
+      throw new IOException("blockInStripe is " + blockInStripe +
+                            " but must be smaller than " + STRIPE_BLOCKS);
+    }
+    LocatedBlock block = fileBlocks.get(stripe * STRIPE_BLOCKS + blockInStripe);
+    removeAndReportBlock(dfs, filePath, block);
+    LOG.info("removed file " + filePath.toString() + " block " +
+             stripe * STRIPE_BLOCKS + " in stripe " + stripe);
+  }
+
+  /**
+   * removes a parity block in the specified stripe
+   */
+  private void removeParityBlock(Path filePath, int stripe) throws IOException {
+    // find parity file
+    Path destPath = new Path(RAID_DIR);
+    RaidNode.ParityFilePair ppair = null;
+
+    ppair = RaidNode.getParityFile(destPath, filePath, conf);
+    String parityPathStr = ppair.getPath().toUri().getPath();
+    LOG.info("parity path: " + parityPathStr);
+    FileSystem parityFS = ppair.getFileSystem();
+    if (!(parityFS instanceof DistributedFileSystem)) {
+      throw new IOException("parity file is not on distributed file system");
+    }
+    DistributedFileSystem parityDFS = (DistributedFileSystem) parityFS;
+
+    
+    // now corrupt the block corresponding to the stripe selected
+    FileStatus parityFileStatus =
+      parityDFS.getFileStatus(new Path(parityPathStr));
+    long parityBlockSize = parityFileStatus.getBlockSize();
+    long parityFileLength = parityFileStatus.getLen();
+    long parityFileLengthInBlocks = (parityFileLength / parityBlockSize) + 
+      (((parityFileLength % parityBlockSize) == 0) ? 0L : 1L);
+    if (parityFileLengthInBlocks <= stripe) {
+      throw new IOException("selected stripe " + stripe + 
+                            " but parity file only has " + 
+                            parityFileLengthInBlocks + " blocks");
+    }
+    if (parityBlockSize != BLOCK_SIZE) {
+      throw new IOException("file block size is " + BLOCK_SIZE + 
+                            " but parity file block size is " + 
+                            parityBlockSize);
+    }
+    LocatedBlocks parityFileBlocks = parityDFS.getClient().getNamenode().
+      getBlockLocations(parityPathStr, 0, parityFileLength);
+    if (parityFileBlocks.locatedBlockCount() != parityFileLengthInBlocks) {
+      throw new IOException("expected " + parityFileLengthInBlocks + 
+                            " parity file blocks but got " + 
+                            parityFileBlocks.locatedBlockCount() + 
+                            " blocks");
+    }
+    LocatedBlock parityFileBlock = parityFileBlocks.get(stripe);
+    removeAndReportBlock(parityDFS, new Path(parityPathStr), parityFileBlock);
+    LOG.info("removed parity file block/stripe " + stripe +
+             " for " + filePath.toString());
+
+  }
+
+  /**
+   * removes a block from the har part file
+   */
+  private void removeHarParityBlock(int block) throws IOException {
+    Path harPath = new Path(RAID_PATH, HAR_NAME);
+    FileStatus [] listPaths = dfs.listStatus(harPath);
+    
+    boolean deleted = false;
+    
+    for (FileStatus f: listPaths) {
+      if (f.getPath().getName().startsWith("part-")) {
+        final Path partPath = new Path(f.getPath().toUri().getPath());
+        final LocatedBlocks partBlocks  = dfs.getClient().getNamenode().
+          getBlockLocations(partPath.toString(),
+                            0,
+                            f.getLen());
+        
+        if (partBlocks.locatedBlockCount() <= block) {
+          throw new IOException("invalid har block " + block);
+        }
+
+        final LocatedBlock partBlock = partBlocks.get(block);
+        removeAndReportBlock(dfs, partPath, partBlock);
+        LOG.info("removed block " + block + "/" + 
+                 partBlocks.locatedBlockCount() +
+                 " of file " + partPath.toString() +
+                 " block size " + partBlock.getBlockSize());
+        deleted = true;
+        break;
+      }
+    }
+
+    if (!deleted) {
+      throw new IOException("cannot find part file in " + harPath.toString());
+    }
+  }
+
+
+  /**
+   * returns the data directories for a data node
+   */
+  private File[] getDataDirs(int datanode) throws IOException{
+    File data_dir = new File(System.getProperty("test.build.data"), 
+                             "dfs/data/");
+    File dir1 = new File(data_dir, "data"+(2 * datanode + 1));
+    File dir2 = new File(data_dir, "data"+(2 * datanode + 2));
+    if (!(dir1.isDirectory() && dir2.isDirectory())) {
+      throw new IOException("data directories not found for data node " + 
+                            datanode + ": " + dir1.toString() + " " + 
+                            dir2.toString());
+    }
+
+    File[] dirs = new File[2];
+    dirs[0] = new File(dir1, "current");
+    dirs[1] = new File(dir2, "current");
+    return dirs;
+  }
+
+
+  /**
+   * checks fsck with no missing blocks
+   */
+  @Test
+  public void testClean() throws Exception {
+    LOG.info("testClean");
+    setUp(false);
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 0, but returns " +
+               Integer.toString(result), result == 0);
+  }
+
+
+  /**
+   * checks fsck with missing block in file block but not in parity block
+   */
+  @Test
+  public void testFileBlockMissing() throws Exception {
+    LOG.info("testFileBlockMissing");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 0, 0);
+    waitUntilCorruptFileCount(dfs, 1);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 0, but returns " + 
+               Integer.toString(result), result == 0);
+  }
+
+  /**
+   * checks fsck with missing block in parity block but not in file block
+   */
+  @Test
+  public void testParityBlockMissing() throws Exception {
+    LOG.info("testParityBlockMissing");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeParityBlock(FILE_PATH0, 0);
+    waitUntilCorruptFileCount(dfs, 1);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 0, but returns " +
+               Integer.toString(result), result == 0);
+  }
+
+  /**
+   * checks fsck with missing block in both file block and parity block
+   * in different stripes
+   */
+  @Test
+  public void testFileBlockAndParityBlockMissingInDifferentStripes() 
+    throws Exception {
+    LOG.info("testFileBlockAndParityBlockMissingInDifferentStripes");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 0, 0);
+    waitUntilCorruptFileCount(dfs, 1);
+    removeParityBlock(FILE_PATH0, 1);
+    waitUntilCorruptFileCount(dfs, 2);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 0, but returns " + 
+               Integer.toString(result), result == 0);
+  }
+
+  /**
+   * checks fsck with missing block in both file block and parity block
+   * in same stripe
+   */
+  @Test
+  public void testFileBlockAndParityBlockMissingInSameStripe() 
+    throws Exception {
+    LOG.info("testFileBlockAndParityBlockMissingInSameStripe");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeParityBlock(FILE_PATH0, 1);
+    waitUntilCorruptFileCount(dfs, 1);
+    removeFileBlock(FILE_PATH0, 1, 0);
+    waitUntilCorruptFileCount(dfs, 2);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 1, but returns " + 
+               Integer.toString(result), result == 1);
+  }
+
+  /**
+   * checks fsck with two missing file blocks in same stripe
+   */
+  @Test
+  public void test2FileBlocksMissingInSameStripe() 
+    throws Exception {
+    LOG.info("test2FileBlocksMissingInSameStripe");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 1, 1);
+    waitUntilCorruptFileCount(dfs, 1);
+    removeFileBlock(FILE_PATH0, 1, 0);
+    waitUntilCorruptFileCount(dfs, 1);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 1, but returns " + 
+               Integer.toString(result), result == 1);
+  }
+
+  /**
+   * checks fsck with two missing file blocks in different stripes
+   */
+  @Test
+  public void test2FileBlocksMissingInDifferentStripes() 
+    throws Exception {
+    LOG.info("test2FileBlocksMissingInDifferentStripes");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 1, 1);
+    waitUntilCorruptFileCount(dfs, 1);
+    removeFileBlock(FILE_PATH0, 0, 0);
+    waitUntilCorruptFileCount(dfs, 1);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 0, but returns " +
+               Integer.toString(result), result == 0);
+  }
+
+  /**
+   * checks fsck with file block missing (HAR)
+   * use 2 files to verify HAR offset logic in RaidShell fsck
+   * both files have one corrupt block, parity blocks are clean
+   *
+   * parity blocks in har (file.stripe):
+   * +-----+-----+-----+  +-----+
+   * | 0.0 | 0.1 | 1.0 |  | 1.1 |
+   * +-----+-----+-----+  +-----+
+   *  0                    1
+   *
+   */
+  @Test
+  public void testFileBlockMissingHar() 
+    throws Exception {
+    LOG.info("testFileBlockMissingHar");
+    setUp(true);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 1, 1);
+    removeFileBlock(FILE_PATH1, 1, 1);
+    waitUntilCorruptFileCount(dfs, 2);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 0, but returns " +
+               Integer.toString(result), result == 0);
+  }
+
+  /**
+   * checks fsck with file block missing (HAR)
+   * use 2 files to verify HAR offset logic in RaidShell fsck
+   *
+   * parity blocks in har (file.stripe):
+   * +-----+-----+-----+  +-----+
+   * | 0.0 | 0.1 | 1.0 |  | 1.1 |
+   * +-----+-----+-----+  +-----+
+   *  0                    1
+   *
+   * corrupt file 0, stripe 0 file block 0
+   * corrupt file 0, stripe 1 file block 0
+   * corrupt file 1, stripe 0 file block 0
+   * corrupt file 1, stripe 1 file block 0
+   * corrupt har block 0
+   * both files should be corrupt
+   */
+  @Test
+  public void testFileBlockAndParityBlockMissingHar1() 
+    throws Exception {
+    LOG.info("testFileBlockAndParityBlockMissingHar1");
+    setUp(true);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 0, 0);
+    removeFileBlock(FILE_PATH0, 1, 0);
+    removeFileBlock(FILE_PATH1, 0, 0);
+    removeFileBlock(FILE_PATH1, 1, 0);
+    removeHarParityBlock(0);
+    waitUntilCorruptFileCount(dfs, 3);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 2, but returns " +
+               Integer.toString(result), result == 2);
+  }
+
+  /**
+   * checks fsck with file block missing (HAR)
+   * use 2 files to verify HAR offset logic in RaidShell fsck
+   *
+   * parity blocks in har (file.stripe):
+   * +-----+-----+-----+  +-----+
+   * | 0.0 | 0.1 | 1.0 |  | 1.1 |
+   * +-----+-----+-----+  +-----+
+   *  0                    1
+   *
+   * corrupt file 0, stripe 0 file block 0
+   * corrupt file 0, stripe 1 file block 0
+   * corrupt file 1, stripe 0 file block 0
+   * corrupt file 1, stripe 1 file block 0
+   * corrupt har block 1
+   * only file 2 should be corrupt
+   */
+  @Test
+  public void testFileBlockAndParityBlockMissingHar2() 
+    throws Exception {
+    LOG.info("testFileBlockAndParityBlockMissingHar2");
+    setUp(true);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeFileBlock(FILE_PATH0, 0, 0);
+    removeFileBlock(FILE_PATH0, 1, 0);
+    removeFileBlock(FILE_PATH1, 0, 0);
+    removeFileBlock(FILE_PATH1, 1, 0);
+    removeHarParityBlock(1);
+    waitUntilCorruptFileCount(dfs, 3);
+
+    int result = ToolRunner.run(shell, args);
+
+    assertTrue("fsck should return 1, but returns " +
+               Integer.toString(result), result == 1);
+  }
+
+  /**
+   * checks that fsck does not report corrupt file that is not in
+   * the specified path
+   */
+  @Test
+  public void testPathFilter() 
+    throws Exception {
+    LOG.info("testPathFilter");
+    setUp(false);
+    waitUntilCorruptFileCount(dfs, 0);
+    removeParityBlock(FILE_PATH0, 1);
+    waitUntilCorruptFileCount(dfs, 1);
+    removeFileBlock(FILE_PATH0, 1, 0);
+    waitUntilCorruptFileCount(dfs, 2);
+
+    String[] otherArgs = new String[2];
+    otherArgs[0] = "-fsck";
+    otherArgs[1] = "/user/pkling/other";
+    int result = ToolRunner.run(shell, otherArgs);
+
+    assertTrue("fsck should return 0, but returns " + 
+               Integer.toString(result), result == 0);
+  }
+
+
+  @After
+  public void tearDown() throws Exception {
+    if (rnode != null) {
+      rnode.stop();
+      rnode.join();
+      rnode = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    
+    dfs = null;
+
+    LOG.info("Test cluster shut down");
+  }
+  
+
+}
+