HDFS-1948 Forward port 'hdfs-1520 lightweight namenode operation to trigger lease reccovery'

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1133108 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index a71b891..cba750c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -944,6 +944,9 @@
 
     HDFS-1619. Remove AC_TYPE* from the libhdfs. (Roman Shaposhnik via eli)
 
+    HDFS-1948  Forward port 'hdfs-1520 lightweight namenode operation to
+    trigger lease recovery' (stack)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 3b03674..17a9829 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -539,6 +539,23 @@
   }
 
   /**
+   * Recover a file's lease
+   * @param src a file's path
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  boolean recoverLease(String src) throws IOException {
+    checkOpen();
+
+    try {
+      return namenode.recoverLease(src, clientName);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(FileNotFoundException.class,
+                                     AccessControlException.class);
+    }
+  }
+
+  /**
    * Get block location info about file
    * 
    * getBlockLocations() returns a list of hostnames that store 
diff --git a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 9d54954..c4e916e 100644
--- a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -218,6 +218,17 @@
     this.verifyChecksum = verifyChecksum;
   }
 
+  /** 
+   * Start the lease recovery of a file
+   *
+   * @param f a file
+   * @return true if the file is already closed
+   * @throws IOException if an error occurs
+   */
+  public boolean recoverLease(Path f) throws IOException {
+    return dfs.recoverLease(getPathName(f));
+  }
+
   @SuppressWarnings("deprecation")
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 9db6fa4..1520668 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -537,6 +537,17 @@
   public void renewLease(String clientName) throws AccessControlException,
       IOException;
 
