| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache |
| * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for |
| * additional information regarding copyright ownership. |
| */ |
| |
| package org.apache.flink.core.fs; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.Public; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.CoreOptions; |
| import org.apache.flink.configuration.IllegalConfigurationException; |
| import org.apache.flink.core.fs.local.LocalFileSystem; |
| import org.apache.flink.core.fs.local.LocalFileSystemFactory; |
| import org.apache.flink.util.ExceptionUtils; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.ServiceLoader; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * Abstract base class of all file systems used by Flink. This class may be extended to implement |
| * distributed file systems, or local file systems. The abstraction by this file system is very simple, |
| * and the set of available operations quite limited, to support the common denominator of a wide |
| * range of file systems. For example, appending to or mutating existing files is not supported. |
| * |
| * <p>Flink implements and supports some file system types directly (for example the default |
| * machine-local file system). Other file system types are accessed by an implementation that bridges |
| * to the suite of file systems supported by Hadoop (such as for example HDFS). |
| * |
| * <h2>Scope and Purpose</h2> |
| * |
| * <p>The purpose of this abstraction is used to expose a common and well defined interface for |
| * access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing |
| * state and recovery data) and by reusable built-in connectors (file sources / sinks). |
| * |
| * <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with |
| * extreme flexibility and control across all possible file systems. That mission would be a folly, |
| * as the differences in characteristics of even the most common file systems are already quite |
| * large. It is expected that user programs that need specialized functionality of certain file systems |
| * in their functions, operations, sources, or sinks instantiate the specialized file system adapters |
| * directly. |
| * |
| * <h2>Data Persistence Contract</h2> |
| * |
| * <p>The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data, |
| * both for results of streaming applications and for fault tolerance and recovery. It is therefore |
| * crucial that the persistence semantics of these streams are well defined. |
| * |
| * <h3>Definition of Persistence Guarantees</h3> |
| * |
| * <p>Data written to an output stream is considered persistent, if two requirements are met: |
| * |
| * <ol> |
| * <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines, |
| * virtual machines, containers, etc. that are able to access the file see the data consistently |
| * when given the absolute file path. This requirement is similar to the <i>close-to-open</i> |
| * semantics defined by POSIX, but restricted to the file itself (by its absolute path).</li> |
| * |
| * <li><b>Durability Requirement:</b> The file system's specific durability/persistence requirements |
| * must be met. These are specific to the particular file system. For example the |
| * {@link LocalFileSystem} does not provide any durability guarantees for crashes of both |
| * hardware and operating system, while replicated distributed file systems (like HDFS) |
| * typically guarantee durability in the presence of at most <i>n</i> concurrent node failures, |
| * where <i>n</i> is the replication factor.</li> |
| * </ol> |
| * |
| * <p>Updates to the file's parent directory (such that the file shows up when |
| * listing the directory contents) are not required to be complete for the data in the file stream |
| * to be considered persistent. This relaxation is important for file systems where updates to |
| * directory contents are only eventually consistent. |
| * |
| * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes |
| * once the call to {@link FSDataOutputStream#close()} returns. |
| * |
| * <h3>Examples</h3> |
| * |
| * <ul> |
| * <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once |
| * it has been received and acknowledged by the file system, typically by having been replicated |
| * to a quorum of machines (<i>durability requirement</i>). In addition the absolute file path |
| * must be visible to all other machines that will potentially access the file (<i>visibility |
| * requirement</i>). |
| * |
| * <p>Whether data has hit non-volatile storage on the storage nodes depends on the specific |
| * guarantees of the particular file system. |
| * |
| * <p>The metadata updates to the file's parent directory are not required to have reached |
| * a consistent state. It is permissible that some machines see the file when listing the parent |
| * directory's contents while others do not, as long as access to the file by its absolute path |
| * is possible on all nodes.</li> |
| * |
| * <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics. |
| * Because the local file system does not have any fault tolerance guarantees, no further |
| * requirements exist. |
| * |
| * <p>The above implies specifically that data may still be in the OS cache when considered |
| * persistent from the local file system's perspective. Crashes that cause the OS cache to loose |
| * data are considered fatal to the local machine and are not covered by the local file system's |
| * guarantees as defined by Flink. |
| * |
| * <p>That means that computed results, checkpoints, and savepoints that are written only to |
| * the local filesystem are not guaranteed to be recoverable from the local machine's failure, |
| * making local file systems unsuitable for production setups.</li> |
| * </ul> |
| * |
| * <h2>Updating File Contents</h2> |
| * |
| * <p>Many file systems either do not support overwriting contents of existing files at all, or do |
| * not support consistent visibility of the updated contents in that case. For that reason, |
| * Flink's FileSystem does not support appending to existing files, or seeking within output streams |
| * so that previously written data could be overwritten. |
| * |
| * <h2>Overwriting Files</h2> |
| * |
| * <p>Overwriting files is in general possible. A file is overwritten by deleting it and creating |
| * a new file. However, certain filesystems cannot make that change synchronously visible |
| * to all parties that have access to the file. |
| * For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only |
| * <i>eventual consistency</i> in the visibility of the file replacement: Some machines may see |
| * the old file, some machines may see the new file. |
| * |
| * <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms in |
| * Flink strictly avoid writing to the same file path more than once. |
| * |
| * <h2>Thread Safety</h2> |
| * |
| * <p>Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem |
| * is frequently shared across multiple threads in Flink and must be able to concurrently |
| * create input/output streams and list file metadata. |
| * |
| * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly |
| * <b>not thread-safe</b>. Instances of the streams should also not be passed between threads |
| * in between read or write operations, because there are no guarantees about the visibility of |
| * operations across threads (many operations do not create memory fences). |
| * |
| * <h2>Streams Safety Net</h2> |
| * |
| * <p>When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via |
| * {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem. |
| * The safety net ensures that all streams created from the FileSystem are closed when the |
| * application task finishes (or is canceled or failed). That way, the task's threads do not |
| * leak connections. |
| * |
| * <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety |
| * net via {@link FileSystem#getUnguardedFileSystem(URI)}. |
| * |
| * @see FSDataInputStream |
| * @see FSDataOutputStream |
| */ |
| @Public |
| public abstract class FileSystem { |
| |
| /** |
| * The possible write modes. The write mode decides what happens if a file should be created, |
| * but already exists. |
| */ |
| public enum WriteMode { |
| |
| /** Creates the target file only if no file exists at that path already. |
| * Does not overwrite existing files and directories. */ |
| NO_OVERWRITE, |
| |
| /** Creates a new target file regardless of any existing files or directories. |
| * Existing files and directories will be deleted (recursively) automatically before |
| * creating the new file. */ |
| OVERWRITE |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| /** Logger for all FileSystem work. */ |
| private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); |
| |
| /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and |
| * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */ |
| private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true); |
| |
| /** Object used to protect calls to specific methods.*/ |
| private static final ReentrantLock LOCK = new ReentrantLock(true); |
| |
| /** Cache for file systems, by scheme + authority. */ |
| private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>(); |
| |
| /** All available file system factories. */ |
| private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems(); |
| |
| /** Mapping of file system schemes to the corresponding factories, |
| * populated in {@link FileSystem#initialize(Configuration)}. */ |
| private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>(); |
| |
| /** The default factory that is used when no scheme matches. */ |
| private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory(); |
| |
| /** The default filesystem scheme to be used, configured during process-wide initialization. |
| * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */ |
| //CHECKSTYLE.OFF: StaticVariableName |
| private static URI DEFAULT_SCHEME; |
| //CHECKSTYLE.ON: StaticVariableName |
| |
| // ------------------------------------------------------------------------ |
| // Initialization |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Initializes the shared file system settings. |
| * |
| * <p>The given configuration is passed to each file system factory to initialize the respective |
| * file systems. Because the configuration of file systems may be different subsequent to the call |
| * of this method, this method clears the file system instance cache. |
| * |
| * <p>This method also reads the default file system URI from the configuration key |
| * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where |
| * the URI has no scheme will be interpreted as relative to that URI. |
| * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}. |
| * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as |
| * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}. |
| * |
| * @param config the configuration from where to fetch the parameter. |
| */ |
| public static void initialize(Configuration config) throws IOException, IllegalConfigurationException { |
| LOCK.lock(); |
| try { |
| // make sure file systems are re-instantiated after re-configuration |
| CACHE.clear(); |
| FS_FACTORIES.clear(); |
| |
| // configure all file system factories |
| for (FileSystemFactory factory : RAW_FACTORIES) { |
| factory.configure(config); |
| String scheme = factory.getScheme(); |
| |
| FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config); |
| FS_FACTORIES.put(scheme, fsf); |
| } |
| |
| // configure the default (fallback) factory |
| FALLBACK_FACTORY.configure(config); |
| |
| // also read the default file system scheme |
| final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null); |
| if (stringifiedUri == null) { |
| DEFAULT_SCHEME = null; |
| } |
| else { |
| try { |
| DEFAULT_SCHEME = new URI(stringifiedUri); |
| } |
| catch (URISyntaxException e) { |
| throw new IllegalConfigurationException("The default file system scheme ('" + |
| CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e); |
| } |
| } |
| } |
| finally { |
| LOCK.unlock(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Obtaining File System Instances |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Returns a reference to the {@link FileSystem} instance for accessing the local file system. |
| * |
| * @return a reference to the {@link FileSystem} instance for accessing the local file system. |
| */ |
| public static FileSystem getLocalFileSystem() { |
| return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance()); |
| } |
| |
| /** |
| * Returns a reference to the {@link FileSystem} instance for accessing the |
| * file system identified by the given {@link URI}. |
| * |
| * @param uri |
| * the {@link URI} identifying the file system |
| * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given |
| * {@link URI}. |
| * @throws IOException |
| * thrown if a reference to the file system instance could not be obtained |
| */ |
| public static FileSystem get(URI uri) throws IOException { |
| return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); |
| } |
| |
| @Internal |
| public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException { |
| checkNotNull(fsUri, "file system URI"); |
| |
| LOCK.lock(); |
| try { |
| final URI uri; |
| |
| if (fsUri.getScheme() != null) { |
| uri = fsUri; |
| } |
| else { |
| // Apply the default fs scheme |
| final URI defaultUri = getDefaultFsUri(); |
| URI rewrittenUri = null; |
| |
| try { |
| rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(), |
| defaultUri.getPort(), fsUri.getPath(), null, null); |
| } |
| catch (URISyntaxException e) { |
| // for local URIs, we make one more try to repair the path by making it absolute |
| if (defaultUri.getScheme().equals("file")) { |
| try { |
| rewrittenUri = new URI( |
| "file", null, |
| new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(), |
| null); |
| } catch (URISyntaxException ignored) { |
| // could not help it... |
| } |
| } |
| } |
| |
| if (rewrittenUri != null) { |
| uri = rewrittenUri; |
| } |
| else { |
| throw new IOException("The file system URI '" + fsUri + |
| "' declares no scheme and cannot be interpreted relative to the default file system URI (" |
| + defaultUri + ")."); |
| } |
| } |
| |
| // print a helpful pointer for malformed local URIs (happens a lot to new users) |
| if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) { |
| String supposedUri = "file:///" + uri.getAuthority() + uri.getPath(); |
| |
| throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '" |
| + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')"); |
| } |
| |
| final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority()); |
| |
| // See if there is a file system object in the cache |
| { |
| FileSystem cached = CACHE.get(key); |
| if (cached != null) { |
| return cached; |
| } |
| } |
| |
| // this "default" initialization makes sure that the FileSystem class works |
| // even when not configured with an explicit Flink configuration, like on |
| // JobManager or TaskManager setup |
| if (FS_FACTORIES.isEmpty()) { |
| initialize(new Configuration()); |
| } |
| |
| // Try to create a new file system |
| final FileSystem fs; |
| final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme()); |
| |
| if (factory != null) { |
| fs = factory.create(uri); |
| } |
| else { |
| try { |
| fs = FALLBACK_FACTORY.create(uri); |
| } |
| catch (UnsupportedFileSystemSchemeException e) { |
| throw new UnsupportedFileSystemSchemeException( |
| "Could not find a file system implementation for scheme '" + uri.getScheme() + |
| "'. The scheme is not directly supported by Flink and no Hadoop file " + |
| "system to support this scheme could be loaded.", e); |
| } |
| } |
| |
| CACHE.put(key, fs); |
| return fs; |
| } |
| finally { |
| LOCK.unlock(); |
| } |
| } |
| |
| /** |
| * Gets the default file system URI that is used for paths and file systems |
| * that do not specify and explicit scheme. |
| * |
| * <p>As an example, assume the default file system URI is set to {@code 'hdfs://someserver:9000/'}. |
| * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as |
| * {@code 'hdfs://someserver:9000/user/USERNAME/in.txt'}. |
| * |
| * @return The default file system URI |
| */ |
| public static URI getDefaultFsUri() { |
| return DEFAULT_SCHEME != null ? DEFAULT_SCHEME : LocalFileSystem.getLocalFsURI(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // File System Methods |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Returns the path of the file system's current working directory. |
| * |
| * @return the path of the file system's current working directory |
| */ |
| public abstract Path getWorkingDirectory(); |
| |
| /** |
| * Returns the path of the user's home directory in this file system. |
| * |
| * @return the path of the user's home directory in this file system. |
| */ |
| public abstract Path getHomeDirectory(); |
| |
| /** |
| * Returns a URI whose scheme and authority identify this file system. |
| * |
| * @return a URI whose scheme and authority identify this file system |
| */ |
| public abstract URI getUri(); |
| |
| /** |
| * Return a file status object that represents the path. |
| * |
| * @param f |
| * The path we want information from |
| * @return a FileStatus object |
| * @throws FileNotFoundException |
| * when the path does not exist; |
| * IOException see specific implementation |
| */ |
| public abstract FileStatus getFileStatus(Path f) throws IOException; |
| |
| /** |
| * Return an array containing hostnames, offset and size of |
| * portions of the given file. For a nonexistent |
| * file or regions, null will be returned. |
| * This call is most helpful with DFS, where it returns |
| * hostnames of machines that contain the given file. |
| * The FileSystem will simply return an elt containing 'localhost'. |
| */ |
| public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException; |
| |
| /** |
| * Opens an FSDataInputStream at the indicated Path. |
| * |
| * @param f |
| * the file name to open |
| * @param bufferSize |
| * the size of the buffer to be used. |
| */ |
| public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; |
| |
| /** |
| * Opens an FSDataInputStream at the indicated Path. |
| * |
| * @param f |
| * the file to open |
| */ |
| public abstract FSDataInputStream open(Path f) throws IOException; |
| |
| /** |
| * Return the number of bytes that large input files should be optimally be split into to minimize I/O time. |
| * |
| * @return the number of bytes that large input files should be optimally be split into to minimize I/O time |
| * |
| * @deprecated This value is no longer used and is meaningless. |
| */ |
| @Deprecated |
| public long getDefaultBlockSize() { |
| return 32 * 1024 * 1024; // 32 MB; |
| } |
| |
| /** |
| * List the statuses of the files/directories in the given path if the path is |
| * a directory. |
| * |
| * @param f |
| * given path |
| * @return the statuses of the files/directories in the given path |
| * @throws IOException |
| */ |
| public abstract FileStatus[] listStatus(Path f) throws IOException; |
| |
| /** |
| * List the statuses with location of the files/directories in the given path if the path is |
| * a directory. |
| * |
| * @param f |
| * given path |
| * @return the statuses of the files/directories in the given path |
| * @throws IOException |
| */ |
| public LocatedFileStatus[] listLocatedStatus(Path f) throws IOException { |
| FileStatus[] fileStatuses = listStatus(f); |
| if (fileStatuses == null) { |
| return null; |
| } |
| LocatedFileStatus[] result = new LocatedFileStatus[fileStatuses.length]; |
| for (int i = 0; i < fileStatuses.length; i++) { |
| result[i] = new DefaultLocatedFileStatus( |
| fileStatuses[i], |
| getFileBlockLocations(fileStatuses[i], 0, fileStatuses[i].getLen())); |
| } |
| return result; |
| } |
| |
| static class DefaultLocatedFileStatus implements LocatedFileStatus { |
| |
| FileStatus fileStatus; |
| BlockLocation[] locations; |
| |
| public DefaultLocatedFileStatus(FileStatus fileStatus, BlockLocation[] blockLocations) { |
| this.fileStatus = fileStatus; |
| this.locations = blockLocations; |
| } |
| |
| @Override |
| public BlockLocation[] getBlockLocation() { |
| return locations; |
| } |
| |
| @Override |
| public long getLen() { |
| return fileStatus.getLen(); |
| } |
| |
| @Override |
| public long getBlockSize() { |
| return fileStatus.getBlockSize(); |
| } |
| |
| @Override |
| public short getReplication() { |
| return fileStatus.getReplication(); |
| } |
| |
| @Override |
| public long getModificationTime() { |
| return fileStatus.getModificationTime(); |
| } |
| |
| @Override |
| public long getAccessTime() { |
| return fileStatus.getAccessTime(); |
| } |
| |
| @Override |
| public boolean isDir() { |
| return fileStatus.isDir(); |
| } |
| |
| @Override |
| public Path getPath() { |
| return fileStatus.getPath(); |
| } |
| } |
| |
| /** |
| * Check if exists. |
| * |
| * @param f |
| * source file |
| */ |
| public boolean exists(final Path f) throws IOException { |
| try { |
| return (getFileStatus(f) != null); |
| } catch (FileNotFoundException e) { |
| return false; |
| } |
| } |
| |
| /** |
| * Delete a file. |
| * |
| * @param f |
| * the path to delete |
| * @param recursive |
| * if path is a directory and set to <code>true</code>, the directory is deleted else throws an exception. In |
| * case of a file the recursive can be set to either <code>true</code> or <code>false</code> |
| * @return <code>true</code> if delete is successful, <code>false</code> otherwise |
| * @throws IOException |
| */ |
| public abstract boolean delete(Path f, boolean recursive) throws IOException; |
| |
| /** |
| * Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. |
| * Existence of the directory hierarchy is not an error. |
| * |
| * @param f |
| * the directory/directories to be created |
| * @return <code>true</code> if at least one new directory has been created, <code>false</code> otherwise |
| * @throws IOException |
| * thrown if an I/O error occurs while creating the directory |
| */ |
| public abstract boolean mkdirs(Path f) throws IOException; |
| |
| /** |
| * Opens an FSDataOutputStream at the indicated Path. |
| * |
| * <p>This method is deprecated, because most of its parameters are ignored by most file systems. |
| * To control for example the replication factor and block size in the Hadoop Distributed File system, |
| * make sure that the respective Hadoop configuration file is either linked from the Flink configuration, |
| * or in the classpath of either Flink or the user code. |
| * |
| * @param f |
| * the file name to open |
| * @param overwrite |
| * if a file with this name already exists, then if true, |
| * the file will be overwritten, and if false an error will be thrown. |
| * @param bufferSize |
| * the size of the buffer to be used. |
| * @param replication |
| * required block replication for the file. |
| * @param blockSize |
| * the size of the file blocks |
| * |
| * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because |
| * a file already exists at that path and the write mode indicates to not |
| * overwrite the file. |
| * |
| * @deprecated Deprecated because not well supported across types of file systems. |
| * Control the behavior of specific file systems via configurations instead. |
| */ |
| @Deprecated |
| public FSDataOutputStream create( |
| Path f, |
| boolean overwrite, |
| int bufferSize, |
| short replication, |
| long blockSize) throws IOException { |
| |
| return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE); |
| } |
| |
| /** |
| * Opens an FSDataOutputStream at the indicated Path. |
| * |
| * @param f |
| * the file name to open |
| * @param overwrite |
| * if a file with this name already exists, then if true, |
| * the file will be overwritten, and if false an error will be thrown. |
| * |
| * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because |
| * a file already exists at that path and the write mode indicates to not |
| * overwrite the file. |
| * |
| * @deprecated Use {@link #create(Path, WriteMode)} instead. |
| */ |
| @Deprecated |
| public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { |
| return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE); |
| } |
| |
| /** |
| * Opens an FSDataOutputStream to a new file at the given path. |
| * |
| * <p>If the file already exists, the behavior depends on the given {@code WriteMode}. |
| * If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an |
| * exception. |
| * |
| * @param f The file path to write to |
| * @param overwriteMode The action to take if a file or directory already exists at the given path. |
| * @return The stream to the new file at the target path. |
| * |
| * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because |
| * a file already exists at that path and the write mode indicates to not |
| * overwrite the file. |
| */ |
| public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException; |
| |
| /** |
| * Renames the file/directory src to dst. |
| * |
| * @param src |
| * the file/directory to rename |
| * @param dst |
| * the new name of the file/directory |
| * @return <code>true</code> if the renaming was successful, <code>false</code> otherwise |
| * @throws IOException |
| */ |
| public abstract boolean rename(Path src, Path dst) throws IOException; |
| |
| /** |
| * Returns true if this is a distributed file system. A distributed file system here means |
| * that the file system is shared among all Flink processes that participate in a cluster or |
| * job and that all these processes can see the same files. |
| * |
| * @return True, if this is a distributed file system, false otherwise. |
| */ |
| public abstract boolean isDistributedFS(); |
| |
| /** |
| * Gets a description of the characteristics of this file system. |
| */ |
| public abstract FileSystemKind getKind(); |
| |
| // ------------------------------------------------------------------------ |
| // output directory initialization |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Initializes output directories on local file systems according to the given write mode. |
| * |
| * <ul> |
| * <li>WriteMode.NO_OVERWRITE & parallel output: |
| * <ul> |
| * <li>A directory is created if the output path does not exist.</li> |
| * <li>An existing directory is reused, files contained in the directory are NOT deleted.</li> |
| * <li>An existing file raises an exception.</li> |
| * </ul> |
| * </li> |
| * |
| * <li>WriteMode.NO_OVERWRITE & NONE parallel output: |
| * <ul> |
| * <li>An existing file or directory raises an exception.</li> |
| * </ul> |
| * </li> |
| * |
| * <li>WriteMode.OVERWRITE & parallel output: |
| * <ul> |
| * <li>A directory is created if the output path does not exist.</li> |
| * <li>An existing directory is reused, files contained in the directory are NOT deleted.</li> |
| * <li>An existing file is deleted and replaced by a new directory.</li> |
| * </ul> |
| * </li> |
| * |
| * <li>WriteMode.OVERWRITE & NONE parallel output: |
| * <ul> |
| * <li>An existing file or directory (and all its content) is deleted</li> |
| * </ul> |
| * </li> |
| * </ul> |
| * |
| * <p>Files contained in an existing directory are not deleted, because multiple instances of a |
| * DataSinkTask might call this function at the same time and hence might perform concurrent |
| * delete operations on the file system (possibly deleting output files of concurrently running tasks). |
| * Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create |
| * operations would be difficult. |
| * |
| * @param outPath Output path that should be prepared. |
| * @param writeMode Write mode to consider. |
| * @param createDirectory True, to initialize a directory at the given path, false to prepare space for a file. |
| * |
| * @return True, if the path was successfully prepared, false otherwise. |
| * @throws IOException Thrown, if any of the file system access operations failed. |
| */ |
| public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { |
| if (isDistributedFS()) { |
| return false; |
| } |
| |
| // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that |
| // concurrently work in this method (multiple output formats writing locally) might end |
| // up deleting each other's directories and leave non-retrievable files, without necessarily |
| // causing an exception. That results in very subtle issues, like output files looking as if |
| // they are not getting created. |
| |
| // we acquire the lock interruptibly here, to make sure that concurrent threads waiting |
| // here can cancel faster |
| try { |
| OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly(); |
| } |
| catch (InterruptedException e) { |
| // restore the interruption state |
| Thread.currentThread().interrupt(); |
| |
| // leave the method - we don't have the lock anyways |
| throw new IOException("The thread was interrupted while trying to initialize the output directory"); |
| } |
| |
| try { |
| FileStatus status; |
| try { |
| status = getFileStatus(outPath); |
| } |
| catch (FileNotFoundException e) { |
| // okay, the file is not there |
| status = null; |
| } |
| |
| // check if path exists |
| if (status != null) { |
| // path exists, check write mode |
| switch (writeMode) { |
| |
| case NO_OVERWRITE: |
| if (status.isDir() && createDirectory) { |
| return true; |
| } else { |
| // file may not be overwritten |
| throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " + |
| "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + |
| WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); |
| } |
| |
| case OVERWRITE: |
| if (status.isDir()) { |
| if (createDirectory) { |
| // directory exists and does not need to be created |
| return true; |
| } else { |
| // we will write in a single file, delete directory |
| try { |
| delete(outPath, true); |
| } |
| catch (IOException e) { |
| throw new IOException("Could not remove existing directory '" + outPath + |
| "' to allow overwrite by result file", e); |
| } |
| } |
| } |
| else { |
| // delete file |
| try { |
| delete(outPath, false); |
| } |
| catch (IOException e) { |
| throw new IOException("Could not remove existing file '" + outPath + |
| "' to allow overwrite by result file/directory", e); |
| } |
| } |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Invalid write mode: " + writeMode); |
| } |
| } |
| |
| if (createDirectory) { |
| // Output directory needs to be created |
| if (!exists(outPath)) { |
| mkdirs(outPath); |
| } |
| |
| // double check that the output directory exists |
| try { |
| return getFileStatus(outPath).isDir(); |
| } |
| catch (FileNotFoundException e) { |
| return false; |
| } |
| } |
| else { |
| // check that the output path does not exist and an output file |
| // can be created by the output format. |
| return !exists(outPath); |
| } |
| } |
| finally { |
| OUTPUT_DIRECTORY_INIT_LOCK.unlock(); |
| } |
| } |
| |
| /** |
| * Initializes output directories on distributed file systems according to the given write mode. |
| * |
| * <p>WriteMode.NO_OVERWRITE & parallel output: |
| * - A directory is created if the output path does not exist. |
| * - An existing file or directory raises an exception. |
| * |
| * <p>WriteMode.NO_OVERWRITE & NONE parallel output: |
| * - An existing file or directory raises an exception. |
| * |
| * <p>WriteMode.OVERWRITE & parallel output: |
| * - A directory is created if the output path does not exist. |
| * - An existing directory and its content is deleted and a new directory is created. |
| * - An existing file is deleted and replaced by a new directory. |
| * |
| * <p>WriteMode.OVERWRITE & NONE parallel output: |
| * - An existing file or directory is deleted and replaced by a new directory. |
| * |
| * @param outPath Output path that should be prepared. |
| * @param writeMode Write mode to consider. |
| * @param createDirectory True, to initialize a directory at the given path, false otherwise. |
| * |
| * @return True, if the path was successfully prepared, false otherwise. |
| * |
| * @throws IOException Thrown, if any of the file system access operations failed. |
| */ |
| public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { |
| if (!isDistributedFS()) { |
| return false; |
| } |
| |
| // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that |
| // concurrently work in this method (multiple output formats writing locally) might end |
| // up deleting each other's directories and leave non-retrievable files, without necessarily |
| // causing an exception. That results in very subtle issues, like output files looking as if |
| // they are not getting created. |
| |
| // we acquire the lock interruptibly here, to make sure that concurrent threads waiting |
| // here can cancel faster |
| try { |
| OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly(); |
| } |
| catch (InterruptedException e) { |
| // restore the interruption state |
| Thread.currentThread().interrupt(); |
| |
| // leave the method - we don't have the lock anyways |
| throw new IOException("The thread was interrupted while trying to initialize the output directory"); |
| } |
| |
| try { |
| // check if path exists |
| if (exists(outPath)) { |
| // path exists, check write mode |
| switch(writeMode) { |
| |
| case NO_OVERWRITE: |
| // file or directory may not be overwritten |
| throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + |
| WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + |
| " mode to overwrite existing files and directories."); |
| |
| case OVERWRITE: |
| // output path exists. We delete it and all contained files in case of a directory. |
| try { |
| delete(outPath, true); |
| } catch (IOException e) { |
| // Some other thread might already have deleted the path. |
| // If - for some other reason - the path could not be deleted, |
| // this will be handled later. |
| } |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Invalid write mode: " + writeMode); |
| } |
| } |
| |
| if (createDirectory) { |
| // Output directory needs to be created |
| try { |
| if (!exists(outPath)) { |
| mkdirs(outPath); |
| } |
| } catch (IOException ioe) { |
| // Some other thread might already have created the directory. |
| // If - for some other reason - the directory could not be created |
| // and the path does not exist, this will be handled later. |
| } |
| |
| // double check that the output directory exists |
| return exists(outPath) && getFileStatus(outPath).isDir(); |
| } |
| else { |
| // single file case: check that the output path does not exist and |
| // an output file can be created by the output format. |
| return !exists(outPath); |
| } |
| } |
| finally { |
| OUTPUT_DIRECTORY_INIT_LOCK.unlock(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Loads the factories for the file systems directly supported by Flink. |
| * Aside from the {@link LocalFileSystem}, these file systems are loaded |
| * via Java's service framework. |
| * |
| * @return A map from the file system scheme to corresponding file system factory. |
| */ |
| private static List<FileSystemFactory> loadFileSystems() { |
| final ArrayList<FileSystemFactory> list = new ArrayList<>(); |
| |
| // by default, we always have the local file system factory |
| list.add(new LocalFileSystemFactory()); |
| |
| LOG.debug("Loading extension file systems via services"); |
| |
| try { |
| ServiceLoader<FileSystemFactory> serviceLoader = ServiceLoader.load(FileSystemFactory.class); |
| Iterator<FileSystemFactory> iter = serviceLoader.iterator(); |
| |
| // we explicitly use an iterator here (rather than for-each) because that way |
| // we can catch errors in individual service instantiations |
| |
| //noinspection WhileLoopReplaceableByForEach |
| while (iter.hasNext()) { |
| try { |
| FileSystemFactory factory = iter.next(); |
| list.add(factory); |
| LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName()); |
| } |
| catch (Throwable t) { |
| // catching Throwable here to handle various forms of class loading |
| // and initialization errors |
| ExceptionUtils.rethrowIfFatalErrorOrOOM(t); |
| LOG.error("Failed to load a file system via services", t); |
| } |
| } |
| } |
| catch (Throwable t) { |
| // catching Throwable here to handle various forms of class loading |
| // and initialization errors |
| ExceptionUtils.rethrowIfFatalErrorOrOOM(t); |
| LOG.error("Failed to load additional file systems via services", t); |
| } |
| |
| return Collections.unmodifiableList(list); |
| } |
| |
| /** |
| * Utility loader for the Hadoop file system factory. |
| * We treat the Hadoop FS factory in a special way, because we use it as a catch |
| * all for file systems schemes not supported directly in Flink. |
| * |
| * <p>This method does a set of eager checks for availability of certain classes, to |
| * be able to give better error messages. |
| */ |
| private static FileSystemFactory loadHadoopFsFactory() { |
| final ClassLoader cl = FileSystem.class.getClassLoader(); |
| |
| // first, see if the Flink runtime classes are available |
| final Class<? extends FileSystemFactory> factoryClass; |
| try { |
| factoryClass = Class |
| .forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl) |
| .asSubclass(FileSystemFactory.class); |
| } |
| catch (ClassNotFoundException e) { |
| LOG.info("No Flink runtime dependency present. " + |
| "The extended set of supported File Systems via Hadoop is not available."); |
| return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); |
| } |
| catch (Exception | LinkageError e) { |
| LOG.warn("Flink's Hadoop file system factory could not be loaded", e); |
| return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); |
| } |
| |
| // check (for eager and better exception messages) if the Hadoop classes are available here |
| try { |
| Class.forName("org.apache.hadoop.conf.Configuration", false, cl); |
| Class.forName("org.apache.hadoop.fs.FileSystem", false, cl); |
| } |
| catch (ClassNotFoundException e) { |
| LOG.info("Hadoop is not in the classpath/dependencies. " + |
| "The extended set of supported File Systems via Hadoop is not available."); |
| return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies."); |
| } |
| |
| // Create the factory. |
| try { |
| return factoryClass.newInstance(); |
| } |
| catch (Exception | LinkageError e) { |
| LOG.warn("Flink's Hadoop file system factory could not be created", e); |
| return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * An identifier of a file system, via its scheme and its authority. |
| */ |
| private static final class FSKey { |
| |
| /** The scheme of the file system. */ |
| private final String scheme; |
| |
| /** The authority of the file system. */ |
| @Nullable |
| private final String authority; |
| |
| /** |
| * Creates a file system key from a given scheme and an authority. |
| * |
| * @param scheme The scheme of the file system |
| * @param authority The authority of the file system |
| */ |
| public FSKey(String scheme, @Nullable String authority) { |
| this.scheme = checkNotNull(scheme, "scheme"); |
| this.authority = authority; |
| } |
| |
| @Override |
| public boolean equals(final Object obj) { |
| if (obj == this) { |
| return true; |
| } |
| else if (obj != null && obj.getClass() == FSKey.class) { |
| final FSKey that = (FSKey) obj; |
| return this.scheme.equals(that.scheme) && |
| (this.authority == null ? that.authority == null : |
| (that.authority != null && this.authority.equals(that.authority))); |
| } |
| else { |
| return false; |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| return 31 * scheme.hashCode() + |
| (authority == null ? 17 : authority.hashCode()); |
| } |
| |
| @Override |
| public String toString() { |
| return scheme + "://" + (authority != null ? authority : ""); |
| } |
| } |
| } |