/*
 * Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.hadoop.fs.contract;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;

import org.junit.Test;
import org.junit.AssumptionViolatedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;

/**
 * Test creating files, overwrite options etc.
 */
public abstract class AbstractContractCreateTest extends
    AbstractFSContractTestBase {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractContractCreateTest.class);

  /**
   * How long to wait for a path to become visible.
   */
  public static final int CREATE_TIMEOUT = 15000;

  protected Path path(String filepath, boolean useBuilder) throws IOException {
    return super.path(filepath  + (useBuilder ? "" : "-builder"));
  }

  private void testCreateNewFile(boolean useBuilder) throws Throwable {
    describe("Foundational 'create a file' test, using builder API=" +
        useBuilder);
    Path path = path("testCreateNewFile", useBuilder);
    byte[] data = dataset(256, 'a', 'z');
    writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, false,
        useBuilder);
    ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
  }

  @Test
  public void testCreateNewFile() throws Throwable {
    testCreateNewFile(true);
    testCreateNewFile(false);
  }

  private void testCreateFileOverExistingFileNoOverwrite(boolean useBuilder)
      throws Throwable {
    describe("Verify overwriting an existing file fails, using builder API=" +
        useBuilder);
    Path path = path("testCreateFileOverExistingFileNoOverwrite", useBuilder);
    byte[] data = dataset(256, 'a', 'z');
    writeDataset(getFileSystem(), path, data, data.length, 1024, false);
    byte[] data2 = dataset(10 * 1024, 'A', 'Z');
    try {
      writeDataset(getFileSystem(), path, data2, data2.length, 1024, false,
          useBuilder);
      fail("writing without overwrite unexpectedly succeeded");
    } catch (FileAlreadyExistsException expected) {
      //expected
      handleExpectedException(expected);
    } catch (IOException relaxed) {
      handleRelaxedException("Creating a file over a file with overwrite==false",
                             "FileAlreadyExistsException",
                             relaxed);
    }
  }

  @Test
  public void testCreateFileOverExistingFileNoOverwrite() throws Throwable {
    testCreateFileOverExistingFileNoOverwrite(false);
    testCreateFileOverExistingFileNoOverwrite(true);
  }

  private void testOverwriteExistingFile(boolean useBuilder) throws Throwable {
    describe("Overwrite an existing file and verify the new data is there, " +
        "use builder API=" + useBuilder);
    Path path = path("testOverwriteExistingFile", useBuilder);
    byte[] data = dataset(256, 'a', 'z');
    writeDataset(getFileSystem(), path, data, data.length, 1024, false,
        useBuilder);
    ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
    byte[] data2 = dataset(10 * 1024, 'A', 'Z');
    writeDataset(getFileSystem(), path, data2, data2.length, 1024, true,
        useBuilder);
    ContractTestUtils.verifyFileContents(getFileSystem(), path, data2);
  }

  /**
   * This test catches some eventual consistency problems that blobstores exhibit,
   * as we are implicitly verifying that updates are consistent. This
   * is why different file lengths and datasets are used
   */
  @Test
  public void testOverwriteExistingFile() throws Throwable {
    testOverwriteExistingFile(false);
    testOverwriteExistingFile(true);
  }

  private void testOverwriteEmptyDirectory(boolean useBuilder)
      throws Throwable {
    describe("verify trying to create a file over an empty dir fails, " +
        "use builder API=" + useBuilder);
    Path path = path("testOverwriteEmptyDirectory");
    mkdirs(path);
    assertIsDirectory(path);
    byte[] data = dataset(256, 'a', 'z');
    try {
      writeDataset(getFileSystem(), path, data, data.length, 1024, true,
          useBuilder);
      assertIsDirectory(path);
      fail("write of file over empty dir succeeded");
    } catch (FileAlreadyExistsException expected) {
      //expected
      handleExpectedException(expected);
    } catch (IOException e) {
      handleRelaxedException("overwriting a dir with a file ",
                             "FileAlreadyExistsException",
                             e);
    }
    assertIsDirectory(path);
  }

  @Test
  public void testOverwriteEmptyDirectory() throws Throwable {
    testOverwriteEmptyDirectory(false);
    testOverwriteEmptyDirectory(true);
  }

  private void testOverwriteNonEmptyDirectory(boolean useBuilder)
      throws Throwable {
    describe("verify trying to create a file over a non-empty dir fails, " +
        "use builder API=" + useBuilder);
    Path path = path("testOverwriteNonEmptyDirectory");
    mkdirs(path);
    try {
      assertIsDirectory(path);
    } catch (AssertionError failure) {
      if (isSupported(CREATE_OVERWRITES_DIRECTORY)) {
        // file/directory hack surfaces here
        throw new AssumptionViolatedException(failure.toString(), failure);
      }
      // else: rethrow
      throw failure;
    }
    Path child = new Path(path, "child");
    writeTextFile(getFileSystem(), child, "child file", true);
    byte[] data = dataset(256, 'a', 'z');
    try {
      writeDataset(getFileSystem(), path, data, data.length, 1024,
                   true, useBuilder);
      FileStatus status = getFileSystem().getFileStatus(path);

      boolean isDir = status.isDirectory();
      if (!isDir && isSupported(CREATE_OVERWRITES_DIRECTORY)) {
        // For some file systems, downgrade to a skip so that the failure is
        // visible in test results.
        skip("This Filesystem allows a file to overwrite a directory");
      }
      fail("write of file over dir succeeded");
    } catch (FileAlreadyExistsException expected) {
      //expected
      handleExpectedException(expected);
    } catch (IOException e) {
      handleRelaxedException("overwriting a dir with a file ",
                             "FileAlreadyExistsException",
                             e);
    }
    assertIsDirectory(path);
    assertIsFile(child);
  }

  @Test
  public void testOverwriteNonEmptyDirectory() throws Throwable {
    testOverwriteNonEmptyDirectory(false);
    testOverwriteNonEmptyDirectory(true);
  }

  @Test
  public void testCreatedFileIsImmediatelyVisible() throws Throwable {
    describe("verify that a newly created file exists as soon as open returns");
    Path path = path("testCreatedFileIsImmediatelyVisible");
    try(FSDataOutputStream out = getFileSystem().create(path,
                                   false,
                                   4096,
                                   (short) 1,
                                   1024)) {
      if (!getFileSystem().exists(path)) {

        if (isSupported(CREATE_VISIBILITY_DELAYED)) {
          // For some file systems, downgrade to a skip so that the failure is
          // visible in test results.
          skip("This Filesystem delays visibility of newly created files");
        }
        assertPathExists("expected path to be visible before anything written",
                         path);
      }
    }
  }

  @Test
  public void testCreatedFileIsVisibleOnFlush() throws Throwable {
    describe("verify that a newly created file exists once a flush has taken "
        + "place");
    Path path = path("testCreatedFileIsVisibleOnFlush");
    FileSystem fs = getFileSystem();
    try(FSDataOutputStream out = fs.create(path,
          false,
          4096,
          (short) 1,
          1024)) {
      out.write('a');
      out.flush();
      if (!fs.exists(path)) {
        if (isSupported(IS_BLOBSTORE) ||
            isSupported(CREATE_VISIBILITY_DELAYED)) {
          // object store or some file systems: downgrade to a skip so that the
          // failure is visible in test results
          skip("For object store or some file systems, newly created files are"
              + " not immediately visible");
        }
        assertPathExists("expected path to be visible before file closed",
            path);
      }
    }
  }

  @Test
  public void testCreatedFileIsEventuallyVisible() throws Throwable {
    describe("verify a written to file is visible after the stream is closed");
    Path path = path("testCreatedFileIsEventuallyVisible");
    FileSystem fs = getFileSystem();
    try(FSDataOutputStream out = fs.create(path, false, 4096, (short) 1,
        1024)) {
      out.write(0x01);
      out.close();
      getFileStatusEventually(fs, path, CREATE_TIMEOUT);
    }
  }

  @Test
  public void testFileStatusBlocksizeNonEmptyFile() throws Throwable {
    describe("validate the block size of a filesystem and files within it");
    FileSystem fs = getFileSystem();

    long rootPath = fs.getDefaultBlockSize(path("/"));
    assertTrue("Root block size is invalid " + rootPath,
        rootPath > 0);

    Path path = path("testFileStatusBlocksizeNonEmptyFile");
    byte[] data = dataset(256, 'a', 'z');

    writeDataset(fs, path, data, data.length, 1024 * 1024, false);

    validateBlockSize(fs, path, 1);
  }

  @Test
  public void testFileStatusBlocksizeEmptyFile() throws Throwable {
    describe("check that an empty file may return a 0-byte blocksize");
    FileSystem fs = getFileSystem();
    Path path = path("testFileStatusBlocksizeEmptyFile");
    ContractTestUtils.touch(fs, path);
    validateBlockSize(fs, path, 0);
  }

  private void validateBlockSize(FileSystem fs, Path path, int minValue)
      throws IOException, InterruptedException {
    FileStatus status =
        getFileStatusEventually(fs, path, CREATE_TIMEOUT);
    String statusDetails = status.toString();
    assertTrue("File status block size too low:  " + statusDetails
            + " min value: " + minValue,
        status.getBlockSize() >= minValue);
    long defaultBlockSize = fs.getDefaultBlockSize(path);
    assertTrue("fs.getDefaultBlockSize(" + path + ") size " +
            defaultBlockSize + " is below the minimum of " + minValue,
        defaultBlockSize >= minValue);
  }

  @Test
  public void testCreateMakesParentDirs() throws Throwable {
    describe("check that after creating a file its parent directories exist");
    FileSystem fs = getFileSystem();
    Path grandparent = path("testCreateCreatesAndPopulatesParents");
    Path parent = new Path(grandparent, "parent");
    Path child = new Path(parent, "child");
    touch(fs, child);
    assertEquals("List status of parent should include the 1 child file",
        1, fs.listStatus(parent).length);
    assertTrue("Parent directory does not appear to be a directory",
        fs.getFileStatus(parent).isDirectory());
    assertEquals("List status of grandparent should include the 1 parent dir",
        1, fs.listStatus(grandparent).length);
    assertTrue("Grandparent directory does not appear to be a directory",
        fs.getFileStatus(grandparent).isDirectory());
  }

  @Test
  public void testCreateFileUnderFile() throws Throwable {
    describe("Verify that it is forbidden to create file/file");
    if (isSupported(CREATE_FILE_UNDER_FILE_ALLOWED)) {
      // object store or some file systems: downgrade to a skip so that the
      // failure is visible in test results
      skip("This filesystem supports creating files under files");
    }
    Path grandparent = methodPath();
    Path parent = new Path(grandparent, "parent");
    expectCreateUnderFileFails(
        "creating a file under a file",
        grandparent,
        parent);
  }

  @Test
  public void testCreateUnderFileSubdir() throws Throwable {
    describe("Verify that it is forbidden to create file/dir/file");
    if (isSupported(CREATE_FILE_UNDER_FILE_ALLOWED)) {
      // object store or some file systems: downgrade to a skip so that the
      // failure is visible in test results
      skip("This filesystem supports creating files under files");
    }
    Path grandparent = methodPath();
    Path parent = new Path(grandparent, "parent");
    Path child = new Path(parent, "child");
    expectCreateUnderFileFails(
        "creating a file under a subdirectory of a file",
        grandparent,
        child);
  }


  @Test
  public void testMkdirUnderFile() throws Throwable {
    describe("Verify that it is forbidden to create file/dir");
    Path grandparent = methodPath();
    Path parent = new Path(grandparent, "parent");
    expectMkdirsUnderFileFails("mkdirs() under a file",
        grandparent, parent);
  }

  @Test
  public void testMkdirUnderFileSubdir() throws Throwable {
    describe("Verify that it is forbidden to create file/dir/dir");
    Path grandparent = methodPath();
    Path parent = new Path(grandparent, "parent");
    Path child = new Path(parent, "child");
    expectMkdirsUnderFileFails("mkdirs() file/dir",
        grandparent, child);

    try {
      // create the child
      mkdirs(child);
    } catch (FileAlreadyExistsException | ParentNotDirectoryException ex) {
      // either of these may be raised.
      handleExpectedException(ex);
    } catch (IOException e) {
      handleRelaxedException("creating a file under a subdirectory of a file ",
          "FileAlreadyExistsException",
          e);
    }
  }

  /**
   * Expect that touch() will fail because the parent is a file.
   * @param action action for message
   * @param file filename to create
   * @param descendant path under file
   * @throws Exception failure
   */
  protected void expectCreateUnderFileFails(String action,
      Path file, Path descendant)
      throws Exception {
    createFile(file);
    try {
      // create the child
      createFile(descendant);
    } catch (FileAlreadyExistsException | ParentNotDirectoryException ex) {
      //expected
      handleExpectedException(ex);
    } catch (IOException e) {
      handleRelaxedException(action,
          "ParentNotDirectoryException",
          e);
    }
  }

  protected void expectMkdirsUnderFileFails(String action,
      Path file, Path descendant)
      throws Exception {
    createFile(file);
    try {
      // now mkdirs
      mkdirs(descendant);
    } catch (FileAlreadyExistsException | ParentNotDirectoryException ex) {
      //expected
      handleExpectedException(ex);
    } catch (IOException e) {
      handleRelaxedException(action,
          "ParentNotDirectoryException",
          e);
    }
  }

  private void createFile(Path path) throws IOException {
    byte[] data = dataset(256, 'a', 'z');
    FileSystem fs = getFileSystem();
    writeDataset(fs, path, data, data.length, 1024 * 1024,
        true);
  }

  @Test
  public void testSyncable() throws Throwable {
    describe("test declared and actual Syncable behaviors");
    FileSystem fs = getFileSystem();
    boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
    boolean supportsSync = isSupported(SUPPORTS_HSYNC);
    boolean metadataUpdatedOnHSync = isSupported(METADATA_UPDATED_ON_HSYNC);

    validateSyncableSemantics(fs,
        supportsSync,
        supportsFlush,
        metadataUpdatedOnHSync);
  }

  /**
   * Validate the semantics of syncable.
   * @param fs filesystem
   * @param supportsSync sync is present
   * @param supportsFlush flush is present.
   * @param metadataUpdatedOnHSync  Is the metadata updated after an hsync?
   * @throws IOException failure
   */
  protected void validateSyncableSemantics(final FileSystem fs,
      final boolean supportsSync,
      final boolean supportsFlush,
      final boolean metadataUpdatedOnHSync)
      throws IOException {
    Path path = methodPath();
    LOG.info("Expecting files under {} to have supportsSync={}"
            + " and supportsFlush={}; metadataUpdatedOnHSync={}",
        path, supportsSync, supportsFlush, metadataUpdatedOnHSync);

    try (FSDataOutputStream out = fs.create(path, true)) {
      LOG.info("Created output stream {}", out);

      // probe stream for support for flush/sync, whose capabilities
      // of supports/does not support must match what is expected
      String[] hflushCapabilities = {
          StreamCapabilities.HFLUSH
      };
      String[] hsyncCapabilities = {
          StreamCapabilities.HSYNC
      };
      if (supportsFlush) {
        assertCapabilities(out, hflushCapabilities, null);
      } else {
        assertCapabilities(out, null, hflushCapabilities);
      }
      if (supportsSync) {
        assertCapabilities(out, hsyncCapabilities, null);
      } else {
        assertCapabilities(out, null, hsyncCapabilities);
      }

      // write one byte, then hflush it
      out.write('a');
      try {
        out.hflush();
        if (!supportsFlush) {
          // FSDataOutputStream silently downgrades to flush() here.
          // This is not good, but if changed some applications
          // break writing to some stores.
          LOG.warn("FS doesn't support Syncable.hflush(),"
              + " but doesn't reject it either.");
        }
      } catch (UnsupportedOperationException e) {
        if (supportsFlush) {
          throw new AssertionError("hflush not supported", e);
        }
      }

      // write a second byte, then hsync it.
      out.write('b');
      try {
        out.hsync();
      } catch (UnsupportedOperationException e) {
        if (supportsSync) {
          throw new AssertionError("HSync not supported", e);
        }
      }

      if (supportsSync) {
        // if sync really worked, data MUST be visible here

        // first the metadata which MUST be present
        final FileStatus st = fs.getFileStatus(path);
        if (metadataUpdatedOnHSync) {
          // not all stores reliably update it, HDFS/webHDFS in particular
          assertEquals("Metadata not updated during write " + st,
              2, st.getLen());
        }

        // there's no way to verify durability, but we can
        // at least verify a new file input stream reads
        // the data
        try (FSDataInputStream in = fs.open(path)) {
          assertEquals('a', in.read());
          assertEquals('b', in.read());
          assertEquals(-1, in.read());
          LOG.info("Successfully read synced data on a new reader {}", in);
        }
      } else {
        // no sync. Let's do a flush and see what happens.
        out.flush();
        // Now look at the filesystem.
        try (FSDataInputStream in = fs.open(path)) {
          int c = in.read();
          if (c == -1) {
            // nothing was synced; sync and flush really aren't there.
            LOG.info("sync and flush are declared unsupported"
                + " -flushed changes were not saved");

          } else {
            LOG.info("sync and flush are declared unsupported"
                + " - but the stream does offer some sync/flush semantics");
          }
          // close outside a finally as we do want to see any exception raised.
          in.close();

        } catch (FileNotFoundException e) {
          // that's OK if it's an object store or some file systems that newly created files
          // are not immediately visible, but not if its a real FS
          if (!isSupported(IS_BLOBSTORE) && !isSupported(CREATE_VISIBILITY_DELAYED)) {
            throw e;
          } else {
            LOG.warn(
                "Output file was not created; this is an object store or "
                    + "a file system with different visibility semantics");
          }
        }
      }
      // close the output stream
      out.close();

      final String stats = ioStatisticsSourceToString(out);
      if (!stats.isEmpty()) {
        LOG.info("IOStatistics {}", stats);
      }
    }
  }
}
