blob: db43788b7a95a28a42623c6f68416d89a6ea1660 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify.verify;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TreeMultimap;
/** Clients facing {@link FileSystem} utility. */
@Experimental(Kind.FILESYSTEM)
public class FileSystems {
public static final String DEFAULT_SCHEME = "file";
private static final Pattern FILE_SCHEME_PATTERN =
Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):/.*");
private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]");
private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<>(ImmutableMap.of(DEFAULT_SCHEME, new LocalFileSystem()));
/** ******************************** METHODS FOR CLIENT ********************************* */
/** Checks whether the given spec contains a glob wildcard character. */
public static boolean hasGlobWildcard(String spec) {
return GLOB_PATTERN.matcher(spec).find();
}
/**
* This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
* Callers should use {@link #match} to resolve users specs ambiguities before calling other
* methods.
*
* <p>Implementation handles the following ambiguities of a user-provided spec:
*
* <ol>
* <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and choose
* efficient implementations.
* <li>The user-provided {@code spec} might refer to files or directories. It is common that
* users that wish to indicate a directory will omit the trailing path delimiter, such as
* {@code "/tmp/dir"} in Linux. The {@link FileSystem} should be able to recognize a
* directory with the trailing path delimiter omitted, but should always return a correct
* {@link ResourceId} (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}.
* </ol>
*
* <p>All {@link FileSystem} implementations should support glob in the final hierarchical path
* component of {@link ResourceId}. This allows SDK libraries to construct file system agnostic
* spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs.
*
* <p>In case the spec schemes don't match any known {@link FileSystem} implementations,
* FileSystems will attempt to use {@link LocalFileSystem} to resolve a path.
*
* <p>Specs that do not match any resources are treated according to {@link
* EmptyMatchTreatment#DISALLOW}.
*
* @return {@code List<MatchResult>} in the same order of the input specs.
* @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
* @throws IOException if all specs failed to match due to issues like: network connection,
* authorization. Exception for individual spec is deferred until callers retrieve metadata
* with {@link MatchResult#metadata()}.
*/
public static List<MatchResult> match(List<String> specs) throws IOException {
return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
}
/** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */
public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment)
throws IOException {
List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs);
List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size());
for (int i = 0; i < matches.size(); i++) {
res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment));
}
return res;
}
/**
* Like {@link #match(List)}, but for a single resource specification.
*
* <p>The function {@link #match(List)} is preferred when matching multiple patterns, as it allows
* for bulk API calls to remote filesystems.
*/
public static MatchResult match(String spec) throws IOException {
List<MatchResult> matches = match(Collections.singletonList(spec));
verify(
matches.size() == 1,
"FileSystem implementation for %s did not return exactly one MatchResult: %s",
spec,
matches);
return matches.get(0);
}
/** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */
public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment)
throws IOException {
MatchResult res = match(spec);
return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment);
}
private static MatchResult maybeAdjustEmptyMatchResult(
String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment) throws IOException {
if (res.status() == Status.NOT_FOUND
|| (res.status() == Status.OK && res.metadata().isEmpty())) {
boolean notFoundAllowed =
emptyMatchTreatment == EmptyMatchTreatment.ALLOW
|| (hasGlobWildcard(spec)
&& emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD);
return notFoundAllowed
? MatchResult.create(Status.OK, Collections.emptyList())
: MatchResult.create(
Status.NOT_FOUND, new FileNotFoundException("No files matched spec: " + spec));
}
return res;
}
/**
* Returns the {@link Metadata} for a single file resource. Expects a resource specification
* {@code spec} that matches a single result.
*
* @param spec a resource specification that matches exactly one result.
* @return the {@link Metadata} for the specified resource.
* @throws FileNotFoundException if the file resource is not found.
* @throws IOException in the event of an error in the inner call to {@link #match}, or if the
* given spec does not match exactly 1 result.
*/
public static Metadata matchSingleFileSpec(String spec) throws IOException {
List<MatchResult> matches = FileSystems.match(Collections.singletonList(spec));
MatchResult matchResult = Iterables.getOnlyElement(matches);
if (matchResult.status() == Status.NOT_FOUND) {
throw new FileNotFoundException(String.format("File spec %s not found", spec));
} else if (matchResult.status() != Status.OK) {
throw new IOException(
String.format("Error matching file spec %s: status %s", spec, matchResult.status()));
} else {
List<Metadata> metadata = matchResult.metadata();
if (metadata.size() != 1) {
throw new IOException(
String.format(
"Expecting spec %s to match exactly one file, but matched %s: %s",
spec, metadata.size(), metadata));
}
return metadata.get(0);
}
}
/**
* Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}.
*
* @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match},
* {@link ResourceId#resolve}, or {@link ResourceId#getCurrentDirectory()}.
* @throws IOException if all {@code resourceIds} failed to match due to issues like: network
* connection, authorization. Exception for individual {@link ResourceId} need to be deferred
* until callers retrieve metadata with {@link MatchResult#metadata()}.
*/
public static List<MatchResult> matchResources(List<ResourceId> resourceIds) throws IOException {
return match(FluentIterable.from(resourceIds).transform(ResourceId::toString).toList());
}
/**
* Returns a write channel for the given {@link ResourceId}.
*
* <p>The resource is not expanded; it is used verbatim.
*
* @param resourceId the reference of the file-like resource to create
* @param mimeType the mine type of the file-like resource to create
*/
public static WritableByteChannel create(ResourceId resourceId, String mimeType)
throws IOException {
return create(resourceId, StandardCreateOptions.builder().setMimeType(mimeType).build());
}
/**
* Returns a write channel for the given {@link ResourceId} with {@link CreateOptions}.
*
* <p>The resource is not expanded; it is used verbatim.
*
* @param resourceId the reference of the file-like resource to create
* @param createOptions the configuration of the create operation
*/
public static WritableByteChannel create(ResourceId resourceId, CreateOptions createOptions)
throws IOException {
return getFileSystemInternal(resourceId.getScheme()).create(resourceId, createOptions);
}
/**
* Returns a read channel for the given {@link ResourceId}.
*
* <p>The resource is not expanded; it is used verbatim.
*
* <p>If seeking is supported, then this returns a {@link java.nio.channels.SeekableByteChannel}.
*
* @param resourceId the reference of the file-like resource to open
*/
public static ReadableByteChannel open(ResourceId resourceId) throws IOException {
return getFileSystemInternal(resourceId.getScheme()).open(resourceId);
}
/**
* Copies a {@link List} of file-like resources from one location to another.
*
* <p>The number of source resources must equal the number of destination resources. Destination
* resources will be created recursively.
*
* <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
*
* <p>It doesn't support copying globs.
*
* @param srcResourceIds the references of the source resources
* @param destResourceIds the references of the destination resources
*/
public static void copy(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
throws IOException {
validateSrcDestLists(srcResourceIds, destResourceIds);
if (srcResourceIds.isEmpty()) {
// Short-circuit.
return;
}
List<ResourceId> srcToCopy = srcResourceIds;
List<ResourceId> destToCopy = destResourceIds;
if (Sets.newHashSet(moveOptions)
.contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
KV<List<ResourceId>, List<ResourceId>> existings =
filterMissingFiles(srcResourceIds, destResourceIds);
srcToCopy = existings.getKey();
destToCopy = existings.getValue();
}
if (srcToCopy.isEmpty()) {
return;
}
getFileSystemInternal(srcToCopy.iterator().next().getScheme()).copy(srcToCopy, destToCopy);
}
/**
* Renames a {@link List} of file-like resources from one location to another.
*
* <p>The number of source resources must equal the number of destination resources. Destination
* resources will be created recursively.
*
* <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
*
* <p>It doesn't support renaming globs.
*
* @param srcResourceIds the references of the source resources
* @param destResourceIds the references of the destination resources
*/
public static void rename(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
throws IOException {
validateSrcDestLists(srcResourceIds, destResourceIds);
if (srcResourceIds.isEmpty()) {
// Short-circuit.
return;
}
List<ResourceId> srcToRename = srcResourceIds;
List<ResourceId> destToRename = destResourceIds;
if (Sets.newHashSet(moveOptions)
.contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
KV<List<ResourceId>, List<ResourceId>> existings =
filterMissingFiles(srcResourceIds, destResourceIds);
srcToRename = existings.getKey();
destToRename = existings.getValue();
}
if (srcToRename.isEmpty()) {
return;
}
getFileSystemInternal(srcToRename.iterator().next().getScheme())
.rename(srcToRename, destToRename);
}
/**
* Deletes a collection of resources.
*
* <p>{@code resourceIds} must have the same scheme.
*
* @param resourceIds the references of the resources to delete.
*/
public static void delete(Collection<ResourceId> resourceIds, MoveOptions... moveOptions)
throws IOException {
if (resourceIds.isEmpty()) {
// Short-circuit.
return;
}
Collection<ResourceId> resourceIdsToDelete;
if (Sets.newHashSet(moveOptions)
.contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
resourceIdsToDelete =
FluentIterable.from(matchResources(Lists.newArrayList(resourceIds)))
.filter(matchResult -> !matchResult.status().equals(Status.NOT_FOUND))
.transformAndConcat(
new Function<MatchResult, Iterable<Metadata>>() {
@Nonnull
@Override
public Iterable<Metadata> apply(@Nonnull MatchResult input) {
try {
return Lists.newArrayList(input.metadata());
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to get metadata from MatchResult: %s.", input),
e);
}
}
})
.transform(
new Function<Metadata, ResourceId>() {
@Nonnull
@Override
public ResourceId apply(@Nonnull Metadata input) {
return input.resourceId();
}
})
.toList();
} else {
resourceIdsToDelete = resourceIds;
}
if (resourceIdsToDelete.isEmpty()) {
return;
}
getFileSystemInternal(resourceIdsToDelete.iterator().next().getScheme())
.delete(resourceIdsToDelete);
}
private static KV<List<ResourceId>, List<ResourceId>> filterMissingFiles(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) throws IOException {
validateSrcDestLists(srcResourceIds, destResourceIds);
if (srcResourceIds.isEmpty()) {
// Short-circuit.
return KV.of(Collections.<ResourceId>emptyList(), Collections.<ResourceId>emptyList());
}
List<ResourceId> srcToHandle = new ArrayList<>();
List<ResourceId> destToHandle = new ArrayList<>();
List<MatchResult> matchResults = matchResources(srcResourceIds);
for (int i = 0; i < matchResults.size(); ++i) {
if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) {
srcToHandle.add(srcResourceIds.get(i));
destToHandle.add(destResourceIds.get(i));
}
}
return KV.of(srcToHandle, destToHandle);
}
private static void validateSrcDestLists(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) {
checkArgument(
srcResourceIds.size() == destResourceIds.size(),
"Number of source resource ids %s must equal number of destination resource ids %s",
srcResourceIds.size(),
destResourceIds.size());
if (srcResourceIds.isEmpty()) {
// nothing more to validate.
return;
}
Set<String> schemes =
FluentIterable.from(srcResourceIds)
.append(destResourceIds)
.transform(ResourceId::getScheme)
.toSet();
checkArgument(
schemes.size() == 1,
String.format(
"Expect srcResourceIds and destResourceIds have the same scheme, but received %s.",
Joiner.on(", ").join(schemes)));
}
private static String getOnlyScheme(List<String> specs) {
checkArgument(!specs.isEmpty(), "Expect specs are not empty.");
Set<String> schemes = FluentIterable.from(specs).transform(FileSystems::parseScheme).toSet();
return Iterables.getOnlyElement(schemes);
}
private static String parseScheme(String spec) {
// The spec is almost, but not quite, a URI. In particular,
// the reserved characters '[', ']', and '?' have meanings that differ
// from their use in the URI spec. ('*' is not reserved).
// Here, we just need the scheme, which is so circumscribed as to be
// very easy to extract with a regex.
Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec);
if (!matcher.matches()) {
return DEFAULT_SCHEME;
} else {
return matcher.group("scheme").toLowerCase();
}
}
/** Internal method to get {@link FileSystem} for {@code scheme}. */
@VisibleForTesting
static FileSystem getFileSystemInternal(String scheme) {
String lowerCaseScheme = scheme.toLowerCase();
Map<String, FileSystem> schemeToFileSystem = SCHEME_TO_FILESYSTEM.get();
FileSystem rval = schemeToFileSystem.get(lowerCaseScheme);
if (rval == null) {
throw new IllegalArgumentException("No filesystem found for scheme " + scheme);
}
return rval;
}
/** ******************************** METHODS FOR REGISTRATION ********************************* */
/**
* Sets the default configuration in workers.
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
*
* <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
}
@VisibleForTesting
static Map<String, FileSystem> verifySchemesAreUnique(
PipelineOptions options, Set<FileSystemRegistrar> registrars) {
Multimap<String, FileSystem> fileSystemsBySchemes =
TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());
for (FileSystemRegistrar registrar : registrars) {
for (FileSystem fileSystem : registrar.fromOptions(options)) {
fileSystemsBySchemes.put(fileSystem.getScheme(), fileSystem);
}
}
for (Entry<String, Collection<FileSystem>> entry : fileSystemsBySchemes.asMap().entrySet()) {
if (entry.getValue().size() > 1) {
String conflictingFileSystems =
Joiner.on(", ")
.join(
FluentIterable.from(entry.getValue())
.transform(input -> input.getClass().getName())
.toSortedList(Ordering.natural()));
throw new IllegalStateException(
String.format(
"Scheme: [%s] has conflicting filesystems: [%s]",
entry.getKey(), conflictingFileSystems));
}
}
ImmutableMap.Builder<String, FileSystem> schemeToFileSystem = ImmutableMap.builder();
for (Entry<String, FileSystem> entry : fileSystemsBySchemes.entries()) {
schemeToFileSystem.put(entry.getKey(), entry.getValue());
}
return schemeToFileSystem.build();
}
/**
* Returns a new {@link ResourceId} that represents the named resource of a type corresponding to
* the resource type.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including any
* necessary escaping, for the underlying {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
return getFileSystemInternal(parseScheme(singleResourceSpec))
.matchNewResource(singleResourceSpec, isDirectory);
}
}