HDFS-1371. One bad node can incorrectly flag many files as corrupt. Contributed by Tanping Wang.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1125145 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index c2c5a59..3237fb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -610,6 +610,9 @@
 
     HDFS-1922. Fix recurring failure of TestJMXGet (Luke Lu via todd)
 
+    HDFS-1371. One bad node can incorrectly flag many files as corrupt.
+    (Tanping Wang via jitendra)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
index b74b55d..0052d65 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -22,8 +22,13 @@
 import java.net.Socket;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -62,7 +67,7 @@
   private LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
   private DatanodeInfo currentNode = null;
-  private ExtendedBlock currentBlock = null;
+  private LocatedBlock currentLocatedBlock = null;
   private long pos = 0;
   private long blockEnd = -1;
 
@@ -204,8 +209,11 @@
   /**
    * Returns the block containing the target position. 
    */
-  public ExtendedBlock getCurrentBlock() {
-    return currentBlock;
+  synchronized public ExtendedBlock getCurrentBlock() {
+    if (currentLocatedBlock == null){
+      return null;
+    }
+    return currentLocatedBlock.getBlock();
   }
 
   /**
@@ -261,7 +269,7 @@
     if (updatePosition) {
       pos = offset;
       blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
-      currentBlock = blk.getBlock();
+      currentLocatedBlock = blk;
     }
     return blk;
   }
@@ -462,8 +470,9 @@
    * name readBuffer() is chosen to imply similarity to readBuffer() in
    * ChecksuFileSystem
    */ 
-  private synchronized int readBuffer(byte buf[], int off, int len) 
-                                                  throws IOException {
+  private synchronized int readBuffer(byte buf[], int off, int len,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     IOException ioe;
     
     /* we retry current node only once. So this is set to true only here.
@@ -479,16 +488,19 @@
       try {
         return blockReader.read(buf, off, len);
       } catch ( ChecksumException ce ) {
-        DFSClient.LOG.warn("Found Checksum error for " + currentBlock + " from " +
-                 currentNode.getName() + " at " + ce.getPos());          
-        dfsClient.reportChecksumFailure(src, currentBlock, currentNode);
+        DFSClient.LOG.warn("Found Checksum error for "
+            + getCurrentBlock() + " from " + currentNode.getName()
+            + " at " + ce.getPos());        
         ioe = ce;
         retryCurrentNode = false;
+        // we want to remember which block replicas we have tried
+        addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+            corruptedBlockMap);
       } catch ( IOException e ) {
         if (!retryCurrentNode) {
-          DFSClient.LOG.warn("Exception while reading from " + currentBlock +
-                   " of " + src + " from " + currentNode + ": " +
-                   StringUtils.stringifyException(e));
+          DFSClient.LOG.warn("Exception while reading from "
+              + getCurrentBlock() + " of " + src + " from "
+              + currentNode + ": " + StringUtils.stringifyException(e));
         }
         ioe = e;
       }
@@ -519,6 +531,8 @@
     if (closed) {
       throw new IOException("Stream closed");
     }
+    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap 
+      = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
     failures = 0;
     if (pos < getFileLength()) {
       int retries = 2;
@@ -528,7 +542,7 @@
             currentNode = blockSeekTo(pos);
           }
           int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
-          int result = readBuffer(buf, off, realLen);
+          int result = readBuffer(buf, off, realLen, corruptedBlockMap);
           
           if (result >= 0) {
             pos += result;
@@ -551,12 +565,34 @@
           if (--retries == 0) {
             throw e;
           }
+        } finally {
+          // Check if need to report block replicas corruption either read
+          // was successful or ChecksumException occured.
+          reportCheckSumFailure(corruptedBlockMap, 
+              currentLocatedBlock.getLocations().length);
         }
       }
     }
     return -1;
   }
 
+  /**
+   * Add corrupted block replica into map.
+   * @param corruptedBlockMap 
+   */
+  private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    Set<DatanodeInfo> dnSet = null;
+    if((corruptedBlockMap.containsKey(blk))) {
+      dnSet = corruptedBlockMap.get(blk);
+    }else {
+      dnSet = new HashSet<DatanodeInfo>();
+    }
+    if (!dnSet.contains(node)) {
+      dnSet.add(node);
+      corruptedBlockMap.put(blk, dnSet);
+    }
+  }
       
   private DNAddrPair chooseDataNode(LocatedBlock block)
     throws IOException {
@@ -605,8 +641,10 @@
     }
   } 
       
-  private void fetchBlockByteRange(LocatedBlock block, long start,
-                                   long end, byte[] buf, int offset) throws IOException {
+  private void fetchBlockByteRange(LocatedBlock block, long start, long end,
+      byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     //
     // Connect to best DataNode for desired Block, with potential offset
     //
@@ -646,7 +684,8 @@
         DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
                  src + " at " + block.getBlock() + ":" + 
                  e.getPos() + " from " + chosenNode.getName());
-        dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
+        // we want to remember what we have tried
+        addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
       } catch (IOException e) {
         if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will get a new access token and retry, "
@@ -703,11 +742,21 @@
     // corresponding to position and realLen
     List<LocatedBlock> blockRange = getBlockRange(position, realLen);
     int remaining = realLen;
+    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap 
+      = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
     for (LocatedBlock blk : blockRange) {
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
-      fetchBlockByteRange(blk, targetStart, 
-                          targetStart + bytesToRead - 1, buffer, offset);
+      try {
+        fetchBlockByteRange(blk, targetStart, 
+            targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+      } finally {
+        // Check and report if any block replicas are corrupted.
+        // BlockMissingException may be caught if all block replicas are
+        // corrupted.
+        reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
+      }
+
       remaining -= bytesToRead;
       position += bytesToRead;
       offset += bytesToRead;
@@ -718,7 +767,43 @@
     }
     return realLen;
   }
-   
+  
+  /**
+   * DFSInputStream reports checksum failure.
+   * Case I : client has tried multiple data nodes and at least one of the
+   * attempts has succeeded. We report the other failures as corrupted block to
+   * namenode. 
+   * Case II: client has tried out all data nodes, but all failed. We
+   * only report if the total number of replica is 1. We do not
+   * report otherwise since this maybe due to the client is a handicapped client
+   * (who can not read).
+   * @param corruptedBlockMap, map of corrupted blocks
+   * @param dataNodeCount, number of data nodes who contains the block replicas
+   */
+  private void reportCheckSumFailure(
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 
+      int dataNodeCount) {
+    if (corruptedBlockMap.isEmpty()) {
+      return;
+    }
+    Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
+        .entrySet().iterator();
+    Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
+    ExtendedBlock blk = entry.getKey();
+    Set<DatanodeInfo> dnSet = entry.getValue();
+    if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
+        || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
+      DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
+      int i = 0;
+      for (DatanodeInfo dn:dnSet) {
+        locs[i++] = dn;
+      }
+      LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
+      dfsClient.reportChecksumFailure(src, lblocks);
+    }
+    corruptedBlockMap.clear();
+  }
+
   @Override
   public long skip(long n) throws IOException {
     if ( n > 0 ) {
@@ -760,9 +845,10 @@
           }
         } catch (IOException e) {//make following read to retry
           if(DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Exception while seek to " + targetPos +
-                " from " + currentBlock +" of " + src + " from " +
-                currentNode + ": " + StringUtils.stringifyException(e));
+            DFSClient.LOG.debug("Exception while seek to " + targetPos
+                + " from " + getCurrentBlock() + " of " + src
+                + " from " + currentNode + ": "
+                + StringUtils.stringifyException(e));
           }
         }
       }
