/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify.verify;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TreeMultimap;

/** Clients facing {@link FileSystem} utility. */
@Experimental(Kind.FILESYSTEM)
public class FileSystems {

  public static final String DEFAULT_SCHEME = "file";
  private static final Pattern FILE_SCHEME_PATTERN =
      Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):/.*");
  private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]");

  private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
      new AtomicReference<>(ImmutableMap.of(DEFAULT_SCHEME, new LocalFileSystem()));

  /** ******************************** METHODS FOR CLIENT ********************************* */

  /** Checks whether the given spec contains a glob wildcard character. */
  public static boolean hasGlobWildcard(String spec) {
    return GLOB_PATTERN.matcher(spec).find();
  }

  /**
   * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
   * Callers should use {@link #match} to resolve users specs ambiguities before calling other
   * methods.
   *
   * <p>Implementation handles the following ambiguities of a user-provided spec:
   *
   * <ol>
   *   <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and choose
   *       efficient implementations.
   *   <li>The user-provided {@code spec} might refer to files or directories. It is common that
   *       users that wish to indicate a directory will omit the trailing path delimiter, such as
   *       {@code "/tmp/dir"} in Linux. The {@link FileSystem} should be able to recognize a
   *       directory with the trailing path delimiter omitted, but should always return a correct
   *       {@link ResourceId} (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}.
   * </ol>
   *
   * <p>All {@link FileSystem} implementations should support glob in the final hierarchical path
   * component of {@link ResourceId}. This allows SDK libraries to construct file system agnostic
   * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs.
   *
   * <p>In case the spec schemes don't match any known {@link FileSystem} implementations,
   * FileSystems will attempt to use {@link LocalFileSystem} to resolve a path.
   *
   * <p>Specs that do not match any resources are treated according to {@link
   * EmptyMatchTreatment#DISALLOW}.
   *
   * @return {@code List<MatchResult>} in the same order of the input specs.
   * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
   * @throws IOException if all specs failed to match due to issues like: network connection,
   *     authorization. Exception for individual spec is deferred until callers retrieve metadata
   *     with {@link MatchResult#metadata()}.
   */
  public static List<MatchResult> match(List<String> specs) throws IOException {
    return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
  }

  /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */
  public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment)
      throws IOException {
    List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs);
    List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size());
    for (int i = 0; i < matches.size(); i++) {
      res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment));
    }
    return res;
  }

  /**
   * Like {@link #match(List)}, but for a single resource specification.
   *
   * <p>The function {@link #match(List)} is preferred when matching multiple patterns, as it allows
   * for bulk API calls to remote filesystems.
   */
  public static MatchResult match(String spec) throws IOException {
    List<MatchResult> matches = match(Collections.singletonList(spec));
    verify(
        matches.size() == 1,
        "FileSystem implementation for %s did not return exactly one MatchResult: %s",
        spec,
        matches);
    return matches.get(0);
  }

  /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */
  public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment)
      throws IOException {
    MatchResult res = match(spec);
    return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment);
  }

  private static MatchResult maybeAdjustEmptyMatchResult(
      String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment) throws IOException {
    if (res.status() == Status.NOT_FOUND
        || (res.status() == Status.OK && res.metadata().isEmpty())) {
      boolean notFoundAllowed =
          emptyMatchTreatment == EmptyMatchTreatment.ALLOW
              || (hasGlobWildcard(spec)
                  && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD);
      return notFoundAllowed
          ? MatchResult.create(Status.OK, Collections.emptyList())
          : MatchResult.create(
              Status.NOT_FOUND, new FileNotFoundException("No files matched spec: " + spec));
    }
    return res;
  }

  /**
   * Returns the {@link Metadata} for a single file resource. Expects a resource specification
   * {@code spec} that matches a single result.
   *
   * @param spec a resource specification that matches exactly one result.
   * @return the {@link Metadata} for the specified resource.
   * @throws FileNotFoundException if the file resource is not found.
   * @throws IOException in the event of an error in the inner call to {@link #match}, or if the
   *     given spec does not match exactly 1 result.
   */
  public static Metadata matchSingleFileSpec(String spec) throws IOException {
    List<MatchResult> matches = FileSystems.match(Collections.singletonList(spec));
    MatchResult matchResult = Iterables.getOnlyElement(matches);
    if (matchResult.status() == Status.NOT_FOUND) {
      throw new FileNotFoundException(String.format("File spec %s not found", spec));
    } else if (matchResult.status() != Status.OK) {
      throw new IOException(
          String.format("Error matching file spec %s: status %s", spec, matchResult.status()));
    } else {
      List<Metadata> metadata = matchResult.metadata();
      if (metadata.size() != 1) {
        throw new IOException(
            String.format(
                "Expecting spec %s to match exactly one file, but matched %s: %s",
                spec, metadata.size(), metadata));
      }
      return metadata.get(0);
    }
  }

  /**
   * Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}.
   *
   * @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match},
   *     {@link ResourceId#resolve}, or {@link ResourceId#getCurrentDirectory()}.
   * @throws IOException if all {@code resourceIds} failed to match due to issues like: network
   *     connection, authorization. Exception for individual {@link ResourceId} need to be deferred
   *     until callers retrieve metadata with {@link MatchResult#metadata()}.
   */
  public static List<MatchResult> matchResources(List<ResourceId> resourceIds) throws IOException {
    return match(FluentIterable.from(resourceIds).transform(ResourceId::toString).toList());
  }

  /**
   * Returns a write channel for the given {@link ResourceId}.
   *
   * <p>The resource is not expanded; it is used verbatim.
   *
   * @param resourceId the reference of the file-like resource to create
   * @param mimeType the mine type of the file-like resource to create
   */
  public static WritableByteChannel create(ResourceId resourceId, String mimeType)
      throws IOException {
    return create(resourceId, StandardCreateOptions.builder().setMimeType(mimeType).build());
  }

  /**
   * Returns a write channel for the given {@link ResourceId} with {@link CreateOptions}.
   *
   * <p>The resource is not expanded; it is used verbatim.
   *
   * @param resourceId the reference of the file-like resource to create
   * @param createOptions the configuration of the create operation
   */
  public static WritableByteChannel create(ResourceId resourceId, CreateOptions createOptions)
      throws IOException {
    return getFileSystemInternal(resourceId.getScheme()).create(resourceId, createOptions);
  }

  /**
   * Returns a read channel for the given {@link ResourceId}.
   *
   * <p>The resource is not expanded; it is used verbatim.
   *
   * <p>If seeking is supported, then this returns a {@link java.nio.channels.SeekableByteChannel}.
   *
   * @param resourceId the reference of the file-like resource to open
   */
  public static ReadableByteChannel open(ResourceId resourceId) throws IOException {
    return getFileSystemInternal(resourceId.getScheme()).open(resourceId);
  }

  /**
   * Copies a {@link List} of file-like resources from one location to another.
   *
   * <p>The number of source resources must equal the number of destination resources. Destination
   * resources will be created recursively.
   *
   * <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
   *
   * <p>It doesn't support copying globs.
   *
   * @param srcResourceIds the references of the source resources
   * @param destResourceIds the references of the destination resources
   */
  public static void copy(
      List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
      throws IOException {
    validateSrcDestLists(srcResourceIds, destResourceIds);
    if (srcResourceIds.isEmpty()) {
      // Short-circuit.
      return;
    }

    List<ResourceId> srcToCopy = srcResourceIds;
    List<ResourceId> destToCopy = destResourceIds;
    if (Sets.newHashSet(moveOptions)
        .contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
      KV<List<ResourceId>, List<ResourceId>> existings =
          filterMissingFiles(srcResourceIds, destResourceIds);
      srcToCopy = existings.getKey();
      destToCopy = existings.getValue();
    }
    if (srcToCopy.isEmpty()) {
      return;
    }
    getFileSystemInternal(srcToCopy.iterator().next().getScheme()).copy(srcToCopy, destToCopy);
  }

  /**
   * Renames a {@link List} of file-like resources from one location to another.
   *
   * <p>The number of source resources must equal the number of destination resources. Destination
   * resources will be created recursively.
   *
   * <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
   *
   * <p>It doesn't support renaming globs.
   *
   * @param srcResourceIds the references of the source resources
   * @param destResourceIds the references of the destination resources
   */
  public static void rename(
      List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
      throws IOException {
    validateSrcDestLists(srcResourceIds, destResourceIds);
    if (srcResourceIds.isEmpty()) {
      // Short-circuit.
      return;
    }

    List<ResourceId> srcToRename = srcResourceIds;
    List<ResourceId> destToRename = destResourceIds;
    if (Sets.newHashSet(moveOptions)
        .contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
      KV<List<ResourceId>, List<ResourceId>> existings =
          filterMissingFiles(srcResourceIds, destResourceIds);
      srcToRename = existings.getKey();
      destToRename = existings.getValue();
    }
    if (srcToRename.isEmpty()) {
      return;
    }
    getFileSystemInternal(srcToRename.iterator().next().getScheme())
        .rename(srcToRename, destToRename);
  }

  /**
   * Deletes a collection of resources.
   *
   * <p>{@code resourceIds} must have the same scheme.
   *
   * @param resourceIds the references of the resources to delete.
   */
  public static void delete(Collection<ResourceId> resourceIds, MoveOptions... moveOptions)
      throws IOException {
    if (resourceIds.isEmpty()) {
      // Short-circuit.
      return;
    }

    Collection<ResourceId> resourceIdsToDelete;
    if (Sets.newHashSet(moveOptions)
        .contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
      resourceIdsToDelete =
          FluentIterable.from(matchResources(Lists.newArrayList(resourceIds)))
              .filter(matchResult -> !matchResult.status().equals(Status.NOT_FOUND))
              .transformAndConcat(
                  new Function<MatchResult, Iterable<Metadata>>() {
                    @Nonnull
                    @Override
                    public Iterable<Metadata> apply(@Nonnull MatchResult input) {
                      try {
                        return Lists.newArrayList(input.metadata());
                      } catch (IOException e) {
                        throw new RuntimeException(
                            String.format("Failed to get metadata from MatchResult: %s.", input),
                            e);
                      }
                    }
                  })
              .transform(
                  new Function<Metadata, ResourceId>() {
                    @Nonnull
                    @Override
                    public ResourceId apply(@Nonnull Metadata input) {
                      return input.resourceId();
                    }
                  })
              .toList();
    } else {
      resourceIdsToDelete = resourceIds;
    }
    if (resourceIdsToDelete.isEmpty()) {
      return;
    }
    getFileSystemInternal(resourceIdsToDelete.iterator().next().getScheme())
        .delete(resourceIdsToDelete);
  }

  private static KV<List<ResourceId>, List<ResourceId>> filterMissingFiles(
      List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) throws IOException {
    validateSrcDestLists(srcResourceIds, destResourceIds);
    if (srcResourceIds.isEmpty()) {
      // Short-circuit.
      return KV.of(Collections.<ResourceId>emptyList(), Collections.<ResourceId>emptyList());
    }

    List<ResourceId> srcToHandle = new ArrayList<>();
    List<ResourceId> destToHandle = new ArrayList<>();

    List<MatchResult> matchResults = matchResources(srcResourceIds);
    for (int i = 0; i < matchResults.size(); ++i) {
      if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) {
        srcToHandle.add(srcResourceIds.get(i));
        destToHandle.add(destResourceIds.get(i));
      }
    }
    return KV.of(srcToHandle, destToHandle);
  }

  private static void validateSrcDestLists(
      List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) {
    checkArgument(
        srcResourceIds.size() == destResourceIds.size(),
        "Number of source resource ids %s must equal number of destination resource ids %s",
        srcResourceIds.size(),
        destResourceIds.size());

    if (srcResourceIds.isEmpty()) {
      // nothing more to validate.
      return;
    }

    Set<String> schemes =
        FluentIterable.from(srcResourceIds)
            .append(destResourceIds)
            .transform(ResourceId::getScheme)
            .toSet();
    checkArgument(
        schemes.size() == 1,
        String.format(
            "Expect srcResourceIds and destResourceIds have the same scheme, but received %s.",
            Joiner.on(", ").join(schemes)));
  }

  private static String getOnlyScheme(List<String> specs) {
    checkArgument(!specs.isEmpty(), "Expect specs are not empty.");
    Set<String> schemes = FluentIterable.from(specs).transform(FileSystems::parseScheme).toSet();
    return Iterables.getOnlyElement(schemes);
  }

  private static String parseScheme(String spec) {
    // The spec is almost, but not quite, a URI. In particular,
    // the reserved characters '[', ']', and '?' have meanings that differ
    // from their use in the URI spec. ('*' is not reserved).
    // Here, we just need the scheme, which is so circumscribed as to be
    // very easy to extract with a regex.
    Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec);

    if (!matcher.matches()) {
      return DEFAULT_SCHEME;
    } else {
      return matcher.group("scheme").toLowerCase();
    }
  }

  /** Internal method to get {@link FileSystem} for {@code scheme}. */
  @VisibleForTesting
  static FileSystem getFileSystemInternal(String scheme) {
    String lowerCaseScheme = scheme.toLowerCase();
    Map<String, FileSystem> schemeToFileSystem = SCHEME_TO_FILESYSTEM.get();
    FileSystem rval = schemeToFileSystem.get(lowerCaseScheme);
    if (rval == null) {
      throw new IllegalArgumentException("No filesystem found for scheme " + scheme);
    }
    return rval;
  }

  /** ******************************** METHODS FOR REGISTRATION ********************************* */

  /**
   * Sets the default configuration in workers.
   *
   * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
   *
   * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
   */
  @Internal
  public static void setDefaultPipelineOptions(PipelineOptions options) {
    checkNotNull(options, "options");
    Set<FileSystemRegistrar> registrars =
        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
    registrars.addAll(
        Lists.newArrayList(
            ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));

    SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
  }

  @VisibleForTesting
  static Map<String, FileSystem> verifySchemesAreUnique(
      PipelineOptions options, Set<FileSystemRegistrar> registrars) {
    Multimap<String, FileSystem> fileSystemsBySchemes =
        TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());

    for (FileSystemRegistrar registrar : registrars) {
      for (FileSystem fileSystem : registrar.fromOptions(options)) {
        fileSystemsBySchemes.put(fileSystem.getScheme(), fileSystem);
      }
    }
    for (Entry<String, Collection<FileSystem>> entry : fileSystemsBySchemes.asMap().entrySet()) {
      if (entry.getValue().size() > 1) {
        String conflictingFileSystems =
            Joiner.on(", ")
                .join(
                    FluentIterable.from(entry.getValue())
                        .transform(input -> input.getClass().getName())
                        .toSortedList(Ordering.natural()));
        throw new IllegalStateException(
            String.format(
                "Scheme: [%s] has conflicting filesystems: [%s]",
                entry.getKey(), conflictingFileSystems));
      }
    }

    ImmutableMap.Builder<String, FileSystem> schemeToFileSystem = ImmutableMap.builder();
    for (Entry<String, FileSystem> entry : fileSystemsBySchemes.entries()) {
      schemeToFileSystem.put(entry.getKey(), entry.getValue());
    }
    return schemeToFileSystem.build();
  }

  /**
   * Returns a new {@link ResourceId} that represents the named resource of a type corresponding to
   * the resource type.
   *
   * <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including any
   * necessary escaping, for the underlying {@link FileSystem}.
   *
   * <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
   * such as when the specified {@code singleResourceSpec} is not a valid resource name.
   */
  public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
    return getFileSystemInternal(parseScheme(singleResourceSpec))
        .matchNewResource(singleResourceSpec, isDirectory);
  }
}
