blob: ead750ca44ef8858472796d31fb6f4b4470d3eb6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
FileSystem abstraction to interact with various local and remote filesystems.
"""
from pyarrow.util import _is_path_like, _stringify_path
from pyarrow._fs import ( # noqa
FileSelector,
FileType,
FileInfo,
FileSystem,
LocalFileSystem,
SubTreeFileSystem,
_MockFileSystem,
FileSystemHandler,
PyFileSystem,
_copy_files,
_copy_files_selector,
)
# For backward compatibility.
FileStats = FileInfo
_not_imported = []
try:
from pyarrow._hdfs import HadoopFileSystem # noqa
except ImportError:
_not_imported.append("HadoopFileSystem")
try:
from pyarrow._gcsfs import GcsFileSystem # noqa
except ImportError:
_not_imported.append("GcsFileSystem")
try:
from pyarrow._s3fs import ( # noqa
AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
S3FileSystem, S3LogLevel, S3RetryStrategy, ensure_s3_initialized,
finalize_s3, ensure_s3_finalized, initialize_s3, resolve_s3_region)
except ImportError:
_not_imported.append("S3FileSystem")
else:
# GH-38364: we don't initialize S3 eagerly as that could lead
# to crashes at shutdown even when S3 isn't used.
# Instead, S3 is initialized lazily using `ensure_s3_initialized`
# in assorted places.
import atexit
atexit.register(ensure_s3_finalized)
def __getattr__(name):
if name in _not_imported:
raise ImportError(
"The pyarrow installation is not built with support for "
"'{0}'".format(name)
)
raise AttributeError(
"module 'pyarrow.fs' has no attribute '{0}'".format(name)
)
def _filesystem_from_str(uri):
# instantiate the file system from an uri, if the uri has a path
# component then it will be treated as a path prefix
filesystem, prefix = FileSystem.from_uri(uri)
prefix = filesystem.normalize_path(prefix)
if prefix:
# validate that the prefix is pointing to a directory
prefix_info = filesystem.get_file_info([prefix])[0]
if prefix_info.type != FileType.Directory:
raise ValueError(
"The path component of the filesystem URI must point to a "
"directory but it has a type: `{}`. The path component "
"is `{}` and the given filesystem URI is `{}`".format(
prefix_info.type.name, prefix_info.path, uri
)
)
filesystem = SubTreeFileSystem(prefix, filesystem)
return filesystem
def _ensure_filesystem(
filesystem, use_mmap=False, allow_legacy_filesystem=False
):
if isinstance(filesystem, FileSystem):
return filesystem
elif isinstance(filesystem, str):
if use_mmap:
raise ValueError(
"Specifying to use memory mapping not supported for "
"filesystem specified as an URI string"
)
return _filesystem_from_str(filesystem)
# handle fsspec-compatible filesystems
try:
import fsspec
except ImportError:
pass
else:
if isinstance(filesystem, fsspec.AbstractFileSystem):
if type(filesystem).__name__ == 'LocalFileSystem':
# In case its a simple LocalFileSystem, use native arrow one
return LocalFileSystem(use_mmap=use_mmap)
return PyFileSystem(FSSpecHandler(filesystem))
# map old filesystems to new ones
import pyarrow.filesystem as legacyfs
if isinstance(filesystem, legacyfs.LocalFileSystem):
return LocalFileSystem(use_mmap=use_mmap)
# TODO handle HDFS?
if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem):
return filesystem
raise TypeError(
"Unrecognized filesystem: {}. `filesystem` argument must be a "
"FileSystem instance or a valid file system URI'".format(
type(filesystem))
)
def _resolve_filesystem_and_path(
path, filesystem=None, allow_legacy_filesystem=False, memory_map=False
):
"""
Return filesystem/path from path which could be an URI or a plain
filesystem path.
"""
if not _is_path_like(path):
if filesystem is not None:
raise ValueError(
"'filesystem' passed but the specified path is file-like, so"
" there is nothing to open with 'filesystem'."
)
return filesystem, path
if filesystem is not None:
filesystem = _ensure_filesystem(
filesystem, use_mmap=memory_map,
allow_legacy_filesystem=allow_legacy_filesystem
)
if isinstance(filesystem, LocalFileSystem):
path = _stringify_path(path)
elif not isinstance(path, str):
raise TypeError(
"Expected string path; path-like objects are only allowed "
"with a local filesystem"
)
if not allow_legacy_filesystem:
path = filesystem.normalize_path(path)
return filesystem, path
path = _stringify_path(path)
# if filesystem is not given, try to automatically determine one
# first check if the file exists as a local (relative) file path
# if not then try to parse the path as an URI
filesystem = LocalFileSystem(use_mmap=memory_map)
try:
file_info = filesystem.get_file_info(path)
except ValueError: # ValueError means path is likely an URI
file_info = None
exists_locally = False
else:
exists_locally = (file_info.type != FileType.NotFound)
# if the file or directory doesn't exists locally, then assume that
# the path is an URI describing the file system as well
if not exists_locally:
try:
filesystem, path = FileSystem.from_uri(path)
except ValueError as e:
# neither an URI nor a locally existing path, so assume that
# local path was given and propagate a nicer file not found error
# instead of a more confusing scheme parsing error
if "empty scheme" not in str(e) \
and "Cannot parse URI" not in str(e):
raise
else:
path = filesystem.normalize_path(path)
return filesystem, path
def copy_files(source, destination,
source_filesystem=None, destination_filesystem=None,
*, chunk_size=1024*1024, use_threads=True):
"""
Copy files between FileSystems.
This functions allows you to recursively copy directories of files from
one file system to another, such as from S3 to your local machine.
Parameters
----------
source : string
Source file path or URI to a single file or directory.
If a directory, files will be copied recursively from this path.
destination : string
Destination file path or URI. If `source` is a file, `destination`
is also interpreted as the destination file (not directory).
Directories will be created as necessary.
source_filesystem : FileSystem, optional
Source filesystem, needs to be specified if `source` is not a URI,
otherwise inferred.
destination_filesystem : FileSystem, optional
Destination filesystem, needs to be specified if `destination` is not
a URI, otherwise inferred.
chunk_size : int, default 1MB
The maximum size of block to read before flushing to the
destination file. A larger chunk_size will use more memory while
copying but may help accommodate high latency FileSystems.
use_threads : bool, default True
Whether to use multiple threads to accelerate copying.
Examples
--------
Inspect an S3 bucket's files:
>>> s3, path = fs.FileSystem.from_uri(
... "s3://registry.opendata.aws/roda/ndjson/")
>>> selector = fs.FileSelector(path)
>>> s3.get_file_info(selector)
[<FileInfo for 'registry.opendata.aws/roda/ndjson/index.ndjson':...]
Copy one file from S3 bucket to a local directory:
>>> fs.copy_files("s3://registry.opendata.aws/roda/ndjson/index.ndjson",
... "file:///{}/index_copy.ndjson".format(local_path))
>>> fs.LocalFileSystem().get_file_info(str(local_path)+
... '/index_copy.ndjson')
<FileInfo for '.../index_copy.ndjson': type=FileType.File, size=...>
Copy file using a FileSystem object:
>>> fs.copy_files("registry.opendata.aws/roda/ndjson/index.ndjson",
... "file:///{}/index_copy.ndjson".format(local_path),
... source_filesystem=fs.S3FileSystem())
"""
source_fs, source_path = _resolve_filesystem_and_path(
source, source_filesystem
)
destination_fs, destination_path = _resolve_filesystem_and_path(
destination, destination_filesystem
)
file_info = source_fs.get_file_info(source_path)
if file_info.type == FileType.Directory:
source_sel = FileSelector(source_path, recursive=True)
_copy_files_selector(source_fs, source_sel,
destination_fs, destination_path,
chunk_size, use_threads)
else:
_copy_files(source_fs, source_path,
destination_fs, destination_path,
chunk_size, use_threads)
class FSSpecHandler(FileSystemHandler):
"""
Handler for fsspec-based Python filesystems.
https://filesystem-spec.readthedocs.io/en/latest/index.html
Parameters
----------
fs : FSSpec-compliant filesystem instance
Examples
--------
>>> PyFileSystem(FSSpecHandler(fsspec_fs)) # doctest: +SKIP
"""
def __init__(self, fs):
self.fs = fs
def __eq__(self, other):
if isinstance(other, FSSpecHandler):
return self.fs == other.fs
return NotImplemented
def __ne__(self, other):
if isinstance(other, FSSpecHandler):
return self.fs != other.fs
return NotImplemented
def get_type_name(self):
protocol = self.fs.protocol
if isinstance(protocol, list):
protocol = protocol[0]
return "fsspec+{0}".format(protocol)
def normalize_path(self, path):
return path
@staticmethod
def _create_file_info(path, info):
size = info["size"]
if info["type"] == "file":
ftype = FileType.File
elif info["type"] == "directory":
ftype = FileType.Directory
# some fsspec filesystems include a file size for directories
size = None
else:
ftype = FileType.Unknown
return FileInfo(path, ftype, size=size, mtime=info.get("mtime", None))
def get_file_info(self, paths):
infos = []
for path in paths:
try:
info = self.fs.info(path)
except FileNotFoundError:
infos.append(FileInfo(path, FileType.NotFound))
else:
infos.append(self._create_file_info(path, info))
return infos
def get_file_info_selector(self, selector):
if not self.fs.isdir(selector.base_dir):
if self.fs.exists(selector.base_dir):
raise NotADirectoryError(selector.base_dir)
else:
if selector.allow_not_found:
return []
else:
raise FileNotFoundError(selector.base_dir)
if selector.recursive:
maxdepth = None
else:
maxdepth = 1
infos = []
selected_files = self.fs.find(
selector.base_dir, maxdepth=maxdepth, withdirs=True, detail=True
)
for path, info in selected_files.items():
_path = path.strip("/")
base_dir = selector.base_dir.strip("/")
# Need to exclude base directory from selected files if present
# (fsspec filesystems, see GH-37555)
if _path != base_dir:
infos.append(self._create_file_info(path, info))
return infos
def create_dir(self, path, recursive):
# mkdir also raises FileNotFoundError when base directory is not found
try:
self.fs.mkdir(path, create_parents=recursive)
except FileExistsError:
pass
def delete_dir(self, path):
self.fs.rm(path, recursive=True)
def _delete_dir_contents(self, path, missing_dir_ok):
try:
subpaths = self.fs.listdir(path, detail=False)
except FileNotFoundError:
if missing_dir_ok:
return
raise
for subpath in subpaths:
if self.fs.isdir(subpath):
self.fs.rm(subpath, recursive=True)
elif self.fs.isfile(subpath):
self.fs.rm(subpath)
def delete_dir_contents(self, path, missing_dir_ok):
if path.strip("/") == "":
raise ValueError(
"delete_dir_contents called on path '", path, "'")
self._delete_dir_contents(path, missing_dir_ok)
def delete_root_dir_contents(self):
self._delete_dir_contents("/")
def delete_file(self, path):
# fs.rm correctly raises IsADirectoryError when `path` is a directory
# instead of a file and `recursive` is not set to True
if not self.fs.exists(path):
raise FileNotFoundError(path)
self.fs.rm(path)
def move(self, src, dest):
self.fs.mv(src, dest, recursive=True)
def copy_file(self, src, dest):
# fs.copy correctly raises IsADirectoryError when `src` is a directory
# instead of a file
self.fs.copy(src, dest)
# TODO can we read/pass metadata (e.g. Content-Type) in the methods below?
def open_input_stream(self, path):
from pyarrow import PythonFile
if not self.fs.isfile(path):
raise FileNotFoundError(path)
return PythonFile(self.fs.open(path, mode="rb"), mode="r")
def open_input_file(self, path):
from pyarrow import PythonFile
if not self.fs.isfile(path):
raise FileNotFoundError(path)
return PythonFile(self.fs.open(path, mode="rb"), mode="r")
def open_output_stream(self, path, metadata):
from pyarrow import PythonFile
return PythonFile(self.fs.open(path, mode="wb"), mode="w")
def open_append_stream(self, path, metadata):
from pyarrow import PythonFile
return PythonFile(self.fs.open(path, mode="ab"), mode="w")