diff --git a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index a00207d..c5c12f0 100644
--- a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -689,6 +689,9 @@
    * is corrupt but we will report both to the namenode.  In the future,
    * we can consider figuring out exactly which block is corrupt.
    */
+  // We do not see a need for user to report block checksum errors and do not  
+  // want to rely on user to report block corruptions.
+  @Deprecated
   public boolean reportChecksumFailure(Path f, 
     FSDataInputStream in, long inPos, 
     FSDataInputStream sums, long sumsPos) {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java
new file mode 100644
index 0000000..97387cc
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.RandomAccessFile;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
+import org.apache.hadoop.hdfs.tools.DFSck;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import junit.framework.Assert;
+
+/**
+ * Class is used to test client reporting corrupted block replica to name node.
+ * The reporting policy is if block replica is more than one, if all replicas
+ * are corrupted, client does not report (since the client can handicapped). If
+ * some of the replicas are corrupted, client reports the corrupted block
+ * replicas. In case of only one block replica, client always reports corrupted
+ * replica.
+ */
+public class TestClientReportBadBlock {
+  private static final Log LOG = LogFactory
+      .getLog(TestClientReportBadBlock.class);
+
+  static final long BLOCK_SIZE = 64 * 1024;
+  private static int buffersize;
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem dfs;
+  private static int numDataNodes = 3;
+  private static final Configuration conf = new HdfsConfiguration();
+
+  Random rand = new Random();
+
+  @Before
+  public void startUpCluster() throws IOException {
+    if (System.getProperty("test.build.data") == null) { // to allow test to be
+      // run outside of Ant
+      System.setProperty("test.build.data", "build/test/data");
+    }
+    // disable block scanner
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); 
+    
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
+        .build();
+    cluster.waitActive();
+    dfs = (DistributedFileSystem) cluster.getFileSystem();
+    buffersize = conf.getInt("io.file.buffer.size", 4096);
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    dfs.close();
+    cluster.shutdown();
+  }
+
+  /*
+   * This test creates a file with one block replica. Corrupt the block. Make
+   * DFSClient read the corrupted file. Corrupted block is expected to be
+   * reported to name node.
+   */
+  @Test
+  public void testOneBlockReplica() throws Exception {
+    final short repl = 1;
+    final int corruptBlockNumber = 1;
+    for (int i = 0; i < 2; i++) {
+      // create a file
+      String fileName = "/tmp/testClientReportBadBlock/OneBlockReplica" + i;
+      Path filePath = new Path(fileName);
+      createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
+      if (i == 0) {
+        dfsClientReadFile(filePath);
+      } else {
+        dfsClientReadFileFromPosition(filePath);
+      }
+      // the only block replica is corrupted. The LocatedBlock should be marked
+      // as corrupted. But the corrupted replica is expected to be returned
+      // when calling Namenode#getBlockLocations() since all(one) replicas are
+      // corrupted.
+      int expectedReplicaCount = 1;
+      verifyCorruptedBlockCount(filePath, expectedReplicaCount);
+      verifyFirstBlockCorrupted(filePath, true);
+      verifyFsckBlockCorrupted();
+      testFsckListCorruptFilesBlocks(filePath, -1);
+    }
+  }
+
+  /**
+   * This test creates a file with three block replicas. Corrupt all of the
+   * replicas. Make dfs client read the file. No block corruption should be
+   * reported.
+   */
+  @Test
+  public void testCorruptAllOfThreeReplicas() throws Exception {
+    final short repl = 3;
+    final int corruptBlockNumber = 3;
+    for (int i = 0; i < 2; i++) {
+      // create a file
+      String fileName = "/tmp/testClientReportBadBlock/testCorruptAllReplicas"
+          + i;
+      Path filePath = new Path(fileName);
+      createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
+      // ask dfs client to read the file
+      if (i == 0) {
+        dfsClientReadFile(filePath);
+      } else {
+        dfsClientReadFileFromPosition(filePath);
+      }
+      // As all replicas are corrupted. We expect DFSClient does NOT report
+      // corrupted replicas to the name node.
+      int expectedReplicasReturned = repl;
+      verifyCorruptedBlockCount(filePath, expectedReplicasReturned);
+      // LocatedBlock should not have the block marked as corrupted.
+      verifyFirstBlockCorrupted(filePath, false);
+      verifyFsckHealth("");
+      testFsckListCorruptFilesBlocks(filePath, 0);
+    }
+  }
+
+  /**
+   * This test creates a file with three block replicas. Corrupt two of the
+   * replicas. Make dfs client read the file. The corrupted blocks with their
+   * owner data nodes should be reported to the name node. 
+   */
+  @Test
+  public void testCorruptTwoOutOfThreeReplicas() throws Exception {
+    final short repl = 3;
+    final int corruptBlocReplicas = 2;
+    for (int i = 0; i < 2; i++) {
+      String fileName = 
+        "/tmp/testClientReportBadBlock/CorruptTwoOutOfThreeReplicas"+ i;
+      Path filePath = new Path(fileName);
+      createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlocReplicas);
+      int replicaCount = 0;
+      /*
+       * The order of data nodes in LocatedBlock returned by name node is sorted 
+       * by NetworkToplology#pseudoSortByDistance. In current MiniDFSCluster, 
+       * when LocatedBlock is returned, the sorting is based on a random order.
+       * That is to say, the DFS client and simulated data nodes in mini DFS
+       * cluster are considered not on the same host nor the same rack.
+       * Therefore, even we corrupted the first two block replicas based in 
+       * order. When DFSClient read some block replicas, it is not guaranteed 
+       * which block replicas (good/bad) will be returned first. So we try to 
+       * re-read the file until we know the expected replicas numbers is 
+       * returned.
+       */
+      while (replicaCount != repl - corruptBlocReplicas) {
+          if (i == 0) {
+            dfsClientReadFile(filePath);
+          } else {
+            dfsClientReadFileFromPosition(filePath);
+          }
+        LocatedBlocks blocks = dfs.dfs.getNamenode().
+                  getBlockLocations(filePath.toString(), 0, Long.MAX_VALUE);
+        replicaCount = blocks.get(0).getLocations().length;
+      }
+      verifyFirstBlockCorrupted(filePath, false);
+      int expectedReplicaCount = repl-corruptBlocReplicas;
+      verifyCorruptedBlockCount(filePath, expectedReplicaCount);
+      verifyFsckHealth("Target Replicas is 3 but found 1 replica");
+      testFsckListCorruptFilesBlocks(filePath, 0);
+    }
+  }
+
+  /**
+   * create a file with one block and corrupt some/all of the block replicas.
+   */
+  private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
+      int corruptBlockCount) throws IOException, AccessControlException,
+      FileNotFoundException, UnresolvedLinkException {
+    DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0);
+    DFSTestUtil.waitReplication(dfs, filePath, repl);
+    // Locate the file blocks by asking name node
+    final LocatedBlocks locatedblocks = dfs.dfs.getNamenode()
+        .getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE);
+    Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length);
+    // The file only has one block
+    LocatedBlock lblock = locatedblocks.get(0);
+    DatanodeInfo[] datanodeinfos = lblock.getLocations();
+    ExtendedBlock block = lblock.getBlock();
+    // corrupt some /all of the block replicas
+    for (int i = 0; i < corruptBlockCount; i++) {
+      DatanodeInfo dninfo = datanodeinfos[i];
+      final DataNode dn = cluster.getDataNode(dninfo.getIpcPort());
+      corruptBlock(block, dn);
+      LOG.debug("Corrupted block " + block.getBlockName() + " on data node "
+          + dninfo.getName());
+
+    }
+  }
+
+  /**
+   * Verify the first block of the file is corrupted (for all its replica).
+   */
+  private void verifyFirstBlockCorrupted(Path filePath, boolean isCorrupted)
+      throws AccessControlException, FileNotFoundException,
+      UnresolvedLinkException, IOException {
+    final LocatedBlocks locatedBlocks = dfs.dfs.getNamenode()
+        .getBlockLocations(filePath.toUri().getPath(), 0, Long.MAX_VALUE);
+    final LocatedBlock firstLocatedBlock = locatedBlocks.get(0);
+    Assert.assertEquals(isCorrupted, firstLocatedBlock.isCorrupt());
+  }
+
+  /**
+   * Verify the number of corrupted block replicas by fetching the block
+   * location from name node.
+   */
+  private void verifyCorruptedBlockCount(Path filePath, int expectedReplicas)
+      throws AccessControlException, FileNotFoundException,
+      UnresolvedLinkException, IOException {
+    final LocatedBlocks lBlocks = dfs.dfs.getNamenode().getBlockLocations(
+        filePath.toUri().getPath(), 0, Long.MAX_VALUE);
+    // we expect only the first block of the file is used for this test
+    LocatedBlock firstLocatedBlock = lBlocks.get(0);
+    Assert.assertEquals(expectedReplicas,
+        firstLocatedBlock.getLocations().length);
+  }
+
+  /**
+   * Ask dfs client to read the file
+   */
+  private void dfsClientReadFile(Path corruptedFile) throws IOException,
+      UnresolvedLinkException {
+    DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
+    byte[] buf = new byte[buffersize];
+    int nRead = 0; // total number of bytes read
+    
+    try {
+      do {
+        nRead = in.read(buf, 0, buf.length);
+      } while (nRead > 0);
+    } catch (ChecksumException ce) {
+      // caught ChecksumException if all replicas are bad, ignore and continue.
+      LOG.debug("DfsClientReadFile caught ChecksumException.");
+    } catch (BlockMissingException bme) {
+      // caught BlockMissingException, ignore.
+      LOG.debug("DfsClientReadFile caught BlockMissingException.");
+    }
+  }
+
+  /**
+   * DFS client read bytes starting from the specified position.
+   */
+  private void dfsClientReadFileFromPosition(Path corruptedFile)
+      throws UnresolvedLinkException, IOException {
+    DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
+    byte[] buf = new byte[buffersize];
+    int startPosition = 2;
+    int nRead = 0; // total number of bytes read
+    try {
+      do {
+        nRead = in.read(startPosition, buf, 0, buf.length);
+        startPosition += buf.length;
+      } while (nRead > 0);
+    } catch (BlockMissingException bme) {
+      LOG.debug("DfsClientReadFile caught BlockMissingException.");
+    }
+  }
+
+  /**
+   * Corrupt a block on a data node. Replace the block file content with content
+   * of 1, 2, ...BLOCK_SIZE.
+   * 
+   * @param block
+   *          the ExtendedBlock to be corrupted
+   * @param dn
+   *          the data node where the block needs to be corrupted
+   * @throws FileNotFoundException
+   * @throws IOException
+   */
+  private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
+      throws FileNotFoundException, IOException {
+    final FSDataset data = (FSDataset) dn.getFSDataset();
+    final RandomAccessFile raFile = new RandomAccessFile(
+        data.getBlockFile(block), "rw");
+    final byte[] bytes = new byte[(int) BLOCK_SIZE];
+    for (int i = 0; i < BLOCK_SIZE; i++) {
+      bytes[i] = (byte) (i);
+    }
+    raFile.write(bytes);
+    raFile.close();
+  }
+
+  private static void verifyFsckHealth(String expected) throws Exception {
+    // Fsck health has error code 0.
+    // Make sure filesystem is in healthy state
+    String outStr = runFsck(conf, 0, true, "/");
+    LOG.info(outStr);
+    Assert.assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+    if (!expected.equals("")) {
+      Assert.assertTrue(outStr.contains(expected));
+    }
+  }
+
+  private static void verifyFsckBlockCorrupted() throws Exception {
+    String outStr = runFsck(conf, 1, true, "/");
+    LOG.info(outStr);
+    Assert.assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+  }
+  
+  private static void testFsckListCorruptFilesBlocks(Path filePath, int errorCode) throws Exception{
+    String outStr = runFsck(conf, errorCode, true, filePath.toString(), "-list-corruptfileblocks");
+    LOG.info("fsck -list-corruptfileblocks out: " + outStr);
+    if (errorCode != 0) {
+      Assert.assertTrue(outStr.contains("CORRUPT files"));
+    }
+  }
+
+  static String runFsck(Configuration conf, int expectedErrCode,
+      boolean checkErrorCode, String... path) throws Exception {
+    ByteArrayOutputStream bStream = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(bStream, true);
+    int errCode = ToolRunner.run(new DFSck(conf, out), path);
+    if (checkErrorCode)
+      Assert.assertEquals(expectedErrCode, errCode);
+    return bStream.toString();
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index f371daf..ea957d7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -341,16 +341,17 @@
     int replicaCount = 0;
     Random random = new Random();
     String outStr = null;
+    short factor = 1;
 
     MiniDFSCluster cluster = null;
     try {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
     Path file1 = new Path("/testCorruptBlock");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
+    DFSTestUtil.createFile(fs, file1, 1024, factor, 0);
     // Wait until file replication has completed
-    DFSTestUtil.waitReplication(fs, file1, (short)3);
+    DFSTestUtil.waitReplication(fs, file1, factor);
     ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
 
     // Make sure filesystem is in healthy state
@@ -358,18 +359,16 @@
     System.out.println(outStr);
     assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
     
-    // corrupt replicas 
-    for (int i=0; i < 3; i++) {
-      File blockFile = MiniDFSCluster.getBlockFile(i, block);
-      if (blockFile != null && blockFile.exists()) {
-        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        FileChannel channel = raFile.getChannel();
-        String badString = "BADBAD";
-        int rand = random.nextInt((int)channel.size()/2);
-        raFile.seek(rand);
-        raFile.write(badString.getBytes());
-        raFile.close();
-      }
+    // corrupt replicas
+    File blockFile = MiniDFSCluster.getBlockFile(0, block);
+    if (blockFile != null && blockFile.exists()) {
+      RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+      FileChannel channel = raFile.getChannel();
+      String badString = "BADBAD";
+      int rand = random.nextInt((int) channel.size()/2);
+      raFile.seek(rand);
+      raFile.write(badString.getBytes());
+      raFile.close();
     }
     // Read the file to trigger reportBadBlocks
     try {
@@ -384,7 +383,7 @@
     blocks = dfsClient.getNamenode().
                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
     replicaCount = blocks.get(0).getLocations().length;
-    while (replicaCount != 3) {
+    while (replicaCount != factor) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException ignore) {