MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files 
have been created. (Ramkumar Vadali via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@990693 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b161c3..cbd0882 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -262,6 +262,9 @@
     MAPREDUCE-1670. RAID policies should not scan their own destination path.
     (Ramkumar Vadali via dhruba)
 
+    MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files 
+    have been created. (Ramkumar Vadali via dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index f390305..8b28a71 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -1364,7 +1364,8 @@
                   if (stat != null) {
                     LOG.info("Haring parity files for policy " + 
                         info.getName() + " " + destPath);
-                    recurseHar(destFs, stat, cutoff, tmpHarPath);
+                    recurseHar(info, destFs, stat, destPref.toUri().getPath(),
+                        srcPath.getFileSystem(conf), cutoff, tmpHarPath);
                   }
                 }
               }
@@ -1380,7 +1381,8 @@
     return;
   }
   
-  private void recurseHar(FileSystem destFs, FileStatus dest, long cutoff, String tmpHarPath)
+  private void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
+    String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath)
     throws IOException {
 
     if (dest.isFile()) {
@@ -1388,6 +1390,7 @@
     }
     
     Path destPath = dest.getPath(); // pathname, no host:port
+    String destStr = destPath.toUri().getPath();
 
     // Verify if it already contains a HAR directory
     if ( destFs.exists(new Path(destPath, destPath.getName()+HAR_SUFFIX)) ) {
@@ -1401,13 +1404,34 @@
       shouldHar = files.length > 0;
       for (FileStatus one: files) {
         if (one.isDirectory()){
-          recurseHar(destFs, one, cutoff, tmpHarPath);
+          recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
           shouldHar = false;
         } else if (one.getModificationTime() > cutoff ) {
           shouldHar = false;
         }
       }
+
+      if (shouldHar) {
+        String src = destStr.replaceFirst(destPrefix, "");
+        Path srcPath = new Path(src);
+        FileStatus[] statuses = srcFs.listStatus(srcPath);
+        Path destPathPrefix = new Path(destPrefix).makeQualified(destFs);
+        if (statuses != null) {
+          for (FileStatus status : statuses) {
+            if (getParityFile(destPathPrefix, 
+                              status.getPath().makeQualified(srcFs)) == null ) {
+              LOG.info("Cannot archive " + destPath + 
+                  " because it doesn't contain parity file for " +
+                  status.getPath().makeQualified(srcFs) + " on destination " +
+                  destPathPrefix);
+              shouldHar = false;
+              break;
+            }
+          }
+        }
+      }
     }
+
     if ( shouldHar ) {
       singleHar(destFs, dest, tmpHarPath);
     }
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
index f23455c..01b9b95 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
@@ -199,13 +199,15 @@
     mySetup(srcPath, targetReplication, metaReplication, stripeLength);
     RaidShell shell = null;
     Path dir = new Path("/user/test/raidtest/subdir/");
-    Path file1 = new Path(dir + "/file" + iter);
     RaidNode cnode = null;
     try {
       Path destPath = new Path("/destraid/user/test/raidtest/subdir");
       fileSys.delete(dir, true);
       fileSys.delete(destPath, true);
-      TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize);
+      for (int i = 0; i < 10; i++) {
+        Path file = new Path(dir + "/file" + i);
+        TestRaidNode.createOldFile(fileSys, file, 1, numBlock, blockSize);
+      }
       LOG.info("doTestHar created test files for iteration " + iter);
 
       // create an instance of the RaidNode
@@ -226,15 +228,28 @@
       LOG.info("doTestHar created RaidShell.");
       FileStatus[] listPaths = null;
 
+      int maxFilesFound = 0;
       // wait till file is raided
       while (true) {
         try {
           listPaths = fileSys.listStatus(destPath);
           int count = 0;
+          int filesFound = 0;
           if (listPaths != null) {
             for (FileStatus s : listPaths) {
               LOG.info("doTestHar found path " + s.getPath());
+
+              if (!s.isDir())
+                filesFound++;
+              if (filesFound > maxFilesFound)
+                maxFilesFound = filesFound;
+
               if (s.getPath().toString().endsWith(".har")) {
+                // If a HAR directory is found, ensure that we have seen
+                // 10 parity files. We have to keep track of the max # of
+                // files since some parity files might get deleted by the
+                // purge thread.
+                assertEquals(10, maxFilesFound);
                 count++;
               }
             }
@@ -278,8 +293,6 @@
     } finally {
       shell.close();
       if (cnode != null) { cnode.stop(); cnode.join(); }
-      LOG.info("doTestHar delete file " + file1);
-      fileSys.delete(file1, true);
     }
     LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
              " stripeLength=" + stripeLength);