/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io;

import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.Writer;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link FileIO}. */
@RunWith(JUnit4.class)
public class FileIOTest implements Serializable {
  @Rule public transient TestPipeline p = TestPipeline.create();

  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();

  @Rule public transient ExpectedException thrown = ExpectedException.none();

  @Test
  @Category(NeedsRunner.class)
  public void testMatchAndMatchAll() throws IOException {
    Path firstPath = tmpFolder.newFile("first").toPath();
    Path secondPath = tmpFolder.newFile("second").toPath();
    int firstSize = 37;
    int secondSize = 42;
    long firstModified = 1541097000L;
    long secondModified = 1541098000L;
    Files.write(firstPath, new byte[firstSize]);
    Files.write(secondPath, new byte[secondSize]);
    Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified));
    Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified));
    MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified);
    MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified);

    PAssert.that(
            p.apply(
                "Match existing",
                FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")))
        .containsInAnyOrder(firstMetadata, secondMetadata);
    PAssert.that(
            p.apply(
                "Match existing with provider",
                FileIO.match()
                    .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*"))))
        .containsInAnyOrder(firstMetadata, secondMetadata);
    PAssert.that(
            p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
                .apply("MatchAll existing", FileIO.matchAll()))
        .containsInAnyOrder(firstMetadata, secondMetadata);

    PAssert.that(
            p.apply(
                "Match non-existing ALLOW",
                FileIO.match()
                    .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah")
                    .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)))
        .containsInAnyOrder();
    PAssert.that(
            p.apply(
                    "Create non-existing",
                    Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah"))
                .apply(
                    "MatchAll non-existing ALLOW",
                    FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)))
        .containsInAnyOrder();

    PAssert.that(
            p.apply(
                "Match non-existing ALLOW_IF_WILDCARD",
                FileIO.match()
                    .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah*")
                    .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)))
        .containsInAnyOrder();
    PAssert.that(
            p.apply(
                    "Create non-existing wildcard + explicit",
                    Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*"))
                .apply(
                    "MatchAll non-existing ALLOW_IF_WILDCARD",
                    FileIO.matchAll()
                        .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)))
        .containsInAnyOrder();
    PAssert.that(
            p.apply(
                    "Create non-existing wildcard + default",
                    Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*"))
                .apply("MatchAll non-existing default", FileIO.matchAll()))
        .containsInAnyOrder();

    p.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testMatchDisallowEmptyDefault() throws IOException {
    p.apply("Match", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"));

    thrown.expectCause(isA(FileNotFoundException.class));
    p.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testMatchDisallowEmptyExplicit() throws IOException {
    p.apply(
        FileIO.match()
            .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")
            .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));

    thrown.expectCause(isA(FileNotFoundException.class));
    p.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testMatchDisallowEmptyNonWildcard() throws IOException {
    p.apply(
        FileIO.match()
            .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah")
            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD));

    thrown.expectCause(isA(FileNotFoundException.class));
    p.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testMatchAllDisallowEmptyExplicit() throws IOException {
    p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
        .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));
    thrown.expectCause(isA(FileNotFoundException.class));
    p.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testMatchAllDisallowEmptyNonWildcard() throws IOException {
    p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah"))
        .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD));
    thrown.expectCause(isA(FileNotFoundException.class));
    p.run();
  }

  @Test
  @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
  public void testMatchWatchForNewFiles() throws IOException, InterruptedException {
    // Write some files to a "source" directory.
    final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source");
    sourcePath.toFile().mkdir();
    Files.write(sourcePath.resolve("first"), new byte[42]);
    Files.write(sourcePath.resolve("second"), new byte[37]);
    Files.write(sourcePath.resolve("third"), new byte[99]);

    // Create a "watch" directory that the pipeline will copy files into.
    final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch");
    watchPath.toFile().mkdir();
    PCollection<MatchResult.Metadata> matchMetadata =
        p.apply(
            FileIO.match()
                .filepattern(watchPath.resolve("*").toString())
                .continuously(
                    Duration.millis(100),
                    Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))));
    PCollection<MatchResult.Metadata> matchAllMetadata =
        p.apply(Create.of(watchPath.resolve("*").toString()))
            .apply(
                FileIO.matchAll()
                    .continuously(
                        Duration.millis(100),
                        Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))));
    assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded());
    assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded());

    // Copy the files to the "watch" directory, preserving the lastModifiedTime;
    // the COPY_ATTRIBUTES option ensures that we will at a minimum copy lastModifiedTime.
    CopyOption[] copyOptions = {StandardCopyOption.COPY_ATTRIBUTES};
    Thread writer =
        new Thread(
            () -> {
              try {
                Thread.sleep(1000);
                Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), copyOptions);
                Thread.sleep(300);
                Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), copyOptions);
                Thread.sleep(300);
                Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), copyOptions);
              } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
              }
            });
    writer.start();

    // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition
    // with the writer thread.
    List<MatchResult.Metadata> expected =
        Arrays.asList(
            metadata(
                watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))),
            metadata(
                watchPath.resolve("second"), 37, lastModifiedMillis(sourcePath.resolve("second"))),
            metadata(
                watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third"))));
    PAssert.that(matchMetadata).containsInAnyOrder(expected);
    PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
    p.run();

    writer.join();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testRead() throws IOException {
    final String path = tmpFolder.newFile("file").getAbsolutePath();
    final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath();
    Files.write(new File(path).toPath(), "Hello world".getBytes(Charsets.UTF_8));
    try (Writer writer =
        new OutputStreamWriter(
            new GZIPOutputStream(new FileOutputStream(pathGZ)), Charsets.UTF_8)) {
      writer.write("Hello world");
    }

    PCollection<MatchResult.Metadata> matches = p.apply("Match", FileIO.match().filepattern(path));
    PCollection<FileIO.ReadableFile> decompressedAuto =
        matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO));
    PCollection<FileIO.ReadableFile> decompressedDefault =
        matches.apply("Read default", FileIO.readMatches());
    PCollection<FileIO.ReadableFile> decompressedUncompressed =
        matches.apply(
            "Read UNCOMPRESSED", FileIO.readMatches().withCompression(Compression.UNCOMPRESSED));
    for (PCollection<FileIO.ReadableFile> c :
        Arrays.asList(decompressedAuto, decompressedDefault, decompressedUncompressed)) {
      PAssert.thatSingleton(c)
          .satisfies(
              input -> {
                assertEquals(path, input.getMetadata().resourceId().toString());
                assertEquals("Hello world".length(), input.getMetadata().sizeBytes());
                assertEquals(Compression.UNCOMPRESSED, input.getCompression());
                assertTrue(input.getMetadata().isReadSeekEfficient());
                try {
                  assertEquals("Hello world", input.readFullyAsUTF8String());
                } catch (IOException e) {
                  throw new RuntimeException(e);
                }
                return null;
              });
    }

    PCollection<MatchResult.Metadata> matchesGZ =
        p.apply("Match GZ", FileIO.match().filepattern(pathGZ));
    PCollection<FileIO.ReadableFile> compressionAuto =
        matchesGZ.apply("Read GZ AUTO", FileIO.readMatches().withCompression(Compression.AUTO));
    PCollection<FileIO.ReadableFile> compressionDefault =
        matchesGZ.apply("Read GZ default", FileIO.readMatches());
    PCollection<FileIO.ReadableFile> compressionGzip =
        matchesGZ.apply("Read GZ GZIP", FileIO.readMatches().withCompression(Compression.GZIP));
    for (PCollection<FileIO.ReadableFile> c :
        Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) {
      PAssert.thatSingleton(c)
          .satisfies(
              input -> {
                assertEquals(pathGZ, input.getMetadata().resourceId().toString());
                assertFalse(input.getMetadata().sizeBytes() == "Hello world".length());
                assertEquals(Compression.GZIP, input.getCompression());
                assertFalse(input.getMetadata().isReadSeekEfficient());
                try {
                  assertEquals("Hello world", input.readFullyAsUTF8String());
                } catch (IOException e) {
                  throw new RuntimeException(e);
                }
                return null;
              });
    }

    p.run();
  }

  private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) {
    return MatchResult.Metadata.builder()
        .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */))
        .setIsReadSeekEfficient(true)
        .setSizeBytes(size)
        .setLastModifiedMillis(lastModifiedMillis)
        .build();
  }

  private static long lastModifiedMillis(Path path) throws IOException {
    return Files.getLastModifiedTime(path).toMillis();
  }

  private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write<?, ?> write)
      throws Exception {
    return write.resolveFileNamingFn().getClosure().apply(null, null);
  }

  private static String getDefaultFileName(FileIO.Write<?, ?> write) throws Exception {
    return resolveFileNaming(write).getFilename(null, null, 0, 0, null);
  }

  @Test
  public void testFilenameFnResolution() throws Exception {
    FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex, compression) -> "foo";

    String expected =
        FileSystems.matchNewResource("test", true).resolve("foo", RESOLVE_FILE).toString();
    assertEquals(
        "Filenames should be resolved within a relative directory if '.to' is invoked",
        expected,
        getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o -> foo)));
    assertEquals(
        "Filenames should be resolved within a relative directory if '.to' is invoked",
        expected,
        getDefaultFileName(FileIO.write().to("test").withNaming(foo)));

    assertEquals(
        "Filenames should be resolved as the direct result of the filenaming function if '.to' "
            + "is not invoked",
        "foo",
        getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo)));
    assertEquals(
        "Filenames should be resolved as the direct result of the filenaming function if '.to' "
            + "is not invoked",
        "foo",
        getDefaultFileName(FileIO.write().withNaming(foo)));

    assertEquals(
        "Default to the defaultNaming if a filenaming isn't provided for a non-dynamic write",
        "output-00000-of-00000",
        resolveFileNaming(FileIO.write())
            .getFilename(
                GlobalWindow.INSTANCE,
                PaneInfo.ON_TIME_AND_ONLY_FIRING,
                0,
                0,
                Compression.UNCOMPRESSED));

    assertEquals(
        "Default Naming should take prefix and suffix into account if provided",
        "foo-00000-of-00000.bar",
        resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar"))
            .getFilename(
                GlobalWindow.INSTANCE,
                PaneInfo.ON_TIME_AND_ONLY_FIRING,
                0,
                0,
                Compression.UNCOMPRESSED));

    assertEquals(
        "Filenames should be resolved within a relative directory if '.to' is invoked, "
            + "even with default naming",
        FileSystems.matchNewResource("test", true)
            .resolve("output-00000-of-00000", RESOLVE_FILE)
            .toString(),
        resolveFileNaming(FileIO.write().to("test"))
            .getFilename(
                GlobalWindow.INSTANCE,
                PaneInfo.ON_TIME_AND_ONLY_FIRING,
                0,
                0,
                Compression.UNCOMPRESSED));
  }

  @Test
  @Category(NeedsRunner.class)
  public void testFileIoDynamicNaming() throws IOException {
    // Test for BEAM-6407.

    String outputFileName = tmpFolder.newFile().getAbsolutePath();
    PCollectionView<String> outputFileNameView =
        p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton());

    Contextful.Fn<String, FileIO.Write.FileNaming> fileNaming =
        (element, c) ->
            (window, pane, numShards, shardIndex, compression) ->
                c.sideInput(outputFileNameView) + "-" + shardIndex;

    p.apply(Create.of(""))
        .apply(
            "WriteDynamicFilename",
            FileIO.<String, String>writeDynamic()
                .by(SerializableFunctions.constant(""))
                .withDestinationCoder(StringUtf8Coder.of())
                .via(TextIO.sink())
                .withTempDirectory(tmpFolder.newFolder().getAbsolutePath())
                .withNaming(
                    Contextful.of(
                        fileNaming, Requirements.requiresSideInputs(outputFileNameView))));

    // We need to run the TestPipeline with the default options.
    p.run(PipelineOptionsFactory.create()).waitUntilFinish();
    assertTrue(
        "Output file shard 0 exists after pipeline completes",
        new File(outputFileName + "-0").exists());
  }
}
