MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
Vadali via schen)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1021873 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cfdf74..56aa6ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -133,6 +133,9 @@
     MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
     via amareshwari)
 
+    MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
+    Vadali via schen)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
@@ -325,7 +328,7 @@
     MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
     Mathew via amareshwari)
 
-    MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
+    MAPREDUCE-1908. DistributedRaidFileSystem now handles ChecksumException
     correctly. (Ramkumar Vadali via schen)
 
 Release 0.21.1 - Unreleased
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
index 49defb0..fd1c33e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
@@ -35,7 +35,9 @@
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.raid.Decoder;
 import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.XORDecoder;
 import org.apache.hadoop.hdfs.BlockMissingException;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
@@ -94,7 +96,7 @@
     }
 
     // find stripe length configured
-    stripeLength = conf.getInt("hdfs.raid.stripeLength", RaidNode.DEFAULT_STRIPE_LENGTH);
+    stripeLength = RaidNode.getStripeLength(conf);
     if (stripeLength == 0) {
       LOG.info("dfs.raid.stripeLength is incorrectly defined to be " + 
                stripeLength + " Ignoring...");
@@ -343,9 +345,10 @@
             clientConf.set("fs.hdfs.impl", clazz.getName());
             // Disable caching so that a previously cached RaidDfs is not used.
             clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-            Path npath = RaidNode.unRaid(clientConf, path,
-                         alternates[idx], stripeLength,
-                         corruptOffset);
+            Decoder decoder =
+              new XORDecoder(clientConf, RaidNode.getStripeLength(clientConf));
+            Path npath = RaidNode.unRaid(clientConf, path, alternates[idx],
+                              decoder, stripeLength, corruptOffset);
             FileSystem fs1 = getUnderlyingFileSystem(conf);
             fs1.initialize(npath.toUri(), conf);
             LOG.info("Opening alternate path " + npath + " at offset " + curpos);
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
index ecf63c4..deb9865 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
@@ -56,6 +56,10 @@
 
   public static final long HAR_PARTFILE_SIZE = 10 * 1024 * 1024 * 1024l;
   
+  public static final int DISTRAID_MAX_JOBS = 10;
+
+  public static final int DISTRAID_MAX_FILES = 10000;
+
   /**
    * Time to wait after the config file has been modified before reloading it
    * (this is done to prevent loading a file that hasn't been fully written).
@@ -71,6 +75,9 @@
   private long reloadInterval = RELOAD_INTERVAL;
   private long periodicity; // time between runs of all policies
   private long harPartfileSize;
+  private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for
+                                // a job.
+  private int maxFilesPerJob; // Max no. of files raided by a job.
 
   // Reload the configuration
   private boolean doReload;
@@ -88,6 +95,10 @@
     this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
     this.periodicity = conf.getLong("raid.policy.rescan.interval",  RESCAN_INTERVAL);
     this.harPartfileSize = conf.getLong("raid.har.partfile.size", HAR_PARTFILE_SIZE);
+    this.maxJobsPerPolicy = conf.getInt("raid.distraid.max.jobs",
+                                        DISTRAID_MAX_JOBS);
+    this.maxFilesPerJob = conf.getInt("raid.distraid.max.files",
+                                      DISTRAID_MAX_FILES);
     if (configFileName == null) {
       String msg = "No raid.config.file given in conf - " +
                    "the Hadoop Raid utility cannot run. Aborting....";
@@ -306,6 +317,14 @@
     return harPartfileSize;
   }
   
+  public synchronized int getMaxJobsPerPolicy() {
+    return maxJobsPerPolicy;
+  }
+
+  public synchronized int getMaxFilesPerJob() {
+    return maxFilesPerJob;
+  }
+
   /**
    * Get a collection of all policies
    */
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
new file mode 100644
index 0000000..bdb062a
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Represents a generic decoder that can be used to read a file with
+ * corrupt blocks by using the parity file.
+ * This is an abstract class, concrete subclasses need to implement
+ * fixErasedBlock.
+ */
+public abstract class Decoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.Decoder");
+  protected Configuration conf;
+  protected int stripeSize;
+  protected int paritySize;
+  protected Random rand;
+  protected int bufSize;
+  protected byte[][] readBufs;
+  protected byte[][] writeBufs;
+
+  Decoder(Configuration conf, int stripeSize, int paritySize) {
+    this.conf = conf;
+    this.stripeSize = stripeSize;
+    this.paritySize = paritySize;
+    this.rand = new Random();
+    this.bufSize = conf.getInt("raid.decoder.bufsize", 1024 * 1024);
+    this.readBufs = new byte[stripeSize + paritySize][];
+    this.writeBufs = new byte[paritySize][];
+    allocateBuffers();
+  }
+
+  private void allocateBuffers() {
+    for (int i = 0; i < stripeSize + paritySize; i++) {
+      readBufs[i] = new byte[bufSize];
+    }
+    for (int i = 0; i < paritySize; i++) {
+      writeBufs[i] = new byte[bufSize];
+    }
+  }
+
+  private void configureBuffers(long blockSize) {
+    if ((long)bufSize > blockSize) {
+      bufSize = (int)blockSize;
+      allocateBuffers();
+    } else if (blockSize % bufSize != 0) {
+      bufSize = (int)(blockSize / 256L); // heuristic.
+      if (bufSize == 0) {
+        bufSize = 1024;
+      }
+      bufSize = Math.min(bufSize, 1024 * 1024);
+      allocateBuffers();
+    }
+  }
+
+  /**
+   * The interface to generate a decoded file using the good portion of the
+   * source file and the parity file.
+   * @param fs The filesystem containing the source file.
+   * @param srcFile The damaged source file.
+   * @param parityFs The filesystem containing the parity file. This could be
+   *        different from fs in case the parity file is part of a HAR archive.
+   * @param parityFile The parity file.
+   * @param errorOffset Known location of error in the source file. There could
+   *        be additional errors in the source file that are discovered during
+   *        the decode process.
+   * @param decodedFile The decoded file. This will have the exact same contents
+   *        as the source file on success.
+   */
+  public void decodeFile(
+    FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+    long errorOffset, Path decodedFile) throws IOException {
+
+    LOG.info("Create " + decodedFile + " for error at " +
+            srcFile + ":" + errorOffset);
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    long blockSize = srcStat.getBlockSize();
+    configureBuffers(blockSize);
+    // Move the offset to the start of the block.
+    errorOffset = (errorOffset / blockSize) * blockSize;
+
+    // Create the decoded file.
+    FSDataOutputStream out = fs.create(
+      decodedFile, false, conf.getInt("io.file.buffer.size", 64 * 1024),
+      srcStat.getReplication(), srcStat.getBlockSize());
+
+    // Open the source file.
+    FSDataInputStream in = fs.open(
+      srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+
+    // Start copying data block-by-block.
+    for (long offset = 0; offset < srcStat.getLen(); offset += blockSize) {
+      long limit = Math.min(blockSize, srcStat.getLen() - offset);
+      long bytesAlreadyCopied = 0;
+      if (offset != errorOffset) {
+        try {
+          in = fs.open(
+            srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+          in.seek(offset);
+          RaidUtils.copyBytes(in, out, readBufs[0], limit);
+          assert(out.getPos() == offset +limit);
+          LOG.info("Copied till " + out.getPos() + " from " + srcFile);
+          continue;
+        } catch (BlockMissingException e) {
+          LOG.info("Encountered BME at " + srcFile + ":" + offset);
+          bytesAlreadyCopied = out.getPos() - offset;
+        } catch (ChecksumException e) {
+          LOG.info("Encountered CE at " + srcFile + ":" + offset);
+          bytesAlreadyCopied = out.getPos() - offset;
+        }
+      }
+      // If we are here offset == errorOffset or we got an exception.
+      // Recover the block starting at offset.
+      fixErasedBlock(fs, srcFile, parityFs, parityFile, blockSize, offset,
+        bytesAlreadyCopied, limit, out);
+    }
+    out.close();
+
+    try {
+      fs.setOwner(decodedFile, srcStat.getOwner(), srcStat.getGroup());
+      fs.setPermission(decodedFile, srcStat.getPermission());
+      fs.setTimes(decodedFile, srcStat.getModificationTime(),
+                  srcStat.getAccessTime());
+    } catch (Exception exc) {
+      LOG.info("Didn't manage to copy meta information because of " + exc +
+               " Ignoring...");
+    }
+
+  }
+
+  /**
+   * Recovers a corrupt block to local file.
+   *
+   * @param srcFs The filesystem containing the source file.
+   * @param srcPath The damaged source file.
+   * @param parityPath The filesystem containing the parity file. This could be
+   *        different from fs in case the parity file is part of a HAR archive.
+   * @param parityFile The parity file.
+   * @param blockSize The block size of the file.
+   * @param blockOffset Known location of error in the source file. There could
+   *        be additional errors in the source file that are discovered during
+   *        the decode process.
+   * @param localBlockFile The file to write the block to.
+   * @param limit The maximum number of bytes to be written out.
+   *              This is to prevent writing beyond the end of the file.
+   */
+  public void recoverBlockToFile(
+    FileSystem srcFs, Path srcPath, FileSystem parityFs, Path parityPath,
+    long blockSize, long blockOffset, File localBlockFile, long limit)
+    throws IOException {
+    OutputStream out = new FileOutputStream(localBlockFile);
+    fixErasedBlock(srcFs, srcPath, parityFs, parityPath,
+                  blockSize, blockOffset, 0, limit, out);
+    out.close();
+  }
+
+  /**
+   * Implementation-specific mechanism of writing a fixed block.
+   * @param fs The filesystem containing the source file.
+   * @param srcFile The damaged source file.
+   * @param parityFs The filesystem containing the parity file. This could be
+   *        different from fs in case the parity file is part of a HAR archive.
+   * @param parityFile The parity file.
+   * @param blockSize The maximum size of a block.
+   * @param errorOffset Known location of error in the source file. There could
+   *        be additional errors in the source file that are discovered during
+   *        the decode process.
+   * @param bytesToSkip After the block is generated, these many bytes should be
+   *       skipped before writing to the output. This is needed because the
+   *       output may have a portion of the block written from the source file
+   *       before a new corruption is discovered in the block.
+   * @param limit The maximum number of bytes to be written out, including
+   *       bytesToSkip. This is to prevent writing beyond the end of the file.
+   * @param out The output.
+   */
+  protected abstract void fixErasedBlock(
+      FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+      long blockSize, long errorOffset, long bytesToSkip, long limit,
+      OutputStream out) throws IOException;
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
new file mode 100644
index 0000000..180d71f
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Implements depth-first traversal using a Stack object. The traversal
+ * can be stopped at any time and the state of traversal is saved.
+ */
+public class DirectoryTraversal {
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.raid.DirectoryTraversal");
+
+  private FileSystem fs;
+  private List<FileStatus> paths;
+  private int pathIdx = 0;  // Next path to process.
+  private Stack<Node> stack = new Stack<Node>();
+
+  /**
+   * Represents a directory node in directory traversal.
+   */
+  static class Node {
+    private FileStatus path;  // Path that this node represents.
+    private FileStatus[] elements;  // Elements in the node.
+    private int idx = 0;
+
+    public Node(FileStatus path, FileStatus[] elements) {
+      this.path = path;
+      this.elements = elements;
+    }
+
+    public boolean hasNext() {
+      return idx < elements.length;
+    }
+
+    public FileStatus next() {
+      return elements[idx++];
+    }
+
+    public FileStatus path() {
+      return this.path;
+    }
+  }
+
+  /**
+   * Constructor.
+   * @param fs The filesystem to use.
+   * @param startPaths A list of paths that need to be traversed
+   */
+  public DirectoryTraversal(FileSystem fs, List<FileStatus> startPaths) {
+    this.fs = fs;
+    paths = startPaths;
+    pathIdx = 0;
+  }
+
+  /**
+   * Choose some files to RAID.
+   * @param conf Configuration to use.
+   * @param raidDestPrefix Prefix of the path to RAID to.
+   * @param modTimePeriod Time gap before RAIDing.
+   * @param limit Limit on the number of files to choose.
+   * @return list of files to RAID.
+   * @throws IOException
+   */
+  public List<FileStatus> selectFilesToRaid(
+      Configuration conf, int targetRepl, Path raidDestPrefix,
+      long modTimePeriod, int limit) throws IOException {
+    List<FileStatus> selected = new LinkedList<FileStatus>();
+    int numSelected = 0;
+
+    long now = System.currentTimeMillis();
+    while (numSelected < limit) {
+      FileStatus next = getNextFile();
+      if (next == null) {
+        break;
+      }
+      // We have the next file, do we want to select it?
+      // If the source file has fewer than or equal to 2 blocks, then skip it.
+      long blockSize = next.getBlockSize();
+      if (2 * blockSize >= next.getLen()) {
+        continue;
+      }
+
+      boolean select = false;
+      try {
+        Object ppair = RaidNode.getParityFile(
+            raidDestPrefix, next.getPath(), conf);
+        // Is there is a valid parity file?
+        if (ppair != null) {
+          // Is the source at the target replication?
+          if (next.getReplication() != targetRepl) {
+            // Select the file so that its replication can be set.
+            select = true;
+          } else {
+            // Nothing to do, don't select the file.
+            select = false;
+          }
+        } else if (next.getModificationTime() + modTimePeriod < now) {
+          // If there isn't a valid parity file, check if the file is too new.
+          select = true;
+        }
+      } catch (java.io.FileNotFoundException e) {
+        select = true; // destination file does not exist
+      }
+      if (select) {
+        selected.add(next);
+        numSelected++;
+      }
+    }
+
+    return selected;
+  }
+
+  /**
+   * Return the next file.
+   * @throws IOException
+   */
+  public FileStatus getNextFile() throws IOException {
+    // Check if traversal is done.
+    while (!doneTraversal()) {
+      // If traversal is not done, check if the stack is not empty.
+      while (!stack.isEmpty()) {
+        // If the stack is not empty, look at the top node.
+        Node node = stack.peek();
+        // Check if the top node has an element.
+        if (node.hasNext()) {
+          FileStatus element = node.next();
+          // Is the next element a directory.
+          if (!element.isDir()) {
+            // It is a file, return it.
+            return element;
+          }
+          // Next element is a directory, push it on to the stack and
+          // continue
+          try {
+            pushNewNode(element);
+          } catch (FileNotFoundException e) {
+            // Ignore and move to the next element.
+          }
+          continue;
+        } else {
+          // Top node has no next element, pop it and continue.
+          stack.pop();
+          continue;
+        }
+      }
+      // If the stack is empty, do we have more paths?
+      while (!paths.isEmpty()) {
+        FileStatus next = paths.remove(0);
+        pathIdx++;
+        if (!next.isDir()) {
+          return next;
+        }
+        try {
+          pushNewNode(next);
+        } catch (FileNotFoundException e) {
+          continue;
+        }
+        break;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets the next directory in the tree. The algorithm returns deeper directories
+   * first.
+   * @return A FileStatus representing the directory.
+   * @throws IOException
+   */
+  public FileStatus getNextDirectory() throws IOException {
+    // Check if traversal is done.
+    while (!doneTraversal()) {
+      // If traversal is not done, check if the stack is not empty.
+      while (!stack.isEmpty()) {
+        // If the stack is not empty, look at the top node.
+        Node node = stack.peek();
+        // Check if the top node has an element.
+        if (node.hasNext()) {
+          FileStatus element = node.next();
+          // Is the next element a directory.
+          if (element.isDir()) {
+            // Next element is a directory, push it on to the stack and
+            // continue
+            try {
+              pushNewNode(element);
+            } catch (FileNotFoundException e) {
+              // Ignore and move to the next element.
+            }
+            continue;
+          }
+        } else {
+          stack.pop();
+          return node.path;
+        }
+      }
+      // If the stack is empty, do we have more paths?
+      while (!paths.isEmpty()) {
+        FileStatus next = paths.remove(0);
+        pathIdx++;
+        if (next.isDir()) {
+          try {
+            pushNewNode(next);
+          } catch (FileNotFoundException e) {
+            continue;
+          }
+          break;
+        }
+      }
+    }
+    return null;
+  }
+
+  private void pushNewNode(FileStatus stat) throws IOException {
+    if (!stat.isDir()) {
+      return;
+    }
+    Path p = stat.getPath();
+    LOG.info("Traversing to directory " + p);
+    FileStatus[] elements = fs.listStatus(p);
+    Node newNode = new Node(stat, (elements == null? new FileStatus[0]: elements));
+    stack.push(newNode);
+  }
+
+  public boolean doneTraversal() {
+    return paths.isEmpty() && stack.isEmpty();
+  }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
index 7130e7a..85f4aca 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
@@ -34,17 +34,21 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.raid.RaidNode.Statistics;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.util.StringUtils;
@@ -111,6 +115,11 @@
 
   List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
 
+  private JobClient jobClient;
+  private RunningJob runningJob;
+  private int jobEventCounter = 0;
+  private String lastReport = null;
+
   /** Responsible for generating splits of the src file list. */
   static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
     /** Do nothing. */
@@ -184,6 +193,7 @@
     private int failcount = 0;
     private int succeedcount = 0;
     private Statistics st = null;
+    private Reporter reporter = null;
 
     private String getCountString() {
       return "Succeeded: " + succeedcount + " Failed: " + failcount;
@@ -200,6 +210,7 @@
     public void map(Text key, PolicyInfo policy,
         OutputCollector<WritableComparable, Text> out, Reporter reporter)
         throws IOException {
+      this.reporter = reporter;
       try {
         LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
         Path p = new Path(key.toString());
@@ -268,30 +279,71 @@
   private static int getMapCount(int srcCount, int numNodes) {
     int numMaps = (int) (srcCount / OP_PER_MAP);
     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
-    return Math.max(numMaps, 1);
+    return Math.max(numMaps, MAX_MAPS_PER_NODE);
   }
 
-  /** invokes mapred job do parallel raiding */
-  public void doDistRaid() throws IOException {
-    if (raidPolicyPathPairList.size() == 0) {
-      LOG.info("DistRaid has no paths to raid.");
-      return;
+  /** Invokes a map-reduce job do parallel raiding.
+   *  @return true if the job was started, false otherwise
+   */
+  public boolean startDistRaid() throws IOException {
+    assert(raidPolicyPathPairList.size() > 0);
+    if (setup()) {
+      this.jobClient = new JobClient(jobconf);
+      this.runningJob = this.jobClient.submitJob(jobconf);
+      LOG.info("Job Started: " + runningJob.getID());
+      return true;
     }
-    try {
-      if (setup()) {
-        JobClient.runJob(jobconf);
-      }
-    } finally {
-      // delete job directory
-      final String jobdir = jobconf.get(JOB_DIR_LABEL);
-      if (jobdir != null) {
-        final Path jobpath = new Path(jobdir);
-        jobpath.getFileSystem(jobconf).delete(jobpath, true);
-      }
-    }
-    raidPolicyPathPairList.clear();
+    return false;
   }
 
+   /** Checks if the map-reduce job has completed.
+    *
+    * @return true if the job completed, false otherwise.
+    * @throws IOException
+    */
+   public boolean checkComplete() throws IOException {
+     JobID jobID = runningJob.getID();
+     if (runningJob.isComplete()) {
+       // delete job directory
+       final String jobdir = jobconf.get(JOB_DIR_LABEL);
+       if (jobdir != null) {
+         final Path jobpath = new Path(jobdir);
+         jobpath.getFileSystem(jobconf).delete(jobpath, true);
+       }
+       if (runningJob.isSuccessful()) {
+         LOG.info("Job Complete(Succeeded): " + jobID);
+       } else {
+         LOG.info("Job Complete(Failed): " + jobID);
+       }
+       raidPolicyPathPairList.clear();
+       Counters ctrs = runningJob.getCounters();
+       long filesRaided = ctrs.findCounter(Counter.FILES_SUCCEEDED).getValue();
+       long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue();
+       return true;
+     } else {
+       String report =  (" job " + jobID +
+         " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
+         " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
+       if (!report.equals(lastReport)) {
+         LOG.info(report);
+         lastReport = report;
+       }
+       TaskCompletionEvent[] events =
+         runningJob.getTaskCompletionEvents(jobEventCounter);
+       jobEventCounter += events.length;
+       for(TaskCompletionEvent event : events) {
+         if (event.getTaskStatus() ==  TaskCompletionEvent.Status.FAILED) {
+           LOG.info(" Job " + jobID + " " + event.toString());
+         }
+       }
+       return false;
+     }
+   }
+
+   public boolean successful() throws IOException {
+     return runningJob.isSuccessful();
+   }
+
   /**
    * set up input file which has the list of input files.
    * 
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
new file mode 100644
index 0000000..e3d91b0
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Represents a generic encoder that can generate a parity file for a source
+ * file.
+ * This is an abstract class, concrete subclasses need to implement
+ * encodeFileImpl.
+ */
+public abstract class Encoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.Encoder");
+  protected Configuration conf;
+  protected int stripeSize;
+  protected int paritySize;
+  protected Random rand;
+  protected int bufSize;
+  protected byte[][] readBufs;
+  protected byte[][] writeBufs;
+
+  /**
+   * A class that acts as a sink for data, similar to /dev/null.
+   */
+  static class NullOutputStream extends OutputStream {
+    public void write(byte[] b) throws IOException {}
+    public void write(int b) throws IOException {}
+    public void write(byte[] b, int off, int len) throws IOException {}
+  }
+
+  Encoder(
+    Configuration conf, int stripeSize, int paritySize) {
+    this.conf = conf;
+    this.stripeSize = stripeSize;
+    this.paritySize = paritySize;
+    this.rand = new Random();
+    this.bufSize = conf.getInt("raid.encoder.bufsize", 1024 * 1024);
+    this.readBufs = new byte[stripeSize][];
+    this.writeBufs = new byte[paritySize][];
+    allocateBuffers();
+  }
+
+  private void allocateBuffers() {
+    for (int i = 0; i < stripeSize; i++) {
+      readBufs[i] = new byte[bufSize];
+    }
+    for (int i = 0; i < paritySize; i++) {
+      writeBufs[i] = new byte[bufSize];
+    }
+  }
+
+  private void configureBuffers(long blockSize) {
+    if ((long)bufSize > blockSize) {
+      bufSize = (int)blockSize;
+      allocateBuffers();
+    } else if (blockSize % bufSize != 0) {
+      bufSize = (int)(blockSize / 256L); // heuristic.
+      if (bufSize == 0) {
+        bufSize = 1024;
+      }
+      bufSize = Math.min(bufSize, 1024 * 1024);
+      allocateBuffers();
+    }
+  }
+
+  /**
+   * The interface to use to generate a parity file.
+   * This method can be called multiple times with the same Encoder object,
+   * thus allowing reuse of the buffers allocated by the Encoder object.
+   *
+   * @param fs The filesystem containing the source file.
+   * @param srcFile The source file.
+   * @param parityFile The parity file to be generated.
+   */
+  public void encodeFile(FileSystem fs, Path srcFile, Path parityFile,
+    short parityRepl, Progressable reporter) throws IOException {
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    long srcSize = srcStat.getLen();
+    long blockSize = srcStat.getBlockSize();
+
+    configureBuffers(blockSize);
+
+    // Create a tmp file to which we will write first.
+    Path parityTmp = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
+                              parityFile.toUri().getPath() +
+                              "." + rand.nextLong() + ".tmp");
+    FSDataOutputStream out = fs.create(
+                               parityTmp,
+                               true,
+                               conf.getInt("io.file.buffer.size", 64 * 1024),
+                               parityRepl,
+                               blockSize);
+
+    try {
+      encodeFileToStream(fs, srcFile, srcSize, blockSize, out, reporter);
+      out.close();
+      out = null;
+      LOG.info("Wrote temp parity file " + parityTmp);
+
+      // delete destination if exists
+      if (fs.exists(parityFile)){
+        fs.delete(parityFile, false);
+      }
+      fs.mkdirs(parityFile.getParent());
+      if (!fs.rename(parityTmp, parityFile)) {
+        String msg = "Unable to rename file " + parityTmp + " to " + parityFile;
+        throw new IOException (msg);
+      }
+      LOG.info("Wrote parity file " + parityFile);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+      fs.delete(parityTmp, false);
+    }
+  }
+
+  /**
+   * Recovers a corrupt block in a parity file to a local file.
+   *
+   * The encoder generates paritySize parity blocks for a source file stripe.
+   * Since we want only one of the parity blocks, this function creates
+   * null outputs for the blocks to be discarded.
+   *
+   * @param fs The filesystem in which both srcFile and parityFile reside.
+   * @param srcFile The source file.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source/parity files.
+   * @param corruptOffset The location of corruption in the parity file.
+   * @param localBlockFile The destination for the reovered block.
+   */
+  public void recoverParityBlockToFile(
+    FileSystem fs,
+    Path srcFile, long srcSize, long blockSize,
+    Path parityFile, long corruptOffset,
+    File localBlockFile) throws IOException {
+    OutputStream out = new FileOutputStream(localBlockFile);
+    try {
+      recoverParityBlockToStream(fs, srcFile, srcSize, blockSize, parityFile,
+        corruptOffset, out);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Recovers a corrupt block in a parity file to a local file.
+   *
+   * The encoder generates paritySize parity blocks for a source file stripe.
+   * Since we want only one of the parity blocks, this function creates
+   * null outputs for the blocks to be discarded.
+   *
+   * @param fs The filesystem in which both srcFile and parityFile reside.
+   * @param srcFile The source file.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source/parity files.
+   * @param corruptOffset The location of corruption in the parity file.
+   * @param out The destination for the reovered block.
+   */
+  public void recoverParityBlockToStream(
+    FileSystem fs,
+    Path srcFile, long srcSize, long blockSize,
+    Path parityFile, long corruptOffset,
+    OutputStream out) throws IOException {
+    LOG.info("Recovering parity block" + parityFile + ":" + corruptOffset);
+    // Get the start offset of the corrupt block.
+    corruptOffset = (corruptOffset / blockSize) * blockSize;
+    // Output streams to each block in the parity file stripe.
+    OutputStream[] outs = new OutputStream[paritySize];
+    long indexOfCorruptBlockInParityStripe =
+      (corruptOffset / blockSize) % paritySize;
+    LOG.info("Index of corrupt block in parity stripe: " +
+              indexOfCorruptBlockInParityStripe);
+    // Create a real output stream for the block we want to recover,
+    // and create null streams for the rest.
+    for (int i = 0; i < paritySize; i++) {
+      if (indexOfCorruptBlockInParityStripe == i) {
+        outs[i] = out;
+      } else {
+        outs[i] = new NullOutputStream();
+      }
+    }
+    // Get the stripe index and start offset of stripe.
+    long stripeIdx = corruptOffset / (paritySize * blockSize);
+    long stripeStart = stripeIdx * blockSize * stripeSize;
+
+    // Get input streams to each block in the source file stripe.
+    InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+        srcSize, blockSize);
+    LOG.info("Starting recovery by using source stripe " +
+              srcFile + ":" + stripeStart);
+    // Read the data from the blocks and write to the parity file.
+    encodeStripe(blocks, stripeStart, blockSize, outs, Reporter.NULL);
+  }
+
+  /**
+   * Recovers a corrupt block in a parity file to an output stream.
+   *
+   * The encoder generates paritySize parity blocks for a source file stripe.
+   * Since there is only one output provided, some blocks are written out to
+   * files before being written out to the output.
+   *
+   * @param fs The filesystem in which both srcFile and parityFile reside.
+   * @param srcFile The source file.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source/parity files.
+   * @param out The destination for the reovered block.
+   */
+  private void encodeFileToStream(FileSystem fs, Path srcFile, long srcSize,
+    long blockSize, OutputStream out, Progressable reporter) throws IOException {
+    OutputStream[] tmpOuts = new OutputStream[paritySize];
+    // One parity block can be written directly to out, rest to local files.
+    tmpOuts[0] = out;
+    File[] tmpFiles = new File[paritySize - 1];
+    for (int i = 0; i < paritySize - 1; i++) {
+      tmpFiles[i] = File.createTempFile("parity", "_" + i);
+      LOG.info("Created tmp file " + tmpFiles[i]);
+      tmpFiles[i].deleteOnExit();
+    }
+    try {
+      // Loop over stripes in the file.
+      for (long stripeStart = 0; stripeStart < srcSize;
+          stripeStart += blockSize * stripeSize) {
+        reporter.progress();
+        LOG.info("Starting encoding of stripe " + srcFile + ":" + stripeStart);
+        // Create input streams for blocks in the stripe.
+        InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+          srcSize, blockSize);
+        // Create output streams to the temp files.
+        for (int i = 0; i < paritySize - 1; i++) {
+          tmpOuts[i + 1] = new FileOutputStream(tmpFiles[i]);
+        }
+        // Call the implementation of encoding.
+        encodeStripe(blocks, stripeStart, blockSize, tmpOuts, reporter);
+        // Close output streams to the temp files and write the temp files
+        // to the output provided.
+        for (int i = 0; i < paritySize - 1; i++) {
+          tmpOuts[i + 1].close();
+          tmpOuts[i + 1] = null;
+          InputStream in  = new FileInputStream(tmpFiles[i]);
+          RaidUtils.copyBytes(in, out, writeBufs[i], blockSize);
+          reporter.progress();
+        }
+      }
+    } finally {
+      for (int i = 0; i < paritySize - 1; i++) {
+        if (tmpOuts[i + 1] != null) {
+          tmpOuts[i + 1].close();
+        }
+        tmpFiles[i].delete();
+        LOG.info("Deleted tmp file " + tmpFiles[i]);
+      }
+    }
+  }
+
+  /**
+   * Return input streams for each block in a source file's stripe.
+   * @param fs The filesystem where the file resides.
+   * @param srcFile The source file.
+   * @param stripeStartOffset The start offset of the stripe.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source file.
+   */
+  protected InputStream[] stripeInputs(
+    FileSystem fs,
+    Path srcFile,
+    long stripeStartOffset,
+    long srcSize,
+    long blockSize
+    ) throws IOException {
+    InputStream[] blocks = new InputStream[stripeSize];
+    for (int i = 0; i < stripeSize; i++) {
+      long seekOffset = stripeStartOffset + i * blockSize;
+      if (seekOffset < srcSize) {
+        FSDataInputStream in = fs.open(
+                   srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+        in.seek(seekOffset);
+        LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
+        blocks[i] = in;
+      } else {
+        LOG.info("Using zeros at offset " + seekOffset);
+        // We have no src data at this offset.
+        blocks[i] = new RaidUtils.ZeroInputStream(
+                          seekOffset + blockSize);
+      }
+    }
+    return blocks;
+  }
+
+  /**
+   * The implementation of generating parity data for a stripe.
+   *
+   * @param blocks The streams to blocks in the stripe.
+   * @param srcFile The source file.
+   * @param stripeStartOffset The start offset of the stripe
+   * @param blockSize The maximum size of a block.
+   * @param outs output streams to the parity blocks.
+   * @param reporter progress indicator.
+   */
+  protected abstract void encodeStripe(
+    InputStream[] blocks,
+    long stripeStartOffset,
+    long blockSize,
+    OutputStream[] outs,
+    Progressable reporter) throws IOException;
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
new file mode 100644
index 0000000..e01fcba
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Periodically monitors the status of jobs registered with it.
+ *
+ * Jobs that are submitted for the same policy name are kept in the same list,
+ * and the list itself is kept in a map that has the policy name as the key and
+ * the list as value.
+ */
+class JobMonitor implements Runnable {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.JobMonitor");
+
+  volatile boolean running = true;
+
+  private Map<String, List<DistRaid>> jobs;
+  private long jobMonitorInterval;
+  private volatile long jobsMonitored = 0;
+  private volatile long jobsSucceeded = 0;
+
+  public JobMonitor(Configuration conf) {
+    jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
+    jobs = new java.util.HashMap<String, List<DistRaid>>();
+  }
+
+  public void run() {
+    while (running) {
+      try {
+        LOG.info("JobMonitor thread continuing to run...");
+        doMonitor();
+      } catch (Throwable e) {
+        LOG.error("JobMonitor encountered exception " +
+          StringUtils.stringifyException(e));
+        // All expected exceptions are caught by doMonitor(). It is better
+        // to exit now, this will prevent RaidNode from submitting more jobs
+        // since the number of running jobs will never decrease.
+        return;
+      }
+    }
+  }
+
+  /**
+   * Periodically checks status of running map-reduce jobs.
+   */
+  public void doMonitor() {
+    while (running) {
+      String[] keys = null;
+      // Make a copy of the names of the current jobs.
+      synchronized(jobs) {
+        keys = jobs.keySet().toArray(new String[0]);
+      }
+
+      // Check all the jobs. We do not want to block access to `jobs`
+      // because that will prevent new jobs from being added.
+      // This is safe because JobMonitor.run is the only code that can
+      // remove a job from `jobs`. Thus all elements in `keys` will have
+      // valid values.
+      Map<String, List<DistRaid>> finishedJobs =
+        new HashMap<String, List<DistRaid>>();
+
+      for (String key: keys) {
+        // For each policy being monitored, get the list of jobs running.
+        DistRaid[] jobListCopy = null;
+        synchronized(jobs) {
+          List<DistRaid> jobList = jobs.get(key);
+          synchronized(jobList) {
+            jobListCopy = jobList.toArray(new DistRaid[jobList.size()]);
+          }
+        }
+        // The code that actually contacts the JobTracker is not synchronized,
+        // it uses copies of the list of jobs.
+        for (DistRaid job: jobListCopy) {
+          // Check each running job.
+          try {
+            boolean complete = job.checkComplete();
+            if (complete) {
+              addJob(finishedJobs, key, job);
+              if (job.successful()) {
+                jobsSucceeded++;
+              }
+            }
+          } catch (IOException ioe) {
+            // If there was an error, consider the job finished.
+            addJob(finishedJobs, key, job);
+          }
+        }
+      }
+
+      if (finishedJobs.size() > 0) {
+        for (String key: finishedJobs.keySet()) {
+          List<DistRaid> finishedJobList = finishedJobs.get(key);
+          // Iterate through finished jobs and remove from jobs.
+          // removeJob takes care of locking.
+          for (DistRaid job: finishedJobList) {
+            removeJob(jobs, key, job);
+          }
+        }
+      }
+
+      try {
+        Thread.sleep(jobMonitorInterval);
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+
+  public int runningJobsCount(String key) {
+    int count = 0;
+    synchronized(jobs) {
+      if (jobs.containsKey(key)) {
+        List<DistRaid> jobList = jobs.get(key);
+        synchronized(jobList) {
+          count = jobList.size();
+        }
+      }
+    }
+    return count;
+  }
+
+  public void monitorJob(String key, DistRaid job) {
+    addJob(jobs, key, job);
+    jobsMonitored++;
+  }
+
+  public long jobsMonitored() {
+    return this.jobsMonitored;
+  }
+
+  public long jobsSucceeded() {
+    return this.jobsSucceeded;
+  }
+
+  private static void addJob(Map<String, List<DistRaid>> jobsMap,
+                              String jobName, DistRaid job) {
+    synchronized(jobsMap) {
+      List<DistRaid> list = null;
+      if (jobsMap.containsKey(jobName)) {
+        list = jobsMap.get(jobName);
+      } else {
+        list = new LinkedList<DistRaid>();
+        jobsMap.put(jobName, list);
+      }
+      synchronized(list) {
+        list.add(job);
+      }
+    }
+  }
+
+  private static void removeJob(Map<String, List<DistRaid>> jobsMap,
+                                  String jobName, DistRaid job) {
+    synchronized(jobsMap) {
+      if (jobsMap.containsKey(jobName)) {
+        List<DistRaid> list = jobsMap.get(jobName);
+        synchronized(list) {
+          for (Iterator<DistRaid> it = list.iterator(); it.hasNext(); ) {
+            DistRaid val = it.next();
+            if (val == job) {
+              it.remove();
+            }
+          }
+          if (list.size() == 0) {
+            jobsMap.remove(jobName);
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
new file mode 100644
index 0000000..b30cfcf
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Wraps over multiple input streams and provides an input stream that is
+ * an XOR of the streams.
+ */
+class ParityInputStream extends InputStream {
+  private static final int DEFAULT_BUFSIZE = 5*1024*1024;
+  private InputStream[] streams;
+  private byte[] xor;
+  private byte[] buf;
+  private int bufSize;
+  private long remaining;
+  private int available = 0;
+  private int readPos = 0;
+
+  public ParityInputStream(
+      InputStream[] streams, long parityBlockSize, byte[] buf, byte[] xor) {
+    assert buf.length == xor.length;
+    bufSize = buf.length;
+    this.streams = streams;
+    remaining = parityBlockSize;
+    this.buf = buf;
+    this.xor = xor;
+  }
+  
+  @Override
+  public int read() throws IOException {
+    makeAvailable();
+    if (available == 0) {
+      return -1;
+    }
+    int ret = xor[readPos];
+    readPos++;
+    available--;
+    return ret;
+  }
+  
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    makeAvailable();
+    if (available == 0) {
+      return -1;
+    }
+    int ret = Math.min(len, available);
+    for (int i = 0; i < ret; ++i) {
+      b[off+i] = xor[readPos+i];
+    }
+    readPos += ret;
+    available -= ret;
+    return ret;
+  }
+
+  public void close() throws IOException {
+    for (InputStream i: streams) {
+      i.close();
+    }
+  }
+  
+  /**
+   * Send the contents of the stream to the sink.
+   * @param sink
+   * @param reporter
+   * @throws IOException
+   */
+  public void drain(OutputStream sink, Progressable reporter)
+          throws IOException {
+    
+    while (true) {
+      makeAvailable();
+      if (available == 0) {
+        break;
+      }
+      sink.write(xor, readPos, available);
+      available = 0;
+      if (reporter != null) {
+        reporter.progress();
+      }
+    }
+  }
+
+  /**
+   * Make some bytes available for reading in the internal buffer.
+   * @throws IOException
+   */
+  private void makeAvailable() throws IOException {
+    if (available > 0 || remaining <= 0) {
+      return;
+    }
+    // Read some bytes from the first stream.
+    int xorlen = (int)Math.min(remaining, bufSize);
+    readExact(streams[0], xor, xorlen);
+
+    // Read bytes from all the other streams and xor them.
+    for (int i = 1; i < streams.length; i++) {
+      readExact(streams[i], buf, xorlen);
+
+      for (int j = 0; j < xorlen; j++) {
+        xor[j] ^= buf[j];
+      }
+    }
+    
+    remaining -= xorlen;
+    available = xorlen;
+    readPos = 0;
+    readPos = 0;
+  }
+
+  private static void readExact(InputStream in, byte[] bufs, int toRead)
+  throws IOException {
+    int tread = 0;
+    while (tread < toRead) {
+      int read = in.read(bufs, tread, toRead - tread);
+      if (read == -1) {
+        // If the stream ends, fill in zeros.
+        Arrays.fill(bufs, tread, toRead, (byte)0);
+        tread = toRead;
+      } else {
+        tread += read;
+      }
+    }
+    assert tread == toRead;
+  }
+
+}
+
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index af226a5..90f8f3e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -21,10 +21,12 @@
 import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Iterator;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.HashSet;
@@ -73,7 +75,9 @@
   public static final long SLEEP_TIME = 10000L; // 10 seconds
   public static final int DEFAULT_PORT = 60000;
   public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+  public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
   public static final String DEFAULT_RAID_LOCATION = "/raid";
+  public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
   public static final String HAR_SUFFIX = "_raid.har";
   
   /** RPC server */
@@ -101,6 +105,10 @@
   /** Deamon thread to har raid directories */
   Daemon harThread = null;
 
+  /** Daemon thread to monitor distributed raid job progress */
+  JobMonitor jobMonitor = null;
+  Daemon jobMonitorThread = null;
+
   /** Do do distributed raiding */
   boolean isRaidLocal = false;
   
@@ -168,6 +176,7 @@
     try {
       initialize(conf);
     } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
       this.stop();
       throw e;
     } catch (Exception e) {
@@ -193,6 +202,7 @@
     try {
       if (server != null) server.join();
       if (triggerThread != null) triggerThread.join();
+      if (jobMonitorThread != null) jobMonitorThread.join();
       if (purgeThread != null) purgeThread.join();
     } catch (InterruptedException ie) {
       // do nothing
@@ -210,6 +220,8 @@
     running = false;
     if (server != null) server.stop();
     if (triggerThread != null) triggerThread.interrupt();
+    if (jobMonitor != null) jobMonitor.running = false;
+    if (jobMonitorThread != null) jobMonitorThread.interrupt();
     if (purgeThread != null) purgeThread.interrupt();
   }
 
@@ -252,6 +264,10 @@
     running = true;
     this.server.start(); // start RPC server
 
+    this.jobMonitor = new JobMonitor(conf);
+    this.jobMonitorThread = new Daemon(this.jobMonitor);
+    this.jobMonitorThread.start();
+
     // start the deamon thread to fire polcies appropriately
     this.triggerThread = new Daemon(new TriggerMonitor());
     this.triggerThread.start();
@@ -282,22 +298,15 @@
     LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
     Path inputPath = new Path(inStr);
     Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
-    PolicyInfo info = findMatchingPolicy(srcPath);
-    if (info != null) {
+    // find stripe length from config
+    int stripeLength = getStripeLength(conf);
 
-      // find stripe length from config
-      int stripeLength = getStripeLength(conf, info);
-
-      // create destination path prefix
-      String destPrefix = getDestinationPath(conf, info);
-      Path destPath = new Path(destPrefix.trim());
-      FileSystem fs = FileSystem.get(destPath.toUri(), conf);
-      destPath = destPath.makeQualified(fs);
-
-      Path unraided = unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
-      if (unraided != null) {
-        return unraided.toString();
-      }
+    Path destPref = getDestinationPath(conf);
+    Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
+    Path unraided = unRaid(conf, srcPath, destPref, decoder,
+        stripeLength, corruptOffset);
+    if (unraided != null) {
+      return unraided.toString();
     }
     return null;
   }
@@ -306,6 +315,11 @@
    * Periodically checks to see which policies should be fired.
    */
   class TriggerMonitor implements Runnable {
+
+    private Map<String, Long> scanTimes = new HashMap<String, Long>();
+    private Map<String, DirectoryTraversal> scanState =
+      new HashMap<String, DirectoryTraversal>();
+
     /**
      */
     public void run() {
@@ -320,6 +334,109 @@
       }
     }
 
+    /**
+     * Should we select more files for a policy.
+     */
+    private boolean shouldSelectFiles(PolicyInfo info) {
+      String policyName = info.getName();
+      int runningJobsCount = jobMonitor.runningJobsCount(policyName);
+      // Is there a scan in progress for this policy?
+      if (scanState.containsKey(policyName)) {
+        int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
+
+        // If there is a scan in progress for this policy, we can have
+        // upto maxJobsPerPolicy running jobs.
+        return (runningJobsCount < maxJobsPerPolicy);
+      } else {
+        // If there isn't a scan in progress for this policy, we don't
+        // want to start a fresh scan if there is even one running job.
+        if (runningJobsCount >= 1) {
+          return false;
+        }
+        // Check the time of the last full traversal before starting a fresh
+        // traversal.
+        if (scanTimes.containsKey(policyName)) {
+          long lastScan = scanTimes.get(policyName);
+          return (now() > lastScan + configMgr.getPeriodicity());
+        } else {
+          return true;
+        }
+      }
+    }
+
+   /**
+    * Returns a list of pathnames that needs raiding.
+    * The list of paths could be obtained by resuming a previously suspended
+    * traversal.
+    * The number of paths returned is limited by raid.distraid.max.jobs.
+    */
+    private List<FileStatus> selectFiles(PolicyInfo info) throws IOException {
+      Path destPrefix = getDestinationPath(conf);
+      String policyName = info.getName();
+      Path srcPath = info.getSrcPath();
+      long modTimePeriod = 0;
+      String str = info.getProperty("modTimePeriod");
+      if (str != null) {
+         modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+      }
+      short srcReplication = 0;
+      str = info.getProperty("srcReplication");
+      if (str != null) {
+        srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+      }
+
+      // Max number of files returned.
+      int selectLimit = configMgr.getMaxFilesPerJob();
+      int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+
+      // If we have a pending traversal, resume it.
+      if (scanState.containsKey(policyName)) {
+        DirectoryTraversal dt = scanState.get(policyName);
+        LOG.info("Resuming traversal for policy " + policyName);
+        List<FileStatus> returnSet = dt.selectFilesToRaid(
+            conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+        if (dt.doneTraversal()) {
+          scanState.remove(policyName);
+        }
+        return returnSet;
+      }
+
+      // Expand destination prefix path.
+      String destpstr = destPrefix.toString();
+      if (!destpstr.endsWith(Path.SEPARATOR)) {
+        destpstr += Path.SEPARATOR;
+      }
+
+      List<FileStatus> returnSet = new LinkedList<FileStatus>();
+
+      FileSystem fs = srcPath.getFileSystem(conf);
+      FileStatus[] gpaths = fs.globStatus(srcPath);
+      if (gpaths != null) {
+        List<FileStatus> selectedPaths = new LinkedList<FileStatus>();
+        for (FileStatus onepath: gpaths) {
+          String pathstr = onepath.getPath().makeQualified(fs).toString();
+          if (!pathstr.endsWith(Path.SEPARATOR)) {
+            pathstr += Path.SEPARATOR;
+          }
+          if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
+            LOG.info("Skipping source " + pathstr +
+                     " because it conflicts with raid directory " + destpstr);
+          } else {
+            selectedPaths.add(onepath);
+          }
+        }
+
+        // Set the time for a new traversal.
+        scanTimes.put(policyName, now());
+        DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
+        returnSet = dt.selectFilesToRaid(
+            conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+        if (!dt.doneTraversal()) {
+          scanState.put(policyName, dt);
+        }
+      }
+      return returnSet;
+    }
 
     /**
      * Keep processing policies.
@@ -328,18 +445,11 @@
     private void doProcess() throws IOException, InterruptedException {
       PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
 
-      long prevExec = 0;
-      DistRaid dr = null;
       while (running) {
+        Thread.sleep(SLEEP_TIME);
 
-        boolean reload = configMgr.reloadConfigsIfNecessary();
-        while(!reload && now() < prevExec + configMgr.getPeriodicity()){
-          Thread.sleep(SLEEP_TIME);
-          reload = configMgr.reloadConfigsIfNecessary();
-        }
+        configMgr.reloadConfigsIfNecessary();
 
-        prevExec = now();
-        
         // activate all categories
         Collection<PolicyList> all = configMgr.getAllPolicies();
         
@@ -348,35 +458,18 @@
         PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
         Arrays.sort(sorted, lexi);
 
-        if (!isRaidLocal) {
-          dr = new DistRaid(conf);
-        }
-        // paths we have processed so far
-        List<String> processed = new LinkedList<String>();
-        
         for (PolicyList category : sorted) {
           for (PolicyInfo info: category.getAll()) {
 
-            long modTimePeriod = 0;
-            short srcReplication = 0;
-            String str = info.getProperty("modTimePeriod");
-            if (str != null) {
-               modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod")); 
-            }
-            str = info.getProperty("srcReplication");
-            if (str != null) {
-               srcReplication = Short.parseShort(info.getProperty("srcReplication")); 
+            if (!shouldSelectFiles(info)) {
+              continue;
             }
 
             LOG.info("Triggering Policy Filter " + info.getName() +
                      " " + info.getSrcPath());
             List<FileStatus> filteredPaths = null;
-            try { 
-              filteredPaths = selectFiles(conf, info.getSrcPath(), 
-                                          getDestinationPath(conf, info),
-                                          modTimePeriod,
-                                          srcReplication,
-                                          prevExec);
+            try {
+              filteredPaths = selectFiles(info);
             } catch (Exception e) {
               LOG.info("Exception while invoking filter on policy " + info.getName() +
                        " srcPath " + info.getSrcPath() + 
@@ -389,95 +482,41 @@
                continue;
             }
 
-            // If any of the filtered path has already been accepted 
-            // by a previous policy, then skip it.
-            for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
-              String fs = iter.next().getPath().toString() + "/";
-              for (String p : processed) {
-                if (p.startsWith(fs)) {
-                  iter.remove();
-                  break;
+            // Apply the action on accepted paths
+            LOG.info("Triggering Policy Action " + info.getName() +
+                     " " + info.getSrcPath());
+            try {
+              if (isRaidLocal){
+                doRaid(conf, info, filteredPaths);
+              }
+              else{
+                // We already checked that no job for this policy is running
+                // So we can start a new job.
+                DistRaid dr = new DistRaid(conf);
+                //add paths for distributed raiding
+                dr.addRaidPaths(info, filteredPaths);
+                boolean started = dr.startDistRaid();
+                if (started) {
+                  jobMonitor.monitorJob(info.getName(), dr);
                 }
               }
-            }
-
-            // Apply the action on accepted paths
-            LOG.info("Triggering Policy Action " + info.getName());
-            try {
-            	if (isRaidLocal){
-            	  doRaid(conf, info, filteredPaths);
-            	}
-            	else{
-            	  //add paths for distributed raiding
-            	  dr.addRaidPaths(info, filteredPaths);
-            	}
             } catch (Exception e) {
               LOG.info("Exception while invoking action on policy " + info.getName() +
                        " srcPath " + info.getSrcPath() + 
                        " exception " + StringUtils.stringifyException(e));
               continue;
             }
-
-            // add these paths to processed paths
-            for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
-              String p = iter.next().getPath().toString() + "/";
-              processed.add(p);
-            }
-          }
-        }
-        processed.clear(); // free up memory references before yielding
-
-        //do the distributed raiding
-        if (!isRaidLocal) {
-          dr.doDistRaid();
-        } 
-      }
-    }
-  }
-
-  /**
-   * Returns the policy that matches the specified path.
-   * The method below finds the first policy that matches an input path. Since different 
-   * policies with different purposes and destinations might be associated with the same input
-   * path, we should be skeptical about the places using the method and we should try to change
-   * the code to avoid it.
-   */
-  private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
-    PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-    Collection<PolicyList> all = configMgr.getAllPolicies();
-        
-    // sort all policies by reverse lexicographical order. This is needed
-    // to make the nearest policy take precedence.
-    PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
-    Arrays.sort(sorted, lexi);
-
-    // loop through all categories of policies.
-    for (PolicyList category : sorted) {
-      PolicyInfo first = category.getAll().iterator().next();
-      if (first != null) {
-        Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
-        if (srcPaths == null) {
-          continue;
-        }
-
-        for (Path src: srcPaths) {
-          if (inpath.toString().startsWith(src.toString())) {
-            // if the srcpath is a prefix of the specified path
-            // we have a match! 
-            return first;
           }
         }
       }
     }
-    return null; // no matching policies
   }
 
-  
   static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
     return new Path(destPathPrefix, makeRelative(srcPath));
   }
   
-  private static class ParityFilePair {
+  static class ParityFilePair {
     private Path path;
     private FileSystem fs;
     
@@ -506,11 +545,19 @@
    * @return Path object representing the parity file of the source
    * @throws IOException
    */
-  static private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
+  static ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
     Path srcParent = srcPath.getParent();
 
     FileSystem fsDest = destPathPrefix.getFileSystem(conf);
-
+    FileSystem fsSrc = srcPath.getFileSystem(conf);
+    
+    FileStatus srcStatus = null;
+    try {
+      srcStatus = fsSrc.getFileStatus(srcPath);
+    } catch (java.io.FileNotFoundException e) {
+      return null;
+    }
+    
     Path outDir = destPathPrefix;
     if (srcParent != null) {
       if (srcParent.getParent() == null) {
@@ -520,36 +567,36 @@
       }
     }
 
+    
+    //CASE 1: CHECK HAR - Must be checked first because har is created after
+    // parity file and returning the parity file could result in error while
+    // reading it.
+    Path outPath =  getOriginalParityFile(destPathPrefix, srcPath);
     String harDirName = srcParent.getName() + HAR_SUFFIX; 
     Path HarPath = new Path(outDir,harDirName);
-    Path outPath =  getOriginalParityFile(destPathPrefix, srcPath);
-
-    if (!fsDest.exists(HarPath)) {  // case 1: no HAR file
-      return new ParityFilePair(outPath,fsDest);
+    if (fsDest.exists(HarPath)) {  
+      URI HarPathUri = HarPath.toUri();
+      Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
+      FileSystem fsHar = new HarFileSystem(fsDest);
+      fsHar.initialize(inHarPath.toUri(), conf);
+      if (fsHar.exists(inHarPath)) {
+        FileStatus inHar = fsHar.getFileStatus(inHarPath);
+        if (inHar.getModificationTime() == srcStatus.getModificationTime()) {
+          return new ParityFilePair(inHarPath,fsHar);
+        }
+      }
+    }
+    
+    //CASE 2: CHECK PARITY
+    try {
+      FileStatus outHar = fsDest.getFileStatus(outPath);
+      if (outHar.getModificationTime() == srcStatus.getModificationTime()) {
+        return new ParityFilePair(outPath,fsDest);
+      }
+    } catch (java.io.FileNotFoundException e) {
     }
 
-    URI HarPathUri = HarPath.toUri();
-    Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
-    FileSystem fsHar = new HarFileSystem(fsDest);
-    fsHar.initialize(inHarPath.toUri(), conf);
-
-    if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
-      return new ParityFilePair(outPath,fsDest);
-    }
-
-    if (! fsDest.exists(outPath)) { // case 3: only inside HAR
-      return new ParityFilePair(inHarPath,fsHar);
-    }
-
-    // both inside and outside HAR. Should return most recent
-    FileStatus inHar = fsHar.getFileStatus(inHarPath);
-    FileStatus outHar = fsDest.getFileStatus(outPath);
-
-    if (inHar.getModificationTime() >= outHar.getModificationTime()) {
-      return new ParityFilePair(inHarPath,fsHar);
-    }
-
-    return new ParityFilePair(outPath,fsDest);
+    return null; // NULL if no parity file
   }
   
   private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
@@ -558,108 +605,6 @@
 	  
   }
   
- /**
-  * Returns a list of pathnames that needs raiding.
-  */
-  List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
-                                       long modTimePeriod, short srcReplication, long now) throws IOException {
-
-    List<FileStatus> returnSet = new LinkedList<FileStatus>();
-
-    // expand destination prefix path
-    Path destp = new Path(destPrefix.trim());
-    FileSystem fs = FileSystem.get(destp.toUri(), conf);
-    destp = destp.makeQualified(fs);
-
-    // Expand destination prefix path.
-    String destpstr = destp.toString();
-    if (!destpstr.endsWith(Path.SEPARATOR)) {
-      destpstr += Path.SEPARATOR;
-    }
-
-    fs = p.getFileSystem(conf);
-    FileStatus[] gpaths = fs.globStatus(p);
-    if (gpaths != null) {
-      for (FileStatus onepath: gpaths) {
-        String pathstr = onepath.getPath().makeQualified(fs).toString();
-        if (!pathstr.endsWith(Path.SEPARATOR)) {
-          pathstr += Path.SEPARATOR;
-        }
-        if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
-          LOG.info("Skipping source " + pathstr +
-                   " because it conflicts with raid directory " + destpstr);
-        } else {
-         recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
-        }
-      }
-    }
-    return returnSet;
-  }
-
-  /**
-   * Pick files that need to be RAIDed.
-   */
-  private void recurse(FileSystem srcFs,
-                       Configuration conf,
-                       Path destPathPrefix,
-                       FileStatus src,
-                       List<FileStatus> accept,
-                       long modTimePeriod, 
-                       short srcReplication, 
-                       long now) throws IOException {
-    Path path = src.getPath();
-    FileStatus[] files = null;
-    try {
-      files = srcFs.listStatus(path);
-    } catch (java.io.FileNotFoundException e) {
-      // ignore error because the file could have been deleted by an user
-      LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
-    } catch (IOException e) {
-      throw e;
-    }
-
-    // If the modTime of the raid file is later than the modtime of the
-    // src file and the src file has not been modified
-    // recently, then that file is a candidate for RAID.
-
-    if (src.isFile()) {
-
-      // if the source file has fewer than or equal to 2 blocks, then no need to RAID
-      long blockSize = src.getBlockSize();
-      if (2 * blockSize >= src.getLen()) {
-        return;
-      }
-
-      // check if destination path already exists. If it does and it's modification time
-      // does not match the modTime of the source file, then recalculate RAID
-      boolean add = false;
-      try {
-        ParityFilePair ppair = getParityFile(destPathPrefix, path);
-        Path outpath =  ppair.getPath();
-        FileSystem outFs = ppair.getFileSystem();
-        FileStatus ostat = outFs.getFileStatus(outpath);
-        if (ostat.getModificationTime() != src.getModificationTime() &&
-            src.getModificationTime() + modTimePeriod < now) {
-          add = true;
-         }
-      } catch (java.io.FileNotFoundException e) {
-        add = true; // destination file does not exist
-      }
-
-      if (add) {
-        accept.add(src);
-      }
-      return;
-
-    } else if (files != null) {
-      for (FileStatus one:files) {
-        if (!one.getPath().getName().endsWith(HAR_SUFFIX)){
-          recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
-        }
-      }
-    }
-  }
-
 
   /**
    * RAID a list of files.
@@ -668,8 +613,8 @@
       throws IOException {
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
-    int stripeLength = getStripeLength(conf, info);
-    String destPrefix = getDestinationPath(conf, info);
+    int stripeLength = getStripeLength(conf);
+    Path destPref = getDestinationPath(conf);
     String simulate = info.getProperty("simulate");
     boolean doSimulate = simulate == null ? false : Boolean
         .parseBoolean(simulate);
@@ -677,13 +622,9 @@
     Statistics statistics = new Statistics();
     int count = 0;
 
-    Path p = new Path(destPrefix.trim());
-    FileSystem fs = FileSystem.get(p.toUri(), conf);
-    p = p.makeQualified(fs);
-
     for (FileStatus s : paths) {
-      doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
-          stripeLength);
+      doRaid(conf, s, destPref, statistics, Reporter.NULL, doSimulate, targetRepl,
+             metaRepl, stripeLength);
       if (count % 1000 == 0) {
         LOG.info("RAID statistics " + statistics.toString());
       }
@@ -701,23 +642,16 @@
       FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
-    int stripeLength = getStripeLength(conf, info);
-    String destPrefix = getDestinationPath(conf, info);
+    int stripeLength = getStripeLength(conf);
+    Path destPref = getDestinationPath(conf);
     String simulate = info.getProperty("simulate");
     boolean doSimulate = simulate == null ? false : Boolean
         .parseBoolean(simulate);
 
-    int count = 0;
-
-    Path p = new Path(destPrefix.trim());
-    FileSystem fs = FileSystem.get(p.toUri(), conf);
-    p = p.makeQualified(fs);
-
-    doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
-        stripeLength);
+    doRaid(conf, src, destPref, statistics, reporter, doSimulate,
+           targetRepl, metaRepl, stripeLength);
   }
-  
-  
+
   /**
    * RAID an individual file
    */
@@ -784,25 +718,11 @@
                                   Path destPathPrefix, BlockLocation[] locations,
                                   int metaRepl, int stripeLength) throws IOException {
 
-    // two buffers for generating parity
-    Random rand = new Random();
-    int bufSize = 5 * 1024 * 1024; // 5 MB
-    byte[] bufs = new byte[bufSize];
-    byte[] xor = new byte[bufSize];
-
     Path inpath = stat.getPath();
-    long blockSize = stat.getBlockSize();
-    long fileSize = stat.getLen();
-
-    // create output tmp path
     Path outpath =  getOriginalParityFile(destPathPrefix, inpath);
     FileSystem outFs = outpath.getFileSystem(conf);
-   
-    Path tmppath =  new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + 
-                             outpath.toUri().getPath() + "." + 
-                             rand.nextLong() + ".tmp");
 
-    // if the parity file is already upto-date, then nothing to do
+    // If the parity file is already upto-date, then nothing to do
     try {
       FileStatus stmp = outFs.getFileStatus(outpath);
       if (stmp.getModificationTime() == stat.getModificationTime()) {
@@ -812,66 +732,11 @@
       }
     } catch (IOException e) {
       // ignore errors because the raid file might not exist yet.
-    } 
-
-    LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
-    FSDataOutputStream out = outFs.create(tmppath, 
-                                          true, 
-                                          conf.getInt("io.file.buffer.size", 64 * 1024), 
-                                          (short)metaRepl, 
-                                          blockSize);
-
-    try {
-
-      // loop once for every stripe length
-      for (int startBlock = 0; startBlock < locations.length;) {
-
-        // report progress to Map-reduce framework
-        if (reporter != null) {
-          reporter.progress();
-        }
-        int blocksLeft = locations.length - startBlock;
-        int stripe = Math.min(stripeLength, blocksLeft);
-        LOG.info(" startBlock " + startBlock + " stripe " + stripe);
-
-        // open a new file descriptor for each block in this stripe.
-        // make each fd point to the beginning of each block in this stripe.
-        FSDataInputStream[] ins = new FSDataInputStream[stripe];
-        for (int i = 0; i < stripe; i++) {
-          ins[i] = inFs.open(inpath, bufSize);
-          ins[i].seek(blockSize * (startBlock + i));
-        }
-
-        generateParity(ins,out,blockSize,bufs,xor, reporter);
-        
-        // close input file handles
-        for (int i = 0; i < ins.length; i++) {
-          ins[i].close();
-        }
-
-        // increment startBlock to point to the first block to be processed
-        // in the next iteration
-        startBlock += stripe;
-      }
-      out.close();
-      out = null;
-
-      // delete destination if exists
-      if (outFs.exists(outpath)){
-        outFs.delete(outpath, false);
-      }
-      // rename tmppath to the real parity filename
-      outFs.mkdirs(outpath.getParent());
-      if (!outFs.rename(tmppath, outpath)) {
-        String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
-        LOG.warn(msg);
-        throw new IOException (msg);
-      }
-    } finally {
-      // remove the tmp file if it still exists
-      outFs.delete(tmppath, false);  
     }
 
+    XOREncoder encoder = new XOREncoder(conf, stripeLength);
+    encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter);
+
     // set the modification time of the RAID file. This is done so that the modTime of the
     // RAID file reflects that contents of the source file that it has RAIDed. This should
     // also work for files that are being appended to. This is necessary because the time on
@@ -880,255 +745,39 @@
     outFs.setTimes(outpath, stat.getModificationTime(), -1);
 
     FileStatus outstat = outFs.getFileStatus(outpath);
-    LOG.info("Source file " + inpath + " of size " + fileSize +
+    FileStatus inStat = inFs.getFileStatus(inpath);
+    LOG.info("Source file " + inpath + " of size " + inStat.getLen() +
              " Parity file " + outpath + " of size " + outstat.getLen() +
              " src mtime " + stat.getModificationTime()  +
              " parity mtime " + outstat.getModificationTime());
   }
 
-  private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead) 
-      throws IOException {
-
-    int tread = 0;
-    
-    while (tread < toRead) {
-      int read = ins.read(bufs, tread, toRead - tread);
-      if (read == -1) {
-        return tread;
-      } else {
-        tread += read;
-      }
-    }
-    
-    return tread;
-  }
-  
-  private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout, 
-      long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException {
-    
-    int bufSize;
-    if ((bufs == null) || (bufs.length == 0)){
-      bufSize = 5 * 1024 * 1024; // 5 MB
-      bufs = new byte[bufSize];
-    } else {
-      bufSize = bufs.length;
-    }
-    if ((xor == null) || (xor.length != bufs.length)){
-      xor = new byte[bufSize];
-    }
-
-    int xorlen = 0;
-      
-    // this loop processes all good blocks in selected stripe
-    long remaining = parityBlockSize;
-    
-    while (remaining > 0) {
-      int toRead = (int)Math.min(remaining, bufSize);
-
-      if (ins.length > 0) {
-        xorlen = readInputUntilEnd(ins[0], xor, toRead);
-      }
-
-      // read all remaining blocks and xor them into the buffer
-      for (int i = 1; i < ins.length; i++) {
-
-        // report progress to Map-reduce framework
-        if (reporter != null) {
-          reporter.progress();
-        }
-        
-        int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
-        
-        int j;
-        int xorlimit = (int) Math.min(xorlen,actualRead);
-        for (j = 0; j < xorlimit; j++) {
-          xor[j] ^= bufs[j];
-        }
-        if ( actualRead > xorlen ){
-          for (; j < actualRead; j++) {
-            xor[j] = bufs[j];
-          }
-          xorlen = actualRead;
-        }
-        
-      }
-
-      if (xorlen < toRead) {
-        Arrays.fill(bufs, xorlen, toRead, (byte) 0);
-      }
-      
-      // write this to the tmp file
-      fout.write(xor, 0, toRead);
-      remaining -= toRead;
-    }
-  
-  }
-  
   /**
-   * Extract a good block from the parity block. This assumes that the corruption
-   * is in the main file and the parity file is always good.
+   * Extract a good block from the parity block. This assumes that the
+   * corruption is in the main file and the parity file is always good.
    */
-  public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix, 
-                            int stripeLength, long corruptOffset) throws IOException {
+  public static Path unRaid(Configuration conf, Path srcPath,
+      Path destPathPrefix, Decoder decoder, int stripeLength,
+      long corruptOffset) throws IOException {
 
-    // extract block locations, size etc from source file
-    Random rand = new Random();
-    FileSystem srcFs = srcPath.getFileSystem(conf);
-    FileStatus srcStat = srcFs.getFileStatus(srcPath);
-    long blockSize = srcStat.getBlockSize();
-    long fileSize = srcStat.getLen();
-
-    // find the stripe number where the corrupted offset lies
-    long snum = corruptOffset / (stripeLength * blockSize);
-    long startOffset = snum * stripeLength * blockSize;
-    long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
-    long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
-
-    LOG.info("Start offset of relevent stripe = " + startOffset +
-             " corruptBlockInStripe " + corruptBlockInStripe);
-
-    // open file descriptors to read all good blocks of the file
-    FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
-    int  numLength = 0;
-    for (int i = 0; i < stripeLength; i++) {
-      if (i == corruptBlockInStripe) {
-        continue;  // do not open corrupt block
-      }
-      if (startOffset + i * blockSize >= fileSize) {
-        LOG.info("Stop offset of relevent stripe = " + 
-                  startOffset + i * blockSize);
-        break;
-      }
-      instmp[numLength] = srcFs.open(srcPath);
-      instmp[numLength].seek(startOffset + i * blockSize);
-      numLength++;
+    // Test if parity file exists
+    ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
+    if (ppair == null) {
+      return null;
     }
 
-    // create array of inputstream, allocate one extra slot for 
-    // parity file. numLength could be smaller than stripeLength
-    // if we are processing the last partial stripe on a file.
-    numLength += 1;
-    FSDataInputStream[] ins = new FSDataInputStream[numLength];
-    for (int i = 0; i < numLength-1; i++) {
-      ins[i] = instmp[i];
-    }
-    LOG.info("Decompose a total of " + numLength + " blocks.");
-
-    // open and seek to the appropriate offset in parity file.
-    ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf); 
-    Path parityFile = ppair.getPath();
-    FileSystem parityFs = ppair.getFileSystem();
-    LOG.info("Parity file for " + srcPath + " is " + parityFile);
-    ins[numLength-1] = parityFs.open(parityFile);
-    ins[numLength-1].seek(snum * blockSize);
-    LOG.info("Parity file " + parityFile +
-             " seeking to relevent block at offset " + 
-             ins[numLength-1].getPos());
-
-    // create a temporary filename in the source filesystem
-    // do not overwrite an existing tmp file. Make it fail for now.
-    // We need to generate a unique name for this tmp file later on.
-    Path tmpFile = null;
-    FSDataOutputStream fout = null;
-    FileSystem destFs = destPathPrefix.getFileSystem(conf);
-    int retry = 5;
-    try {
-      tmpFile = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + "/" + 
-          rand.nextInt());
-      fout = destFs.create(tmpFile, false);
-    } catch (IOException e) {
-      if (retry-- <= 0) {
-        LOG.info("Unable to create temporary file " + tmpFile +
-                 " Aborting....");
-        throw e; 
-      }
-      LOG.info("Unable to create temporary file " + tmpFile +
-               "Retrying....");
-    }
-    LOG.info("Created recovered block file " + tmpFile);
-
-    // buffers for generating parity bits
-    int bufSize = 5 * 1024 * 1024; // 5 MB
-    byte[] bufs = new byte[bufSize];
-    byte[] xor = new byte[bufSize];
-   
-    generateParity(ins,fout,corruptBlockSize,bufs,xor,null);
-    
-    // close all files
-    fout.close();
-    for (int i = 0; i < ins.length; i++) {
-      ins[i].close();
-    }
-
-    // Now, reopen the source file and the recovered block file
-    // and copy all relevant data to new file
     final Path recoveryDestination = 
       new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+    FileSystem destFs = recoveryDestination.getFileSystem(conf);
     final Path recoveredPrefix = 
       destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
     final Path recoveredPath = 
-      new Path(recoveredPrefix + "." + rand.nextLong() + ".recovered");
+      new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
     LOG.info("Creating recovered file " + recoveredPath);
 
-    FSDataInputStream sin = srcFs.open(srcPath);
-    FSDataOutputStream out = destFs.create(recoveredPath, false, 
-                                             conf.getInt("io.file.buffer.size", 64 * 1024),
-                                             srcStat.getReplication(), 
-                                             srcStat.getBlockSize());
-
-    FSDataInputStream bin = destFs.open(tmpFile);
-    long recoveredSize = 0;
-
-    // copy all the good blocks (upto the corruption)
-    // from source file to output file
-    long remaining = corruptOffset / blockSize * blockSize;
-    while (remaining > 0) {
-      int toRead = (int)Math.min(remaining, bufSize);
-      sin.readFully(bufs, 0, toRead);
-      out.write(bufs, 0, toRead);
-      remaining -= toRead;
-      recoveredSize += toRead;
-    }
-    LOG.info("Copied upto " + recoveredSize + " from src file. ");
-
-    // copy recovered block to output file
-    remaining = corruptBlockSize;
-    while (recoveredSize < fileSize &&
-           remaining > 0) {
-      int toRead = (int)Math.min(remaining, bufSize);
-      bin.readFully(bufs, 0, toRead);
-      out.write(bufs, 0, toRead);
-      remaining -= toRead;
-      recoveredSize += toRead;
-    }
-    LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
-
-    // skip bad block in src file
-    if (recoveredSize < fileSize) {
-      sin.seek(sin.getPos() + corruptBlockSize); 
-    }
-
-    // copy remaining good data from src file to output file
-    while (recoveredSize < fileSize) {
-      int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
-      sin.readFully(bufs, 0, toRead);
-      out.write(bufs, 0, toRead);
-      recoveredSize += toRead;
-    }
-    out.close();
-    LOG.info("Completed writing " + recoveredSize + " bytes into " +
-             recoveredPath);
-              
-    sin.close();
-    bin.close();
-
-    // delete the temporary block file that was created.
-    destFs.delete(tmpFile, false);
-    LOG.info("Deleted temporary file " + tmpFile);
-
-    // copy the meta information from source path to the newly created
-    // recovered path
-    copyMetaInformation(destFs, srcStat, recoveredPath);
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    decoder.decodeFile(srcFs, srcPath, ppair.getFileSystem(),
+        ppair.getPath(), corruptOffset, recoveredPath);
 
     return recoveredPath;
   }
@@ -1179,35 +828,22 @@
         PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
         Arrays.sort(sorted, lexi);
 
-        // paths we have processed so far
-        Set<Path> processed = new HashSet<Path>();
-        
         for (PolicyList category : sorted) {
           for (PolicyInfo info: category.getAll()) {
 
             try {
               // expand destination prefix path
-              String destinationPrefix = getDestinationPath(conf, info);
-              Path destPref = new Path(destinationPrefix.trim());
-              FileSystem destFs = FileSystem.get(destPref.toUri(), conf);
-              destPref = destFs.makeQualified(destPref);
+              Path destPref = getDestinationPath(conf);
+              FileSystem destFs = destPref.getFileSystem(conf);
 
               //get srcPaths
               Path[] srcPaths = info.getSrcPathExpanded();
               
-              if ( srcPaths != null ){
+              if (srcPaths != null) {
                 for (Path srcPath: srcPaths) {
                   // expand destination prefix
                   Path destPath = getOriginalParityFile(destPref, srcPath);
 
-                  // if this destination path has already been processed as part
-                  // of another policy, then nothing more to do
-                  if (processed.contains(destPath)) {
-                    LOG.info("Obsolete parity files for policy " + 
-                            info.getName() + " has already been procesed.");
-                    continue;
-                  }
-
                   FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
                   FileStatus stat = null;
                   try {
@@ -1221,12 +857,8 @@
                     recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
                   }
 
-                  // this destination path has already been processed
-                  processed.add(destPath);
-
                 }
               }
-
             } catch (Exception e) {
               LOG.warn("Ignoring Exception while processing policy " + 
                        info.getName() + " " + 
@@ -1342,10 +974,8 @@
             try {
               long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
 
-              String destinationPrefix = getDestinationPath(conf, info);
-              Path destPref = new Path(destinationPrefix.trim());
+              Path destPref = getDestinationPath(conf);
               FileSystem destFs = destPref.getFileSystem(conf); 
-              destPref = destFs.makeQualified(destPref);
 
               //get srcPaths
               Path[] srcPaths = info.getSrcPathExpanded();
@@ -1407,7 +1037,11 @@
           recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
           shouldHar = false;
         } else if (one.getModificationTime() > cutoff ) {
-          shouldHar = false;
+          if (shouldHar) {
+            LOG.info("Cannot archive " + destPath + 
+                   " because " + one.getPath() + " was modified after cutoff");
+            shouldHar = false;
+          }
         }
       }
 
@@ -1433,6 +1067,7 @@
     }
 
     if ( shouldHar ) {
+      LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath );
       singleHar(destFs, dest, tmpHarPath);
     }
   } 
@@ -1493,56 +1128,25 @@
       LOG.info("Leaving Har thread.");
     }
     
-
-  }  
-  
-  /**
-   * If the config file has an entry for hdfs.raid.locations, then that overrides
-   * destination path specified in the raid policy file
-   */
-  static private String getDestinationPath(Configuration conf, PolicyInfo info) {
-    String locs = conf.get("hdfs.raid.locations"); 
-    if (locs != null) {
-      return locs;
-    }
-    locs = info.getDestinationPath();
-    if (locs == null) {
-      return DEFAULT_RAID_LOCATION;
-    }
-    return locs;
   }
 
   /**
-   * If the config file has an entry for hdfs.raid.stripeLength, then use that
-   * specified in the raid policy file
+   * Return the path prefix that stores the parity files
    */
-  static private int getStripeLength(Configuration conf, PolicyInfo info)
-    throws IOException {
-    int len = conf.getInt("hdfs.raid.stripeLength", 0); 
-    if (len != 0) {
-      return len;
-    }
-    String str = info.getProperty("stripeLength");
-    if (str == null) {
-      String msg = "hdfs.raid.stripeLength is not defined." +
-                   " Using a default " + DEFAULT_STRIPE_LENGTH;
-      LOG.info(msg);
-      return DEFAULT_STRIPE_LENGTH;
-    }
-    return Integer.parseInt(str);
+  static Path getDestinationPath(Configuration conf)
+      throws IOException {
+    String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
+    Path p = new Path(loc.trim());
+    FileSystem fs = FileSystem.get(p.toUri(), conf);
+    p = p.makeQualified(fs);
+    return p;
   }
 
   /**
-   * Copy the file owner, modtime, etc from srcPath to the recovered Path.
-   * It is possiible that we might have to retrieve file persmissions,
-   * quotas, etc too in future.
+   * Obtain stripe length from configuration
    */
-  static private void copyMetaInformation(FileSystem fs, FileStatus stat, 
-                                          Path recoveredPath) 
-    throws IOException {
-    fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
-    fs.setPermission(recoveredPath, stat.getPermission());
-    fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
+  public static int getStripeLength(Configuration conf) {
+    return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
   }
 
   /**
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
new file mode 100644
index 0000000..6818168
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.IOUtils;
+
+public class RaidUtils {
+  public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK)
+    throws IOException {
+    int toRead = buf.length;
+    int numRead = 0;
+    while (numRead < toRead) {
+      int nread = in.read(buf, numRead, toRead - numRead);
+      if (nread < 0) {
+        if (eofOK) {
+          // EOF hit, fill with zeros
+          Arrays.fill(buf, numRead, toRead, (byte)0);
+          numRead = toRead;
+        } else {
+          // EOF hit, throw.
+          throw new IOException("Premature EOF");
+        }
+      } else {
+        numRead += nread;
+      }
+    }
+  }
+
+  public static void copyBytes(
+    InputStream in, OutputStream out, byte[] buf, long count)
+    throws IOException {
+    for (long bytesRead = 0; bytesRead < count; ) {
+      int toRead = Math.min(buf.length, (int)(count - bytesRead));
+      IOUtils.readFully(in, buf, 0, toRead);
+      bytesRead += toRead;
+      out.write(buf, 0, toRead);
+    }
+  }
+
+  public static class ZeroInputStream extends InputStream
+	    implements Seekable, PositionedReadable {
+    private long endOffset;
+    private long pos;
+
+    public ZeroInputStream(long endOffset) {
+      this.endOffset = endOffset;
+      this.pos = 0;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (pos < endOffset) {
+        pos++;
+        return 0;
+      }
+      return -1;
+    }
+
+    @Override
+    public int available() throws IOException {
+      return (int)(endOffset - pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public void seek(long seekOffset) throws IOException {
+      if (seekOffset < endOffset) {
+        pos = seekOffset;
+      } else {
+        throw new IOException("Illegal Offset" + pos);
+      }
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      int count = 0;
+      for (; position < endOffset && count < length; position++) {
+        buffer[offset + count] = 0;
+        count++;
+      }
+      return count;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      int count = 0;
+      for (; position < endOffset && count < length; position++) {
+        buffer[offset + count] = 0;
+        count++;
+      }
+      if (count < length) {
+        throw new IOException("Premature EOF");
+      }
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      readFully(position, buffer, 0, buffer.length);
+    }
+  }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
new file mode 100644
index 0000000..3dfe592
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+public class XORDecoder extends Decoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.XORDecoder");
+
+  public XORDecoder(
+    Configuration conf, int stripeSize) {
+    super(conf, stripeSize, 1);
+  }
+
+  @Override
+  protected void fixErasedBlock(
+      FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+      long blockSize, long errorOffset, long bytesToSkip, long limit,
+      OutputStream out) throws IOException {
+    LOG.info("Fixing block at " + srcFile + ":" + errorOffset +
+             ", skipping " + bytesToSkip + ", limit " + limit);
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    ArrayList<FSDataInputStream> xorinputs = new ArrayList<FSDataInputStream>();
+
+    FSDataInputStream parityFileIn = parityFs.open(parityFile);
+    parityFileIn.seek(parityOffset(errorOffset, blockSize));
+    xorinputs.add(parityFileIn);
+
+    long errorBlockOffset = (errorOffset / blockSize) * blockSize;
+    long[] srcOffsets = stripeOffsets(errorOffset, blockSize);
+    for (int i = 0; i < srcOffsets.length; i++) {
+      if (srcOffsets[i] == errorBlockOffset) {
+        LOG.info("Skipping block at " + srcFile + ":" + errorBlockOffset);
+        continue;
+      }
+      if (srcOffsets[i] < srcStat.getLen()) {
+        FSDataInputStream in = fs.open(srcFile);
+        in.seek(srcOffsets[i]);
+        xorinputs.add(in);
+      }
+    }
+    FSDataInputStream[] inputs = xorinputs.toArray(
+                                    new FSDataInputStream[]{null});
+    ParityInputStream recovered =
+      new ParityInputStream(inputs, limit, readBufs[0], writeBufs[0]);
+    recovered.skip(bytesToSkip);
+    recovered.drain(out, null);
+  }
+
+  protected long[] stripeOffsets(long errorOffset, long blockSize) {
+    long[] offsets = new long[stripeSize];
+    long stripeIdx = errorOffset / (blockSize * stripeSize);
+    long startOffsetOfStripe = stripeIdx * stripeSize * blockSize;
+    for (int i = 0; i < stripeSize; i++) {
+      offsets[i] = startOffsetOfStripe + i * blockSize;
+    }
+    return offsets;
+  }
+
+  protected long parityOffset(long errorOffset, long blockSize) {
+    long stripeIdx = errorOffset / (blockSize * stripeSize);
+    return stripeIdx * blockSize;
+  }
+
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
new file mode 100644
index 0000000..3d113e6
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+public class XOREncoder extends Encoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.XOREncoder");
+  public XOREncoder(
+    Configuration conf, int stripeSize) {
+    super(conf, stripeSize, 1);
+  }
+
+  @Override
+  protected void encodeStripe(
+    InputStream[] blocks,
+    long stripeStartOffset,
+    long blockSize,
+    OutputStream[] outs,
+    Progressable reporter) throws IOException {
+    LOG.info("Peforming XOR ");
+    ParityInputStream parityIn =
+      new ParityInputStream(blocks, blockSize, readBufs[0], writeBufs[0]);
+    try {
+      parityIn.drain(outs[0], reporter);
+    } finally {
+      parityIn.close();
+    }
+  }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index f6e3aa6..3da2f15 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -47,11 +47,14 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
 import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
 
 public class TestRaidDfs extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -59,8 +62,7 @@
   final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
-  final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
-  final Random rand = new Random();
+  final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
   final static int NUM_DATANODES = 3;
 
   Configuration conf;
@@ -83,6 +85,9 @@
     // scan all policies once every 5 second
     conf.setLong("raid.policy.rescan.interval", 5000);
 
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
     // do not use map-reduce cluster for Raiding
     conf.setBoolean("fs.raidnode.local", true);
     conf.set("raid.server.address", "localhost:0");
@@ -133,80 +138,148 @@
     if (cnode != null) { cnode.stop(); cnode.join(); }
     if (dfs != null) { dfs.shutdown(); }
   }
+  
+  private LocatedBlocks getBlockLocations(Path file, long length)
+    throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+    return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length);
+  }
+
+  private LocatedBlocks getBlockLocations(Path file)
+    throws IOException {
+    FileStatus stat = fileSys.getFileStatus(file);
+    return getBlockLocations(file, stat.getLen());
+  }
+
+  private DistributedRaidFileSystem getRaidFS() throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+    Configuration clientConf = new Configuration(conf);
+    clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+    clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+    clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    URI dfsUri = dfs.getUri();
+    return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
+  }
+
+  public static void waitForFileRaided(
+    Log logger, FileSystem fileSys, Path file, Path destPath)
+  throws IOException, InterruptedException {
+    FileStatus parityStat = null;
+    String fileName = file.getName().toString();
+    // wait till file is raided
+    while (parityStat == null) {
+      logger.info("Waiting for files to be raided.");
+      try {
+        FileStatus[] listPaths = fileSys.listStatus(destPath);
+        if (listPaths != null) {
+          for (FileStatus f : listPaths) {
+            logger.info("File raided so far : " + f.getPath());
+            String found = f.getPath().getName().toString();
+            if (fileName.equals(found)) {
+              parityStat = f;
+              break;
+            }
+          }
+        }
+      } catch (FileNotFoundException e) {
+        //ignore
+      }
+      Thread.sleep(1000);                  // keep waiting
+    }
+
+    while (true) {
+      LocatedBlocks locations = null;
+      DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+      locations = dfs.getClient().namenode.getBlockLocations(
+                    file.toString(), 0, parityStat.getLen());
+      if (!locations.isUnderConstruction()) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    while (true) {
+      FileStatus stat = fileSys.getFileStatus(file);
+      if (stat.getReplication() == 1) break;
+      Thread.sleep(1000);
+    }
+  }
+
+  private void corruptBlockAndValidate(Path srcFile, Path destPath,
+    int[] listBlockNumToCorrupt, long blockSize, int numBlocks)
+  throws IOException, InterruptedException {
+    int repl = 1;
+    long crc = createTestFilePartialLastBlock(fileSys, srcFile, repl,
+                  numBlocks, blockSize);
+    long length = fileSys.getFileStatus(srcFile).getLen();
+
+    waitForFileRaided(LOG, fileSys, srcFile, destPath);
+
+    // Delete first block of file
+    for (int blockNumToCorrupt : listBlockNumToCorrupt) {
+      LOG.info("Corrupt block " + blockNumToCorrupt + " of file " + srcFile);
+      LocatedBlocks locations = getBlockLocations(srcFile);
+      corruptBlock(srcFile, locations.get(blockNumToCorrupt).getBlock(),
+            NUM_DATANODES, true);
+    }
+
+    // Validate
+    DistributedRaidFileSystem raidfs = getRaidFS();
+    assertTrue(validateFile(raidfs, srcFile, length, crc));
+  }
 
   /**
-   * Test DFS Raid
+   * Create a file, corrupt a block in it and ensure that the file can be
+   * read through DistributedRaidFileSystem.
    */
   public void testRaidDfs() throws Exception {
     LOG.info("Test testRaidDfs started.");
+
     long blockSize = 8192L;
-    int stripeLength = 3;
+    int numBlocks = 8;
+    int repl = 1;
     mySetup();
-    Path file1 = new Path("/user/dhruba/raidtest/file1");
+
+    // Create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    cnode = RaidNode.createRaidNode(null, localConf);
+
+    Path file = new Path("/user/dhruba/raidtest/file");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
-    long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize);
-    LOG.info("Test testPathFilter created test files");
-
-    // create an instance of the RaidNode
-    cnode = RaidNode.createRaidNode(null, conf);
-    
+    int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
     try {
-      FileStatus[] listPaths = null;
+      long crc = createTestFilePartialLastBlock(fileSys, file, repl,
+                  numBlocks, blockSize);
+      long length = fileSys.getFileStatus(file).getLen();
+      waitForFileRaided(LOG, fileSys, file, destPath);
+      LocatedBlocks locations = getBlockLocations(file);
 
-      // wait till file is raided
-      while (listPaths == null || listPaths.length != 1) {
-        LOG.info("Test testPathFilter waiting for files to be raided.");
-        try {
-          listPaths = fileSys.listStatus(destPath);
-        } catch (FileNotFoundException e) {
-          //ignore
-        }
-        Thread.sleep(1000);                  // keep waiting
+      for (int i = 0; i < corrupt.length; i++) {
+        int blockNumToCorrupt = corrupt[i][0];
+        LOG.info("Corrupt block " + blockNumToCorrupt + " of file");
+        corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(),
+          NUM_DATANODES, false);
+        validateFile(getRaidFS(), file, length, crc);
       }
-      assertEquals(listPaths.length, 1); // all files raided
-      LOG.info("Files raided so far : " + listPaths[0].getPath());
-
-      // extract block locations from File system. Wait till file is closed.
-      LocatedBlocks locations = null;
-      DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
-      while (true) {
-        locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(),
-                                                               0, listPaths[0].getLen());
-        if (!locations.isUnderConstruction()) {
-          break;
-        }
-        Thread.sleep(1000);
-      }
-
-      // filter all filesystem calls from client
-      Configuration clientConf = new Configuration(conf);
-      clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
-      clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-      URI dfsUri = dfs.getUri();
-      FileSystem.closeAll();
-      FileSystem raidfs = FileSystem.get(dfsUri, clientConf);
-      
-      assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem);
-      
-      LOG.info("Corrupt first block of file");
-      corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false);
-      validateFile(raidfs, file1, file1, crc1);
 
       // Corrupt one more block. This is expected to fail.
-      LOG.info("Corrupt second block of file");
-      corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false);
+      LOG.info("Corrupt one more block of file");
+      corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false);
       try {
-        validateFile(raidfs, file1, file1, crc1);
+        validateFile(getRaidFS(), file, length, crc);
         fail("Expected exception ChecksumException not thrown!");
       } catch (org.apache.hadoop.fs.ChecksumException e) {
       }
     } catch (Exception e) {
-      LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
+      LOG.info("testRaidDfs Exception " + e +
+                StringUtils.stringifyException(e));
       throw e;
     } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
       myTearDown();
     }
-    LOG.info("Test testPathFilter completed.");
+    LOG.info("Test testRaidDfs completed.");
   }
 
   /**
@@ -217,7 +290,7 @@
 
     try {
       Path file = new Path("/user/raid/raidtest/file1");
-      createOldFile(fileSys, file, 1, 7, 8192L);
+      createTestFile(fileSys, file, 1, 7, 8192L);
 
       // filter all filesystem calls from client
       Configuration clientConf = new Configuration(conf);
@@ -242,13 +315,15 @@
       myTearDown();
     }
   }
-  
+
   //
   // creates a file and populate it with random data. Returns its crc.
   //
-  private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+  public static long createTestFile(FileSystem fileSys, Path name, int repl,
+                        int numBlocks, long blocksize)
     throws IOException {
     CRC32 crc = new CRC32();
+    Random rand = new Random();
     FSDataOutputStream stm = fileSys.create(name, true,
                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
                                             (short)repl, blocksize);
@@ -264,19 +339,43 @@
   }
 
   //
+  // Creates a file with partially full last block. Populate it with random
+  // data. Returns its crc.
+  //
+  public static long createTestFilePartialLastBlock(
+      FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+    throws IOException {
+    CRC32 crc = new CRC32();
+    Random rand = new Random();
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, blocksize);
+    // Write whole blocks.
+    byte[] b = new byte[(int)blocksize];
+    for (int i = 1; i < numBlocks; i++) {
+      rand.nextBytes(b);
+      stm.write(b);
+      crc.update(b);
+    }
+    // Write partial block.
+    b = new byte[(int)blocksize/2 - 1];
+    rand.nextBytes(b);
+    stm.write(b);
+    crc.update(b);
+
+    stm.close();
+    return crc.getValue();
+  }
+  //
   // validates that file matches the crc.
   //
-  private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc) 
+  public static boolean validateFile(FileSystem fileSys, Path name, long length,
+                                  long crc) 
     throws IOException {
 
-    FileStatus stat1 = fileSys.getFileStatus(name1);
-    FileStatus stat2 = fileSys.getFileStatus(name2);
-    assertTrue(" Length of file " + name1 + " is " + stat1.getLen() + 
-               " is different from length of file " + name1 + " " + stat2.getLen(),
-               stat1.getLen() == stat2.getLen());
-
+    long numRead = 0;
     CRC32 newcrc = new CRC32();
-    FSDataInputStream stm = fileSys.open(name2);
+    FSDataInputStream stm = fileSys.open(name);
     final byte[] b = new byte[4192];
     int num = 0;
     while (num >= 0) {
@@ -284,19 +383,28 @@
       if (num < 0) {
         break;
       }
+      numRead += num;
       newcrc.update(b, 0, num);
     }
     stm.close();
+
+    if (numRead != length) {
+      LOG.info("Number of bytes read " + numRead +
+               " does not match file size " + length);
+      return false;
+    }
+
     LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
     if (newcrc.getValue() != crc) {
-      fail("CRC mismatch of files " + name1 + " with file " + name2);
+      LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
     }
+    return true;
   }
 
   /*
    * The Data directories for a datanode
    */
-  static private File[] getDataNodeDirs(int i) throws IOException {
+  private static File[] getDataNodeDirs(int i) throws IOException {
     File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
     File data_dir = new File(base_dir, "data");
     File dir1 = new File(data_dir, "data"+(2*i+1));
@@ -353,10 +461,32 @@
               (numCorrupted + numDeleted) > 0);
   }
 
-  //
-  // Corrupt specified block of file
-  //
-  void corruptBlock(Path file, Block blockNum) throws IOException {
-    corruptBlock(file, blockNum, NUM_DATANODES, true);
+  public static void corruptBlock(Path file, Block blockNum,
+                    int numDataNodes, long offset) throws IOException {
+    long id = blockNum.getBlockId();
+
+    // Now deliberately remove/truncate data blocks from the block.
+    //
+    for (int i = 0; i < numDataNodes; i++) {
+      File[] dirs = getDataNodeDirs(i);
+      
+      for (int j = 0; j < dirs.length; j++) {
+        File[] blocks = dirs[j].listFiles();
+        assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+        for (int idx = 0; idx < blocks.length; idx++) {
+          if (blocks[idx].getName().startsWith("blk_" + id) &&
+              !blocks[idx].getName().endsWith(".meta")) {
+            // Corrupt
+            File f = blocks[idx];
+            RandomAccessFile raf = new RandomAccessFile(f, "rw");
+            raf.seek(offset);
+            int data = raf.readInt();
+            raf.seek(offset);
+            raf.writeInt(data+1);
+            LOG.info("Corrupted block " + blocks[idx]);
+          }
+        }
+      }
+    }
   }
 }
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
new file mode 100644
index 0000000..119cae6
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestDirectoryTraversal extends TestCase {
+  final static Log LOG = LogFactory.getLog(
+                            "org.apache.hadoop.raid.TestDirectoryTraversal");
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+
+  MiniDFSCluster dfs = null;
+  FileSystem fs = null;
+  Configuration conf = null;
+
+  /**
+   * Test basic enumeration.
+   */
+  public void testEnumeration() throws IOException {
+    mySetup();
+
+    try {
+      Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+      createTestTree(topDir);
+
+      LOG.info("Enumerating files");
+      List<FileStatus> startPaths = new LinkedList<FileStatus>();
+      startPaths.add(fs.getFileStatus(topDir));
+      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+      List<FileStatus> selected = new LinkedList<FileStatus>();
+      while (true) {
+        FileStatus f = dt.getNextFile();
+        if (f == null) break;
+        assertEquals(false, f.isDir());
+        LOG.info(f.getPath());
+        selected.add(f);
+      }
+      assertEquals(5, selected.size());
+
+      LOG.info("Enumerating directories");
+      startPaths.clear();
+      startPaths.add(fs.getFileStatus(topDir));
+      dt = new DirectoryTraversal(fs, startPaths);
+      selected.clear();
+      while (true) {
+        FileStatus dir = dt.getNextDirectory();
+        if (dir == null) break;
+        assertEquals(true, dir.isDir());
+        LOG.info(dir.getPath());
+        selected.add(dir);
+      }
+      assertEquals(4, selected.size());
+    } finally {
+      myTearDown();
+    }
+  }
+
+  public void testSuspension() throws IOException {
+    mySetup();
+
+    try {
+      Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+      createTestTree(topDir);
+
+      String top = topDir.toString();
+      List<FileStatus> startPaths = new LinkedList<FileStatus>();
+      startPaths.add(fs.getFileStatus(new Path(top + "/a")));
+      startPaths.add(fs.getFileStatus(new Path(top + "/b")));
+      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+      int limit = 2;
+      short targetRepl = 1;
+      Path raid = new Path("/raid");
+      List<FileStatus> selected = dt.selectFilesToRaid(conf, targetRepl, raid,
+                                                        0, limit);
+      for (FileStatus f: selected) {
+        LOG.info(f.getPath());
+      }
+      assertEquals(limit, selected.size());
+
+      selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit);
+      for (FileStatus f: selected) {
+        LOG.info(f.getPath());
+      }
+      assertEquals(limit, selected.size());
+    } finally {
+      myTearDown();
+    }
+  }
+
+  /**
+   * Creates a test directory tree.
+   *            top
+   *           / | \
+   *          /  |  f5
+   *         a   b___
+   *        / \  |\  \
+   *       f1 f2 f3f4 c
+   */
+  private void createTestTree(Path topDir) throws IOException {
+    String top = topDir.toString();
+    fs.delete(topDir, true);
+
+    fs.mkdirs(topDir);
+    fs.create(new Path(top + "/f5")).close();
+
+    fs.mkdirs(new Path(top + "/a"));
+    createTestFile(new Path(top + "/a/f1"));
+    createTestFile(new Path(top + "/a/f2"));
+
+    fs.mkdirs(new Path(top + "/b"));
+    fs.mkdirs(new Path(top + "/b/c"));
+    createTestFile(new Path(top + "/b/f3"));
+    createTestFile(new Path(top + "/b/f4"));
+  }
+
+  private void createTestFile(Path file) throws IOException {
+    long blockSize = 8192;
+    byte[] bytes = new byte[(int)blockSize];
+    FSDataOutputStream stm = fs.create(file, false, 4096, (short)1, blockSize);
+    stm.write(bytes);
+    stm.write(bytes);
+    stm.write(bytes);
+    stm.close();
+    FileStatus stat = fs.getFileStatus(file);
+    assertEquals(blockSize, stat.getBlockSize());
+  }
+
+  private void mySetup() throws IOException {
+    conf = new Configuration();
+    dfs = new MiniDFSCluster(conf, 6, true, null);
+    dfs.waitActive();
+    fs = dfs.getFileSystem();
+  }
+
+  private void myTearDown() {
+    if (dfs != null) { dfs.shutdown(); }
+  }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
index 01b9b95..09fe785 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
@@ -82,6 +82,7 @@
     conf.setBoolean("fs.raidnode.local", local);
 
     conf.set("raid.server.address", "localhost:0");
+    conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
 
     // create a dfs and map-reduce cluster
     final int taskTrackers = 4;
@@ -101,12 +102,12 @@
   /**
    * create raid.xml file for RaidNode
    */
-  private void mySetup(String srcPath, long targetReplication,
-                long metaReplication, long stripeLength ) throws Exception {
+  private void mySetup(long targetReplication,
+                long metaReplication, long stripeLength) throws Exception {
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
-                   "<srcPath prefix=\"" + srcPath + "\"> " +
+                   "<srcPath prefix=\"/user/test/raidtest\"> " +
                      "<policy name = \"RaidTest1\"> " +
                         "<destPath> /destraid</destPath> " +
                         "<property> " +
@@ -162,7 +163,6 @@
   public void testRaidHar() throws Exception {
     LOG.info("Test testRaidHar  started.");
 
-    String srcPaths    []  = { "/user/test/raidtest", "/user/test/raid*" };
     long blockSizes    []  = {1024L};
     long stripeLengths []  = {5};
     long targetReplication = 1;
@@ -172,13 +172,11 @@
 
     createClusters(true);
     try {
-      for (String srcPath : srcPaths) {
-        for (long blockSize : blockSizes) {
-          for (long stripeLength : stripeLengths) {
-            doTestHar(iter, srcPath, targetReplication, metaReplication,
-                         stripeLength, blockSize, numBlock);
-            iter++;
-          }
+      for (long blockSize : blockSizes) {
+        for (long stripeLength : stripeLengths) {
+           doTestHar(iter, targetReplication, metaReplication,
+                       stripeLength, blockSize, numBlock);
+           iter++;
         }
       }
     } finally {
@@ -191,14 +189,14 @@
    * Create parity file, delete original file and then validate that
    * parity file is automatically deleted.
    */
-  private void doTestHar(int iter, String srcPath, long targetReplication,
+  private void doTestHar(int iter, long targetReplication,
                           long metaReplication, long stripeLength,
                           long blockSize, int numBlock) throws Exception {
     LOG.info("doTestHar started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
-    mySetup(srcPath, targetReplication, metaReplication, stripeLength);
-    RaidShell shell = null;
+    mySetup(targetReplication, metaReplication, stripeLength);
     Path dir = new Path("/user/test/raidtest/subdir/");
+    Path file1 = new Path(dir + "/file" + iter);
     RaidNode cnode = null;
     try {
       Path destPath = new Path("/destraid/user/test/raidtest/subdir");
@@ -211,21 +209,9 @@
       LOG.info("doTestHar created test files for iteration " + iter);
 
       // create an instance of the RaidNode
-      cnode = RaidNode.createRaidNode(null, conf);
-      int times = 10;
-
-      while (times-- > 0) {
-        try {
-          shell = new RaidShell(conf, cnode.getListenerAddress());
-        } catch (Exception e) {
-          LOG.info("doTestHar unable to connect to " + 
-              cnode.getListenerAddress() + " retrying....");
-          Thread.sleep(1000);
-          continue;
-        }
-        break;
-      }
-      LOG.info("doTestHar created RaidShell.");
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
       FileStatus[] listPaths = null;
 
       int maxFilesFound = 0;
@@ -234,6 +220,7 @@
         try {
           listPaths = fileSys.listStatus(destPath);
           int count = 0;
+          Path harPath = null;
           int filesFound = 0;
           if (listPaths != null) {
             for (FileStatus s : listPaths) {
@@ -250,6 +237,7 @@
                 // files since some parity files might get deleted by the
                 // purge thread.
                 assertEquals(10, maxFilesFound);
+                harPath = s.getPath();
                 count++;
               }
             }
@@ -260,11 +248,12 @@
         } catch (FileNotFoundException e) {
           //ignore
         }
-        LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " + 
+        LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
                  (listPaths == null ? "none" : listPaths.length));
         Thread.sleep(1000);                  // keep waiting
+
       }
-      
+
       fileSys.delete(dir, true);
       // wait till raid file is deleted
       int count = 1;
@@ -291,7 +280,6 @@
                                           StringUtils.stringifyException(e));
       throw e;
     } finally {
-      shell.close();
       if (cnode != null) { cnode.stop(); cnode.join(); }
     }
     LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
index 6613da0..1bb197a 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.GregorianCalendar;
 import java.util.Iterator;
@@ -64,8 +65,11 @@
 
   Configuration conf;
   String namenode = null;
+  String hftp = null;
   MiniDFSCluster dfs = null;
+  MiniMRCluster mr = null;
   FileSystem fileSys = null;
+  String jobTrackerName = null;
 
   /**
    * create mapreduce and dfs clusters
@@ -75,6 +79,7 @@
     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
     conf = new Configuration();
     conf.set("raid.config.file", CONFIG_FILE);
+    conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     conf.setBoolean("raid.config.reload", true);
     conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
     conf.setBoolean("dfs.permissions.enabled", true);
@@ -82,76 +87,138 @@
     // scan all policies once every 5 second
     conf.setLong("raid.policy.rescan.interval", 5000);
 
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
     // the RaidNode does the raiding inline (instead of submitting to map/reduce)
     conf.setBoolean("fs.raidnode.local", local);
 
     conf.set("raid.server.address", "localhost:0");
-    
+
     // create a dfs and map-reduce cluster
     final int taskTrackers = 4;
     final int jobTrackerPort = 60050;
 
-    dfs = new MiniDFSCluster(conf, 3, true, null);
+    dfs = new MiniDFSCluster(conf, 6, true, null);
     dfs.waitActive();
     fileSys = dfs.getFileSystem();
     namenode = fileSys.getUri().toString();
-  }
-    
-  /**
-   * create raid.xml file for RaidNode
-   */
-  private void mySetup(String path, short srcReplication, long targetReplication,
-                long metaReplication, long stripeLength) throws Exception {
-    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
-    fileWriter.write("<?xml version=\"1.0\"?>\n");
-    String str = "<configuration> " +
-                   "<srcPath prefix=\"" + path + "\"> " +
-                     "<policy name = \"RaidTest1\"> " +
-                        "<destPath> /destraid</destPath> " +
-                        "<property> " +
-                          "<name>srcReplication</name> " +
-                          "<value>" + srcReplication + "</value> " +
-                          "<description> pick only files whole replFactor is greater than or equal to " +
-                          "</description> " + 
-                        "</property> " +
-                        "<property> " +
-                          "<name>targetReplication</name> " +
-                          "<value>" + targetReplication + "</value> " +
-                          "<description>after RAIDing, decrease the replication factor of a file to this value." +
-                          "</description> " + 
-                        "</property> " +
-                        "<property> " +
-                          "<name>metaReplication</name> " +
-                          "<value>" + metaReplication + "</value> " +
-                          "<description> replication factor of parity file" +
-                          "</description> " + 
-                        "</property> " +
-                        "<property> " +
-                          "<name>stripeLength</name> " +
-                          "<value>" + stripeLength + "</value> " +
-                          "<description> the max number of blocks in a file to RAID together " +
-                          "</description> " + 
-                        "</property> " +
-                        "<property> " +
-                          "<name>modTimePeriod</name> " +
-                          "<value>2000</value> " + 
-                          "<description> time (milliseconds) after a file is modified to make it " +
-                                         "a candidate for RAIDing " +
-                          "</description> " + 
-                        "</property> " +
-                     "</policy>" +
-                   "</srcPath>" +
-                 "</configuration>";
-    fileWriter.write(str);
-    fileWriter.close();
+    mr = new MiniMRCluster(taskTrackers, namenode, 3);
+    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
 
-    
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", jobTrackerName);
   }
 
+  class ConfigBuilder {
+    private List<String> policies;
+
+    public ConfigBuilder() {
+      policies = new java.util.ArrayList<String>();
+    }
+
+    public void addPolicy(String name, short srcReplication,
+                          long targetReplication, long metaReplication, long stripeLength) {
+      String str =
+        "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+          "<policy name = \"" + name + "\"> " +
+             "<destPath> /destraid</destPath> " +
+             "<property> " +
+               "<name>srcReplication</name> " +
+               "<value>" + srcReplication + "</value> " +
+               "<description> pick only files whole replFactor is greater than or equal to " +
+               "</description> " +
+             "</property> " +
+             "<property> " +
+               "<name>targetReplication</name> " +
+               "<value>" + targetReplication + "</value> " +
+               "<description>after RAIDing, decrease the replication factor of a file to this value." +
+               "</description> " +
+             "</property> " +
+             "<property> " +
+               "<name>metaReplication</name> " +
+               "<value>" + metaReplication + "</value> " +
+               "<description> replication factor of parity file" +
+               "</description> " +
+             "</property> " +
+             "<property> " +
+               "<name>stripeLength</name> " +
+               "<value>" + stripeLength + "</value> " +
+               "<description> the max number of blocks in a file to RAID together " +
+               "</description> " +
+             "</property> " +
+             "<property> " +
+               "<name>modTimePeriod</name> " +
+               "<value>2000</value> " +
+               "<description> time (milliseconds) after a file is modified to make it " +
+                              "a candidate for RAIDing " +
+               "</description> " +
+             "</property> " +
+          "</policy>" +
+        "</srcPath>";
+      policies.add(str);
+    }
+
+    public void addPolicy(String name, String path, short srcReplication,
+                          long targetReplication, long metaReplication, long stripeLength) {
+      String str =
+        "<srcPath prefix=\"" + path + "\"> " +
+          "<policy name = \"" + name + "\"> " +
+             "<destPath> /destraid</destPath> " +
+             "<property> " +
+               "<name>srcReplication</name> " +
+               "<value>" + srcReplication + "</value> " +
+               "<description> pick only files whole replFactor is greater than or equal to " +
+               "</description> " + 
+             "</property> " +
+             "<property> " +
+               "<name>targetReplication</name> " +
+               "<value>" + targetReplication + "</value> " +
+               "<description>after RAIDing, decrease the replication factor of a file to this value." +
+               "</description> " + 
+             "</property> " +
+             "<property> " +
+               "<name>metaReplication</name> " +
+               "<value>" + metaReplication + "</value> " +
+               "<description> replication factor of parity file" +
+               "</description> " + 
+             "</property> " +
+             "<property> " +
+               "<name>stripeLength</name> " +
+               "<value>" + stripeLength + "</value> " +
+               "<description> the max number of blocks in a file to RAID together " +
+               "</description> " + 
+             "</property> " +
+             "<property> " +
+               "<name>modTimePeriod</name> " +
+               "<value>2000</value> " + 
+               "<description> time (milliseconds) after a file is modified to make it " +
+                              "a candidate for RAIDing " +
+               "</description> " + 
+             "</property> " +
+          "</policy>" +
+        "</srcPath>";
+      policies.add(str);
+    }
+
+    public void persist() throws IOException {
+      FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+      fileWriter.write("<?xml version=\"1.0\"?>\n");
+      fileWriter.write("<configuration>");
+      for (String policy: policies) {
+        fileWriter.write(policy);
+      }
+      fileWriter.write("</configuration>");
+      fileWriter.close();
+    }
+  }
+    
   /**
    * stop clusters created earlier
    */
   private void stopClusters() throws Exception {
+    if (mr != null) { mr.shutdown(); }
     if (dfs != null) { dfs.shutdown(); }
   }
 
@@ -168,8 +235,8 @@
     int  numBlock          = 11;
     int  iter = 0;
 
+    createClusters(true);
     try {
-      createClusters(true);
       for (long blockSize : blockSizes) {
         for (long stripeLength : stripeLengths) {
            doTestPathFilter(iter, targetReplication, metaReplication,
@@ -192,7 +259,10 @@
                           long blockSize, int numBlock) throws Exception {
     LOG.info("doTestPathFilter started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
-    mySetup("/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+    ConfigBuilder cb = new ConfigBuilder();
+    cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.persist();
+
     RaidShell shell = null;
     Path dir = new Path("/user/dhruba/raidtest/");
     Path file1 = new Path(dir + "/file" + iter);
@@ -230,7 +300,9 @@
           if (listPaths != null && listPaths.length == 1) {
             for (FileStatus s : listPaths) {
               LOG.info("doTestPathFilter found path " + s.getPath());
-              if (!s.getPath().toString().endsWith(".tmp")) {
+              if (!s.getPath().toString().endsWith(".tmp") &&
+                  fileSys.getFileStatus(file1).getReplication() ==
+                  targetReplication) {
                 count++;
               }
             }
@@ -247,28 +319,29 @@
       }
       // assertEquals(listPaths.length, 1); // all files raided
       LOG.info("doTestPathFilter all files found in Raid.");
+      Thread.sleep(20000); // Without this wait, unit test crashes
 
       // check for error at beginning of file
       if (numBlock >= 1) {
-        LOG.info("Check error at beginning of file.");
+        LOG.info("doTestPathFilter Check error at beginning of file.");
         simulateError(shell, fileSys, file1, crc1, 0);
       }
 
       // check for error at the beginning of second block
       if (numBlock >= 2) {
-        LOG.info("Check error at beginning of second block.");
+        LOG.info("doTestPathFilter Check error at beginning of second block.");
         simulateError(shell, fileSys, file1, crc1, blockSize + 1);
       }
 
       // check for error at the middle of third block
       if (numBlock >= 3) {
-        LOG.info("Check error at middle of third block.");
+        LOG.info("doTestPathFilter Check error at middle of third block.");
         simulateError(shell, fileSys, file1, crc1, 2 * blockSize + 10);
       }
 
       // check for error at the middle of second stripe
       if (numBlock >= stripeLength + 1) {
-        LOG.info("Check error at middle of second stripe.");
+        LOG.info("doTestPathFilter Check error at middle of second stripe.");
         simulateError(shell, fileSys, file1, crc1,
                                             stripeLength * blockSize + 100);
       }
@@ -297,8 +370,9 @@
     long stripeLength = 2;
     long blockSize = 1024;
     int numBlock = 3;
-    mySetup("/user/dhruba/policytest", srcReplication, targetReplication, metaReplication, stripeLength);
-    RaidShell shell = null;
+    ConfigBuilder cb = new ConfigBuilder();
+    cb.addPolicy("policy1", "/user/dhruba/policytest", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.persist();
     Path dir = new Path("/user/dhruba/policytest/");
     Path file1 = new Path(dir + "/file1");
     Path file2 = new Path(dir + "/file2");
@@ -309,21 +383,9 @@
       fileSys.delete(destPath, true);
 
       // create an instance of the RaidNode
-      cnode = RaidNode.createRaidNode(null, conf);
-      int times = 10;
-
-      while (times-- > 0) {
-        try {
-          shell = new RaidShell(conf, cnode.getListenerAddress());
-        } catch (Exception e) {
-          LOG.info("doCheckPolicy unable to connect to " + 
-              cnode.getListenerAddress() + " retrying....");
-          Thread.sleep(1000);
-          continue;
-        }
-        break;
-      }
-      LOG.info("doCheckPolicy created RaidShell.");
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
 
       // this file should be picked up RaidNode
       long crc2 = createOldFile(fileSys, file2, 2, numBlock, blockSize);
@@ -338,7 +400,9 @@
         if (listPaths != null && listPaths.length == 1) {
           for (FileStatus s : listPaths) {
             LOG.info("doCheckPolicy found path " + s.getPath());
-            if (!s.getPath().toString().endsWith(".tmp")) {
+            if (!s.getPath().toString().endsWith(".tmp") &&
+                fileSys.getFileStatus(file2).getReplication() ==
+                targetReplication) {
               count++;
               firstmodtime = s.getModificationTime();
             }
@@ -369,7 +433,9 @@
           for (FileStatus s : listPaths) {
             LOG.info("doCheckPolicy found path " + s.getPath() + " " + s.getModificationTime());
             if (!s.getPath().toString().endsWith(".tmp") &&
-                s.getModificationTime() > firstmodtime) {
+                s.getModificationTime() > firstmodtime &&
+                fileSys.getFileStatus(file2).getReplication() ==
+                targetReplication) {
               count++;
             }
           }
@@ -389,7 +455,6 @@
                                           StringUtils.stringifyException(e));
       throw e;
     } finally {
-      shell.close();
       if (cnode != null) { cnode.stop(); cnode.join(); }
       LOG.info("doTestPathFilter delete file " + file1);
       fileSys.delete(file1, false);
@@ -397,112 +462,93 @@
     LOG.info("doCheckPolicy completed:");
   }
 
+  private void createTestFiles(String path, String destpath) throws IOException {
+    long blockSize         = 1024L;
+    Path dir = new Path(path);
+    Path destPath = new Path(destpath);
+    fileSys.delete(dir, true);
+    fileSys.delete(destPath, true);
+   
+    for(int i = 0 ; i < 10; i++){
+      Path file = new Path(path + "file" + i);
+      createOldFile(fileSys, file, 1, 7, blockSize);
+    }
+  }
+
   /**
    * Test dist Raid
    */
   public void testDistRaid() throws Exception {
     LOG.info("Test testDistRaid started.");
-    long blockSize         = 1024L;
     long targetReplication = 2;
     long metaReplication   = 2;
     long stripeLength      = 3;
     short srcReplication = 1;
 
+    createClusters(false);
+    ConfigBuilder cb = new ConfigBuilder();
+    cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.addPolicy("policy2", "/user/dhruba/raidtest2", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.persist();
+
+    RaidNode cnode = null;
     try {
-      createClusters(false);
-      mySetup("/user/dhruba/raidtest", srcReplication, targetReplication, metaReplication, stripeLength);
+      createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
+      createTestFiles("/user/dhruba/raidtest2/", "/destraid/user/dhruba/raidtest2");
       LOG.info("Test testDistRaid created test files");
 
-      Path dir = new Path("/user/dhruba/raidtest/");
-      Path destPath = new Path("/destraid/user/dhruba/raidtest");
-      fileSys.delete(dir, true);
-      fileSys.delete(destPath, true);
-     
-      ConfigManager configMgr  = new ConfigManager(conf);
-      configMgr.reloadConfigsIfNecessary();
-      LOG.info(" testDistRaid ConfigFile Loaded");
-
-      // activate all categories
-      Collection<PolicyList> all = configMgr.getAllPolicies();   
-      PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
-      Iterator<PolicyInfo> pi = sorted[0].getAll().iterator();
-      PolicyInfo p = pi.next();
-      List<FileStatus> ll = new ArrayList<FileStatus>();
-
-      for(int i = 0 ; i < 10; i++){
-        Path file = new Path("/user/dhruba/raidtest/file"+i);
-        createOldFile(fileSys, file, 1, 7, blockSize);
-        FileStatus st = fileSys.getFileStatus(file);
-        ll.add(st);
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
+      // Verify the policies are parsed correctly
+      for (PolicyList policyList : cnode.getAllPolicies()) {
+        for (PolicyInfo p : policyList.getAll()) {
+          if (p.getName().equals("policy1")) {
+            Path srcPath = new Path("/user/dhruba/raidtest");
+            assertTrue(p.getSrcPath().equals(
+                srcPath.makeQualified(srcPath.getFileSystem(conf))));
+          } else {
+            assertTrue(p.getName().equals("policy2"));
+            Path srcPath = new Path("/user/dhruba/raidtest2");
+            assertTrue(p.getSrcPath().equals(
+                srcPath.makeQualified(srcPath.getFileSystem(conf))));
+          }
+          assertEquals(targetReplication,
+                       Integer.parseInt(p.getProperty("targetReplication")));
+          assertEquals(metaReplication,
+                       Integer.parseInt(p.getProperty("metaReplication")));
+          assertEquals(stripeLength,
+                       Integer.parseInt(p.getProperty("stripeLength")));
+        }
       }
-      
-      DistRaid dr = new DistRaid(conf);      
-      dr.addRaidPaths(p, ll);
-      dr.doDistRaid();
+
+      long start = System.currentTimeMillis();
+      final int MAX_WAITTIME = 300000;
+      while (cnode.jobMonitor.jobsMonitored() < 2 &&
+             System.currentTimeMillis() - start < MAX_WAITTIME) {
+        Thread.sleep(1000);
+      }
+      this.assertEquals(cnode.jobMonitor.jobsMonitored(), 2);
+
+      start = System.currentTimeMillis();
+      while (cnode.jobMonitor.jobsSucceeded() < 2 &&
+             System.currentTimeMillis() - start < MAX_WAITTIME) {
+        Thread.sleep(1000);
+      }
+      this.assertEquals(cnode.jobMonitor.jobsSucceeded(), 2);
+
       LOG.info("Test testDistRaid successful.");
       
     } catch (Exception e) {
       LOG.info("testDistRaid Exception " + e + StringUtils.stringifyException(e));
       throw e;
     } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
       stopClusters();
     }
     LOG.info("Test testDistRaid completed.");
   }
   
-  /**
-   * Test the case where the source and destination paths conflict.
-   * @throws Exception
-   */
-  public void testConflictingPaths() throws Exception {
-    LOG.info("Test testConflictingPaths started");
-    long targetReplication = 2;
-    long metaReplication   = 2;
-    long stripeLength      = 3;
-    short srcReplication = 1;
-    long modTimePeriod = 0;
-    try {
-      createClusters(false);
-      mySetup("/user/d/raidtest", srcReplication, targetReplication,
-          metaReplication, stripeLength);
-      // We dont need this to run, just need the object.
-      RaidNode cnode = RaidNode.createRaidNode(null, conf);
-      cnode.stop();
-      cnode.join();
-
-      createOldFile(fileSys, new Path("/user/d/raidtest/f1"), 2, 7, 8192L);
-      LOG.info("Test testConflictingPaths created test files");
-
-      long now = System.currentTimeMillis();
-
-      // Test the regular case.
-      LOG.info("Test testConflictingPaths testing the regular case");
-      List<FileStatus> selected = cnode.selectFiles(conf,
-          new Path("/user/d/raidtest*"), "/raid",
-          modTimePeriod, srcReplication, now);
-      assertTrue(selected.size() > 0);
-
-      // Test the conflicting case: src under dest.
-      LOG.info("Test testConflictingPaths testing src under dest");
-      selected = cnode.selectFiles(conf,
-          new Path("/user/d/raidtest*"), "/user/d",
-          modTimePeriod, srcReplication, now);
-      assertEquals(0, selected.size());
-
-      // Test the conflicting case: dest under src.
-      LOG.info("Test testConflictingPaths testing dest under src");
-      selected = cnode.selectFiles(conf,
-          new Path("/user/d*"), "/user/d/raidtest",
-          modTimePeriod, srcReplication, now);
-      assertEquals(0, selected.size());
-
-      LOG.info("Test testConflictingPaths succeeded.");
-    } finally {
-      stopClusters();
-    }
-    LOG.info("Test testConflictingPaths completed.");
-  }
-
   //
   // simulate a corruption at specified offset and verify that eveyrthing is good
   //
@@ -573,4 +619,50 @@
       fail("CRC mismatch of files " + name1 + " with file " + name2);
     }
   }
+
+  public void testSuspendTraversal() throws Exception {
+    LOG.info("Test testSuspendTraversal started.");
+    long targetReplication = 2;
+    long metaReplication   = 2;
+    long stripeLength      = 3;
+    short srcReplication = 1;
+
+    createClusters(false);
+    ConfigBuilder cb = new ConfigBuilder();
+    cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.persist();
+
+    RaidNode cnode = null;
+    try {
+      createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
+      LOG.info("Test testSuspendTraversal created test files");
+
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      localConf.setInt("raid.distraid.max.files", 3);
+      final int numJobsExpected = 4; // 10 test files: 4 jobs with 3 files each.
+      cnode = RaidNode.createRaidNode(null, localConf);
+
+      long start = System.currentTimeMillis();
+      final int MAX_WAITTIME = 300000;
+
+      start = System.currentTimeMillis();
+      while (cnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
+             System.currentTimeMillis() - start < MAX_WAITTIME) {
+        Thread.sleep(1000);
+      }
+      this.assertEquals(cnode.jobMonitor.jobsMonitored(), numJobsExpected);
+      this.assertEquals(cnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+
+      LOG.info("Test testSuspendTraversal successful.");
+
+    } catch (Exception e) {
+      LOG.info("testSuspendTraversal Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
+      stopClusters();
+    }
+    LOG.info("Test testSuspendTraversal completed.");
+  }
 }
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
index 311cbba..c2dcfdf 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
@@ -195,7 +195,6 @@
     LOG.info("doTestPurge started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
     mySetup(srcPath, targetReplication, metaReplication, stripeLength);
-    RaidShell shell = null;
     Path dir = new Path("/user/dhruba/raidtest/");
     Path file1 = new Path(dir + "/file" + iter);
     RaidNode cnode = null;
@@ -207,21 +206,9 @@
       LOG.info("doTestPurge created test files for iteration " + iter);
 
       // create an instance of the RaidNode
-      cnode = RaidNode.createRaidNode(null, conf);
-      int times = 10;
-
-      while (times-- > 0) {
-        try {
-          shell = new RaidShell(conf, cnode.getListenerAddress());
-        } catch (Exception e) {
-          LOG.info("doTestPurge unable to connect to " + 
-              cnode.getListenerAddress() + " retrying....");
-          Thread.sleep(1000);
-          continue;
-        }
-        break;
-      }
-      LOG.info("doTestPurge created RaidShell.");
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
       FileStatus[] listPaths = null;
 
       // wait till file is raided
@@ -266,7 +253,6 @@
                                           StringUtils.stringifyException(e));
       throw e;
     } finally {
-      shell.close();
       if (cnode != null) { cnode.stop(); cnode.join(); }
       LOG.info("doTestPurge delete file " + file1);
       fileSys.delete(file1, true);