+  /**
+   * Start lease recovery.
+   * Lightweight NameNode operation to trigger lease recovery
+   * 
+   * @param src path of the file to start lease recovery
+   * @param clientName name of the current client
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  public boolean recoverLease(String src, String clientName) throws IOException;
+
   public int GET_STATS_CAPACITY_IDX = 0;
   public int GET_STATS_USED_IDX = 1;
   public int GET_STATS_REMAINING_IDX = 2;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index d34fa47..d0a2afe 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1395,62 +1395,7 @@
 
     try {
       INode myFile = dir.getFileINode(src);
-      if (myFile != null && myFile.isUnderConstruction()) {
-        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
-        //
-        // If the file is under construction , then it must be in our
-        // leases. Find the appropriate lease record.
-        //
-        Lease lease = leaseManager.getLeaseByPath(src);
-        if (lease == null) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because pendingCreates is non-null but no leases found.");
-        }
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease.getHolder().equals(holder)) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because current leaseholder is trying to recreate file.");
-        }
-        assert lease.getHolder().equals(pendingFile.getClientName()) :
-          "Current lease holder " + lease.getHolder() +
-          " does not match file creator " + pendingFile.getClientName();
-        //
-        // Current lease holder is different from the requester.
-        // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery, otherwise fail.
-        //
-        if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          boolean isClosed = internalReleaseLease(lease, src, null);
-          if(!isClosed)
-            throw new RecoveryInProgressException(
-                "Failed to close file " + src +
-                ". Lease recovery is in progress. Try again later.");
-
-        } else {
-          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
-          if(lastBlock != null && lastBlock.getBlockUCState() ==
-            BlockUCState.UNDER_RECOVERY) {
-            throw new RecoveryInProgressException(
-              "Recovery in progress, file [" + src + "], " +
-              "lease owner [" + lease.getHolder() + "]");
-            } else {
-              throw new AlreadyBeingCreatedException(
-                "Failed to create file [" + src + "] for [" + holder +
-                "] on client [" + clientMachine +
-                "], because this file is already being created by [" +
-                pendingFile.getClientName() + "] on [" +
-                pendingFile.getClientMachine() + "]");
-            }
-         }
-      }
+      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 
       try {
         blockManager.verifyReplication(src, replication, clientMachine);
@@ -1537,6 +1482,120 @@
   }
 
   /**
+   * Recover lease;
+   * Immediately revoke the lease of the current lease holder and start lease
+   * recovery so that the file can be forced to be closed.
+   * 
+   * @param src the path of the file to start lease recovery
+   * @param holder the lease holder's name
+   * @param clientMachine the client machine's name
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  synchronized boolean recoverLease(String src, String holder, String clientMachine)
+  throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException(
+          "Cannot recover the lease of " + src, safeMode);
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+
+    INode inode = dir.getFileINode(src);
+    if (inode == null) {
+      throw new FileNotFoundException("File not found " + src);
+    }
+
+    if (!inode.isUnderConstruction()) {
+      return true;
+    }
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    return false;
+  }
+
+  private void recoverLeaseInternal(INode fileInode, 
+      String src, String holder, String clientMachine, boolean force)
+  throws IOException {
+    if (fileInode != null && fileInode.isUnderConstruction()) {
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
+      //
+      // If the file is under construction , then it must be in our
+      // leases. Find the appropriate lease record.
+      //
+      Lease lease = leaseManager.getLease(holder);
+      //
+      // We found the lease for this file. And surprisingly the original
+      // holder is trying to recreate this file. This should never occur.
+      //
+      if (!force && lease != null) {
+        Lease leaseFile = leaseManager.getLeaseByPath(src);
+        if ((leaseFile != null && leaseFile.equals(lease)) ||
+            lease.getHolder().equals(holder)) { 
+          throw new AlreadyBeingCreatedException(
+            "failed to create file " + src + " for " + holder +
+            " on client " + clientMachine + 
+            " because current leaseholder is trying to recreate file.");
+        }
+      }
+      //
+      // Find the original holder.
+      //
+      lease = leaseManager.getLease(pendingFile.getClientName());
+      if (lease == null) {
+        throw new AlreadyBeingCreatedException(
+          "failed to create file " + src + " for " + holder +
+          " on client " + clientMachine + 
+          " because pendingCreates is non-null but no leases found.");
+      }
+      if (force) {
+        // close now: no need to wait for soft lease expiration and 
+        // close only the file src
+        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
+          " from client " + pendingFile.getClientName());
+        internalReleaseLease(lease, src, holder);
+      } else {
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
+        //
+        // If the original holder has not renewed in the last SOFTLIMIT 
+        // period, then start lease recovery.
+        //
+        if (lease.expiredSoftLimit()) {
+          LOG.info("startFile: recover lease " + lease + ", src=" + src +
+              " from client " + pendingFile.getClientName());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+        } else {
+          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
+          if(lastBlock != null && lastBlock.getBlockUCState() ==
+            BlockUCState.UNDER_RECOVERY) {
+            throw new RecoveryInProgressException(
+              "Recovery in progress, file [" + src + "], " +
+              "lease owner [" + lease.getHolder() + "]");
+            } else {
+              throw new AlreadyBeingCreatedException(
+                "Failed to create file [" + src + "] for [" + holder +
+                "] on client [" + clientMachine +
+                "], because this file is already being created by [" +
+                pendingFile.getClientName() + "] on [" +
+                pendingFile.getClientMachine() + "]");
+            }
+         }
+      }
+    }
+
+  }
+
+  /**
    * Append to an existing file in the namespace.
    */
   LocatedBlock appendFile(String src, String holder, String clientMachine)
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 108ac6e..8a62810 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -836,6 +836,12 @@
   }
 
   /** {@inheritDoc} */
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    return namesystem.recoverLease(src, clientName, clientMachine);
+  }
+
+  /** {@inheritDoc} */
   public boolean setReplication(String src, short replication) 
     throws IOException {  
     return namesystem.setReplication(src, replication);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
index 532023c..0902b49 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -27,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
@@ -95,7 +98,147 @@
     IOUtils.closeStream(dfs);
     if (cluster != null) {cluster.shutdown();}
   }
