MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files
have been created. (Ramkumar Vadali via dhruba)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@990693 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b161c3..cbd0882 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -262,6 +262,9 @@
MAPREDUCE-1670. RAID policies should not scan their own destination path.
(Ramkumar Vadali via dhruba)
+ MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files
+ have been created. (Ramkumar Vadali via dhruba)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index f390305..8b28a71 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -1364,7 +1364,8 @@
if (stat != null) {
LOG.info("Haring parity files for policy " +
info.getName() + " " + destPath);
- recurseHar(destFs, stat, cutoff, tmpHarPath);
+ recurseHar(info, destFs, stat, destPref.toUri().getPath(),
+ srcPath.getFileSystem(conf), cutoff, tmpHarPath);
}
}
}
@@ -1380,7 +1381,8 @@
return;
}
- private void recurseHar(FileSystem destFs, FileStatus dest, long cutoff, String tmpHarPath)
+ private void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
+ String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath)
throws IOException {
if (dest.isFile()) {
@@ -1388,6 +1390,7 @@
}
Path destPath = dest.getPath(); // pathname, no host:port
+ String destStr = destPath.toUri().getPath();
// Verify if it already contains a HAR directory
if ( destFs.exists(new Path(destPath, destPath.getName()+HAR_SUFFIX)) ) {
@@ -1401,13 +1404,34 @@
shouldHar = files.length > 0;
for (FileStatus one: files) {
if (one.isDirectory()){
- recurseHar(destFs, one, cutoff, tmpHarPath);
+ recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
shouldHar = false;
} else if (one.getModificationTime() > cutoff ) {
shouldHar = false;
}
}
+
+ if (shouldHar) {
+ String src = destStr.replaceFirst(destPrefix, "");
+ Path srcPath = new Path(src);
+ FileStatus[] statuses = srcFs.listStatus(srcPath);
+ Path destPathPrefix = new Path(destPrefix).makeQualified(destFs);
+ if (statuses != null) {
+ for (FileStatus status : statuses) {
+ if (getParityFile(destPathPrefix,
+ status.getPath().makeQualified(srcFs)) == null ) {
+ LOG.info("Cannot archive " + destPath +
+ " because it doesn't contain parity file for " +
+ status.getPath().makeQualified(srcFs) + " on destination " +
+ destPathPrefix);
+ shouldHar = false;
+ break;
+ }
+ }
+ }
+ }
}
+
if ( shouldHar ) {
singleHar(destFs, dest, tmpHarPath);
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
index f23455c..01b9b95 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
@@ -199,13 +199,15 @@
mySetup(srcPath, targetReplication, metaReplication, stripeLength);
RaidShell shell = null;
Path dir = new Path("/user/test/raidtest/subdir/");
- Path file1 = new Path(dir + "/file" + iter);
RaidNode cnode = null;
try {
Path destPath = new Path("/destraid/user/test/raidtest/subdir");
fileSys.delete(dir, true);
fileSys.delete(destPath, true);
- TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize);
+ for (int i = 0; i < 10; i++) {
+ Path file = new Path(dir + "/file" + i);
+ TestRaidNode.createOldFile(fileSys, file, 1, numBlock, blockSize);
+ }
LOG.info("doTestHar created test files for iteration " + iter);
// create an instance of the RaidNode
@@ -226,15 +228,28 @@
LOG.info("doTestHar created RaidShell.");
FileStatus[] listPaths = null;
+ int maxFilesFound = 0;
// wait till file is raided
while (true) {
try {
listPaths = fileSys.listStatus(destPath);
int count = 0;
+ int filesFound = 0;
if (listPaths != null) {
for (FileStatus s : listPaths) {
LOG.info("doTestHar found path " + s.getPath());
+
+ if (!s.isDir())
+ filesFound++;
+ if (filesFound > maxFilesFound)
+ maxFilesFound = filesFound;
+
if (s.getPath().toString().endsWith(".har")) {
+ // If a HAR directory is found, ensure that we have seen
+ // 10 parity files. We have to keep track of the max # of
+ // files since some parity files might get deleted by the
+ // purge thread.
+ assertEquals(10, maxFilesFound);
count++;
}
}
@@ -278,8 +293,6 @@
} finally {
shell.close();
if (cnode != null) { cnode.stop(); cnode.join(); }
- LOG.info("doTestHar delete file " + file1);
- fileSys.delete(file1, true);
}
LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
" stripeLength=" + stripeLength);