/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Assume;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;

import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;

/**
 * Test S3Guard list consistency feature by injecting delayed listObjects()
 * visibility via {@link InconsistentAmazonS3Client}.
 *
 * Tests here generally:
 * 1. Use the inconsistency injection mentioned above.
 * 2. Only run when S3Guard is enabled.
 */
public class ITestS3GuardListConsistency extends AbstractS3ATestBase {

  @Override
  protected AbstractFSContract createContract(Configuration conf) {
    conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
        S3ClientFactory.class);
    // Other configs would break test assumptions
    conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
    conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
    return new S3AContract(conf);
  }

  /**
   * Helper function for other test cases: does a single rename operation and
   * validates the aftermath.
   * @param mkdirs Directories to create
   * @param srcdirs Source paths for rename operation
   * @param dstdirs Destination paths for rename operation
   * @param yesdirs Files that must exist post-rename (e.g. srcdirs children)
   * @param nodirs Files that must not exist post-rename (e.g. dstdirs children)
   * @throws Exception
   */
  private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs,
      Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception {
    S3AFileSystem fs = getFileSystem();
    Assume.assumeTrue(fs.hasMetadataStore());

    if (mkdirs != null) {
      for (Path mkdir : mkdirs) {
        assertTrue(fs.mkdirs(mkdir));
      }
      clearInconsistency(fs);
    }

    assertTrue("srcdirs and dstdirs must have equal length",
        srcdirs.length == dstdirs.length);
    for (int i = 0; i < srcdirs.length; i++) {
      assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i],
          fs.rename(srcdirs[i], dstdirs[i]));
    }

    for (Path yesdir : yesdirs) {
      assertTrue("Path was supposed to exist: " + yesdir, fs.exists(yesdir));
    }
    for (Path nodir : nodirs) {
      assertFalse("Path is not supposed to exist: " + nodir, fs.exists(nodir));
    }
  }

  /**
   * Tests that after renaming a directory, the original directory and its
   * contents are indeed missing and the corresponding new paths are visible.
   * @throws Exception
   */
  @Test
  public void testConsistentListAfterRename() throws Exception {
    Path[] mkdirs = {
      path("d1/f"),
      path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)
    };
    Path[] srcdirs = {path("d1")};
    Path[] dstdirs = {path("d2")};
    Path[] yesdirs = {path("d2"), path("d2/f"),
        path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
    Path[] nodirs = {path("d1"), path("d1/f"),
        path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
    doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs);
    getFileSystem().delete(path("d1"), true);
    getFileSystem().delete(path("d2"), true);
  }

  /**
   * Tests a circular sequence of renames to verify that overwriting recently
   * deleted files and reading recently created files from rename operations
   * works as expected.
   * @throws Exception
   */
  @Test
  public void testRollingRenames() throws Exception {
    Path[] dir0 = {path("rolling/1")};
    Path[] dir1 = {path("rolling/2")};
    Path[] dir2 = {path("rolling/3")};
    // These sets have to be in reverse order compared to the movement
    Path[] setA = {dir1[0], dir0[0]};
    Path[] setB = {dir2[0], dir1[0]};
    Path[] setC = {dir0[0], dir2[0]};

    for(int i = 0; i < 2; i++) {
      Path[] firstSet = i == 0 ? setA : null;
      doTestRenameSequence(firstSet, setA, setB, setB, dir0);
      doTestRenameSequence(null, setB, setC, setC, dir1);
      doTestRenameSequence(null, setC, setA, setA, dir2);
    }

    S3AFileSystem fs = getFileSystem();
    assertFalse("Renaming deleted file should have failed",
        fs.rename(dir2[0], dir1[0]));
    assertTrue("Renaming over existing file should have succeeded",
        fs.rename(dir1[0], dir0[0]));
  }

  /**
   * Tests that deleted files immediately stop manifesting in list operations
   * even when the effect in S3 is delayed.
   * @throws Exception
   */
  @Test
  public void testConsistentListAfterDelete() throws Exception {
    S3AFileSystem fs = getFileSystem();
    // test will fail if NullMetadataStore (the default) is configured: skip it.
    Assume.assumeTrue(fs.hasMetadataStore());

    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
    // in listObjects() results via InconsistentS3Client
    Path inconsistentPath =
        path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);

    Path[] testDirs = {path("a/b/dir1"),
        path("a/b/dir2"),
        inconsistentPath};

    for (Path path : testDirs) {
      assertTrue(fs.mkdirs(path));
    }
    clearInconsistency(fs);
    for (Path path : testDirs) {
      assertTrue(fs.delete(path, false));
    }

    FileStatus[] paths = fs.listStatus(path("a/b/"));
    List<Path> list = new ArrayList<>();
    for (FileStatus fileState : paths) {
      list.add(fileState.getPath());
    }
    assertFalse(list.contains(path("a/b/dir1")));
    assertFalse(list.contains(path("a/b/dir2")));
    // This should fail without S3Guard, and succeed with it.
    assertFalse(list.contains(inconsistentPath));
  }

  /**
   * Tests that rename immediately after files in the source directory are
   * deleted results in exactly the correct set of destination files and none
   * of the source files.
   * @throws Exception
   */
  @Test
  public void testConsistentRenameAfterDelete() throws Exception {
    S3AFileSystem fs = getFileSystem();
    // test will fail if NullMetadataStore (the default) is configured: skip it.
    Assume.assumeTrue(fs.hasMetadataStore());

    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
    // in listObjects() results via InconsistentS3Client
    Path inconsistentPath =
        path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);

    Path[] testDirs = {path("a/b/dir1"),
        path("a/b/dir2"),
        inconsistentPath};

    for (Path path : testDirs) {
      assertTrue(fs.mkdirs(path));
    }
    clearInconsistency(fs);
    assertTrue(fs.delete(testDirs[1], false));
    assertTrue(fs.delete(testDirs[2], false));

    fs.rename(path("a"), path("a3"));
    FileStatus[] paths = fs.listStatus(path("a3/b"));
    List<Path> list = new ArrayList<>();
    for (FileStatus fileState : paths) {
      list.add(fileState.getPath());
    }
    assertTrue(list.contains(path("a3/b/dir1")));
    assertFalse(list.contains(path("a3/b/dir2")));
    // This should fail without S3Guard, and succeed with it.
    assertFalse(list.contains(path("a3/b/dir3-" +
        DEFAULT_DELAY_KEY_SUBSTRING)));

    try {
      RemoteIterator<LocatedFileStatus> old = fs.listFilesAndEmptyDirectories(
          path("a"), true);
      fail("Recently renamed dir should not be visible");
    } catch(FileNotFoundException e) {
      // expected
    }
  }

  @Test
  public void testConsistentListStatusAfterPut() throws Exception {

    S3AFileSystem fs = getFileSystem();

    // This test will fail if NullMetadataStore (the default) is configured:
    // skip it.
    Assume.assumeTrue(fs.hasMetadataStore());

    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
    // in listObjects() results via InconsistentS3Client
    Path inconsistentPath =
        path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);

    Path[] testDirs = {path("a/b/dir1"),
        path("a/b/dir2"),
        inconsistentPath};

    for (Path path : testDirs) {
      assertTrue(fs.mkdirs(path));
    }

    FileStatus[] paths = fs.listStatus(path("a/b/"));
    List<Path> list = new ArrayList<>();
    for (FileStatus fileState : paths) {
      list.add(fileState.getPath());
    }
    assertTrue(list.contains(path("a/b/dir1")));
    assertTrue(list.contains(path("a/b/dir2")));
    // This should fail without S3Guard, and succeed with it.
    assertTrue(list.contains(inconsistentPath));
  }

  /**
   * Similar to {@link #testConsistentListStatusAfterPut()}, this tests that the
   * FS listLocatedStatus() call will return consistent list.
   */
  @Test
  public void testConsistentListLocatedStatusAfterPut() throws Exception {
    final S3AFileSystem fs = getFileSystem();
    // This test will fail if NullMetadataStore (the default) is configured:
    // skip it.
    Assume.assumeTrue(fs.hasMetadataStore());
    String rootDir = "doTestConsistentListLocatedStatusAfterPut";
    fs.mkdirs(path(rootDir));

    final int[] numOfPaths = {0, 1, 5};
    for (int normalPathNum : numOfPaths) {
      for (int delayedPathNum : new int[] {0, 2}) {
        LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
            normalPathNum, delayedPathNum);
        doTestConsistentListLocatedStatusAfterPut(fs, rootDir, normalPathNum,
            delayedPathNum);
      }
    }
  }

  /**
   * Helper method to implement the tests of consistent listLocatedStatus().
   * @param fs The S3 file system from contract
   * @param normalPathNum number paths listed directly from S3 without delaying
   * @param delayedPathNum number paths listed with delaying
   * @throws Exception
   */
  private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs,
      String rootDir, int normalPathNum, int delayedPathNum) throws Exception {
    final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
    int index = 0;
    for (; index < normalPathNum; index++) {
      testDirs.add(path(rootDir + "/dir-" +
          index));
    }
    for (; index < normalPathNum + delayedPathNum; index++) {
      // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
      // in listObjects() results via InconsistentS3Client
      testDirs.add(path(rootDir + "/dir-" + index +
          DEFAULT_DELAY_KEY_SUBSTRING));
    }

    for (Path path : testDirs) {
      // delete the old test path (if any) so that when we call mkdirs() later,
      // the to delay directories will be tracked via putObject() request.
      fs.delete(path, true);
      assertTrue(fs.mkdirs(path));
    }

    // this should return the union data from S3 and MetadataStore
    final RemoteIterator<LocatedFileStatus> statusIterator =
        fs.listLocatedStatus(path(rootDir + "/"));
    List<Path> list = new ArrayList<>();
    for (; statusIterator.hasNext();) {
      list.add(statusIterator.next().getPath());
    }

    // This should fail without S3Guard, and succeed with it because part of the
    // children under test path are delaying visibility
    for (Path path : testDirs) {
      assertTrue("listLocatedStatus should list " + path, list.contains(path));
    }
  }

  /**
   * Tests that the S3AFS listFiles() call will return consistent file list.
   */
  @Test
  public void testConsistentListFiles() throws Exception {
    final S3AFileSystem fs = getFileSystem();
    // This test will fail if NullMetadataStore (the default) is configured:
    // skip it.
    Assume.assumeTrue(fs.hasMetadataStore());

    final int[] numOfPaths = {0, 2};
    for (int dirNum : numOfPaths) {
      for (int normalFile : numOfPaths) {
        for (int delayedFile : new int[] {0, 1}) {
          for (boolean recursive : new boolean[] {true, false}) {
            doTestListFiles(fs, dirNum, normalFile, delayedFile, recursive);
          }
        }
      }
    }
  }

  /**
   * Helper method to implement the tests of consistent listFiles().
   *
   * The file structure has dirNum subdirectories, and each directory (including
   * the test base directory itself) has normalFileNum normal files and
   * delayedFileNum delayed files.
   *
   * @param fs The S3 file system from contract
   * @param dirNum number of subdirectories
   * @param normalFileNum number files in each directory without delay to list
   * @param delayedFileNum number files in each directory with delay to list
   * @param recursive listFiles recursively if true
   * @throws Exception if any unexpected error
   */
  private void doTestListFiles(S3AFileSystem fs, int dirNum, int normalFileNum,
      int delayedFileNum, boolean recursive) throws Exception {
    describe("Testing dirNum=%d, normalFile=%d, delayedFile=%d, "
        + "recursive=%s", dirNum, normalFileNum, delayedFileNum, recursive);
    final Path baseTestDir = path("doTestListFiles-" + dirNum + "-"
        + normalFileNum + "-" + delayedFileNum + "-" + recursive);
    // delete the old test path (if any) so that when we call mkdirs() later,
    // the to delay sub directories will be tracked via putObject() request.
    fs.delete(baseTestDir, true);

    // make subdirectories (if any)
    final List<Path> testDirs = new ArrayList<>(dirNum + 1);
    assertTrue(fs.mkdirs(baseTestDir));
    testDirs.add(baseTestDir);
    for (int i = 0; i < dirNum; i++) {
      final Path subdir = path(baseTestDir + "/dir-" + i);
      assertTrue(fs.mkdirs(subdir));
      testDirs.add(subdir);
    }

    final Collection<String> fileNames
        = new ArrayList<>(normalFileNum + delayedFileNum);
    int index = 0;
    for (; index < normalFileNum; index++) {
      fileNames.add("file-" + index);
    }
    for (; index < normalFileNum + delayedFileNum; index++) {
      // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
      // in listObjects() results via InconsistentS3Client
      fileNames.add("file-" + index + "-" + DEFAULT_DELAY_KEY_SUBSTRING);
    }

    int filesAndEmptyDirectories = 0;

    // create files under each test directory
    for (Path dir : testDirs) {
      for (String fileName : fileNames) {
        writeTextFile(fs, new Path(dir, fileName), "I, " + fileName, false);
        filesAndEmptyDirectories++;
      }
    }

    // this should return the union data from S3 and MetadataStore
    final RemoteIterator<LocatedFileStatus> statusIterator
        = fs.listFiles(baseTestDir, recursive);
    final Collection<Path> listedFiles = new HashSet<>();
    for (; statusIterator.hasNext();) {
      final FileStatus status = statusIterator.next();
      assertTrue("FileStatus " + status + " is not a file!", status.isFile());
      listedFiles.add(status.getPath());
    }
    LOG.info("S3AFileSystem::listFiles('{}', {}) -> {}",
        baseTestDir, recursive, listedFiles);

    // This should fail without S3Guard, and succeed with it because part of the
    // files to list are delaying visibility
    if (!recursive) {
      // in this case only the top level files are listed
      verifyFileIsListed(listedFiles, baseTestDir, fileNames);
      assertEquals("Unexpected number of files returned by listFiles() call",
          normalFileNum + delayedFileNum, listedFiles.size());
    } else {
      for (Path dir : testDirs) {
        verifyFileIsListed(listedFiles, dir, fileNames);
      }
      assertEquals("Unexpected number of files returned by listFiles() call",
          filesAndEmptyDirectories,
          listedFiles.size());
    }
  }

  private static void verifyFileIsListed(Collection<Path> listedFiles,
      Path currentDir, Collection<String> fileNames) {
    for (String fileName : fileNames) {
      final Path file = new Path(currentDir, fileName);
      assertTrue(file + " should have been listed", listedFiles.contains(file));
    }
  }

  @Test
  public void testCommitByRenameOperations() throws Throwable {
    S3AFileSystem fs = getFileSystem();
    Assume.assumeTrue(fs.hasMetadataStore());
    Path work = path("test-commit-by-rename-" + DEFAULT_DELAY_KEY_SUBSTRING);
    Path task00 = new Path(work, "task00");
    fs.mkdirs(task00);
    String name = "part-00";
    try (FSDataOutputStream out =
             fs.create(new Path(task00, name), false)) {
      out.writeChars("hello");
    }
    for (FileStatus stat : fs.listStatus(task00)) {
      fs.rename(stat.getPath(), work);
    }
    List<FileStatus> files = new ArrayList<>(2);
    for (FileStatus stat : fs.listStatus(work)) {
      if (stat.isFile()) {
        files.add(stat);
      }
    }
    assertFalse("renamed file " + name + " not found in " + work,
        files.isEmpty());
    assertEquals("more files found than expected in " + work
        + " " + ls(work), 1, files.size());
    FileStatus status = files.get(0);
    assertEquals("Wrong filename in " + status,
        name, status.getPath().getName());
  }

  @Test
  public void testInconsistentS3ClientDeletes() throws Throwable {
    S3AFileSystem fs = getFileSystem();
    Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
    for (int i = 0; i < 3; i++) {
      fs.mkdirs(new Path(root, "dir" + i));
      touch(fs, new Path(root, "file" + i));
      for (int j = 0; j < 3; j++) {
        touch(fs, new Path(new Path(root, "dir" + i), "file" + i + "-" + j));
      }
    }
    clearInconsistency(fs);

    AmazonS3 client = fs.getAmazonS3Client();
    String key = fs.pathToKey(root) + "/";

    ObjectListing preDeleteDelimited = client.listObjects(
        fs.createListObjectsRequest(key, "/"));
    ObjectListing preDeleteUndelimited = client.listObjects(
        fs.createListObjectsRequest(key, null));

    fs.delete(root, true);

    ObjectListing postDeleteDelimited = client.listObjects(
        fs.createListObjectsRequest(key, "/"));
    ObjectListing postDeleteUndelimited = client.listObjects(
        fs.createListObjectsRequest(key, null));

    assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
            "in a non-recursive listing",
        preDeleteDelimited.getObjectSummaries().size(),
        postDeleteDelimited.getObjectSummaries().size()
    );
    assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
            "in a non-recursive listing",
        preDeleteDelimited.getCommonPrefixes().size(),
        postDeleteDelimited.getCommonPrefixes().size()
    );
    assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
            "in a recursive listing",
        preDeleteUndelimited.getObjectSummaries().size(),
        postDeleteUndelimited.getObjectSummaries().size()
    );
    assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
            "in a recursive listing",
        preDeleteUndelimited.getCommonPrefixes().size(),
        postDeleteUndelimited.getCommonPrefixes().size()
    );
  }

  private static void clearInconsistency(S3AFileSystem fs) throws Exception {
    AmazonS3 s3 = fs.getAmazonS3Client();
    InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
    ic.clearInconsistency();
  }
}