-  
+
+  /**
+   * Test the NameNode's revoke lease on current lease holder function.
+   * @throws Exception
+   */
+  @Test
+  public void testImmediateRecoveryOfLease() throws Exception {
+    //create a file
+    // write bytes into the file.
+    byte [] actual = new byte[FILE_SIZE];
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    Path filepath = createFile("/immediateRecoverLease-shortlease", size, true);
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery on next attempt to write-for-open.
+    cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);
+
+    recoverLeaseUsingCreate(filepath);
+    verifyFile(dfs, filepath, actual, size);
+
+    //test recoverLease
+    // set the soft limit to be 1 hour but recoverLease should
+    // close the file immediately
+    cluster.setLeasePeriod(LONG_LEASE_PERIOD, LONG_LEASE_PERIOD);
+    size = AppendTestUtil.nextInt(FILE_SIZE);
+    filepath = createFile("/immediateRecoverLease-longlease", size, false);
+
+    // test recoverLese from a different client
+    recoverLease(filepath, null);
+    verifyFile(dfs, filepath, actual, size);
+
+    // test recoverlease from the same client
+    size = AppendTestUtil.nextInt(FILE_SIZE);
+    filepath = createFile("/immediateRecoverLease-sameclient", size, false);
+
+    // create another file using the same client
+    Path filepath1 = new Path(filepath.toString() + AppendTestUtil.nextInt());
+    FSDataOutputStream stm = dfs.create(filepath1, true, BUF_SIZE,
+      REPLICATION_NUM, BLOCK_SIZE);
+
+    // recover the first file
+    recoverLease(filepath, dfs);
+    verifyFile(dfs, filepath, actual, size);
+
+    // continue to write to the second file
+    stm.write(buffer, 0, size);
+    stm.close();
+    verifyFile(dfs, filepath1, actual, size);
+  }
+
+  private Path createFile(final String filestr, final int size,
+      final boolean triggerLeaseRenewerInterrupt)
+  throws IOException, InterruptedException {
+    AppendTestUtil.LOG.info("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true, BUF_SIZE,
+      REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    AppendTestUtil.LOG.info("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+
+    if (triggerLeaseRenewerInterrupt) {
+      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+      dfs.dfs.leaserenewer.interruptAndJoin();
+    }
+    return filepath;
+  }
+
+  private void recoverLease(Path filepath, DistributedFileSystem dfs)
+  throws Exception {
+    if (dfs == null) {
+      dfs = (DistributedFileSystem)getFSAsAnotherUser(conf);
+    }
+
+    while (!dfs.recoverLease(filepath)) {
+      AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+      Thread.sleep(5000);
+    }
+  }
+
+  private FileSystem getFSAsAnotherUser(final Configuration c)
+  throws IOException, InterruptedException {
+    return FileSystem.get(FileSystem.getDefaultUri(c), c,
+      UserGroupInformation.createUserForTesting(fakeUsername, 
+          new String [] {fakeGroup}).getUserName());
+  }
+
+  private void recoverLeaseUsingCreate(Path filepath)
+  throws IOException, InterruptedException {
+    FileSystem dfs2 = getFSAsAnotherUser(conf);
+
+    boolean done = false;
+    for(int i = 0; i < 10 && !done; i++) {
+      AppendTestUtil.LOG.info("i=" + i);
+      try {
+        dfs2.create(filepath, false, BUF_SIZE, (short)1, BLOCK_SIZE);
+        fail("Creation of an existing file should never succeed.");
+      } catch (IOException ioe) {
+        final String message = ioe.getMessage();
+        if (message.contains("file exists")) {
+          AppendTestUtil.LOG.info("done", ioe);
+          done = true;
+        }
+        else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
+          AppendTestUtil.LOG.info("GOOD! got " + message);
+        }
+        else {
+          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
+        }
+      }
+
+      if (!done) {
+        AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+        try {Thread.sleep(5000);} catch (InterruptedException e) {}
+      }
+    }
+    assertTrue(done);
+  }
+
+  private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,
+      int size) throws IOException {
+    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
+        + "Validating its contents now...");
+
+    // verify that file-size matches
+    assertTrue("File should be " + size + " bytes, but is actually " +
+               " found to be " + dfs.getFileStatus(filepath).getLen() +
+               " bytes",
+               dfs.getFileStatus(filepath).getLen() == size);
+
+    // verify that there is enough data to read.
+    System.out.println("File size is good. Now validating sizes from datanodes...");
+    FSDataInputStream stmin = dfs.open(filepath);
+    stmin.readFully(0, actual, 0, size);
+    stmin.close();
+  }
+
   /**
    * This test makes the client does not renew its lease and also
    * set the hard lease expiration period to be short 1s. Thus triggering