MAPREDUCE-2395. TestBlockFixer timing out on trunk. Contributed by Ramkumar Vadali.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1089686 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 257ebe9..ccfa229 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -121,6 +121,9 @@
     MAPREDUCE-2348. Disable mumak tests on trunk since they currently time out
     (todd)
 
+    MAPREDUCE-2395. TestBlockFixer timing out on trunk. (Ramkumar Vadali via
+    todd)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
index fac0b22..16fadde 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
@@ -135,11 +135,6 @@
   }
 
   @Test
-  public void testBlockFixDist() throws Exception {
-    implBlockFix(false);
-  }
-
-  @Test
   public void testBlockFixLocal() throws Exception {
     implBlockFix(true);
   }
@@ -148,7 +143,7 @@
    * Create a file with three stripes, corrupt a block each in two stripes,
    * and wait for the the file to be fixed.
    */
-  private void implBlockFix(boolean local) throws Exception {
+  protected void implBlockFix(boolean local) throws Exception {
     LOG.info("Test testBlockFix started.");
     long blockSize = 8192L;
     int stripeLength = 3;
@@ -231,7 +226,7 @@
    * regenerated. Now stop RaidNode and corrupt the generated block.
    * Test that corruption in the generated block can be detected by clients.
    */
-  private void generatedBlockTestCommon(String testName, int blockToCorrupt,
+  protected void generatedBlockTestCommon(String testName, int blockToCorrupt,
                                         boolean local) throws Exception {
     LOG.info("Test " + testName + " started.");
     long blockSize = 8192L;
@@ -330,17 +325,6 @@
    * Test that corruption in the generated block can be detected by clients.
    */
   @Test
-  public void testGeneratedBlockDist() throws Exception {
-    generatedBlockTestCommon("testGeneratedBlock", 3, false);
-  }
-
-  /**
-   * Tests integrity of generated block.
-   * Create a file and delete a block entirely. Wait for the block to be
-   * regenerated. Now stop RaidNode and corrupt the generated block.
-   * Test that corruption in the generated block can be detected by clients.
-   */
-  @Test
   public void testGeneratedBlockLocal() throws Exception {
     generatedBlockTestCommon("testGeneratedBlock", 3, true);
   }
@@ -352,27 +336,11 @@
    * Test that corruption in the generated block can be detected by clients.
    */
   @Test
-  public void testGeneratedLastBlockDist() throws Exception {
-    generatedBlockTestCommon("testGeneratedLastBlock", 6, false);
-  }
-
-  /**
-   * Tests integrity of generated last block.
-   * Create a file and delete a block entirely. Wait for the block to be
-   * regenerated. Now stop RaidNode and corrupt the generated block.
-   * Test that corruption in the generated block can be detected by clients.
-   */
-  @Test
   public void testGeneratedLastBlockLocal() throws Exception {
     generatedBlockTestCommon("testGeneratedLastBlock", 6, true);
   }
 
   @Test
-  public void testParityBlockFixDist() throws Exception {
-    implParityBlockFix("testParityBlockFixDist", false);
-  }
-
-  @Test
   public void testParityBlockFixLocal() throws Exception {
     implParityBlockFix("testParityBlockFixLocal", true);
   }
@@ -380,7 +348,7 @@
   /**
    * Corrupt a parity file and wait for it to get fixed.
    */
-  private void implParityBlockFix(String testName, boolean local)
+  protected void implParityBlockFix(String testName, boolean local)
     throws Exception {
     LOG.info("Test " + testName + " started.");
     long blockSize = 8192L;
@@ -462,16 +430,11 @@
   }
 
   @Test
-  public void testParityHarBlockFixDist() throws Exception {
-    implParityHarBlockFix("testParityHarBlockFixDist", false);
-  }
-
-  @Test
   public void testParityHarBlockFixLocal() throws Exception {
     implParityHarBlockFix("testParityHarBlockFixLocal", true);
   }
 
-  private void implParityHarBlockFix(String testName, boolean local)
+  protected void implParityHarBlockFix(String testName, boolean local)
     throws Exception {
     LOG.info("Test " + testName + " started.");
     long blockSize = 8192L;
@@ -567,218 +530,7 @@
   }
 
 
-  /**
-   * tests that we can have 2 concurrent jobs fixing files 
-   * (dist block fixer)
-   */
-  @Test
-  public void testConcurrentJobs() throws Exception {
-    LOG.info("Test testConcurrentJobs started.");
-    long blockSize = 8192L;
-    int stripeLength = 3;
-    mySetup(stripeLength, -1); // never har
-    Path file1 = new Path("/user/dhruba/raidtest/file1");
-    Path file2 = new Path("/user/dhruba/raidtest/file2");
-    Path destPath = new Path("/destraid/user/dhruba/raidtest");
-    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
-                                                          1, 20, blockSize);
-    long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
-                                                          1, 20, blockSize);
-    long file1Len = fileSys.getFileStatus(file1).getLen();
-    long file2Len = fileSys.getFileStatus(file2).getLen();
-    LOG.info("Test testConcurrentJobs created test files");
-
-    // create an instance of the RaidNode
-    Configuration localConf = new Configuration(conf);
-    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
-    localConf.setInt("raid.blockfix.interval", 1000);
-    localConf.set("raid.blockfix.classname", 
-                  "org.apache.hadoop.raid.DistBlockFixer");
-    localConf.setLong("raid.blockfix.filespertask", 2L);
-
-    try {
-      cnode = RaidNode.createRaidNode(null, localConf);
-      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
-      TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
-      cnode.stop(); cnode.join();
-
-      FileStatus file1Stat = fileSys.getFileStatus(file1);
-      FileStatus file2Stat = fileSys.getFileStatus(file2);
-      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
-      LocatedBlocks file1Loc =
-        RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
-                                      0, file1Stat.getLen());
-      LocatedBlocks file2Loc =
-        RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
-                                      0, file2Stat.getLen());
-      
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
-      assertEquals("no corrupt files expected", 0, corruptFiles.length);
-      assertEquals("filesFixed() should return 0 before fixing files",
-                   0, cnode.blockFixer.filesFixed());
-
-      // corrupt file1
-      int[] corruptBlockIdxs = new int[]{0, 4, 6};
-      for (int idx: corruptBlockIdxs)
-        corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
-      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
-
-      cnode = RaidNode.createRaidNode(null, localConf);
-      DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
-      long start = System.currentTimeMillis();
-
-      while (blockFixer.jobsRunning() < 1 &&
-             System.currentTimeMillis() - start < 240000) {
-        LOG.info("Test testBlockFix waiting for fixing job 1 to start");
-        Thread.sleep(10);
-      }
-      assertEquals("job 1 not running", 1, blockFixer.jobsRunning());
-
-      // corrupt file2
-      for (int idx: corruptBlockIdxs)
-        corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
-      reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
-      
-      while (blockFixer.jobsRunning() < 2 &&
-             System.currentTimeMillis() - start < 240000) {
-        LOG.info("Test testBlockFix waiting for fixing job 2 to start");
-        Thread.sleep(10);
-      }
-      assertEquals("2 jobs not running", 2, blockFixer.jobsRunning());
-
-      while (blockFixer.filesFixed() < 2 &&
-             System.currentTimeMillis() - start < 240000) {
-        LOG.info("Test testBlockFix waiting for files to be fixed.");
-        Thread.sleep(10);
-      }
-      assertEquals("files not fixed", 2, blockFixer.filesFixed());
-
-      dfs = getDFS(conf, dfs);
-      
-      try {
-        Thread.sleep(5*1000);
-      } catch (InterruptedException ignore) {
-      }
-      assertTrue("file not fixed",
-                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
-      assertTrue("file not fixed",
-                 TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
-    } catch (Exception e) {
-      LOG.info("Test testConcurrentJobs exception " + e +
-               StringUtils.stringifyException(e));
-      throw e;
-    } finally {
-      myTearDown();
-    }
-
-  }
-
-  /**
-   * tests that the distributed block fixer obeys
-   * the limit on how many files to fix simultaneously
-   */
-  @Test
-  public void testMaxPendingFiles() throws Exception {
-    LOG.info("Test testMaxPendingFiles started.");
-    long blockSize = 8192L;
-    int stripeLength = 3;
-    mySetup(stripeLength, -1); // never har
-    Path file1 = new Path("/user/dhruba/raidtest/file1");
-    Path file2 = new Path("/user/dhruba/raidtest/file2");
-    Path destPath = new Path("/destraid/user/dhruba/raidtest");
-    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
-                                                          1, 20, blockSize);
-    long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
-                                                          1, 20, blockSize);
-    long file1Len = fileSys.getFileStatus(file1).getLen();
-    long file2Len = fileSys.getFileStatus(file2).getLen();
-    LOG.info("Test testMaxPendingFiles created test files");
-
-    // create an instance of the RaidNode
-    Configuration localConf = new Configuration(conf);
-    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
-    localConf.setInt("raid.blockfix.interval", 1000);
-    localConf.set("raid.blockfix.classname", 
-                  "org.apache.hadoop.raid.DistBlockFixer");
-    localConf.setLong("raid.blockfix.filespertask", 2L);
-    localConf.setLong("raid.blockfix.maxpendingfiles", 1L);
-
-    try {
-      cnode = RaidNode.createRaidNode(null, localConf);
-      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
-      TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
-      cnode.stop(); cnode.join();
-
-      FileStatus file1Stat = fileSys.getFileStatus(file1);
-      FileStatus file2Stat = fileSys.getFileStatus(file2);
-      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
-      LocatedBlocks file1Loc =
-        RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
-                                      0, file1Stat.getLen());
-      LocatedBlocks file2Loc =
-        RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
-                                      0, file2Stat.getLen());
-
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
-      assertEquals("no corrupt files expected", 0, corruptFiles.length);
-      assertEquals("filesFixed() should return 0 before fixing files",
-                   0, cnode.blockFixer.filesFixed());
-
-      // corrupt file1
-      int[] corruptBlockIdxs = new int[]{0, 4, 6};
-      for (int idx: corruptBlockIdxs)
-        corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
-      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
-      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
-
-      cnode = RaidNode.createRaidNode(null, localConf);
-      DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
-      long start = System.currentTimeMillis();
-
-      while (blockFixer.jobsRunning() < 1 &&
-             System.currentTimeMillis() - start < 240000) {
-        LOG.info("Test testBlockFix waiting for fixing job 1 to start");
-        Thread.sleep(10);
-      }
-      assertEquals("job not running", 1, blockFixer.jobsRunning());
-
-      // corrupt file2
-      for (int idx: corruptBlockIdxs)
-        corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
-      reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
-      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
-      
-      // wait until both files are fixed
-      while (blockFixer.filesFixed() < 2 &&
-             System.currentTimeMillis() - start < 240000) {
-        // make sure the block fixer does not start a second job while
-        // the first one is still running
-        assertTrue("too many jobs running", blockFixer.jobsRunning() <= 1);
-        Thread.sleep(10);
-      }
-      assertEquals("files not fixed", 2, blockFixer.filesFixed());
-
-      dfs = getDFS(conf, dfs);
-      
-      try {
-        Thread.sleep(5*1000);
-      } catch (InterruptedException ignore) {
-      }
-      assertTrue("file not fixed",
-                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
-      assertTrue("file not fixed",
-                 TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
-    } catch (Exception e) {
-      LOG.info("Test testMaxPendingFiles exception " + e +
-               StringUtils.stringifyException(e));
-      throw e;
-    } finally {
-      myTearDown();
-    }
-
-  }
-
-  private static DistributedFileSystem getDFS(
+  protected static DistributedFileSystem getDFS(
         Configuration conf, FileSystem dfs) throws IOException {
     Configuration clientConf = new Configuration(conf);
     clientConf.set("fs.hdfs.impl",
@@ -789,7 +541,7 @@
     return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf);
   }
 
-  private void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
+  protected void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
 
     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
     conf = new Configuration();
@@ -869,7 +621,7 @@
     fileWriter.close();
   }
 
