/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestLeaseRecovery2 {
  
  public static final Log LOG = LogFactory.getLog(TestLeaseRecovery2.class);
  
  {
    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
  }

  static final private long BLOCK_SIZE = 1024;
  static final private int FILE_SIZE = (int)BLOCK_SIZE*2;
  static final short REPLICATION_NUM = (short)3;
  static byte[] buffer = new byte[FILE_SIZE];
  
  static private String fakeUsername = "fakeUser1";
  static private String fakeGroup = "supergroup";

  static private MiniDFSCluster cluster;
  static private DistributedFileSystem dfs;
  final static private Configuration conf = new HdfsConfiguration();
  final static private int BUF_SIZE = conf.getInt(
      CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
  
  final static private long SHORT_LEASE_PERIOD = 1000L;
  final static private long LONG_LEASE_PERIOD = 60*60*SHORT_LEASE_PERIOD;
  
  /** start a dfs cluster
   * 
   * @throws IOException
   */
  @BeforeClass
  public static void startUp() throws IOException {
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);

    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
    cluster.waitActive();
    dfs = cluster.getFileSystem();
  }
  
  /**
   * stop the cluster
   * @throws IOException
   */
  @AfterClass
  public static void tearDown() throws IOException {
    IOUtils.closeStream(dfs);
    if (cluster != null) {cluster.shutdown();}
  }

  /**
   * Test the NameNode's revoke lease on current lease holder function.
   * @throws Exception
   */
  @Test
  public void testImmediateRecoveryOfLease() throws Exception {
    //create a file
    // write bytes into the file.
    byte [] actual = new byte[FILE_SIZE];
    int size = AppendTestUtil.nextInt(FILE_SIZE);
    Path filepath = createFile("/immediateRecoverLease-shortlease", size, true);
    // set the soft limit to be 1 second so that the
    // namenode triggers lease recovery on next attempt to write-for-open.
    cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);

    recoverLeaseUsingCreate(filepath);
    verifyFile(dfs, filepath, actual, size);

    //test recoverLease
    // set the soft limit to be 1 hour but recoverLease should
    // close the file immediately
    cluster.setLeasePeriod(LONG_LEASE_PERIOD, LONG_LEASE_PERIOD);
    size = AppendTestUtil.nextInt(FILE_SIZE);
    filepath = createFile("/immediateRecoverLease-longlease", size, false);

    // test recoverLese from a different client
    recoverLease(filepath, null);
    verifyFile(dfs, filepath, actual, size);

    // test recoverlease from the same client
    size = AppendTestUtil.nextInt(FILE_SIZE);
    filepath = createFile("/immediateRecoverLease-sameclient", size, false);

    // create another file using the same client
    Path filepath1 = new Path(filepath.toString() + AppendTestUtil.nextInt());
    FSDataOutputStream stm = dfs.create(filepath1, true, BUF_SIZE,
      REPLICATION_NUM, BLOCK_SIZE);

    // recover the first file
    recoverLease(filepath, dfs);
    verifyFile(dfs, filepath, actual, size);

    // continue to write to the second file
    stm.write(buffer, 0, size);
    stm.close();
    verifyFile(dfs, filepath1, actual, size);
  }

  private Path createFile(final String filestr, final int size,
      final boolean triggerLeaseRenewerInterrupt)
  throws IOException, InterruptedException {
    AppendTestUtil.LOG.info("filestr=" + filestr);
    Path filepath = new Path(filestr);
    FSDataOutputStream stm = dfs.create(filepath, true, BUF_SIZE,
      REPLICATION_NUM, BLOCK_SIZE);
    assertTrue(dfs.dfs.exists(filestr));

    AppendTestUtil.LOG.info("size=" + size);
    stm.write(buffer, 0, size);

    // hflush file
    AppendTestUtil.LOG.info("hflush");
    stm.hflush();

    if (triggerLeaseRenewerInterrupt) {
      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
      dfs.dfs.getLeaseRenewer().interruptAndJoin();
    }
    return filepath;
  }

  private void recoverLease(Path filepath, DistributedFileSystem dfs)
  throws Exception {
    if (dfs == null) {
      dfs = (DistributedFileSystem)getFSAsAnotherUser(conf);
    }

    while (!dfs.recoverLease(filepath)) {
      AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
      Thread.sleep(5000);
    }
  }

  private FileSystem getFSAsAnotherUser(final Configuration c)
  throws IOException, InterruptedException {
    return FileSystem.get(FileSystem.getDefaultUri(c), c,
      UserGroupInformation.createUserForTesting(fakeUsername, 
          new String [] {fakeGroup}).getUserName());
  }

  private void recoverLeaseUsingCreate(Path filepath)
  throws IOException, InterruptedException {
    FileSystem dfs2 = getFSAsAnotherUser(conf);

    boolean done = false;
    for(int i = 0; i < 10 && !done; i++) {
      AppendTestUtil.LOG.info("i=" + i);
      try {
        dfs2.create(filepath, false, BUF_SIZE, (short)1, BLOCK_SIZE);
        fail("Creation of an existing file should never succeed.");
      } catch (IOException ioe) {
        final String message = ioe.getMessage();
        if (message.contains("file exists")) {
          AppendTestUtil.LOG.info("done", ioe);
          done = true;
        }
        else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
          AppendTestUtil.LOG.info("GOOD! got " + message);
        }
        else {
          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
        }
      }

      if (!done) {
        AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
        try {Thread.sleep(5000);} catch (InterruptedException e) {}
      }
    }
    assertTrue(done);
  }

  private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,
      int size) throws IOException {
    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
        + "Validating its contents now...");

    // verify that file-size matches
    assertTrue("File should be " + size + " bytes, but is actually " +
               " found to be " + dfs.getFileStatus(filepath).getLen() +
               " bytes",
               dfs.getFileStatus(filepath).getLen() == size);

    // verify that there is enough data to read.
    System.out.println("File size is good. Now validating sizes from datanodes...");
    FSDataInputStream stmin = dfs.open(filepath);
    stmin.readFully(0, actual, 0, size);
    stmin.close();
  }

  /**
   * This test makes the client does not renew its lease and also
   * set the hard lease expiration period to be short 1s. Thus triggering
   * lease expiration to happen while the client is still alive.
   * 
   * The test makes sure that the lease recovery completes and the client
   * fails if it continues to write to the file.
   * 
   * @throws Exception
   */
  @Test
  public void testHardLeaseRecovery() throws Exception {
    //create a file
    String filestr = "/hardLeaseRecovery";
    AppendTestUtil.LOG.info("filestr=" + filestr);
    Path filepath = new Path(filestr);
    FSDataOutputStream stm = dfs.create(filepath, true,
        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
    assertTrue(dfs.dfs.exists(filestr));

    // write bytes into the file.
    int size = AppendTestUtil.nextInt(FILE_SIZE);
    AppendTestUtil.LOG.info("size=" + size);
    stm.write(buffer, 0, size);

    // hflush file
    AppendTestUtil.LOG.info("hflush");
    stm.hflush();
    
    // kill the lease renewal thread
    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
    dfs.dfs.getLeaseRenewer().interruptAndJoin();

    // set the hard limit to be 1 second 
    cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
    
    // wait for lease recovery to complete
    LocatedBlocks locatedBlocks;
    do {
      Thread.sleep(SHORT_LEASE_PERIOD);
      locatedBlocks = dfs.dfs.getLocatedBlocks(filestr, 0L, size);
    } while (locatedBlocks.isUnderConstruction());
    assertEquals(size, locatedBlocks.getFileLength());

    // make sure that the writer thread gets killed
    try {
      stm.write('b');
      stm.close();
      fail("Writer thread should have been killed");
    } catch (IOException e) {
      e.printStackTrace();
    }      

    // verify data
    AppendTestUtil.LOG.info(
        "File size is good. Now validating sizes from datanodes...");
    AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
  }
  
  /**
   * This test makes the client does not renew its lease and also
   * set the soft lease expiration period to be short 1s. Thus triggering
   * soft lease expiration to happen immediately by having another client
   * trying to create the same file.
   * 
   * The test makes sure that the lease recovery completes.
   * 
   * @throws Exception
   */
  @Test
  public void testSoftLeaseRecovery() throws Exception {
    Map<String, String []> u2g_map = new HashMap<String, String []>(1);
    u2g_map.put(fakeUsername, new String[] {fakeGroup});
    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);

    // Reset default lease periods
    cluster.setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
        HdfsConstants.LEASE_HARDLIMIT_PERIOD);
    //create a file
    // create a random file name
    String filestr = "/foo" + AppendTestUtil.nextInt();
    AppendTestUtil.LOG.info("filestr=" + filestr);
    Path filepath = new Path(filestr);
    FSDataOutputStream stm = dfs.create(filepath, true,
        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
    assertTrue(dfs.dfs.exists(filestr));

    // write random number of bytes into it.
    int size = AppendTestUtil.nextInt(FILE_SIZE);
    AppendTestUtil.LOG.info("size=" + size);
    stm.write(buffer, 0, size);

    // hflush file
    AppendTestUtil.LOG.info("hflush");
    stm.hflush();
    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
    dfs.dfs.getLeaseRenewer().interruptAndJoin();

    // set the soft limit to be 1 second so that the
    // namenode triggers lease recovery on next attempt to write-for-open.
    cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);

    // try to re-open the file before closing the previous handle. This
    // should fail but will trigger lease recovery.
    {
      UserGroupInformation ugi = 
        UserGroupInformation.createUserForTesting(fakeUsername, 
            new String [] { fakeGroup});

      FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);

      boolean done = false;
      for(int i = 0; i < 10 && !done; i++) {
        AppendTestUtil.LOG.info("i=" + i);
        try {
          dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
          fail("Creation of an existing file should never succeed.");
        } catch (FileAlreadyExistsException ex) {
          done = true;
        } catch (AlreadyBeingCreatedException ex) {
          AppendTestUtil.LOG.info("GOOD! got " + ex.getMessage());
        } catch (IOException ioe) {
          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
        }

        if (!done) {
          AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
          try {Thread.sleep(5000);} catch (InterruptedException e) {}
        }
      }
      assertTrue(done);
    }

    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
        + "Validating its contents now...");

    // verify that file-size matches
    long fileSize = dfs.getFileStatus(filepath).getLen();
    assertTrue("File should be " + size + " bytes, but is actually " +
        " found to be " + fileSize + " bytes", fileSize == size);

    // verify data
    AppendTestUtil.LOG.info("File size is good. " +
                     "Now validating data and sizes from datanodes...");
    AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
  }
  
  /**
   * This test makes it so the client does not renew its lease and also
   * set the hard lease expiration period to be short, thus triggering
   * lease expiration to happen while the client is still alive. The test
   * also causes the NN to restart after lease recovery has begun, but before
   * the DNs have completed the blocks. This test verifies that when the NN
   * comes back up, the client no longer holds the lease.
   * 
   * The test makes sure that the lease recovery completes and the client
   * fails if it continues to write to the file, even after NN restart.
   * 
   * @throws Exception
   */
  @Test
  public void testHardLeaseRecoveryAfterNameNodeRestart() throws Exception {
    hardLeaseRecoveryRestartHelper(false, -1);
  }

  @Test
  public void testHardLeaseRecoveryAfterNameNodeRestart2() throws Exception {
    hardLeaseRecoveryRestartHelper(false, 1535);
  }

  @Test
  public void testHardLeaseRecoveryWithRenameAfterNameNodeRestart()
      throws Exception {
    hardLeaseRecoveryRestartHelper(true, -1);
  }
  
  public void hardLeaseRecoveryRestartHelper(boolean doRename, int size)
      throws Exception {
    if (size < 0) {
      size =  AppendTestUtil.nextInt(FILE_SIZE + 1);
    }

    //create a file
    String fileStr = "/hardLeaseRecovery";
    AppendTestUtil.LOG.info("filestr=" + fileStr);
    Path filePath = new Path(fileStr);
    FSDataOutputStream stm = dfs.create(filePath, true,
        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
    assertTrue(dfs.dfs.exists(fileStr));

    // write bytes into the file.
    AppendTestUtil.LOG.info("size=" + size);
    stm.write(buffer, 0, size);
    
    String originalLeaseHolder = NameNodeAdapter.getLeaseHolderForPath(
        cluster.getNameNode(), fileStr);
    
    assertFalse("original lease holder should not be the NN",
        originalLeaseHolder.equals(HdfsServerConstants.NAMENODE_LEASE_HOLDER));

    // hflush file
    AppendTestUtil.LOG.info("hflush");
    stm.hflush();
    
    // check visible length
    final HdfsDataInputStream in = (HdfsDataInputStream)dfs.open(filePath);
    Assert.assertEquals(size, in.getVisibleLength());
    in.close();
    
    if (doRename) {
      fileStr += ".renamed";
      Path renamedPath = new Path(fileStr);
      assertTrue(dfs.rename(filePath, renamedPath));
      filePath = renamedPath;
    }
    
    // kill the lease renewal thread
    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
    dfs.dfs.getLeaseRenewer().interruptAndJoin();
    
    // Make sure the DNs don't send a heartbeat for a while, so the blocks
    // won't actually get completed during lease recovery.
    for (DataNode dn : cluster.getDataNodes()) {
      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
    }
    
    // set the hard limit to be 1 second 
    cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
    
    // Make sure lease recovery begins.
    Thread.sleep(HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL * 2);
    
    checkLease(fileStr, size);
    
    cluster.restartNameNode(false);
    
    checkLease(fileStr, size);
    
    // Let the DNs send heartbeats again.
    for (DataNode dn : cluster.getDataNodes()) {
      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
    }

    cluster.waitActive();

    // set the hard limit to be 1 second, to initiate lease recovery. 
    cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
    
    // wait for lease recovery to complete
    LocatedBlocks locatedBlocks;
    do {
      Thread.sleep(SHORT_LEASE_PERIOD);
      locatedBlocks = dfs.dfs.getLocatedBlocks(fileStr, 0L, size);
    } while (locatedBlocks.isUnderConstruction());
    assertEquals(size, locatedBlocks.getFileLength());

    // make sure that the client can't write data anymore.
    try {
      stm.write('b');
      stm.hflush();
      fail("Should not be able to flush after we've lost the lease");
    } catch (IOException e) {
      LOG.info("Expceted exception on write/hflush", e);
    }
    
    try {
      stm.close();
      fail("Should not be able to close after we've lost the lease");
    } catch (IOException e) {
      LOG.info("Expected exception on close", e);
    }

    // verify data
    AppendTestUtil.LOG.info(
        "File size is good. Now validating sizes from datanodes...");
    AppendTestUtil.checkFullFile(dfs, filePath, size, buffer, fileStr);
  }
  
  static void checkLease(String f, int size) {
    final String holder = NameNodeAdapter.getLeaseHolderForPath(
        cluster.getNameNode(), f); 
    if (size == 0) {
      assertEquals("lease holder should null, file is closed", null, holder);
    } else {
      assertEquals("lease holder should now be the NN",
          HdfsServerConstants.NAMENODE_LEASE_HOLDER, holder);
    }
    
  }
}
