MAPREDUCE-2167. Faster directory traversal for raid node. (Ramkumar Vadali via schen)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1034216 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index dfcdf67..f238271 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -168,6 +168,9 @@
 
     MAPREDUCE-2093. Herriot JT and TT clients should vend statistics. (cos)
 
+    MAPREDUCE-2167. Faster directory traversal for raid node. (Ramkumar Vadali
+    via schen)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
index 5ced753..4c955df 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -20,9 +20,16 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Stack;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +52,9 @@
   private List<FileStatus> paths;
   private int pathIdx = 0;  // Next path to process.
   private Stack<Node> stack = new Stack<Node>();
+  private ExecutorService executor;
+
+  private int numThreads;
 
   /**
    * A FileFilter object can be used to choose files during directory traversal.
@@ -88,26 +98,96 @@
    * @param startPaths A list of paths that need to be traversed
    */
   public DirectoryTraversal(FileSystem fs, List<FileStatus> startPaths) {
+    this(fs, startPaths, 1);
+  }
+
+  public DirectoryTraversal(
+    FileSystem fs, List<FileStatus> startPaths, int numThreads) {
     this.fs = fs;
     paths = startPaths;
     pathIdx = 0;
+    this.numThreads = numThreads;
+    executor = Executors.newFixedThreadPool(numThreads);
   }
 
-  public List<FileStatus> getFilteredFiles(FileFilter filter, int limit)
-      throws IOException {
-    List<FileStatus> filtered = new LinkedList<FileStatus>();
-    int num = 0;
-    while (num < limit) {
-      FileStatus next = getNextFile();
-      if (next == null) {
+  public List<FileStatus> getFilteredFiles(FileFilter filter, int limit) {
+    List<FileStatus> filtered = new ArrayList<FileStatus>();
+
+    // We need this semaphore to block when the number of running workitems
+    // is equal to the number of threads. FixedThreadPool limits the number
+    // of threads, but not the queue size. This way we will limit the memory
+    // usage.
+    Semaphore slots = new Semaphore(numThreads);
+
+    while (true) {
+      synchronized(filtered) {
+        if (filtered.size() >= limit) break;
+      }
+      FilterFileWorkItem work = null;
+      try {
+        Node next = getNextDirectoryNode();
+        if (next == null) {
+          break;
+        }
+        work = new FilterFileWorkItem(filter, next, filtered, slots);
+        slots.acquire();
+      } catch (InterruptedException ie) {
+        break;
+      } catch (IOException e) {
         break;
       }
-      if (filter.check(next)) {
-        num++;
-        filtered.add(next);
+      executor.execute(work);
+    }
+
+    try {
+      // Wait for all submitted items to finish.
+      slots.acquire(numThreads);
+      // If this traversal is finished, shutdown the executor.
+      if (doneTraversal()) {
+        executor.shutdown();
+        executor.awaitTermination(1, TimeUnit.HOURS);
+      }
+    } catch (InterruptedException ie) {
+    }
+
+    return filtered;
+  }
+
+  class FilterFileWorkItem implements Runnable {
+    FileFilter filter;
+    Node dir;
+    List<FileStatus> filtered;
+    Semaphore slots;
+
+    FilterFileWorkItem(FileFilter filter, Node dir, List<FileStatus> filtered,
+      Semaphore slots) {
+      this.slots = slots;
+      this.filter = filter;
+      this.dir = dir;
+      this.filtered = filtered;
+    }
+
+    @SuppressWarnings("deprecation")
+    public void run() {
+      try {
+        LOG.info("Initiating file filtering for " + dir.path.getPath());
+        for (FileStatus f: dir.elements) {
+          if (!f.isFile()) {
+            continue;
+          }
+          if (filter.check(f)) {
+            synchronized(filtered) {
+              filtered.add(f);
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Error in directory traversal: " 
+          + StringUtils.stringifyException(e));
+      } finally {
+        slots.release();
       }
     }
-    return filtered;
   }
 
   /**
@@ -168,6 +248,15 @@
    * @throws IOException
    */
   public FileStatus getNextDirectory() throws IOException {
+    Node dirNode = getNextDirectoryNode();
+    if (dirNode != null) {
+      return dirNode.path;
+    }
+    return null;
+  }
+
+  private Node getNextDirectoryNode() throws IOException {
+
     // Check if traversal is done.
     while (!doneTraversal()) {
       // If traversal is not done, check if the stack is not empty.
@@ -190,7 +279,7 @@
           }
         } else {
           stack.pop();
-          return node.path;
+          return node;
         }
       }
       // If the stack is empty, do we have more paths?
@@ -215,7 +304,6 @@
       return;
     }
     Path p = stat.getPath();
-    LOG.info("Traversing to directory " + p);
     FileStatus[] elements = fs.listStatus(p);
     Node newNode = new Node(stat, (elements == null? new FileStatus[0]: elements));
     stack.push(newNode);
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index 6f736e8..76cd389 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -445,7 +445,8 @@
 
         // Set the time for a new traversal.
         scanState.fullScanStartTime = now();
-        DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
+        DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths,
+          conf.getInt("raid.directorytraversal.threads", 4));
         DirectoryTraversal.FileFilter filter =
           filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
         returnSet = dt.getFilteredFiles(filter, selectLimit);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
index 9f7f710..386cb6c 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -58,7 +58,7 @@
       LOG.info("Enumerating files");
       List<FileStatus> startPaths = new LinkedList<FileStatus>();
       startPaths.add(fs.getFileStatus(topDir));
-      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths, 2);
 
       List<FileStatus> selected = new LinkedList<FileStatus>();
       while (true) {
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
index 9b96124..1336b41 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
@@ -632,13 +632,25 @@
 
     RaidNode cnode = null;
     try {
-      createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
+      createTestFiles(
+        "/user/dhruba/raidtest/1/", "/destraid/user/dhruba/raidtest/1");
+      createTestFiles(
+        "/user/dhruba/raidtest/2/", "/destraid/user/dhruba/raidtest/2");
+      createTestFiles(
+        "/user/dhruba/raidtest/3/", "/destraid/user/dhruba/raidtest/3");
+      createTestFiles(
+        "/user/dhruba/raidtest/4/", "/destraid/user/dhruba/raidtest/4");
       LOG.info("Test testSuspendTraversal created test files");
 
       Configuration localConf = new Configuration(conf);
       localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
       localConf.setInt("raid.distraid.max.files", 3);
-      final int numJobsExpected = 4; // 10 test files: 4 jobs with 3 files each.
+      localConf.setInt("raid.directorytraversal.threads", 1);
+      // This is too dependent on the implementation of getFilteredFiles().
+      // It relies on the threading behavior where two directories are traversed
+      // before returning because the list of files is modified in a separate
+      // thread from the one that decides if there are enough files.
+      final int numJobsExpected = 2;
       cnode = RaidNode.createRaidNode(null, localConf);
 
       long start = System.currentTimeMillis();