MAPREDUCE-1892. RaidNode can allow layered policies more efficiently.
(Ramkumar Vadali via schen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1030162 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index bd94fc1..3dc4b5a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -155,6 +155,9 @@
MAPREDUCE-2051. Contribute a fair scheduler preemption system test.
(Todd Lipcon via tomwhite)
+ MAPREDUCE-1892. RaidNode can allow layered policies more efficiently.
+ (Ramkumar Vadali via schen)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
index 180d71f..5ced753 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
/**
* Implements depth-first traversal using a Stack object. The traversal
@@ -46,6 +47,16 @@
private Stack<Node> stack = new Stack<Node>();
/**
+ * A FileFilter object can be used to choose files during directory traversal.
+ */
+ public interface FileFilter {
+ /**
+ * @return a boolean value indicating if the file passes the filter.
+ */
+ boolean check(FileStatus f) throws IOException;
+ }
+
+ /**
* Represents a directory node in directory traversal.
*/
static class Node {
@@ -82,62 +93,21 @@
pathIdx = 0;
}
- /**
- * Choose some files to RAID.
- * @param conf Configuration to use.
- * @param raidDestPrefix Prefix of the path to RAID to.
- * @param modTimePeriod Time gap before RAIDing.
- * @param limit Limit on the number of files to choose.
- * @return list of files to RAID.
- * @throws IOException
- */
- public List<FileStatus> selectFilesToRaid(
- Configuration conf, int targetRepl, Path raidDestPrefix,
- long modTimePeriod, int limit) throws IOException {
- List<FileStatus> selected = new LinkedList<FileStatus>();
- int numSelected = 0;
-
- long now = System.currentTimeMillis();
- while (numSelected < limit) {
+ public List<FileStatus> getFilteredFiles(FileFilter filter, int limit)
+ throws IOException {
+ List<FileStatus> filtered = new LinkedList<FileStatus>();
+ int num = 0;
+ while (num < limit) {
FileStatus next = getNextFile();
if (next == null) {
break;
}
- // We have the next file, do we want to select it?
- // If the source file has fewer than or equal to 2 blocks, then skip it.
- long blockSize = next.getBlockSize();
- if (2 * blockSize >= next.getLen()) {
- continue;
- }
-
- boolean select = false;
- try {
- Object ppair = RaidNode.getParityFile(
- raidDestPrefix, next.getPath(), conf);
- // Is there is a valid parity file?
- if (ppair != null) {
- // Is the source at the target replication?
- if (next.getReplication() != targetRepl) {
- // Select the file so that its replication can be set.
- select = true;
- } else {
- // Nothing to do, don't select the file.
- select = false;
- }
- } else if (next.getModificationTime() + modTimePeriod < now) {
- // If there isn't a valid parity file, check if the file is too new.
- select = true;
- }
- } catch (java.io.FileNotFoundException e) {
- select = true; // destination file does not exist
- }
- if (select) {
- selected.add(next);
- numSelected++;
+ if (filter.check(next)) {
+ num++;
+ filtered.add(next);
}
}
-
- return selected;
+ return filtered;
}
/**
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
new file mode 100644
index 0000000..2a660b4
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+
+public class RaidFilter {
+ static class Statistics {
+ long numRaided = 0;
+ long numTooNew = 0;
+ long sizeTooNew = 0;
+ long numTooSmall = 0;
+ long sizeTooSmall = 0;
+
+ public void aggregate(Statistics other) {
+ this.numRaided += other.numRaided;
+ this.numTooNew += other.numTooNew;
+ this.sizeTooNew += other.sizeTooNew;
+ this.numTooSmall += other.numTooSmall;
+ this.sizeTooSmall += other.sizeTooSmall;
+ }
+
+ public String toString() {
+ return "numRaided = " + numRaided +
+ ", numTooNew = " + numTooNew +
+ ", sizeTooNew = " + sizeTooNew +
+ ", numTooSmall = " + numTooSmall +
+ ", sizeTooSmall = " + sizeTooSmall;
+ }
+ }
+
+ static class TimeBasedFilter extends Configured
+ implements DirectoryTraversal.FileFilter {
+ int targetRepl;
+ Path raidDestPrefix;
+ long modTimePeriod;
+ long startTime;
+ Statistics stats = new Statistics();
+ String currentSrcPath = null;
+ long[] modTimePeriods = new long[0];
+ String[] otherSrcPaths = new String[0];
+
+ TimeBasedFilter(Configuration conf, Path destPrefix, int targetRepl,
+ long startTime, long modTimePeriod) {
+ super(conf);
+ this.raidDestPrefix = destPrefix;
+ this.targetRepl = targetRepl;
+ this.startTime = startTime;
+ this.modTimePeriod = modTimePeriod;
+ }
+
+ TimeBasedFilter(Configuration conf,
+ Path destPrefix, PolicyInfo info,
+ List<PolicyInfo> allPolicies, long startTime, Statistics stats) {
+ super(conf);
+ this.raidDestPrefix = destPrefix;
+ this.targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+ this.modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+ this.startTime = startTime;
+ this.stats = stats;
+ this.currentSrcPath = info.getSrcPath().toUri().getPath();
+ initializeOtherPaths(allPolicies);
+ }
+
+ private void initializeOtherPaths(List<PolicyInfo> allPolicies) {
+ ArrayList<PolicyInfo> tmp = new ArrayList<PolicyInfo>(allPolicies);
+ // Remove all policies where srcPath <= currentSrcPath or
+ // matchingPrefixLength is < length(currentSrcPath)
+ // The policies remaining are the only ones that could better
+ // select a file chosen by the current policy.
+ for (Iterator<PolicyInfo> it = tmp.iterator(); it.hasNext(); ) {
+ String src = it.next().getSrcPath().toUri().getPath();
+ if (src.compareTo(currentSrcPath) <= 0) {
+ it.remove();
+ continue;
+ }
+ int matchLen = matchingPrefixLength(src, currentSrcPath);
+ if (matchLen < currentSrcPath.length()) {
+ it.remove();
+ }
+ }
+ // Sort in reverse lexicographic order.
+ Collections.sort(tmp, new Comparator() {
+ public int compare(Object o1, Object o2) {
+ return 0 -
+ ((PolicyInfo)o1).getSrcPath().toUri().getPath().compareTo(
+ ((PolicyInfo)o1).getSrcPath().toUri().getPath());
+ }
+ });
+ otherSrcPaths = new String[tmp.size()];
+ modTimePeriods = new long[otherSrcPaths.length];
+ for (int i = 0; i < otherSrcPaths.length; i++) {
+ otherSrcPaths[i] = tmp.get(i).getSrcPath().toUri().getPath();
+ modTimePeriods[i] = Long.parseLong(
+ tmp.get(i).getProperty("modTimePeriod"));
+ }
+ }
+
+ public boolean check(FileStatus f) throws IOException {
+ if (!canChooseForCurrentPolicy(f)) {
+ return false;
+ }
+
+ // If the source file has fewer than or equal to 2 blocks, then skip it.
+ long blockSize = f.getBlockSize();
+ if (2 * blockSize >= f.getLen()) {
+ stats.numTooSmall++;
+ stats.sizeTooSmall += f.getLen();
+ return false;
+ }
+
+ boolean select = false;
+ try {
+ Object ppair = RaidNode.getParityFile(
+ raidDestPrefix, f.getPath(), getConf());
+ // Is there is a valid parity file?
+ if (ppair != null) {
+ // Is the source at the target replication?
+ if (f.getReplication() != targetRepl) {
+ // Select the file so that its replication can be set.
+ select = true;
+ } else {
+ stats.numRaided++;
+ // Nothing to do, don't select the file.
+ select = false;
+ }
+ } else {
+ // No parity file.
+ if (f.getModificationTime() + modTimePeriod < startTime) {
+ // If the file is not too new, choose it for raiding.
+ select = true;
+ } else {
+ select = false;
+ stats.numTooNew++;
+ stats.sizeTooNew += f.getLen();
+ }
+ }
+ } catch (java.io.FileNotFoundException e) {
+ select = true; // destination file does not exist
+ } catch (java.io.IOException e) {
+ // If there is a problem with the har path, this will let us continue.
+ DirectoryTraversal.LOG.error(
+ "Error while selecting " + StringUtils.stringifyException(e));
+ }
+ return select;
+ }
+
+ /**
+ * Checks if a file can be chosen for the current policy.
+ */
+ boolean canChooseForCurrentPolicy(FileStatus stat) {
+ boolean choose = true;
+ if (otherSrcPaths.length > 0) {
+ String fileStr = stat.getPath().toUri().getPath();
+
+ // For a given string, find the best matching srcPath.
+ int matchWithCurrent = matchingPrefixLength(fileStr, currentSrcPath);
+ for (int i = 0; i < otherSrcPaths.length; i++) {
+ // If the file is too new, move to the next.
+ if (stat.getModificationTime() > startTime - modTimePeriods[i]) {
+ continue;
+ }
+ int matchLen = matchingPrefixLength(fileStr, otherSrcPaths[i]);
+ if (matchLen > 0 &&
+ fileStr.charAt(matchLen - 1) == Path.SEPARATOR_CHAR) {
+ matchLen--;
+ }
+ if (matchLen > matchWithCurrent) {
+ choose = false;
+ break;
+ }
+ }
+ }
+ return choose;
+ }
+
+ int matchingPrefixLength(final String s1, final String s2) {
+ int len = 0;
+ for (int j = 0; j < s1.length() && j < s2.length(); j++) {
+ if (s1.charAt(j) == s2.charAt(j)) {
+ len++;
+ } else {
+ break;
+ }
+ }
+ return len;
+ }
+ }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index 5507246..ec2f123 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
import java.util.LinkedList;
import java.util.Iterator;
@@ -334,9 +335,19 @@
*/
class TriggerMonitor implements Runnable {
- private Map<String, Long> scanTimes = new HashMap<String, Long>();
- private Map<String, DirectoryTraversal> scanState =
- new HashMap<String, DirectoryTraversal>();
+ class ScanState {
+ long fullScanStartTime;
+ DirectoryTraversal pendingTraversal;
+ RaidFilter.Statistics stats;
+ ScanState() {
+ fullScanStartTime = 0;
+ pendingTraversal = null;
+ stats = new RaidFilter.Statistics();
+ }
+ }
+
+ private Map<String, ScanState> scanStateMap =
+ new HashMap<String, ScanState>();
/**
*/
@@ -357,9 +368,10 @@
*/
private boolean shouldSelectFiles(PolicyInfo info) {
String policyName = info.getName();
+ ScanState scanState = scanStateMap.get(policyName);
int runningJobsCount = jobMonitor.runningJobsCount(policyName);
// Is there a scan in progress for this policy?
- if (scanState.containsKey(policyName)) {
+ if (scanState.pendingTraversal != null) {
int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
// If there is a scan in progress for this policy, we can have
@@ -373,12 +385,8 @@
}
// Check the time of the last full traversal before starting a fresh
// traversal.
- if (scanTimes.containsKey(policyName)) {
- long lastScan = scanTimes.get(policyName);
- return (now() > lastScan + configMgr.getPeriodicity());
- } else {
- return true;
- }
+ long lastScan = scanState.fullScanStartTime;
+ return (now() > lastScan + configMgr.getPeriodicity());
}
}
@@ -388,33 +396,29 @@
* traversal.
* The number of paths returned is limited by raid.distraid.max.jobs.
*/
- private List<FileStatus> selectFiles(PolicyInfo info) throws IOException {
+ private List<FileStatus> selectFiles(
+ PolicyInfo info, ArrayList<PolicyInfo> allPolicies) throws IOException {
Path destPrefix = getDestinationPath(conf);
String policyName = info.getName();
Path srcPath = info.getSrcPath();
- long modTimePeriod = 0;
- String str = info.getProperty("modTimePeriod");
- if (str != null) {
- modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
- }
- short srcReplication = 0;
- str = info.getProperty("srcReplication");
- if (str != null) {
- srcReplication = Short.parseShort(info.getProperty("srcReplication"));
- }
+ long modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
// Max number of files returned.
int selectLimit = configMgr.getMaxFilesPerJob();
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+ long selectStartTime = System.currentTimeMillis();
+
+ ScanState scanState = scanStateMap.get(policyName);
// If we have a pending traversal, resume it.
- if (scanState.containsKey(policyName)) {
- DirectoryTraversal dt = scanState.get(policyName);
+ if (scanState.pendingTraversal != null) {
+ DirectoryTraversal dt = scanState.pendingTraversal;
LOG.info("Resuming traversal for policy " + policyName);
- List<FileStatus> returnSet = dt.selectFilesToRaid(
- conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+ DirectoryTraversal.FileFilter filter =
+ filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
+ List<FileStatus> returnSet = dt.getFilteredFiles(filter, selectLimit);
if (dt.doneTraversal()) {
- scanState.remove(policyName);
+ scanState.pendingTraversal = null;
}
return returnSet;
}
@@ -445,12 +449,13 @@
}
// Set the time for a new traversal.
- scanTimes.put(policyName, now());
+ scanState.fullScanStartTime = now();
DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
- returnSet = dt.selectFilesToRaid(
- conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+ DirectoryTraversal.FileFilter filter =
+ filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
+ returnSet = dt.getFilteredFiles(filter, selectLimit);
if (!dt.doneTraversal()) {
- scanState.put(policyName, dt);
+ scanState.pendingTraversal = dt;
}
}
return returnSet;
@@ -461,73 +466,86 @@
* If the config file has changed, then reload config file and start afresh.
*/
private void doProcess() throws IOException, InterruptedException {
- PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-
+ ArrayList<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();
+ for (PolicyList category : configMgr.getAllPolicies()) {
+ for (PolicyInfo info: category.getAll()) {
+ allPolicies.add(info);
+ }
+ }
while (running) {
Thread.sleep(SLEEP_TIME);
- configMgr.reloadConfigsIfNecessary();
-
- // activate all categories
- Collection<PolicyList> all = configMgr.getAllPolicies();
-
- // sort all policies by reverse lexicographical order. This is needed
- // to make the nearest policy take precedence.
- PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
- Arrays.sort(sorted, lexi);
-
- for (PolicyList category : sorted) {
- for (PolicyInfo info: category.getAll()) {
-
- if (!shouldSelectFiles(info)) {
- continue;
- }
-
- LOG.info("Triggering Policy Filter " + info.getName() +
- " " + info.getSrcPath());
- List<FileStatus> filteredPaths = null;
- try {
- filteredPaths = selectFiles(info);
- } catch (Exception e) {
- LOG.info("Exception while invoking filter on policy " + info.getName() +
- " srcPath " + info.getSrcPath() +
- " exception " + StringUtils.stringifyException(e));
- continue;
- }
-
- if (filteredPaths == null || filteredPaths.size() == 0) {
- LOG.info("No filtered paths for policy " + info.getName());
- continue;
- }
-
- // Apply the action on accepted paths
- LOG.info("Triggering Policy Action " + info.getName() +
- " " + info.getSrcPath());
- try {
- if (isRaidLocal){
- doRaid(conf, info, filteredPaths);
- }
- else{
- // We already checked that no job for this policy is running
- // So we can start a new job.
- DistRaid dr = new DistRaid(conf);
- //add paths for distributed raiding
- dr.addRaidPaths(info, filteredPaths);
- boolean started = dr.startDistRaid();
- if (started) {
- jobMonitor.monitorJob(info.getName(), dr);
- }
- }
- } catch (Exception e) {
- LOG.info("Exception while invoking action on policy " + info.getName() +
- " srcPath " + info.getSrcPath() +
- " exception " + StringUtils.stringifyException(e));
- continue;
+ boolean reloaded = configMgr.reloadConfigsIfNecessary();
+ if (reloaded) {
+ allPolicies.clear();
+ for (PolicyList category : configMgr.getAllPolicies()) {
+ for (PolicyInfo info: category.getAll()) {
+ allPolicies.add(info);
}
}
}
+
+ for (PolicyInfo info: allPolicies) {
+ if (!scanStateMap.containsKey(info.getName())) {
+ scanStateMap.put(info.getName(), new ScanState());
+ }
+
+ if (!shouldSelectFiles(info)) {
+ continue;
+ }
+
+ LOG.info("Triggering Policy Filter " + info.getName() +
+ " " + info.getSrcPath());
+ List<FileStatus> filteredPaths = null;
+ try {
+ filteredPaths = selectFiles(info, allPolicies);
+ } catch (Exception e) {
+ LOG.info("Exception while invoking filter on policy " + info.getName() +
+ " srcPath " + info.getSrcPath() +
+ " exception " + StringUtils.stringifyException(e));
+ continue;
+ }
+
+ if (filteredPaths == null || filteredPaths.size() == 0) {
+ LOG.info("No filtered paths for policy " + info.getName());
+ continue;
+ }
+
+ // Apply the action on accepted paths
+ LOG.info("Triggering Policy Action " + info.getName() +
+ " " + info.getSrcPath());
+ try {
+ if (isRaidLocal){
+ doRaid(conf, info, filteredPaths);
+ }
+ else{
+ // We already checked that no job for this policy is running
+ // So we can start a new job.
+ DistRaid dr = new DistRaid(conf);
+ //add paths for distributed raiding
+ dr.addRaidPaths(info, filteredPaths);
+ boolean started = dr.startDistRaid();
+ if (started) {
+ jobMonitor.monitorJob(info.getName(), dr);
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Exception while invoking action on policy " + info.getName() +
+ " srcPath " + info.getSrcPath() +
+ " exception " + StringUtils.stringifyException(e));
+ continue;
+ }
+ }
}
}
+
+ DirectoryTraversal.FileFilter filterForPolicy(
+ long startTime, PolicyInfo info, List<PolicyInfo> allPolicies,
+ RaidFilter.Statistics stats)
+ throws IOException {
+ return new RaidFilter.TimeBasedFilter(conf, getDestinationPath(conf),
+ info, allPolicies, startTime, stats);
+ }
}
static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
@@ -618,9 +636,9 @@
}
private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
-
- return getParityFile(destPathPrefix, srcPath, conf);
-
+
+ return getParityFile(destPathPrefix, srcPath, conf);
+
}
@@ -825,8 +843,6 @@
* destination directories.
*/
private void doPurge() throws IOException, InterruptedException {
- PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-
long prevExec = 0;
while (running) {
@@ -838,16 +854,8 @@
LOG.info("Started purge scan");
prevExec = now();
-
- // fetch all categories
- Collection<PolicyList> all = configMgr.getAllPolicies();
-
- // sort all policies by reverse lexicographical order. This is
- // needed to make the nearest policy take precedence.
- PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
- Arrays.sort(sorted, lexi);
- for (PolicyList category : sorted) {
+ for (PolicyList category : configMgr.getAllPolicies()) {
for (PolicyInfo info: category.getAll()) {
try {
@@ -857,7 +865,7 @@
//get srcPaths
Path[] srcPaths = info.getSrcPathExpanded();
-
+
if (srcPaths != null) {
for (Path srcPath: srcPaths) {
// expand destination prefix
@@ -1017,9 +1025,6 @@
}
private void doHar() throws IOException, InterruptedException {
-
- PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-
long prevExec = 0;
while (running) {
@@ -1031,16 +1036,8 @@
LOG.info("Started archive scan");
prevExec = now();
-
- // fetch all categories
- Collection<PolicyList> all = configMgr.getAllPolicies();
-
- // sort all policies by reverse lexicographical order. This is
- // needed to make the nearest policy take precedence.
- PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
- Arrays.sort(sorted, lexi);
- for (PolicyList category : sorted) {
+ for (PolicyList category : configMgr.getAllPolicies()) {
for (PolicyInfo info: category.getAll()) {
String str = info.getProperty("time_before_har");
String tmpHarPath = info.getProperty("har_tmp_dir");
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
index c58eb8c..90e1ad4 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
@@ -137,7 +137,7 @@
/**
* Get the srcPath
*/
- public Path getSrcPath() throws IOException {
+ public Path getSrcPath() {
return srcPath;
}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
index 187ed25..eb3dc6e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
@@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.List;
import java.util.LinkedList;
-import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,18 +77,6 @@
return category;
}
- /**
- * Sort Categries based on their srcPath. reverse lexicographical order.
- */
- public static class CompareByPath implements Comparator<PolicyList> {
- public CompareByPath() throws IOException {
- }
- public int compare(PolicyList l1, PolicyList l2) {
- return 0 - l1.getSrcPath().compareTo(l2.getSrcPath());
- }
- }
-
-
//////////////////////////////////////////////////
// Writable
//////////////////////////////////////////////////
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
index 8a2805a..d7288ac 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
@@ -239,18 +239,16 @@
corruptBlock(locs.get(0).getBlock().getBlockName());
reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
- // This should fail.
- boolean caughtChecksumException = false;
try {
Thread.sleep(5*1000);
} catch (InterruptedException ignore) {
}
try {
TestRaidDfs.validateFile(dfs, file1, file1Len, crc1);
+ fail("Expected exception not thrown");
} catch (org.apache.hadoop.fs.ChecksumException ce) {
- caughtChecksumException = true;
+ } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
}
- assertTrue(caughtChecksumException);
} catch (Exception e) {
LOG.info("Test testGeneratedBlock Exception " + e + StringUtils.stringifyException(e));
throw e;
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
index 119cae6..9f7f710 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
public class TestDirectoryTraversal extends TestCase {
final static Log LOG = LogFactory.getLog(
@@ -104,14 +105,16 @@
int limit = 2;
short targetRepl = 1;
Path raid = new Path("/raid");
- List<FileStatus> selected = dt.selectFilesToRaid(conf, targetRepl, raid,
- 0, limit);
+ DirectoryTraversal.FileFilter filter =
+ new RaidFilter.TimeBasedFilter(conf,
+ RaidNode.getDestinationPath(conf), 1, System.currentTimeMillis(), 0);
+ List<FileStatus> selected = dt.getFilteredFiles(filter, limit);
for (FileStatus f: selected) {
LOG.info(f.getPath());
}
assertEquals(limit, selected.size());
- selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit);
+ selected = dt.getFilteredFiles(filter, limit);
for (FileStatus f: selected) {
LOG.info(f.getPath());
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
new file mode 100644
index 0000000..9b1cdd9
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+
+public class TestRaidFilter extends TestCase {
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+ final static Log LOG =
+ LogFactory.getLog("org.apache.hadoop.raid.TestRaidFilter");
+
+ Configuration conf;
+ MiniDFSCluster dfs = null;
+ FileSystem fs = null;
+
+ private void mySetup() throws Exception {
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 2, true, null);
+ dfs.waitActive();
+ fs = dfs.getFileSystem();
+ String namenode = fs.getUri().toString();
+ FileSystem.setDefaultUri(conf, namenode);
+ }
+
+ private void myTearDown() throws Exception {
+ if (dfs != null) { dfs.shutdown(); }
+ }
+
+ public void testLayeredPolicies() throws Exception {
+ mySetup();
+ Path src1 = new Path("/user/foo");
+ Path src2 = new Path("/user/foo/bar");
+
+ PolicyInfo info1 = new PolicyInfo("p1", conf);
+ info1.setSrcPath(src1.toString());
+ info1.setDescription("test policy");
+ info1.setProperty("targetReplication", "1");
+ info1.setProperty("metaReplication", "1");
+ info1.setProperty("modTimePeriod", "0");
+
+ PolicyInfo info2 = new PolicyInfo("p2", conf);
+ info2.setSrcPath(src2.toString());
+ info2.setDescription("test policy");
+ info2.setProperty("targetReplication", "1");
+ info2.setProperty("metaReplication", "1");
+ info2.setProperty("modTimePeriod", "0");
+
+ ArrayList<PolicyInfo> all = new ArrayList<PolicyInfo>();
+ all.add(info1);
+ all.add(info2);
+
+ try {
+ long blockSize = 1024;
+ byte[] bytes = new byte[(int)blockSize];
+ Path f1 = new Path(src1, "f1");
+ Path f2 = new Path(src2, "f2");
+ FSDataOutputStream stm1 = fs.create(f1, false, 4096, (short)1, blockSize);
+ FSDataOutputStream stm2 = fs.create(f2, false, 4096, (short)1, blockSize);
+ FSDataOutputStream[] stms = new FSDataOutputStream[]{stm1, stm2};
+ for (FSDataOutputStream stm: stms) {
+ stm.write(bytes);
+ stm.write(bytes);
+ stm.write(bytes);
+ stm.close();
+ }
+
+ Thread.sleep(1000);
+
+ FileStatus stat1 = fs.getFileStatus(f1);
+ FileStatus stat2 = fs.getFileStatus(f2);
+
+ RaidFilter.Statistics stats = new RaidFilter.Statistics();
+ RaidFilter.TimeBasedFilter filter = new RaidFilter.TimeBasedFilter(
+ conf, RaidNode.getDestinationPath(conf), info1, all,
+ System.currentTimeMillis(), stats);
+ System.out.println("Stats " + stats);
+
+ assertTrue(filter.check(stat1));
+ assertFalse(filter.check(stat2));
+
+ } finally {
+ myTearDown();
+ }
+ }
+}