-  private void myTearDown() throws Exception {
+  protected void myTearDown() throws Exception {
     if (cnode != null) { cnode.stop(); cnode.join(); }
     if (mr != null) { mr.shutdown(); }
     if (dfs != null) { dfs.shutdown(); }
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
new file mode 100644
index 0000000..40ee951
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBlockFixerBlockFixDist extends TestBlockFixer {
+  @Test
+  public void testBlockFixDist() throws Exception {
+    implBlockFix(false);
+  }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
new file mode 100644
index 0000000..36c087b
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.RaidUtils;
+
+public class TestBlockFixerDistConcurrency extends TestBlockFixer {
+  /**
+   * tests that we can have 2 concurrent jobs fixing files 
+   * (dist block fixer)
+   */
+  @Test
+  public void testConcurrentJobs() throws Exception {
+    LOG.info("Test testConcurrentJobs started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1); // never har
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path file2 = new Path("/user/dhruba/raidtest/file2");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 20, blockSize);
+    long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
+                                                          1, 20, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    long file2Len = fileSys.getFileStatus(file2).getLen();
+    LOG.info("Test testConcurrentJobs created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.set("raid.blockfix.classname", 
+                  "org.apache.hadoop.raid.DistBlockFixer");
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
+      cnode.stop(); cnode.join();
+
+      FileStatus file1Stat = fileSys.getFileStatus(file1);
+      FileStatus file2Stat = fileSys.getFileStatus(file2);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks file1Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
+                                      0, file1Stat.getLen());
+      LocatedBlocks file2Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
+                                      0, file2Stat.getLen());
+      
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
+
+      // corrupt file1
+      int[] corruptBlockIdxs = new int[]{0, 4, 6};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
+      long start = System.currentTimeMillis();
+
+      while (blockFixer.jobsRunning() < 1 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for fixing job 1 to start");
+        Thread.sleep(10);
+      }
+      assertEquals("job 1 not running", 1, blockFixer.jobsRunning());
+
+      // corrupt file2
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
+      
+      while (blockFixer.jobsRunning() < 2 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for fixing job 2 to start");
+        Thread.sleep(10);
+      }
+      assertEquals("2 jobs not running", 2, blockFixer.jobsRunning());
+
+      while (blockFixer.filesFixed() < 2 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for files to be fixed.");
+        Thread.sleep(10);
+      }
+      assertEquals("files not fixed", 2, blockFixer.filesFixed());
+
+      dfs = getDFS(conf, dfs);
+      
+      try {
+        Thread.sleep(5*1000);
+      } catch (InterruptedException ignore) {
+      }
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
+    } catch (Exception e) {
+      LOG.info("Test testConcurrentJobs exception " + e +
+               StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+
+  }
+
+  /**
+   * tests that the distributed block fixer obeys
+   * the limit on how many files to fix simultaneously
+   */
+  @Test
+  public void testMaxPendingFiles() throws Exception {
+    LOG.info("Test testMaxPendingFiles started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1); // never har
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path file2 = new Path("/user/dhruba/raidtest/file2");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 20, blockSize);
+    long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
+                                                          1, 20, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    long file2Len = fileSys.getFileStatus(file2).getLen();
+    LOG.info("Test testMaxPendingFiles created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.set("raid.blockfix.classname", 
+                  "org.apache.hadoop.raid.DistBlockFixer");
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+    localConf.setLong("raid.blockfix.maxpendingfiles", 1L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
+      cnode.stop(); cnode.join();
+
+      FileStatus file1Stat = fileSys.getFileStatus(file1);
+      FileStatus file2Stat = fileSys.getFileStatus(file2);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks file1Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
+                                      0, file1Stat.getLen());
+      LocatedBlocks file2Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
+                                      0, file2Stat.getLen());
+
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
+
+      // corrupt file1
+      int[] corruptBlockIdxs = new int[]{0, 4, 6};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
+      long start = System.currentTimeMillis();
+
+      while (blockFixer.jobsRunning() < 1 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for fixing job 1 to start");
+        Thread.sleep(10);
+      }
+      assertEquals("job not running", 1, blockFixer.jobsRunning());
+
+      // corrupt file2
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      
+      // wait until both files are fixed
+      while (blockFixer.filesFixed() < 2 &&
+             System.currentTimeMillis() - start < 240000) {
+        // make sure the block fixer does not start a second job while
+        // the first one is still running
+        assertTrue("too many jobs running", blockFixer.jobsRunning() <= 1);
+        Thread.sleep(10);
+      }
+      assertEquals("files not fixed", 2, blockFixer.filesFixed());
+
+      dfs = getDFS(conf, dfs);
+      
+      try {
+        Thread.sleep(5*1000);
+      } catch (InterruptedException ignore) {
+      }
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
+    } catch (Exception e) {
+      LOG.info("Test testMaxPendingFiles exception " + e +
+               StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+
+  }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
new file mode 100644
index 0000000..c2f42b9
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBlockFixerGeneratedBlockDist extends TestBlockFixer {
+  /**
+   * Tests integrity of generated block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  @Test
+  public void testGeneratedBlockDist() throws Exception {
+    generatedBlockTestCommon("testGeneratedBlock", 3, false);
+  }
+
+  /**
+   * Tests integrity of generated last block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  @Test
+  public void testGeneratedLastBlockDist() throws Exception {
+    generatedBlockTestCommon("testGeneratedLastBlock", 6, false);
+  }
+
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
new file mode 100644
index 0000000..a51bc0a
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBlockFixerParityBlockFixDist extends TestBlockFixer {
+  @Test
+  public void testParityBlockFixDist() throws Exception {
+    implParityBlockFix("testParityBlockFixDist", false);
+  }
+
+  @Test
+  public void testParityHarBlockFixDist() throws Exception {
+    implParityHarBlockFix("testParityHarBlockFixDist", false);
+  }
+}