MAPREDUCE-2167. Faster directory traversal for raid node. (Ramkumar Vadali via schen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1034216 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index dfcdf67..f238271 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -168,6 +168,9 @@
MAPREDUCE-2093. Herriot JT and TT clients should vend statistics. (cos)
+ MAPREDUCE-2167. Faster directory traversal for raid node. (Ramkumar Vadali
+ via schen)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
index 5ced753..4c955df 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -20,9 +20,16 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Stack;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +52,9 @@
private List<FileStatus> paths;
private int pathIdx = 0; // Next path to process.
private Stack<Node> stack = new Stack<Node>();
+ private ExecutorService executor;
+
+ private int numThreads;
/**
* A FileFilter object can be used to choose files during directory traversal.
@@ -88,26 +98,96 @@
* @param startPaths A list of paths that need to be traversed
*/
public DirectoryTraversal(FileSystem fs, List<FileStatus> startPaths) {
+ this(fs, startPaths, 1);
+ }
+
+ public DirectoryTraversal(
+ FileSystem fs, List<FileStatus> startPaths, int numThreads) {
this.fs = fs;
paths = startPaths;
pathIdx = 0;
+ this.numThreads = numThreads;
+ executor = Executors.newFixedThreadPool(numThreads);
}
- public List<FileStatus> getFilteredFiles(FileFilter filter, int limit)
- throws IOException {
- List<FileStatus> filtered = new LinkedList<FileStatus>();
- int num = 0;
- while (num < limit) {
- FileStatus next = getNextFile();
- if (next == null) {
+ public List<FileStatus> getFilteredFiles(FileFilter filter, int limit) {
+ List<FileStatus> filtered = new ArrayList<FileStatus>();
+
+ // We need this semaphore to block when the number of running workitems
+ // is equal to the number of threads. FixedThreadPool limits the number
+ // of threads, but not the queue size. This way we will limit the memory
+ // usage.
+ Semaphore slots = new Semaphore(numThreads);
+
+ while (true) {
+ synchronized(filtered) {
+ if (filtered.size() >= limit) break;
+ }
+ FilterFileWorkItem work = null;
+ try {
+ Node next = getNextDirectoryNode();
+ if (next == null) {
+ break;
+ }
+ work = new FilterFileWorkItem(filter, next, filtered, slots);
+ slots.acquire();
+ } catch (InterruptedException ie) {
+ break;
+ } catch (IOException e) {
break;
}
- if (filter.check(next)) {
- num++;
- filtered.add(next);
+ executor.execute(work);
+ }
+
+ try {
+ // Wait for all submitted items to finish.
+ slots.acquire(numThreads);
+ // If this traversal is finished, shutdown the executor.
+ if (doneTraversal()) {
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ }
+ } catch (InterruptedException ie) {
+ }
+
+ return filtered;
+ }
+
+ class FilterFileWorkItem implements Runnable {
+ FileFilter filter;
+ Node dir;
+ List<FileStatus> filtered;
+ Semaphore slots;
+
+ FilterFileWorkItem(FileFilter filter, Node dir, List<FileStatus> filtered,
+ Semaphore slots) {
+ this.slots = slots;
+ this.filter = filter;
+ this.dir = dir;
+ this.filtered = filtered;
+ }
+
+ @SuppressWarnings("deprecation")
+ public void run() {
+ try {
+ LOG.info("Initiating file filtering for " + dir.path.getPath());
+ for (FileStatus f: dir.elements) {
+ if (!f.isFile()) {
+ continue;
+ }
+ if (filter.check(f)) {
+ synchronized(filtered) {
+ filtered.add(f);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error in directory traversal: "
+ + StringUtils.stringifyException(e));
+ } finally {
+ slots.release();
}
}
- return filtered;
}
/**
@@ -168,6 +248,15 @@
* @throws IOException
*/
public FileStatus getNextDirectory() throws IOException {
+ Node dirNode = getNextDirectoryNode();
+ if (dirNode != null) {
+ return dirNode.path;
+ }
+ return null;
+ }
+
+ private Node getNextDirectoryNode() throws IOException {
+
// Check if traversal is done.
while (!doneTraversal()) {
// If traversal is not done, check if the stack is not empty.
@@ -190,7 +279,7 @@
}
} else {
stack.pop();
- return node.path;
+ return node;
}
}
// If the stack is empty, do we have more paths?
@@ -215,7 +304,6 @@
return;
}
Path p = stat.getPath();
- LOG.info("Traversing to directory " + p);
FileStatus[] elements = fs.listStatus(p);
Node newNode = new Node(stat, (elements == null? new FileStatus[0]: elements));
stack.push(newNode);
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index 6f736e8..76cd389 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -445,7 +445,8 @@
// Set the time for a new traversal.
scanState.fullScanStartTime = now();
- DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
+ DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths,
+ conf.getInt("raid.directorytraversal.threads", 4));
DirectoryTraversal.FileFilter filter =
filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
returnSet = dt.getFilteredFiles(filter, selectLimit);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
index 9f7f710..386cb6c 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -58,7 +58,7 @@
LOG.info("Enumerating files");
List<FileStatus> startPaths = new LinkedList<FileStatus>();
startPaths.add(fs.getFileStatus(topDir));
- DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+ DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths, 2);
List<FileStatus> selected = new LinkedList<FileStatus>();
while (true) {
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
index 9b96124..1336b41 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
@@ -632,13 +632,25 @@
RaidNode cnode = null;
try {
- createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
+ createTestFiles(
+ "/user/dhruba/raidtest/1/", "/destraid/user/dhruba/raidtest/1");
+ createTestFiles(
+ "/user/dhruba/raidtest/2/", "/destraid/user/dhruba/raidtest/2");
+ createTestFiles(
+ "/user/dhruba/raidtest/3/", "/destraid/user/dhruba/raidtest/3");
+ createTestFiles(
+ "/user/dhruba/raidtest/4/", "/destraid/user/dhruba/raidtest/4");
LOG.info("Test testSuspendTraversal created test files");
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
localConf.setInt("raid.distraid.max.files", 3);
- final int numJobsExpected = 4; // 10 test files: 4 jobs with 3 files each.
+ localConf.setInt("raid.directorytraversal.threads", 1);
+ // This is too dependent on the implementation of getFilteredFiles().
+ // It relies on the threading behavior where two directories are traversed
+ // before returning because the list of files is modified in a separate
+ // thread from the one that decides if there are enough files.
+ final int numJobsExpected = 2;
cnode = RaidNode.createRaidNode(null, localConf);
long start = System.currentTimeMillis();