MAPREDUCE-2132. A command line option in RaidShell to fix blocks using raid


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1026137 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 79800b7..4ea8e40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -138,6 +138,8 @@
     MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
     Vadali via schen)
 
+    MAPREDUCE-2132. A command line option in RaidShell to fix blocks using raid
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
new file mode 100644
index 0000000..364ac12
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+
+public abstract class RaidDFSUtil {
+  /**
+   * Returns the corrupt blocks in a file.
+   */
+  public static List<LocatedBlock> corruptBlocksInFile(
+    DistributedFileSystem dfs, String path, long offset, long length)
+  throws IOException {
+    List<LocatedBlock> corrupt = new LinkedList<LocatedBlock>();
+    LocatedBlocks locatedBlocks =
+      getBlockLocations(dfs, path, offset, length);
+    for (LocatedBlock b: locatedBlocks.getLocatedBlocks()) {
+      if (b.isCorrupt() ||
+         (b.getLocations().length == 0 && b.getBlockSize() > 0)) {
+        corrupt.add(b);
+      }
+    }
+    return corrupt;
+  }
+
+  public static LocatedBlocks getBlockLocations(
+    DistributedFileSystem dfs, String path, long offset, long length)
+    throws IOException {
+    return dfs.getClient().namenode.getBlockLocations(path, offset, length);
+  }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
new file mode 100644
index 0000000..2790157
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Reads a block from the disk and sends it to a recipient.
+ */
+public class RaidBlockSender implements java.io.Closeable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  static final Log ClientTraceLog = DataNode.ClientTraceLog;
+  
+  private Block block; // the block to read from
+
+  /** the replica to read from */
+  private final Replica replica = null;
+  /** The visible length of a replica. */
+  private final long replicaVisibleLength;
+
+  private InputStream blockIn; // data stream
+  private long blockInPosition = -1; // updated while using transferTo().
+  private DataInputStream checksumIn; // checksum datastream
+  private DataChecksum checksum; // checksum stream
+  private long offset; // starting position to read
+  private long endOffset; // ending position
+  private int bytesPerChecksum; // chunk size
+  private int checksumSize; // checksum size
+  private boolean corruptChecksumOk; // if need to verify checksum
+  private boolean chunkOffsetOK; // if need to send chunk offset
+  private long seqno; // sequence number of packet
+
+  private boolean transferToAllowed = true;
+  private boolean blockReadFully; //set when the whole block is read
+  private boolean verifyChecksum; //if true, check is verified while reading
+  private BlockTransferThrottler throttler;
+  private final String clientTraceFmt; // format of client trace log message
+
+  /**
+   * Minimum buffer used while sending data to clients. Used only if
+   * transferTo() is enabled. 64KB is not that large. It could be larger, but
+   * not sure if there will be much more improvement.
+   */
+  private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+  private volatile ChunkChecksum lastChunkChecksum = null;
+
+  
+  public RaidBlockSender(Block block, long blockLength, long startOffset, long length,
+              boolean corruptChecksumOk, boolean chunkOffsetOK,
+              boolean verifyChecksum, boolean transferToAllowed,
+              DataInputStream metadataIn, InputStreamFactory streamFactory
+              ) throws IOException {
+    this(block, blockLength, startOffset, length,
+        corruptChecksumOk, chunkOffsetOK,
+        verifyChecksum, transferToAllowed,
+        metadataIn, streamFactory, null);
+  }
+
+  public RaidBlockSender(Block block, long blockLength, long startOffset, long length,
+              boolean corruptChecksumOk, boolean chunkOffsetOK,
+              boolean verifyChecksum, boolean transferToAllowed,
+              DataInputStream metadataIn, InputStreamFactory streamFactory,
+              String clientTraceFmt) throws IOException {
+    try {
+      this.block = block;
+      ChunkChecksum chunkChecksum = null;
+      this.chunkOffsetOK = chunkOffsetOK;
+      this.corruptChecksumOk = corruptChecksumOk;
+      this.verifyChecksum = verifyChecksum;
+      this.replicaVisibleLength = blockLength;
+      this.transferToAllowed = transferToAllowed;
+      this.clientTraceFmt = clientTraceFmt;
+
+      if ( !corruptChecksumOk || metadataIn != null) {
+        this.checksumIn = metadataIn;
+
+        // read and handle the common header here. For now just a version
+       BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+       short version = header.getVersion();
+
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + block + " ignoring ...");
+        }
+        checksum = header.getChecksum();
+      } else {
+        LOG.warn("Could not find metadata file for " + block);
+        // This only decides the buffer size. Use BUFFER_SIZE?
+        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+            16 * 1024);
+      }
+
+      /* If bytesPerChecksum is very large, then the metadata file
+       * is mostly corrupted. For now just truncate bytesPerchecksum to
+       * blockLength.
+       */        
+      bytesPerChecksum = checksum.getBytesPerChecksum();
+      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
+        checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+            Math.max((int)replicaVisibleLength, 10*1024*1024));
+        bytesPerChecksum = checksum.getBytesPerChecksum();        
+      }
+      checksumSize = checksum.getChecksumSize();
+
+      if (length < 0) {
+        length = replicaVisibleLength;
+      }
+
+      // end is either last byte on disk or the length for which we have a 
+      // checksum
+      if (chunkChecksum != null) {
+        endOffset = chunkChecksum.getDataLength();
+      } else {
+        endOffset = blockLength;
+      }
+      
+      if (startOffset < 0 || startOffset > endOffset
+          || (length + startOffset) > endOffset) {
+        String msg = " Offset " + startOffset + " and length " + length
+        + " don't match block " + block + " ( blockLen " + endOffset + " )";
+        LOG.warn("sendBlock() : " + msg);
+        throw new IOException(msg);
+      }
+      
+      offset = (startOffset - (startOffset % bytesPerChecksum));
+      if (length >= 0) {
+        // Make sure endOffset points to end of a checksumed chunk.
+        long tmpLen = startOffset + length;
+        if (tmpLen % bytesPerChecksum != 0) {
+          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+        }
+        if (tmpLen < endOffset) {
+          // will use on-disk checksum here since the end is a stable chunk
+          endOffset = tmpLen;
+        } else if (chunkChecksum != null) {
+          //in last chunk which is changing. flag that we need to use in-memory 
+          // checksum 
+          this.lastChunkChecksum = chunkChecksum;
+        }
+      }
+
+      // seek to the right offsets
+      if (offset > 0) {
+        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+        // note blockInStream is seeked when created below
+        if (checksumSkip > 0) {
+          // Should we use seek() for checksum file as well?
+          IOUtils.skipFully(checksumIn, checksumSkip);
+        }
+      }
+      seqno = 0;
+      
+      blockIn = streamFactory.createStream(offset);
+    } catch (IOException ioe) {
+      IOUtils.closeStream(this);
+      IOUtils.closeStream(blockIn);
+      throw ioe;
+    }
+  }
+
+  /**
+   * close opened files.
+   */
+  public void close() throws IOException {
+    IOException ioe = null;
+    // close checksum file
+    if(checksumIn!=null) {
+      try {
+        checksumIn.close();
+      } catch (IOException e) {
+        ioe = e;
+      }
+      checksumIn = null;
+    }
+    // close data file
+    if(blockIn!=null) {
+      try {
+        blockIn.close();
+      } catch (IOException e) {
+        ioe = e;
+      }
+      blockIn = null;
+    }
+    // throw IOException if there is any
+    if(ioe!= null) {
+      throw ioe;
+    }
+  }
+
+  /**
+   * Converts an IOExcpetion (not subclasses) to SocketException.
+   * This is typically done to indicate to upper layers that the error 
+   * was a socket error rather than often more serious exceptions like 
+   * disk errors.
+   */
+  private static IOException ioeToSocketException(IOException ioe) {
+    if (ioe.getClass().equals(IOException.class)) {
+      // "se" could be a new class in stead of SocketException.
+      IOException se = new SocketException("Original Exception : " + ioe);
+      se.initCause(ioe);
+      /* Change the stacktrace so that original trace is not truncated
+       * when printed.*/ 
+      se.setStackTrace(ioe.getStackTrace());
+      return se;
+    }
+    // otherwise just return the same exception.
+    return ioe;
+  }
+
+  /**
+   * Sends upto maxChunks chunks of data.
+   * 
+   * When blockInPosition is >= 0, assumes 'out' is a 
+   * {@link SocketOutputStream} and tries 
+   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
+   * send data (and updates blockInPosition).
+   */
+  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
+                         throws IOException {
+    // Sends multiple chunks in one packet with a single write().
+
+    int len = (int) Math.min(endOffset - offset,
+                             (((long) bytesPerChecksum) * ((long) maxChunks)));
+    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+    int packetLen = len + numChunks*checksumSize + 4;
+    boolean lastDataPacket = offset + len == endOffset && len > 0;
+    pkt.clear();
+
+
+    PacketHeader header = new PacketHeader(
+      packetLen, offset, seqno, (len == 0), len);
+    header.putInBuffer(pkt);
+
+    int checksumOff = pkt.position();
+    int checksumLen = numChunks * checksumSize;
+    byte[] buf = pkt.array();
+    
+    if (checksumSize > 0 && checksumIn != null) {
+      try {
+        checksumIn.readFully(buf, checksumOff, checksumLen);
+      } catch (IOException e) {
+        LOG.warn(" Could not read or failed to veirfy checksum for data" +
+                 " at offset " + offset + " for block " + block + " got : "
+                 + StringUtils.stringifyException(e));
+        IOUtils.closeStream(checksumIn);
+        checksumIn = null;
+        if (corruptChecksumOk) {
+          if (checksumOff < checksumLen) {
+            // Just fill the array with zeros.
+            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
+          }
+        } else {
+          throw e;
+        }
+      }
+
+      // write in progress that we need to use to get last checksum
+      if (lastDataPacket && lastChunkChecksum != null) {
+        int start = checksumOff + checksumLen - checksumSize;
+        byte[] updatedChecksum = lastChunkChecksum.getChecksum();
+        
+        if (updatedChecksum != null) {
+          System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
+        }
+      }
+    }
+    
+    int dataOff = checksumOff + checksumLen;
+    
+    if (blockInPosition < 0) {
+      //normal transfer
+      IOUtils.readFully(blockIn, buf, dataOff, len);
+
+      if (verifyChecksum) {
+        int dOff = dataOff;
+        int cOff = checksumOff;
+        int dLeft = len;
+
+        for (int i=0; i<numChunks; i++) {
+          checksum.reset();
+          int dLen = Math.min(dLeft, bytesPerChecksum);
+          checksum.update(buf, dOff, dLen);
+          if (!checksum.compare(buf, cOff)) {
+            long failedPos = offset + len -dLeft;
+            throw new ChecksumException("Checksum failed at " + 
+                                        failedPos, failedPos);
+          }
+          dLeft -= dLen;
+          dOff += dLen;
+          cOff += checksumSize;
+        }
+      }
+      //writing is done below (mainly to handle IOException)
+    }
+    
+    try {
+      if (blockInPosition >= 0) {
+        //use transferTo(). Checks on out and blockIn are already done. 
+
+        SocketOutputStream sockOut = (SocketOutputStream)out;
+        //first write the packet
+        sockOut.write(buf, 0, dataOff);
+        // no need to flush. since we know out is not a buffered stream. 
+
+        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
+                                blockInPosition, len);
+
+        blockInPosition += len;
+      } else {
+        // normal transfer
+        out.write(buf, 0, dataOff + len);
+      }
+      
+    } catch (IOException e) {
+      /* exception while writing to the client (well, with transferTo(),
+       * it could also be while reading from the local file).
+       */
+      throw ioeToSocketException(e);
+    }
+
+    if (throttler != null) { // rebalancing so throttle
+      throttler.throttle(packetLen);
+    }
+
+    return len;
+  }
+
+  /**
+   * sendBlock() is used to read block and its metadata and stream the data to
+   * either a client or to another datanode. 
+   * 
+   * @param out  stream to which the block is written to
+   * @param baseStream optional. if non-null, <code>out</code> is assumed to 
+   *        be a wrapper over this stream. This enables optimizations for
+   *        sending the data, e.g. 
+   *        {@link SocketOutputStream#transferToFully(FileChannel, 
+   *        long, int)}.
+   * @param throttler for sending data.
+   * @return total bytes reads, including crc.
+   */
+  public long sendBlock(DataOutputStream out, OutputStream baseStream, 
+                 BlockTransferThrottler throttler) throws IOException {
+    if( out == null ) {
+      throw new IOException( "out stream is null" );
+    }
+    this.throttler = throttler;
+
+    long initialOffset = offset;
+    long totalRead = 0;
+    OutputStream streamForSendChunks = out;
+    
+    final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+    try {
+      try {
+        checksum.writeHeader(out);
+        if ( chunkOffsetOK ) {
+          out.writeLong( offset );
+        }
+        out.flush();
+      } catch (IOException e) { //socket error
+        throw ioeToSocketException(e);
+      }
+      
+      int maxChunksPerPacket;
+      int pktSize = PacketHeader.PKT_HEADER_LEN;
+      
+      if (transferToAllowed && !verifyChecksum && 
+          baseStream instanceof SocketOutputStream && 
+          blockIn instanceof FileInputStream) {
+        
+        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+        
+        // blockInPosition also indicates sendChunks() uses transferTo.
+        blockInPosition = fileChannel.position();
+        streamForSendChunks = baseStream;
+        
+        // assure a mininum buffer size.
+        maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
+                                       MIN_BUFFER_WITH_TRANSFERTO)
+                              + bytesPerChecksum - 1)/bytesPerChecksum;
+        
+        // allocate smaller buffer while using transferTo(). 
+        pktSize += checksumSize * maxChunksPerPacket;
+      } else {
+        maxChunksPerPacket = Math.max(1,
+                 (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+      }
+
+      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+
+      while (endOffset > offset) {
+        long len = sendChunks(pktBuf, maxChunksPerPacket, 
+                              streamForSendChunks);
+        offset += len;
+        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+                            checksumSize);
+        seqno++;
+      }
+      try {
+        // send an empty packet to mark the end of the block
+        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
+        out.flush();
+      } catch (IOException e) { //socket error
+        throw ioeToSocketException(e);
+      }
+    } finally {
+      if (clientTraceFmt != null) {
+        final long endTime = System.nanoTime();
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+      }
+      close();
+    }
+
+    blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
+
+    return totalRead;
+  }
+  
+  boolean isBlockReadFully() {
+    return blockReadFully;
+  }
+  
+  public static interface InputStreamFactory {
+    public InputStream createStream(long offset) throws IOException; 
+  }
+  
+  private static class BlockInputStreamFactory implements InputStreamFactory {
+    private final Block block;
+    private final FSDatasetInterface data;
+
+    private BlockInputStreamFactory(Block block, FSDatasetInterface data) {
+      this.block = block;
+      this.data = data;
+    }
+
+    @Override
+    public InputStream createStream(long offset) throws IOException {
+      return data.getBlockInputStream(block, offset);
+    }
+  }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
new file mode 100644
index 0000000..bd16ab7
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
@@ -0,0 +1,653 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.Random;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.datanode.RaidBlockSender;
+import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.RaidUtils;
+
+/**
+ * This class fixes source file blocks using the parity file,
+ * and parity file blocks using the source file.
+ * It periodically fetches the list of corrupt files from the namenode,
+ * and figures out the location of the bad block by reading through
+ * the corrupt file.
+ */
+public class BlockFixer {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.BlockFixer");
+  private java.util.HashMap<String, java.util.Date> history;
+  private int blockFixInterval = 60*1000; // 1min
+  private long numFilesFixed = 0;
+  private Configuration conf;
+  private String xorPrefix;
+  private XOREncoder xorEncoder;
+  private XORDecoder xorDecoder;
+
+  public BlockFixer(Configuration conf) throws IOException {
+    this.conf = conf;
+    history = new java.util.HashMap<String, java.util.Date>();
+    blockFixInterval = conf.getInt("raid.blockfix.interval",
+                                   blockFixInterval);
+    xorPrefix = RaidNode.getDestinationPath(conf).toUri().getPath();
+    int stripeLength = RaidNode.getStripeLength(conf);
+    xorEncoder = new XOREncoder(conf, stripeLength);
+    xorDecoder = new XORDecoder(conf, stripeLength);
+  }
+
+  public long filesFixed() {
+    return numFilesFixed;
+  }
+
+  void fixFile(Path srcPath) throws IOException {
+    if (RaidNode.isParityHarPartFile(srcPath)) {
+      processCorruptParityHarPartFile(srcPath);
+      return;
+    }
+
+    // The corrupted file is a XOR parity file
+    if (isXorParityFile(srcPath)) {
+      processCorruptParityFile(srcPath, xorEncoder);
+      return;
+    }
+
+    // The corrupted file is a source file
+
+    // Do we have a parity file for this file?
+    RaidNode.ParityFilePair ppair = null;
+    Decoder decoder = null;
+    Path destPath = null;
+    try {
+      destPath = RaidNode.getDestinationPath(conf);
+      ppair = RaidNode.getParityFile(destPath, srcPath, conf);
+      if (ppair != null) {
+        decoder = xorDecoder;
+      }
+    } catch (FileNotFoundException e) {
+    }
+    // If we have a parity file, process the file and fix it.
+    if (ppair != null) {
+      processCorruptFile(srcPath, destPath, decoder);
+    }
+  }
+
+  /**
+   * We maintain history of fixed files because a fixed file may appear in
+   * the list of corrupt files if we loop around too quickly.
+   * This function removes the old items in the history so that we can
+   * recognize files that have actually become corrupt since being fixed.
+   */
+  void purgeHistory() {
+    // Default history interval is 1 hour.
+    long historyInterval = conf.getLong(
+                             "raid.blockfix.history.interval", 3600*1000);
+    java.util.Date cutOff = new java.util.Date(
+                                   System.currentTimeMillis()-historyInterval);
+    List<String> toRemove = new java.util.ArrayList<String>();
+
+    for (String key: history.keySet()) {
+      java.util.Date item = history.get(key);
+      if (item.before(cutOff)) {
+        toRemove.add(key);
+      }
+    }
+    for (String key: toRemove) {
+      LOG.info("Removing " + key + " from history");
+      history.remove(key);
+    }
+  }
+
+  /**
+   * @return A list of corrupt files as obtained from the namenode
+   */
+  List<Path> getCorruptFiles() throws IOException {
+    DistributedFileSystem dfs = getDFS(new Path("/"));
+
+    // TODO: need an RPC here.
+    // FileStatus[] files =  dfs.getClient().namenode.getCorruptFiles();
+    FileStatus[] files = new FileStatus[0];
+    List<Path> corruptFiles = new LinkedList<Path>();
+    for (FileStatus f: files) {
+      Path p = f.getPath();
+      if (!history.containsKey(p.toString())) {
+        corruptFiles.add(p);
+      }
+    }
+    RaidUtils.filterTrash(conf, corruptFiles);
+    return corruptFiles;
+  }
+
+  /**
+   * Sorts source files ahead of parity files.
+   */
+  void sortCorruptFiles(List<Path> files) {
+    // TODO: We should first fix the files that lose more blocks
+    Comparator<Path> comp = new Comparator<Path>() {
+      public int compare(Path p1, Path p2) {
+        if (isXorParityFile(p2)) {
+          // If p2 is a parity file, p1 is smaller.
+          return -1;
+        }
+        if (isXorParityFile(p1)) {
+          // If p1 is a parity file, p2 is smaller.
+          return 1;
+        }
+        // If both are source files, they are equal.
+        return 0;
+      }
+    };
+    Collections.sort(files, comp);
+  }
+
+
+  /**
+   * Returns a DistributedFileSystem hosting the path supplied.
+   */
+  private DistributedFileSystem getDFS(Path p) throws IOException {
+    return (DistributedFileSystem) p.getFileSystem(conf);
+  }
+
+  /**
+   * Reads through a corrupt source file fixing corrupt blocks on the way.
+   * @param srcPath Path identifying the corrupt file.
+   * @throws IOException
+   */
+  void processCorruptFile(Path srcPath, Path destPath, Decoder decoder)
+      throws IOException {
+    LOG.info("Processing corrupt file " + srcPath);
+
+    DistributedFileSystem srcFs = getDFS(srcPath);
+    FileStatus srcStat = srcFs.getFileStatus(srcPath);
+    long blockSize = srcStat.getBlockSize();
+    long srcFileSize = srcStat.getLen();
+    String uriPath = srcPath.toUri().getPath();
+
+    int numBlocksFixed = 0;
+    List<LocatedBlock> corrupt =
+      RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize);
+    for (LocatedBlock lb: corrupt) {
+      Block corruptBlock = lb.getBlock();
+      long corruptOffset = lb.getStartOffset();
+
+      LOG.info("Found corrupt block " + corruptBlock +
+          ", offset " + corruptOffset);
+
+      final long blockContentsSize =
+        Math.min(blockSize, srcFileSize - corruptOffset);
+      File localBlockFile =
+        File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+      localBlockFile.deleteOnExit();
+
+      try {
+        RaidNode.ParityFilePair parityPair = RaidNode.getParityFile(
+            destPath, srcPath, conf);
+
+        decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
+          parityPair.getPath(), blockSize, corruptOffset, localBlockFile,
+          blockContentsSize);
+
+        // We have a the contents of the block, send them.
+        DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+        computeMetdataAndSendFixedBlock(
+          datanode, localBlockFile, lb, blockContentsSize);
+        numBlocksFixed++;
+
+        LOG.info("Adding " + srcPath + " to history");
+        history.put(srcPath.toString(), new java.util.Date());
+      } finally {
+        localBlockFile.delete();
+      }
+    }
+    LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath);
+    numFilesFixed++;
+  }
+
+  boolean isXorParityFile(Path p) {
+    String pathStr = p.toUri().getPath();
+    if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+      return false;
+    }
+    return pathStr.startsWith(xorPrefix);
+  }
+
+  /**
+   * Reads through a parity file, fixing corrupt blocks on the way.
+   * This function uses the corresponding source file to regenerate parity
+   * file blocks.
+   */
+  void processCorruptParityFile(Path parityPath, Encoder encoder)
+      throws IOException {
+    LOG.info("Processing corrupt file " + parityPath);
+    Path srcPath = sourcePathFromParityPath(parityPath);
+    if (srcPath == null) {
+      LOG.warn("Unusable parity file " + parityPath);
+      return;
+    }
+
+    DistributedFileSystem parityFs = getDFS(parityPath);
+    FileStatus parityStat = parityFs.getFileStatus(parityPath);
+    long blockSize = parityStat.getBlockSize();
+    long parityFileSize = parityStat.getLen();
+    FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath);
+    long srcFileSize = srcStat.getLen();
+
+    // Check timestamp.
+    if (srcStat.getModificationTime() != parityStat.getModificationTime()) {
+      LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath +
+               ", moving on...");
+      return;
+    }
+
+    String uriPath = parityPath.toUri().getPath();
+    int numBlocksFixed = 0;
+    List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(
+      parityFs, uriPath, 0, parityFileSize);
+    for (LocatedBlock lb: corrupt) {
+      Block corruptBlock = lb.getBlock();
+      long corruptOffset = lb.getStartOffset();
+
+      LOG.info("Found corrupt block " + corruptBlock +
+          ", offset " + corruptOffset);
+
+      File localBlockFile =
+        File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+      localBlockFile.deleteOnExit();
+
+      try {
+        encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize,
+            blockSize, parityPath, corruptOffset, localBlockFile);
+        // We have a the contents of the block, send them.
+        DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+        computeMetdataAndSendFixedBlock(
+          datanode, localBlockFile, lb, blockSize);
+
+        numBlocksFixed++;
+        LOG.info("Adding " + parityPath + " to history");
+        history.put(parityPath.toString(), new java.util.Date());
+      } finally {
+        localBlockFile.delete();
+      }
+    }
+    LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath);
+    numFilesFixed++;
+  }
+
+  /**
+   * Reads through a parity HAR part file, fixing corrupt blocks on the way.
+   * A HAR block can contain many file blocks, as long as the HAR part file
+   * block size is a multiple of the file block size.
+   */
+  void processCorruptParityHarPartFile(Path partFile) throws IOException {
+    LOG.info("Processing corrupt file " + partFile);
+    // Get some basic information.
+    DistributedFileSystem dfs = getDFS(partFile);
+    FileStatus partFileStat = dfs.getFileStatus(partFile);
+    long partFileSize = partFileStat.getLen();
+    long partFileBlockSize = partFileStat.getBlockSize();
+    LOG.info(partFile + " has block size " + partFileBlockSize);
+
+    // Find the path to the index file.
+    // Parity file HARs are only one level deep, so the index files is at the
+    // same level as the part file.
+    String harDirectory = partFile.toUri().getPath(); // Temporarily.
+    harDirectory =
+      harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR));
+    Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName);
+    FileStatus indexStat = dfs.getFileStatus(indexFile);
+    // Parses through the HAR index file.
+    HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen());
+
+    String uriPath = partFile.toUri().getPath();
+    int numBlocksFixed = 0;
+    List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(
+      dfs, uriPath, 0, partFileSize);
+    for (LocatedBlock lb: corrupt) {
+      Block corruptBlock = lb.getBlock();
+      long corruptOffset = lb.getStartOffset();
+
+      File localBlockFile =
+        File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+      localBlockFile.deleteOnExit();
+      processCorruptParityHarPartBlock(
+        dfs, partFile, corruptBlock, corruptOffset, partFileStat, harIndex,
+        localBlockFile);
+      // Now we have recovered the part file block locally, send it.
+      try {
+        DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+        computeMetdataAndSendFixedBlock(datanode, localBlockFile,
+          lb, localBlockFile.length());
+        numBlocksFixed++;
+
+        LOG.info("Adding " + partFile + " to history");
+        history.put(partFile.toString(), new java.util.Date());
+      } finally {
+        localBlockFile.delete();
+      }
+    }
+    LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile);
+    numFilesFixed++;
+  }
+
+  /**
+   * This fixes a single part file block by recovering in sequence each
+   * parity block in the part file block.
+   */
+  private void processCorruptParityHarPartBlock(
+    FileSystem dfs, Path partFile, Block corruptBlock, long corruptOffset,
+    FileStatus partFileStat, HarIndex harIndex, File localBlockFile)
+    throws IOException {
+    String partName = partFile.toUri().getPath(); // Temporarily.
+    partName = partName.substring(1 + partName.lastIndexOf(Path.SEPARATOR));
+
+    OutputStream out = new FileOutputStream(localBlockFile);
+
+    try {
+      // A HAR part file block could map to several parity files. We need to
+      // use all of them to recover this block.
+      final long corruptEnd = Math.min(corruptOffset + partFileStat.getBlockSize(),
+                                      partFileStat.getLen());
+      for (long offset = corruptOffset; offset < corruptEnd; ) {
+        HarIndex.IndexEntry entry = harIndex.findEntry(partName, offset);
+        if (entry == null) {
+          String msg = "Corrupt index file has no matching index entry for " +
+            partName + ":" + offset;
+          LOG.warn(msg);
+          throw new IOException(msg);
+        }
+        Path parityFile = new Path(entry.fileName);
+        Encoder encoder;
+        if (isXorParityFile(parityFile)) {
+          encoder = xorEncoder;
+        } else {
+          String msg = "Could not figure out parity file correctly";
+          LOG.warn(msg);
+          throw new IOException(msg);
+        }
+        Path srcFile = sourcePathFromParityPath(parityFile);
+        FileStatus srcStat = dfs.getFileStatus(srcFile);
+        if (srcStat.getModificationTime() != entry.mtime) {
+          String msg = "Modification times of " + parityFile + " and " + srcFile +
+            " do not match.";
+          LOG.warn(msg);
+          throw new IOException(msg);
+        }
+        long corruptOffsetInParity = offset - entry.startOffset;
+        LOG.info(partFile + ":" + offset + " maps to " +
+                 parityFile + ":" + corruptOffsetInParity +
+                 " and will be recovered from " + srcFile);
+        encoder.recoverParityBlockToStream(dfs, srcFile, srcStat.getLen(),
+          srcStat.getBlockSize(), parityFile, corruptOffsetInParity, out);
+        // Finished recovery of one parity block. Since a parity block has the
+        // same size as a source block, we can move offset by source block size.
+        offset += srcStat.getBlockSize();
+        LOG.info("Recovered " + srcStat.getBlockSize() + " part file bytes ");
+        if (offset > corruptEnd) {
+          String msg =
+            "Recovered block spills across part file blocks. Cannot continue...";
+          throw new IOException(msg);
+        }
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Choose a datanode (hostname:portnumber). The datanode is chosen at
+   * random from the live datanodes.
+   * @param locationsToAvoid locations to avoid.
+   * @return A string in the format name:port.
+   * @throws IOException
+   */
+  private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid)
+    throws IOException {
+    DistributedFileSystem dfs = getDFS(new Path("/"));
+    DatanodeInfo[] live = dfs.getClient().datanodeReport(
+                                                 DatanodeReportType.LIVE);
+    LOG.info("Choosing a datanode from " + live.length +
+      " live nodes while avoiding " + locationsToAvoid.length);
+    Random rand = new Random();
+    DatanodeInfo chosen = null;
+    int maxAttempts = 1000;
+    for (int i = 0; i < maxAttempts && chosen == null; i++) {
+      int idx = rand.nextInt(live.length);
+      chosen = live[idx];
+      for (DatanodeInfo avoid: locationsToAvoid) {
+        if (chosen.name.equals(avoid.name)) {
+          LOG.info("Avoiding " + avoid.name);
+          chosen = null;
+          break;
+        }
+      }
+    }
+    if (chosen == null) {
+      throw new IOException("Could not choose datanode");
+    }
+    LOG.info("Choosing datanode " + chosen.name);
+    return chosen;
+  }
+
+  /**
+   * Reads data from the data stream provided and computes metadata.
+   */
+  static DataInputStream computeMetadata(
+    Configuration conf, InputStream dataStream) throws IOException {
+    ByteArrayOutputStream mdOutBase = new ByteArrayOutputStream(1024*1024);
+    DataOutputStream mdOut = new DataOutputStream(mdOutBase);
+
+    // First, write out the version.
+    mdOut.writeShort(FSDataset.METADATA_VERSION);
+
+    // Create a summer and write out its header.
+    int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+    DataChecksum sum = DataChecksum.newDataChecksum(
+                        DataChecksum.CHECKSUM_CRC32,
+                        bytesPerChecksum);
+    sum.writeHeader(mdOut);
+
+    // Buffer to read in a chunk of data.
+    byte[] buf = new byte[bytesPerChecksum];
+    // Buffer to store the checksum bytes.
+    byte[] chk = new byte[sum.getChecksumSize()];
+
+    // Read data till we reach the end of the input stream.
+    int bytesSinceFlush = 0;
+    while (true) {
+      // Read some bytes.
+      int bytesRead = dataStream.read(
+        buf, bytesSinceFlush, bytesPerChecksum-bytesSinceFlush);
+      if (bytesRead == -1) {
+        if (bytesSinceFlush > 0) {
+          boolean reset = true;
+          sum.writeValue(chk, 0, reset); // This also resets the sum.
+          // Write the checksum to the stream.
+          mdOut.write(chk, 0, chk.length);
+          bytesSinceFlush = 0;
+        }
+        break;
+      }
+      // Update the checksum.
+      sum.update(buf, bytesSinceFlush, bytesRead);
+      bytesSinceFlush += bytesRead;
+
+      // Flush the checksum if necessary.
+      if (bytesSinceFlush == bytesPerChecksum) {
+        boolean reset = true;
+        sum.writeValue(chk, 0, reset); // This also resets the sum.
+        // Write the checksum to the stream.
+        mdOut.write(chk, 0, chk.length);
+        bytesSinceFlush = 0;
+      }
+    }
+
+    byte[] mdBytes = mdOutBase.toByteArray();
+    return new DataInputStream(new ByteArrayInputStream(mdBytes));
+  }
+
+  private void computeMetdataAndSendFixedBlock(
+    DatanodeInfo datanode,
+    File localBlockFile, LocatedBlock block, long blockSize
+    ) throws IOException {
+
+    LOG.info("Computing metdata");
+    InputStream blockContents = null;
+    DataInputStream blockMetadata = null;
+    try {
+      blockContents = new FileInputStream(localBlockFile);
+      blockMetadata = computeMetadata(conf, blockContents);
+      blockContents.close();
+      // Reopen
+      blockContents = new FileInputStream(localBlockFile);
+      sendFixedBlock(datanode, blockContents, blockMetadata, block, blockSize);
+    } finally {
+      if (blockContents != null) {
+        blockContents.close();
+        blockContents = null;
+      }
+      if (blockMetadata != null) {
+        blockMetadata.close();
+        blockMetadata = null;
+      }
+    }
+  }
+
+  /**
+   * Send a generated block to a datanode.
+   * @param datanode Chosen datanode name in host:port form.
+   * @param blockContents Stream with the block contents.
+   * @param corruptBlock Block identifying the block to be sent.
+   * @param blockSize size of the block.
+   * @throws IOException
+   */
+  private void sendFixedBlock(
+    DatanodeInfo datanode,
+    final InputStream blockContents, DataInputStream metadataIn,
+    LocatedBlock block, long blockSize
+    ) throws IOException {
+    InetSocketAddress target = NetUtils.createSocketAddr(datanode.name);
+    Socket sock = SocketChannel.open().socket();
+
+    int readTimeout =  conf.getInt(
+        "raid.blockfix.read.timeout", HdfsConstants.READ_TIMEOUT);
+    NetUtils.connect(sock, target, readTimeout);
+    sock.setSoTimeout(readTimeout);
+
+    int writeTimeout = conf.getInt(
+        "raid.blockfix.write.timeout", HdfsConstants.WRITE_TIMEOUT);
+
+    OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
+    DataOutputStream out = new DataOutputStream(
+        new BufferedOutputStream(baseStream, FSConstants.SMALL_BUFFER_SIZE));
+
+    boolean corruptChecksumOk = false;
+    boolean chunkOffsetOK = false;
+    boolean verifyChecksum = true;
+    boolean transferToAllowed = false;
+
+    try {
+      LOG.info("Sending block " + block.getBlock() +
+          " from " + sock.getLocalSocketAddress().toString() +
+          " to " + sock.getRemoteSocketAddress().toString() +
+          " " + blockSize + " bytes");
+      RaidBlockSender blockSender = new RaidBlockSender(
+          block.getBlock(), blockSize, 0, blockSize,
+          corruptChecksumOk, chunkOffsetOK, verifyChecksum, transferToAllowed,
+          metadataIn, new RaidBlockSender.InputStreamFactory() {
+          @Override
+          public InputStream createStream(long offset) throws IOException {
+            // we are passing 0 as the offset above, so we can safely ignore
+            // the offset passed
+            return blockContents;
+          }
+        });
+
+      DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
+      DataTransferProtocol.Sender.opWriteBlock(
+        out, block.getBlock(), 1,
+        DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE,
+        0, blockSize, 0, "", null, nodes, block.getBlockToken());
+      blockSender.sendBlock(out, baseStream, null);
+
+      LOG.info("Sent block " + block.getBlock() + " to " + datanode.name);
+    } finally {
+      out.close();
+    }
+  }
+
+  Path sourcePathFromParityPath(Path parityPath) {
+    String parityPathStr = parityPath.toUri().getPath();
+    if (parityPathStr.startsWith(xorPrefix)) {
+      // Remove the prefix to get the source file.
+      String src = parityPathStr.replaceFirst(xorPrefix, "");
+      return new Path(src);
+    }
+    return null;
+  }
+}
+
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
new file mode 100644
index 0000000..42233d7
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.List;
+import java.util.LinkedList;
+
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents the contents of a HAR Index file. The HAR is assumed to be
+ * comprising of RAID parity files only and no directories.
+ */
+public class HarIndex {
+  public static final String indexFileName = "_index";
+  private List<IndexEntry> entries = new LinkedList<IndexEntry>();
+
+  /**
+   * Represents information in a single line of the HAR index file.
+   */
+  public static class IndexEntry {
+    String fileName; // Name of the file in the part file.
+    long startOffset; // Start offset within the part file.
+    long length; // Length of this file within the part file.
+    long mtime; // Modification time of the file.
+    String partFileName; // Name of the part file.
+
+    IndexEntry(String fileName, long startOffset, long length,
+                long mtime, String partFileName) {
+      this.fileName = fileName;
+      this.startOffset = startOffset;
+      this.length = length;
+      this.mtime = mtime;
+      this.partFileName = partFileName;
+    }
+
+    public String toString() {
+      return "fileName=" + fileName +
+             ", startOffset=" + startOffset +
+             ", length=" + length +
+             ", mtime=" + mtime +
+             ", partFileName=" + partFileName;
+    }
+  }
+
+  /**
+   * Constructor that reads the contents of the index file.
+   * @param in An input stream to the index file.
+   * @param max The size of the index file.
+   * @throws IOException
+   */
+  public HarIndex(InputStream in, long max) throws IOException {
+    LineReader lineReader = new LineReader(in);
+    Text text = new Text();
+    long nread = 0;
+    while (nread < max) {
+      int n = lineReader.readLine(text);
+      nread += n;
+      String line = text.toString();
+      try {
+        parseLine(line);
+      } catch (UnsupportedEncodingException e) {
+        throw new IOException("UnsupportedEncodingException after reading " +
+                              nread + "bytes");
+      }
+    }
+  }
+
+  /**
+   * Parses each line and extracts relevant information.
+   * @param line
+   * @throws UnsupportedEncodingException
+   */
+  void parseLine(String line) throws UnsupportedEncodingException {
+    String[] splits = line.split(" ");
+
+    boolean isDir = "dir".equals(splits[1]) ? true: false;
+    if (!isDir && splits.length >= 6) {
+      String name = URLDecoder.decode(splits[0], "UTF-8");
+      String partName = URLDecoder.decode(splits[2], "UTF-8");
+      long startIndex = Long.parseLong(splits[3]);
+      long length = Long.parseLong(splits[4]);
+      String[] newsplits = URLDecoder.decode(splits[5],"UTF-8").split(" ");
+      if (newsplits != null && newsplits.length >= 5) {
+        long mtime = Long.parseLong(newsplits[0]);
+        IndexEntry entry = new IndexEntry(
+          name, startIndex, length, mtime, partName);
+        entries.add(entry);
+      }
+    }
+  }
+
+  /**
+   * Finds the index entry corresponding to a HAR partFile at an offset.
+   * @param partName The name of the part file (part-*).
+   * @param partFileOffset The offset into the part file.
+   * @return The entry corresponding to partName:partFileOffset.
+   */
+  public IndexEntry findEntry(String partName, long partFileOffset) {
+    for (IndexEntry e: entries) {
+      boolean nameMatch = partName.equals(e.partFileName);
+      boolean inRange = (partFileOffset >= e.startOffset) &&
+                        (partFileOffset < e.startOffset + e.length);
+      if (nameMatch && inRange) {
+        return e;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Finds the index entry corresponding to a file in the archive
+   */
+  public IndexEntry findEntryByFileName(String fileName) {
+    for (IndexEntry e: entries) {
+      if (fileName.equals(e.fileName)) {
+        return e;
+      }
+    }
+    return null;
+  }
+
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index 90f8f3e..06b2686 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -30,6 +30,8 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.lang.Thread;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -79,6 +81,9 @@
   public static final String DEFAULT_RAID_LOCATION = "/raid";
   public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
   public static final String HAR_SUFFIX = "_raid.har";
+  public static final Pattern PARITY_HAR_PARTFILE_PATTERN =
+    Pattern.compile(".*" + HAR_SUFFIX + "/part-.*");
+
   
   /** RPC server */
   private Server server;
@@ -1149,6 +1154,11 @@
     return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
   }
 
+  static boolean isParityHarPartFile(Path p) {
+    Matcher m = PARITY_HAR_PARTFILE_PATTERN.matcher(p.toUri().getPath());
+    return m.matches();
+  }
+
   /**
    * Returns current time.
    */
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
index d93844a..28dbee2 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
@@ -48,9 +48,13 @@
  * A {@link RaidShell} that allows browsing configured raid policies.
  */
 public class RaidShell extends Configured implements Tool {
+  static {
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
   public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.RaidShell");
   public RaidProtocol raidnode;
-  final RaidProtocol rpcRaidnode;
+  RaidProtocol rpcRaidnode;
   private UserGroupInformation ugi;
   volatile boolean clientRunning = true;
   private Configuration conf;
@@ -62,28 +66,21 @@
    * configuration options.
    * @throws IOException
    */
-  public RaidShell() throws IOException {
-    this(new Configuration());
-  }
-
-  /**
-   * The RaidShell connects to the specified RaidNode and performs basic
-   * configuration options.
-   * @param conf The Hadoop configuration
-   * @throws IOException
-   */
   public RaidShell(Configuration conf) throws IOException {
-    this(conf, RaidNode.getAddress(conf));
+    super(conf);
+    this.conf = conf;
   }
 
-  public RaidShell(Configuration conf, InetSocketAddress address) throws IOException {
-    super(conf);
+  void initializeRpc(Configuration conf, InetSocketAddress address) throws IOException {
     this.ugi = UserGroupInformation.getCurrentUser();
-
     this.rpcRaidnode = createRPCRaidnode(address, conf, ugi);
     this.raidnode = createRaidnode(rpcRaidnode);
   }
-  
+
+  void initializeLocal(Configuration conf) throws IOException {
+    this.ugi = UserGroupInformation.getCurrentUser();
+  }
+
   public static RaidProtocol createRaidnode(Configuration conf) throws IOException {
     return createRaidnode(RaidNode.getAddress(conf), conf);
   }
@@ -92,7 +89,6 @@
       Configuration conf) throws IOException {
     return createRaidnode(createRPCRaidnode(raidNodeAddr, conf,
       UserGroupInformation.getCurrentUser()));
-
   }
 
   private static RaidProtocol createRPCRaidnode(InetSocketAddress raidNodeAddr,
@@ -153,13 +149,17 @@
       System.err.println("Usage: java RaidShell" + 
                          " [-showConfig]"); 
     } else if ("-recover".equals(cmd)) {
-      System.err.println("Usage: java CronShell" +
+      System.err.println("Usage: java RaidShell" +
                          " [-recover srcPath1 corruptOffset]");
+    } else if ("-recoverBlocks".equals(cmd)) {
+      System.err.println("Usage: java RaidShell" +
+                         " [-recoverBlocks path1 path2...]");
     } else {
       System.err.println("Usage: java RaidShell");
       System.err.println("           [-showConfig ]");
       System.err.println("           [-help [cmd]]");
       System.err.println("           [-recover srcPath1 corruptOffset]");
+      System.err.println("           [-recoverBlocks path1 path2...]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
     }
@@ -195,9 +195,15 @@
 
     try {
       if ("-showConfig".equals(cmd)) {
+        initializeRpc(conf, RaidNode.getAddress(conf));
         exitCode = showConfig(cmd, argv, i);
       } else if ("-recover".equals(cmd)) {
+        initializeRpc(conf, RaidNode.getAddress(conf));
         exitCode = recoverAndPrint(cmd, argv, i);
+      } else if ("-recoverBlocks".equals(cmd)) {
+        initializeLocal(conf);
+        recoverBlocks(argv, i);
+        exitCode = 0;
       } else {
         exitCode = -1;
         System.err.println(cmd.substring(1) + ": Unknown command");
@@ -278,29 +284,37 @@
     }
     return exitCode;
   }
-  
+
+  public void recoverBlocks(String[] args, int startIndex)
+    throws IOException {
+    LOG.info("Recovering blocks for " + (args.length - startIndex) + " files");
+    BlockFixer fixer = new BlockFixer(conf);
+    for (int i = startIndex; i < args.length; i++) {
+      String path = args[i];
+      fixer.fixFile(new Path(path));
+    }
+  }
+
   /**
    * main() has some simple utility methods
    */
   public static void main(String argv[]) throws Exception {
     RaidShell shell = null;
     try {
-      shell = new RaidShell();
+      shell = new RaidShell(new Configuration());
+      int res = ToolRunner.run(shell, argv);
+      System.exit(res);
     } catch (RPC.VersionMismatch v) {
       System.err.println("Version Mismatch between client and server" +
                          "... command aborted.");
       System.exit(-1);
     } catch (IOException e) {
-      System.err.println("Bad connection to RaidNode. command aborted.");
+      System.err.
+        println("Bad connection to RaidNode or NameNode. command aborted.");
+      System.err.println(e.getMessage());
       System.exit(-1);
-    }
-
-    int res;
-    try {
-      res = ToolRunner.run(shell, argv);
     } finally {
       shell.close();
     }
-    System.exit(res);
   }
 }
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
index 6818168..bde24fe 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
@@ -22,12 +22,32 @@
 import java.io.OutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.IOUtils;
 
 public class RaidUtils {
+  /**
+   * Removes files matching the trash file pattern.
+   */
+  public static void filterTrash(Configuration conf, List<Path> files) {
+    // Remove files under Trash.
+    String trashPattern = conf.get("raid.blockfixer.trash.pattern",
+                                   "^/user/.*/\\.Trash.*");
+    for (Iterator<Path> it = files.iterator(); it.hasNext(); ) {
+      String pathStr = it.next().toString();
+      if (Pattern.matches(trashPattern, pathStr)) {
+        it.remove();
+      }
+    }
+  }
+
   public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK)
     throws IOException {
     int toRead = buf.length;
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index 3da2f15..5d7a60a 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -142,7 +142,8 @@
   private LocatedBlocks getBlockLocations(Path file, long length)
     throws IOException {
     DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
-    return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length);
+    return RaidDFSUtil.getBlockLocations(
+      dfs, file.toUri().getPath(), 0, length);
   }
 
   private LocatedBlocks getBlockLocations(Path file)
@@ -190,8 +191,8 @@
     while (true) {
       LocatedBlocks locations = null;
       DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
-      locations = dfs.getClient().namenode.getBlockLocations(
-                    file.toString(), 0, parityStat.getLen());
+      locations = RaidDFSUtil.getBlockLocations(
+        dfs, file.toUri().getPath(), 0, parityStat.getLen());
       if (!locations.isUnderConstruction()) {
         break;
       }
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java
new file mode 100644
index 0000000..8ece5a6
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestHarIndexParser extends TestCase {
+  final static Log LOG = LogFactory.getLog(TestHarIndexParser.class);
+  File indexFile = null;
+
+  protected void setUp() throws FileNotFoundException, IOException {
+    LOG.info("TestHarIndexParser.setUp()");
+    indexFile = File.createTempFile("harindex", ".tmp");
+    indexFile.deleteOnExit();
+    OutputStreamWriter out = new OutputStreamWriter(
+      new FileOutputStream(indexFile),
+      Charset.forName("UTF-8"));
+    out.write("%2F dir 1282018162460+0+493+hadoop+hadoop 0 0 f1 f2 f3 f4\n");
+    out.write("%2Ff1 file part-0 0 1024 1282018141145+1282018140822+420+hadoop+hadoop\n");
+    out.write("%2Ff3 file part-0 2048 1024 1282018148590+1282018148255+420+hadoop+hadoop\n");
+    out.write("%2Ff2 file part-0 1024 1024 1282018144198+1282018143852+420+hadoop+hadoop\n");
+    out.write("%2Ff4 file part-1 0 1024000 1282018162959+1282018162460+420+hadoop+hadoop\n");
+    out.flush();
+    out.close();
+  }
+
+  protected void tearDown() {
+    LOG.info("TestHarIndexParser.tearDown()");
+    if (indexFile != null)
+      indexFile.delete();
+  }
+
+  public void testHarIndexParser()
+    throws UnsupportedEncodingException, IOException {
+    LOG.info("testHarIndexParser started.");
+    InputStream in = new FileInputStream(indexFile);
+    long size = indexFile.length();
+    HarIndex parser = new HarIndex(in, size);
+
+    HarIndex.IndexEntry entry = parser.findEntry("part-0", 2100);
+    assertEquals("/f3", entry.fileName);
+
+    LOG.info("testHarIndexParser finished.");
+  }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
index 1bb197a..e444711 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
@@ -278,18 +278,6 @@
       cnode = RaidNode.createRaidNode(null, conf);
       int times = 10;
 
-      while (times-- > 0) {
-        try {
-          shell = new RaidShell(conf, cnode.getListenerAddress());
-        } catch (Exception e) {
-          LOG.info("doTestPathFilter unable to connect to " + 
-              cnode.getListenerAddress() + " retrying....");
-          Thread.sleep(1000);
-          continue;
-        }
-        break;
-      }
-      LOG.info("doTestPathFilter created RaidShell.");
       FileStatus[] listPaths = null;
 
       // wait till file is raided
@@ -322,6 +310,8 @@
       Thread.sleep(20000); // Without this wait, unit test crashes
 
       // check for error at beginning of file
+      shell = new RaidShell(conf);
+      shell.initializeRpc(conf, cnode.getListenerAddress());
       if (numBlock >= 1) {
         LOG.info("doTestPathFilter Check error at beginning of file.");
         simulateError(shell, fileSys, file1, crc1, 0);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
new file mode 100644
index 0000000..d75b50b
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.raid.RaidNode;
+
+
+public class TestRaidShell extends TestCase {
+  final static Log LOG = LogFactory.getLog(
+                            "org.apache.hadoop.raid.TestRaidShell");
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static String CONFIG_FILE = new File(TEST_DIR,
+      "test-raid.xml").getAbsolutePath();
+  final static long RELOAD_INTERVAL = 1000;
+  final static int NUM_DATANODES = 3;
+  Configuration conf;
+  String namenode = null;
+  MiniDFSCluster dfs = null;
+  String hftp = null;
+  FileSystem fileSys = null;
+  RaidNode cnode = null;
+  Random rand = new Random();
+
+  /**
+   * Create a file with three stripes, corrupt a block each in two stripes,
+   * and wait for the the file to be fixed.
+   */
+  public void testBlockFix() throws Exception {
+    LOG.info("Test testBlockFix started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1);
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    Path parityFile = new Path(destPath, "file1");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 7, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    LOG.info("Test testBlockFix created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    // the RaidNode does the raiding inline (instead of submitting to map/reduce)
+    conf.setBoolean("fs.raidnode.local", true);
+    cnode = RaidNode.createRaidNode(null, localConf);
+
+    try {
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      cnode.stop();
+      cnode.join();
+      cnode = null;
+
+      FileStatus srcStat = fileSys.getFileStatus(file1);
+      LocatedBlocks locations = RaidDFSUtil.getBlockLocations(
+        (DistributedFileSystem) fileSys, file1.toUri().getPath(),
+        0, srcStat.getLen());
+
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+
+      // Corrupt blocks in different stripes. We can fix them.
+      int[] corruptBlockIdxs = new int[]{0, 4, 6};
+      for (int idx: corruptBlockIdxs) {
+        LOG.info("Corrupting block " + locations.get(idx).getBlock());
+        TestRaidDfs.corruptBlock(file1, locations.get(idx).getBlock(),
+                                  NUM_DATANODES, false); // corrupt block
+        long offset = idx * blockSize;
+        try {
+          readAndDiscard(fileSys, file1, offset, blockSize);
+          fail("Expected checksumexception not thrown");
+        } catch (ChecksumException e) {
+          LOG.info("Block at offset " + offset + " got expected exception");
+        }
+      }
+
+      String fileUriPath = file1.toUri().getPath();
+      waitForCorruptBlocks(corruptBlockIdxs.length, dfs, file1);
+
+      // Create RaidShell and fix the file.
+      RaidShell shell = new RaidShell(conf);
+      String[] args = new String[2];
+      args[0] = "-recoverBlocks";
+      args[1] = file1.toUri().getPath();
+      ToolRunner.run(shell, args);
+
+      waitForCorruptBlocks(0, dfs, file1);
+
+      assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+
+      // Now corrupt and fix the parity file.
+      FileStatus parityStat = fileSys.getFileStatus(parityFile);
+      long parityCrc = getCRC(fileSys, parityFile);
+      locations = RaidDFSUtil.getBlockLocations(
+        dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
+      TestRaidDfs.corruptBlock(parityFile, locations.get(0).getBlock(),
+                                NUM_DATANODES, false); // corrupt block
+      try {
+        readAndDiscard(fileSys, parityFile, 0, blockSize);
+        fail("Expected checksumexception not thrown");
+      } catch (ChecksumException e) {
+        LOG.info("Parity Block at offset 0 got expected exception");
+      }
+      waitForCorruptBlocks(1, dfs, parityFile);
+
+      args[1] = parityFile.toUri().getPath();
+      ToolRunner.run(shell, args);
+
+      waitForCorruptBlocks(0, dfs, file1);
+      assertEquals(parityCrc, getCRC(fileSys, parityFile));
+
+    } catch (Exception e) {
+      LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+    LOG.info("Test testBlockFix completed.");
+  }
+
+  private void waitForCorruptBlocks(
+    int numCorruptBlocks, DistributedFileSystem dfs, Path file)
+    throws Exception {
+    String path = file.toUri().getPath();
+    FileStatus stat = dfs.getFileStatus(file);
+    long start = System.currentTimeMillis();
+    long actual = 0;
+    do {
+      actual = RaidDFSUtil.corruptBlocksInFile(
+          dfs, path, 0, stat.getLen()).size();
+      if (actual == numCorruptBlocks) break;
+      if (System.currentTimeMillis() - start > 120000) break;
+      LOG.info("Waiting for " + numCorruptBlocks + " corrupt blocks in " +
+        path + ", found " + actual);
+      Thread.sleep(1000);
+    } while (true);
+    assertEquals(numCorruptBlocks, actual);
+  }
+
+  private static DistributedFileSystem getDFS(
+        Configuration conf, FileSystem dfs) throws IOException {
+    Configuration clientConf = new Configuration(conf);
+    clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+    clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    URI dfsUri = dfs.getUri();
+    FileSystem.closeAll();
+    return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf);
+  }
+
+  private void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+
+    conf.set("raid.config.file", CONFIG_FILE);
+    conf.setBoolean("raid.config.reload", true);
+    conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+    // scan all policies once every 5 second
+    conf.setLong("raid.policy.rescan.interval", 5000);
+
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+    // do not use map-reduce cluster for Raiding
+    conf.setBoolean("fs.raidnode.local", true);
+    conf.set("raid.server.address", "localhost:0");
+    conf.setInt("hdfs.raid.stripeLength", stripeLength);
+    conf.set("hdfs.raid.locations", "/destraid");
+
+    dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().toString();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+    FileSystem.setDefaultUri(conf, namenode);
+
+    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    String str = "<configuration> " +
+                   "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+                     "<policy name = \"RaidTest1\"> " +
+                        "<erasureCode>xor</erasureCode> " +
+                        "<destPath> /destraid</destPath> " +
+                        "<property> " +
+                          "<name>targetReplication</name> " +
+                          "<value>1</value> " +
+                          "<description>after RAIDing, decrease the replication factor of a file to this value." +
+                          "</description> " +
+                        "</property> " +
+                        "<property> " +
+                          "<name>metaReplication</name> " +
+                          "<value>1</value> " +
+                          "<description> replication factor of parity file" +
+                          "</description> " +
+                        "</property> " +
+                        "<property> " +
+                          "<name>modTimePeriod</name> " +
+                          "<value>2000</value> " +
+                          "<description> time (milliseconds) after a file is modified to make it " +
+                                         "a candidate for RAIDing " +
+                          "</description> " +
+                        "</property> ";
+    if (timeBeforeHar >= 0) {
+      str +=
+                        "<property> " +
+                          "<name>time_before_har</name> " +
+                          "<value>" + timeBeforeHar + "</value> " +
+                          "<description> amount of time waited before har'ing parity files" +
+                          "</description> " +
+                        "</property> ";
+    }
+
+    str +=
+                     "</policy>" +
+                   "</srcPath>" +
+                 "</configuration>";
+    fileWriter.write(str);
+    fileWriter.close();
+  }
+
+  private void myTearDown() throws Exception {
+    if (cnode != null) { cnode.stop(); cnode.join(); }
+    if (dfs != null) { dfs.shutdown(); }
+  }
+
+  private long getCRC(FileSystem fs, Path p) throws IOException {
+    CRC32 crc = new CRC32();
+    FSDataInputStream stm = fs.open(p);
+    for (int b = 0; b > 0; b = stm.read()) {
+      crc.update(b);
+    }
+    stm.close();
+    return crc.getValue();
+  }
+
+  private static void readAndDiscard(
+    FileSystem fs, Path p, long offset, long length) throws IOException {
+    FSDataInputStream in = fs.open(p);
+    in.seek(offset);
+    long count = 0;
+    for (int b = 0; b >= 0 && count < length; count++) {
+      b = in.read();
+    }
+  }
+}
+