MAPREDUCE-1892. RaidNode can allow layered policies more efficiently.
(Ramkumar Vadali via schen)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1030162 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index bd94fc1..3dc4b5a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -155,6 +155,9 @@
     MAPREDUCE-2051. Contribute a fair scheduler preemption system test.
     (Todd Lipcon via tomwhite)
 
+    MAPREDUCE-1892. RaidNode can allow layered policies more efficiently.
+    (Ramkumar Vadali via schen)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
index 180d71f..5ced753 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Implements depth-first traversal using a Stack object. The traversal
@@ -46,6 +47,16 @@
   private Stack<Node> stack = new Stack<Node>();
 
   /**
+   * A FileFilter object can be used to choose files during directory traversal.
+   */
+  public interface FileFilter {
+    /**
+     * @return a boolean value indicating if the file passes the filter.
+     */
+    boolean check(FileStatus f) throws IOException;
+  }
+
+  /**
    * Represents a directory node in directory traversal.
    */
   static class Node {
@@ -82,62 +93,21 @@
     pathIdx = 0;
   }
 
-  /**
-   * Choose some files to RAID.
-   * @param conf Configuration to use.
-   * @param raidDestPrefix Prefix of the path to RAID to.
-   * @param modTimePeriod Time gap before RAIDing.
-   * @param limit Limit on the number of files to choose.
-   * @return list of files to RAID.
-   * @throws IOException
-   */
-  public List<FileStatus> selectFilesToRaid(
-      Configuration conf, int targetRepl, Path raidDestPrefix,
-      long modTimePeriod, int limit) throws IOException {
-    List<FileStatus> selected = new LinkedList<FileStatus>();
-    int numSelected = 0;
-
-    long now = System.currentTimeMillis();
-    while (numSelected < limit) {
+  public List<FileStatus> getFilteredFiles(FileFilter filter, int limit)
+      throws IOException {
+    List<FileStatus> filtered = new LinkedList<FileStatus>();
+    int num = 0;
+    while (num < limit) {
       FileStatus next = getNextFile();
       if (next == null) {
         break;
       }
-      // We have the next file, do we want to select it?
-      // If the source file has fewer than or equal to 2 blocks, then skip it.
-      long blockSize = next.getBlockSize();
-      if (2 * blockSize >= next.getLen()) {
-        continue;
-      }
-
-      boolean select = false;
-      try {
-        Object ppair = RaidNode.getParityFile(
-            raidDestPrefix, next.getPath(), conf);
-        // Is there is a valid parity file?
-        if (ppair != null) {
-          // Is the source at the target replication?
-          if (next.getReplication() != targetRepl) {
-            // Select the file so that its replication can be set.
-            select = true;
-          } else {
-            // Nothing to do, don't select the file.
-            select = false;
-          }
-        } else if (next.getModificationTime() + modTimePeriod < now) {
-          // If there isn't a valid parity file, check if the file is too new.
-          select = true;
-        }
-      } catch (java.io.FileNotFoundException e) {
-        select = true; // destination file does not exist
-      }
-      if (select) {
-        selected.add(next);
-        numSelected++;
+      if (filter.check(next)) {
+        num++;
+        filtered.add(next);
       }
     }
-
-    return selected;
+    return filtered;
   }
 
   /**
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
new file mode 100644
index 0000000..2a660b4
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+
+public class RaidFilter {
+  static class Statistics {
+    long numRaided = 0;
+    long numTooNew = 0;
+    long sizeTooNew = 0;
+    long numTooSmall = 0;
+    long sizeTooSmall = 0;
+
+    public void aggregate(Statistics other) {
+      this.numRaided += other.numRaided;
+      this.numTooNew += other.numTooNew;
+      this.sizeTooNew += other.sizeTooNew;
+      this.numTooSmall += other.numTooSmall;
+      this.sizeTooSmall += other.sizeTooSmall;
+    }
+
+    public String toString() {
+      return "numRaided = " + numRaided +
+             ", numTooNew = " + numTooNew +
+             ", sizeTooNew = " + sizeTooNew +
+             ", numTooSmall = " + numTooSmall +
+             ", sizeTooSmall = " + sizeTooSmall;
+    }
+  }
+
+  static class TimeBasedFilter extends Configured
+    implements DirectoryTraversal.FileFilter {
+    int targetRepl;
+    Path raidDestPrefix;
+    long modTimePeriod;
+    long startTime;
+    Statistics stats = new Statistics();
+    String currentSrcPath = null;
+    long[] modTimePeriods = new long[0];
+    String[] otherSrcPaths = new String[0];
+
+    TimeBasedFilter(Configuration conf, Path destPrefix, int targetRepl,
+      long startTime, long modTimePeriod) {
+      super(conf);
+      this.raidDestPrefix = destPrefix;
+      this.targetRepl = targetRepl;
+      this.startTime = startTime;
+      this.modTimePeriod = modTimePeriod;
+    }
+
+    TimeBasedFilter(Configuration conf,
+      Path destPrefix, PolicyInfo info,
+      List<PolicyInfo> allPolicies, long startTime, Statistics stats) {
+      super(conf);
+      this.raidDestPrefix = destPrefix;
+      this.targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+      this.modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+      this.startTime = startTime;
+      this.stats = stats;
+      this.currentSrcPath = info.getSrcPath().toUri().getPath();
+      initializeOtherPaths(allPolicies);
+    }
+
+    private void initializeOtherPaths(List<PolicyInfo> allPolicies) {
+      ArrayList<PolicyInfo> tmp = new ArrayList<PolicyInfo>(allPolicies);
+      // Remove all policies where srcPath <= currentSrcPath or
+      // matchingPrefixLength is < length(currentSrcPath)
+      // The policies remaining are the only ones that could better
+      // select a file chosen by the current policy.
+      for (Iterator<PolicyInfo> it = tmp.iterator(); it.hasNext(); ) {
+        String src = it.next().getSrcPath().toUri().getPath();
+        if (src.compareTo(currentSrcPath) <= 0) {
+          it.remove();
+          continue;
+        }
+        int matchLen = matchingPrefixLength(src, currentSrcPath);
+        if (matchLen < currentSrcPath.length()) {
+          it.remove();
+        }
+      }
+      // Sort in reverse lexicographic order.
+      Collections.sort(tmp, new Comparator() {
+        public int compare(Object o1, Object o2) {
+          return 0 -
+            ((PolicyInfo)o1).getSrcPath().toUri().getPath().compareTo(
+              ((PolicyInfo)o1).getSrcPath().toUri().getPath());
+        }
+      });
+      otherSrcPaths = new String[tmp.size()];
+      modTimePeriods = new long[otherSrcPaths.length];
+      for (int i = 0; i < otherSrcPaths.length; i++) {
+        otherSrcPaths[i] = tmp.get(i).getSrcPath().toUri().getPath();
+        modTimePeriods[i] = Long.parseLong(
+          tmp.get(i).getProperty("modTimePeriod"));
+      }
+    }
+
+    public boolean check(FileStatus f) throws IOException {
+      if (!canChooseForCurrentPolicy(f)) {
+        return false;
+      }
+
+      // If the source file has fewer than or equal to 2 blocks, then skip it.
+      long blockSize = f.getBlockSize();
+      if (2 * blockSize >= f.getLen()) {
+        stats.numTooSmall++;
+        stats.sizeTooSmall += f.getLen();
+        return false;
+      }
+
+      boolean select = false;
+      try {
+        Object ppair = RaidNode.getParityFile(
+            raidDestPrefix, f.getPath(), getConf());
+        // Is there is a valid parity file?
+        if (ppair != null) {
+          // Is the source at the target replication?
+          if (f.getReplication() != targetRepl) {
+            // Select the file so that its replication can be set.
+            select = true;
+          } else {
+            stats.numRaided++;
+            // Nothing to do, don't select the file.
+            select = false;
+          }
+        } else {
+          // No parity file.
+          if (f.getModificationTime() + modTimePeriod < startTime) {
+            // If the file is not too new, choose it for raiding.
+            select = true;
+          } else {
+            select = false;
+            stats.numTooNew++;
+            stats.sizeTooNew += f.getLen();
+          }
+        }
+      } catch (java.io.FileNotFoundException e) {
+        select = true; // destination file does not exist
+      } catch (java.io.IOException e) {
+        // If there is a problem with the har path, this will let us continue.
+        DirectoryTraversal.LOG.error(
+          "Error while selecting " + StringUtils.stringifyException(e));
+      }
+      return select;
+    }
+
+    /**
+     * Checks if a file can be chosen for the current policy.
+     */
+    boolean canChooseForCurrentPolicy(FileStatus stat) {
+      boolean choose = true;
+      if (otherSrcPaths.length > 0) {
+        String fileStr = stat.getPath().toUri().getPath();
+
+        // For a given string, find the best matching srcPath.
+        int matchWithCurrent = matchingPrefixLength(fileStr, currentSrcPath);
+        for (int i = 0; i < otherSrcPaths.length; i++) {
+          // If the file is too new, move to the next.
+          if (stat.getModificationTime() > startTime - modTimePeriods[i]) {
+            continue;
+          }
+          int matchLen = matchingPrefixLength(fileStr, otherSrcPaths[i]);
+          if (matchLen > 0 &&
+              fileStr.charAt(matchLen - 1) == Path.SEPARATOR_CHAR) {
+            matchLen--;
+          }
+          if (matchLen > matchWithCurrent) {
+            choose = false;
+            break;
+          }
+        }
+      }
+      return choose;
+    }
+
+    int matchingPrefixLength(final String s1, final String s2) {
+      int len = 0;
+      for (int j = 0; j < s1.length() && j < s2.length(); j++) {
+        if (s1.charAt(j) == s2.charAt(j)) {
+          len++;
+        } else {
+          break;
+        }
+      }
+      return len;
+    }
+  }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index 5507246..ec2f123 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -22,6 +22,7 @@
 import java.io.FileNotFoundException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Iterator;
@@ -334,9 +335,19 @@
    */
   class TriggerMonitor implements Runnable {
 
-    private Map<String, Long> scanTimes = new HashMap<String, Long>();
-    private Map<String, DirectoryTraversal> scanState =
-      new HashMap<String, DirectoryTraversal>();
+    class ScanState {
+      long fullScanStartTime;
+      DirectoryTraversal pendingTraversal;
+      RaidFilter.Statistics stats;
+      ScanState() {
+        fullScanStartTime = 0;
+        pendingTraversal = null;
+        stats = new RaidFilter.Statistics();
+      }
+    }
+
+    private Map<String, ScanState> scanStateMap =
+      new HashMap<String, ScanState>();
 
     /**
      */
@@ -357,9 +368,10 @@
      */
     private boolean shouldSelectFiles(PolicyInfo info) {
       String policyName = info.getName();
+      ScanState scanState = scanStateMap.get(policyName);
       int runningJobsCount = jobMonitor.runningJobsCount(policyName);
       // Is there a scan in progress for this policy?
-      if (scanState.containsKey(policyName)) {
+      if (scanState.pendingTraversal != null) {
         int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
 
         // If there is a scan in progress for this policy, we can have
@@ -373,12 +385,8 @@
         }
         // Check the time of the last full traversal before starting a fresh
         // traversal.
-        if (scanTimes.containsKey(policyName)) {
-          long lastScan = scanTimes.get(policyName);
-          return (now() > lastScan + configMgr.getPeriodicity());
-        } else {
-          return true;
-        }
+        long lastScan = scanState.fullScanStartTime;
+        return (now() > lastScan + configMgr.getPeriodicity());
       }
     }
 
@@ -388,33 +396,29 @@
     * traversal.
     * The number of paths returned is limited by raid.distraid.max.jobs.
     */
-    private List<FileStatus> selectFiles(PolicyInfo info) throws IOException {
+    private List<FileStatus> selectFiles(
+      PolicyInfo info, ArrayList<PolicyInfo> allPolicies) throws IOException {
       Path destPrefix = getDestinationPath(conf);
       String policyName = info.getName();
       Path srcPath = info.getSrcPath();
-      long modTimePeriod = 0;
-      String str = info.getProperty("modTimePeriod");
-      if (str != null) {
-         modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
-      }
-      short srcReplication = 0;
-      str = info.getProperty("srcReplication");
-      if (str != null) {
-        srcReplication = Short.parseShort(info.getProperty("srcReplication"));
-      }
+      long modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
 
       // Max number of files returned.
       int selectLimit = configMgr.getMaxFilesPerJob();
       int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
 
+      long selectStartTime = System.currentTimeMillis();
+
+      ScanState scanState = scanStateMap.get(policyName);
       // If we have a pending traversal, resume it.
-      if (scanState.containsKey(policyName)) {
-        DirectoryTraversal dt = scanState.get(policyName);
+      if (scanState.pendingTraversal != null) {
+        DirectoryTraversal dt = scanState.pendingTraversal;
         LOG.info("Resuming traversal for policy " + policyName);
-        List<FileStatus> returnSet = dt.selectFilesToRaid(
-            conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+        DirectoryTraversal.FileFilter filter =
+          filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
+        List<FileStatus> returnSet = dt.getFilteredFiles(filter, selectLimit);
         if (dt.doneTraversal()) {
-          scanState.remove(policyName);
+          scanState.pendingTraversal = null;
         }
         return returnSet;
       }
@@ -445,12 +449,13 @@
         }
 
         // Set the time for a new traversal.
-        scanTimes.put(policyName, now());
+        scanState.fullScanStartTime = now();
         DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
-        returnSet = dt.selectFilesToRaid(
-            conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+        DirectoryTraversal.FileFilter filter =
+          filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
+        returnSet = dt.getFilteredFiles(filter, selectLimit);
         if (!dt.doneTraversal()) {
-          scanState.put(policyName, dt);
+          scanState.pendingTraversal = dt;
         }
       }
       return returnSet;
@@ -461,73 +466,86 @@
      * If the config file has changed, then reload config file and start afresh.
      */
     private void doProcess() throws IOException, InterruptedException {
-      PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-
+      ArrayList<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();
+      for (PolicyList category : configMgr.getAllPolicies()) {
+        for (PolicyInfo info: category.getAll()) {
+          allPolicies.add(info);
+        }
+      }
       while (running) {
         Thread.sleep(SLEEP_TIME);
 
-        configMgr.reloadConfigsIfNecessary();
-
-        // activate all categories
-        Collection<PolicyList> all = configMgr.getAllPolicies();
-        
-        // sort all policies by reverse lexicographical order. This is needed
-        // to make the nearest policy take precedence.
-        PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
-        Arrays.sort(sorted, lexi);
-
-        for (PolicyList category : sorted) {
-          for (PolicyInfo info: category.getAll()) {
-
-            if (!shouldSelectFiles(info)) {
-              continue;
-            }
-
-            LOG.info("Triggering Policy Filter " + info.getName() +
-                     " " + info.getSrcPath());
-            List<FileStatus> filteredPaths = null;
-            try {
-              filteredPaths = selectFiles(info);
-            } catch (Exception e) {
-              LOG.info("Exception while invoking filter on policy " + info.getName() +
-                       " srcPath " + info.getSrcPath() + 
-                       " exception " + StringUtils.stringifyException(e));
-              continue;
-            }
-
-            if (filteredPaths == null || filteredPaths.size() == 0) {
-              LOG.info("No filtered paths for policy " + info.getName());
-               continue;
-            }
-
-            // Apply the action on accepted paths
-            LOG.info("Triggering Policy Action " + info.getName() +
-                     " " + info.getSrcPath());
-            try {
-              if (isRaidLocal){
-                doRaid(conf, info, filteredPaths);
-              }
-              else{
-                // We already checked that no job for this policy is running
-                // So we can start a new job.
-                DistRaid dr = new DistRaid(conf);
-                //add paths for distributed raiding
-                dr.addRaidPaths(info, filteredPaths);
-                boolean started = dr.startDistRaid();
-                if (started) {
-                  jobMonitor.monitorJob(info.getName(), dr);
-                }
-              }
-            } catch (Exception e) {
-              LOG.info("Exception while invoking action on policy " + info.getName() +
-                       " srcPath " + info.getSrcPath() + 
-                       " exception " + StringUtils.stringifyException(e));
-              continue;
+        boolean reloaded = configMgr.reloadConfigsIfNecessary();
+        if (reloaded) {
+          allPolicies.clear();
+          for (PolicyList category : configMgr.getAllPolicies()) {
+            for (PolicyInfo info: category.getAll()) {
+              allPolicies.add(info);
             }
           }
         }
+
+        for (PolicyInfo info: allPolicies) {
+          if (!scanStateMap.containsKey(info.getName())) {
+            scanStateMap.put(info.getName(), new ScanState());
+          }
+
+          if (!shouldSelectFiles(info)) {
+            continue;
+          }
+
+          LOG.info("Triggering Policy Filter " + info.getName() +
+                   " " + info.getSrcPath());
+          List<FileStatus> filteredPaths = null;
+          try {
+            filteredPaths = selectFiles(info, allPolicies);
+          } catch (Exception e) {
+            LOG.info("Exception while invoking filter on policy " + info.getName() +
+                     " srcPath " + info.getSrcPath() + 
+                     " exception " + StringUtils.stringifyException(e));
+            continue;
+          }
+
+          if (filteredPaths == null || filteredPaths.size() == 0) {
+            LOG.info("No filtered paths for policy " + info.getName());
+             continue;
+          }
+
+          // Apply the action on accepted paths
+          LOG.info("Triggering Policy Action " + info.getName() +
+                   " " + info.getSrcPath());
+          try {
+            if (isRaidLocal){
+              doRaid(conf, info, filteredPaths);
+            }
+            else{
+              // We already checked that no job for this policy is running
+              // So we can start a new job.
+              DistRaid dr = new DistRaid(conf);
+              //add paths for distributed raiding
+              dr.addRaidPaths(info, filteredPaths);
+              boolean started = dr.startDistRaid();
+              if (started) {
+                jobMonitor.monitorJob(info.getName(), dr);
+              }
+            }
+          } catch (Exception e) {
+            LOG.info("Exception while invoking action on policy " + info.getName() +
+                     " srcPath " + info.getSrcPath() + 
+                     " exception " + StringUtils.stringifyException(e));
+            continue;
+          }
+        }
       }
     }
+
+    DirectoryTraversal.FileFilter filterForPolicy(
+      long startTime, PolicyInfo info, List<PolicyInfo> allPolicies,
+      RaidFilter.Statistics stats)
+      throws IOException {
+      return new RaidFilter.TimeBasedFilter(conf, getDestinationPath(conf),
+        info, allPolicies, startTime, stats);
+    }
   }
 
   static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
@@ -618,9 +636,9 @@
   }
   
   private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
-	  
-	  return getParityFile(destPathPrefix, srcPath, conf);
-	  
+    
+    return getParityFile(destPathPrefix, srcPath, conf);
+    
   }
   
 
@@ -825,8 +843,6 @@
      * destination directories.
      */
     private void doPurge() throws IOException, InterruptedException {
-      PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-
       long prevExec = 0;
       while (running) {
 
@@ -838,16 +854,8 @@
 
         LOG.info("Started purge scan");
         prevExec = now();
-        
-        // fetch all categories
-        Collection<PolicyList> all = configMgr.getAllPolicies();
-        
-        // sort all policies by reverse lexicographical order. This is 
-        // needed to make the nearest policy take precedence.
-        PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
-        Arrays.sort(sorted, lexi);
 
-        for (PolicyList category : sorted) {
+        for (PolicyList category : configMgr.getAllPolicies()) {
           for (PolicyInfo info: category.getAll()) {
 
             try {
@@ -857,7 +865,7 @@
 
               //get srcPaths
               Path[] srcPaths = info.getSrcPathExpanded();
-              
+
               if (srcPaths != null) {
                 for (Path srcPath: srcPaths) {
                   // expand destination prefix
@@ -1017,9 +1025,6 @@
   }
 
   private void doHar() throws IOException, InterruptedException {
-    
-    PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-
     long prevExec = 0;
     while (running) {
 
@@ -1031,16 +1036,8 @@
 
       LOG.info("Started archive scan");
       prevExec = now();
-      
-      // fetch all categories
-      Collection<PolicyList> all = configMgr.getAllPolicies();
-            
-      // sort all policies by reverse lexicographical order. This is 
-      // needed to make the nearest policy take precedence.
-      PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
-      Arrays.sort(sorted, lexi);
 
-      for (PolicyList category : sorted) {
+      for (PolicyList category : configMgr.getAllPolicies()) {
         for (PolicyInfo info: category.getAll()) {
           String str = info.getProperty("time_before_har");
           String tmpHarPath = info.getProperty("har_tmp_dir");
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
index c58eb8c..90e1ad4 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
@@ -137,7 +137,7 @@
   /**
    * Get the srcPath
    */
-  public Path getSrcPath() throws IOException {
+  public Path getSrcPath() {
     return srcPath;
   }
 
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
index 187ed25..eb3dc6e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
@@ -24,7 +24,6 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.LinkedList;
-import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -78,18 +77,6 @@
     return category;
   }
 
-  /**
-   * Sort Categries based on their srcPath. reverse lexicographical order.
-   */
-  public static class CompareByPath implements Comparator<PolicyList> {
-    public CompareByPath() throws IOException {
-    }
-    public int compare(PolicyList l1, PolicyList l2) {
-      return 0 - l1.getSrcPath().compareTo(l2.getSrcPath());
-    }
-  }
-  
-  
   //////////////////////////////////////////////////
   // Writable
   //////////////////////////////////////////////////
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
index 8a2805a..d7288ac 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
@@ -239,18 +239,16 @@
       corruptBlock(locs.get(0).getBlock().getBlockName());
       reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
 
-      // This should fail.
-      boolean caughtChecksumException = false;
       try {
         Thread.sleep(5*1000);
       } catch (InterruptedException ignore) {
       }
       try {
         TestRaidDfs.validateFile(dfs, file1, file1Len, crc1);
+        fail("Expected exception not thrown");
       } catch (org.apache.hadoop.fs.ChecksumException ce) {
-        caughtChecksumException = true;
+      } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
       }
-      assertTrue(caughtChecksumException);
     } catch (Exception e) {
       LOG.info("Test testGeneratedBlock Exception " + e + StringUtils.stringifyException(e));
       throw e;
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
index 119cae6..9f7f710 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
 
 public class TestDirectoryTraversal extends TestCase {
   final static Log LOG = LogFactory.getLog(
@@ -104,14 +105,16 @@
       int limit = 2;
       short targetRepl = 1;
       Path raid = new Path("/raid");
-      List<FileStatus> selected = dt.selectFilesToRaid(conf, targetRepl, raid,
-                                                        0, limit);
+      DirectoryTraversal.FileFilter filter =
+        new RaidFilter.TimeBasedFilter(conf,
+          RaidNode.getDestinationPath(conf), 1, System.currentTimeMillis(), 0);
+      List<FileStatus> selected = dt.getFilteredFiles(filter, limit);
       for (FileStatus f: selected) {
         LOG.info(f.getPath());
       }
       assertEquals(limit, selected.size());
 
-      selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit);
+      selected = dt.getFilteredFiles(filter, limit);
       for (FileStatus f: selected) {
         LOG.info(f.getPath());
       }
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
new file mode 100644
index 0000000..9b1cdd9
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+
+public class TestRaidFilter extends TestCase {
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static Log LOG =
+    LogFactory.getLog("org.apache.hadoop.raid.TestRaidFilter");
+
+  Configuration conf;
+  MiniDFSCluster dfs = null;
+  FileSystem fs = null;
+
+  private void mySetup() throws Exception {
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+    dfs = new MiniDFSCluster(conf, 2, true, null);
+    dfs.waitActive();
+    fs = dfs.getFileSystem();
+    String namenode = fs.getUri().toString();
+    FileSystem.setDefaultUri(conf, namenode);
+  }
+
+  private void myTearDown() throws Exception {
+    if (dfs != null) { dfs.shutdown(); }
+  }
+
+  public void testLayeredPolicies() throws Exception {
+    mySetup();
+    Path src1 = new Path("/user/foo");
+    Path src2 = new Path("/user/foo/bar");
+
+    PolicyInfo info1 = new PolicyInfo("p1", conf);
+    info1.setSrcPath(src1.toString());
+    info1.setDescription("test policy");
+    info1.setProperty("targetReplication", "1");
+    info1.setProperty("metaReplication", "1");
+    info1.setProperty("modTimePeriod", "0");
+
+    PolicyInfo info2 = new PolicyInfo("p2", conf);
+    info2.setSrcPath(src2.toString());
+    info2.setDescription("test policy");
+    info2.setProperty("targetReplication", "1");
+    info2.setProperty("metaReplication", "1");
+    info2.setProperty("modTimePeriod", "0");
+
+    ArrayList<PolicyInfo> all = new ArrayList<PolicyInfo>();
+    all.add(info1);
+    all.add(info2);
+
+    try {
+      long blockSize = 1024;
+      byte[] bytes = new byte[(int)blockSize];
+      Path f1 = new Path(src1, "f1");
+      Path f2 = new Path(src2, "f2");
+      FSDataOutputStream stm1 = fs.create(f1, false, 4096, (short)1, blockSize);
+      FSDataOutputStream stm2 = fs.create(f2, false, 4096, (short)1, blockSize);
+      FSDataOutputStream[]  stms = new FSDataOutputStream[]{stm1, stm2};
+      for (FSDataOutputStream stm: stms) {
+        stm.write(bytes);
+        stm.write(bytes);
+        stm.write(bytes);
+        stm.close();
+      }
+
+      Thread.sleep(1000);
+
+      FileStatus stat1 = fs.getFileStatus(f1);
+      FileStatus stat2 = fs.getFileStatus(f2);
+
+      RaidFilter.Statistics stats = new RaidFilter.Statistics();
+      RaidFilter.TimeBasedFilter filter = new RaidFilter.TimeBasedFilter(
+        conf, RaidNode.getDestinationPath(conf), info1, all,
+        System.currentTimeMillis(), stats);
+      System.out.println("Stats " + stats);
+
+      assertTrue(filter.check(stat1));
+      assertFalse(filter.check(stat2));
+
+    } finally {
+      myTearDown();
+    }
+  }
+}