HDFS-2041. OP_CONCAT_DELETE doesn't properly restore modification time of the concatenated file when edit logs are replayed. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134397 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 87e82ab..8bb8ef8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -716,6 +716,9 @@
     HDFS-1998. Federation: Make refresh-namenodes.sh refresh all the
     namenode. (Tanping Wang via suresh)
 
+    HDFS-2041. OP_CONCAT_DELETE doesn't properly restore modification time
+    of the concatenated file when edit logs are replayed. (todd)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 5e80c64..38b285a 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -930,10 +930,10 @@
     try {
       // actual move
       waitForReady();
-
-      unprotectedConcat(target, srcs);
+      long timestamp = now();
+      unprotectedConcat(target, srcs, timestamp);
       // do the commit
-      fsImage.getEditLog().logConcat(target, srcs, now());
+      fsImage.getEditLog().logConcat(target, srcs, timestamp);
     } finally {
       writeUnlock();
     }
@@ -948,7 +948,7 @@
    * Must be public because also called from EditLogs
    * NOTE: - it does not update quota (not needed for concat)
    */
-  public void unprotectedConcat(String target, String [] srcs) 
+  public void unprotectedConcat(String target, String [] srcs, long timestamp) 
       throws UnresolvedLinkException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
@@ -979,9 +979,8 @@
       count++;
     }
     
-    long now = now();
-    trgInode.setModificationTimeForce(now);
-    trgParent.setModificationTime(now);
+    trgInode.setModificationTimeForce(timestamp);
+    trgParent.setModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
   }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 4796ebe..0c344bd 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -238,7 +238,8 @@
             numOpConcatDelete++;
 
             ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
-            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs);
+            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
+                concatDeleteOp.timestamp);
             break;
           }
           case OP_RENAME_OLD: {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index db99d11..9583b15 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -257,7 +257,6 @@
     int length;
     String path;
     long timestamp;
-    long atime;
     PermissionStatus permissions;
 
     private MkdirOp() {
@@ -280,9 +279,7 @@
       // However, currently this is not being updated/used because of
       // performance reasons.
       if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-        this.atime = readLong(in);
-      } else {
-        this.atime = 0;
+        /*unused this.atime = */readLong(in);
       }
 
       if (logVersion <= -11) {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
index 9dc26d2..ebd4a48 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -86,16 +87,6 @@
     }
   }
   
-  private void runCommand(DFSAdmin admin, String args[], boolean expectEror)
-  throws Exception {
-    int val = admin.run(args);
-    if (expectEror) {
-      assertEquals(val, -1);
-    } else {
-      assertTrue(val>=0);
-    }
-  }
-
   /**
    * Concatenates 10 files into one
    * Verifies the final size, deletion of the file, number of blocks
@@ -221,6 +212,46 @@
     assertEquals(trgLen, totalLen+sFileLen);
     
   }
+  
+  /**
+   * Test that the concat operation is properly persisted in the
+   * edit log, and properly replayed on restart.
+   */
+  @Test
+  public void testConcatInEditLog() throws Exception {
+    final Path TEST_DIR = new Path("/testConcatInEditLog");
+    final long FILE_LEN = blockSize;
+    
+    // 1. Concat some files
+    Path[] srcFiles = new Path[3];
+    for (int i = 0; i < srcFiles.length; i++) {
+      Path path = new Path(TEST_DIR, "src-" + i);
+      DFSTestUtil.createFile(dfs, path, FILE_LEN, REPL_FACTOR, 1);
+      srcFiles[i] = path;
+    }    
+    Path targetFile = new Path(TEST_DIR, "target");
+    DFSTestUtil.createFile(dfs, targetFile, FILE_LEN, REPL_FACTOR, 1);
+    
+    dfs.concat(targetFile, srcFiles);
+    
+    // 2. Verify the concat operation basically worked, and record
+    // file status.
+    assertTrue(dfs.exists(targetFile));
+    FileStatus origStatus = dfs.getFileStatus(targetFile);
+
+    // 3. Restart NN to force replay from edit log
+    cluster.restartNameNode(true);
+    
+    // 4. Verify concat operation was replayed correctly and file status
+    // did not change.
+    assertTrue(dfs.exists(targetFile));
+    assertFalse(dfs.exists(srcFiles[0]));
+
+    FileStatus statusAfterRestart = dfs.getFileStatus(targetFile);
+
+    assertEquals(origStatus.getModificationTime(),
+        statusAfterRestart.getModificationTime());
+  }
 
   // compare content
   private void checkFileContent(byte[] concat, byte[][] bytes ) {