blob: d881c749c714cbec6019ba1c549e5ab01b867e94 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from cpython.datetime cimport datetime, PyDateTime_DateTime
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport PyDateTime_to_TimePoint
from pyarrow.lib import _detect_compression, frombytes, tobytes
from pyarrow.lib cimport *
from pyarrow.util import _stringify_path
from abc import ABC, abstractmethod
from datetime import datetime, timezone
import os
import pathlib
import sys
cdef _init_ca_paths():
cdef CFileSystemGlobalOptions options
import ssl
paths = ssl.get_default_verify_paths()
if paths.cafile:
options.tls_ca_file_path = os.fsencode(paths.cafile)
if paths.capath:
options.tls_ca_dir_path = os.fsencode(paths.capath)
check_status(CFileSystemsInitialize(options))
if sys.platform == 'linux':
# ARROW-9261: On Linux, we may need to fixup the paths to TLS CA certs
# (especially in manylinux packages) since the values hardcoded at
# compile-time in libcurl may be wrong.
_init_ca_paths()
cdef inline c_string _path_as_bytes(path) except *:
# handle only abstract paths, not bound to any filesystem like pathlib is,
# so we only accept plain strings
if not isinstance(path, (bytes, str)):
raise TypeError('Path must be a string')
# tobytes always uses utf-8, which is more or less ok, at least on Windows
# since the C++ side then decodes from utf-8. On Unix, os.fsencode may be
# better.
return tobytes(path)
cdef object _wrap_file_type(CFileType ty):
return FileType(<int8_t> ty)
cdef CFileType _unwrap_file_type(FileType ty) except *:
if ty == FileType.Unknown:
return CFileType_Unknown
elif ty == FileType.NotFound:
return CFileType_NotFound
elif ty == FileType.File:
return CFileType_File
elif ty == FileType.Directory:
return CFileType_Directory
assert 0
cdef class FileInfo(_Weakrefable):
"""
FileSystem entry info.
Parameters
----------
path : str
The full path to the filesystem entry.
type : FileType
The type of the filesystem entry.
mtime : datetime or float, default None
If given, the modification time of the filesystem entry.
If a float is given, it is the number of seconds since the
Unix epoch.
mtime_ns : int, default None
If given, the modification time of the filesystem entry,
in nanoseconds since the Unix epoch.
`mtime` and `mtime_ns` are mutually exclusive.
size : int, default None
If given, the filesystem entry size in bytes. This should only
be given if `type` is `FileType.File`.
"""
def __init__(self, path, FileType type=FileType.Unknown, *,
mtime=None, mtime_ns=None, size=None):
self.info.set_path(tobytes(path))
self.info.set_type(_unwrap_file_type(type))
if mtime is not None:
if mtime_ns is not None:
raise TypeError("Only one of mtime and mtime_ns "
"can be given")
if isinstance(mtime, datetime):
self.info.set_mtime(PyDateTime_to_TimePoint(
<PyDateTime_DateTime*> mtime))
else:
self.info.set_mtime(TimePoint_from_s(mtime))
elif mtime_ns is not None:
self.info.set_mtime(TimePoint_from_ns(mtime_ns))
if size is not None:
self.info.set_size(size)
@staticmethod
cdef wrap(CFileInfo info):
cdef FileInfo self = FileInfo.__new__(FileInfo)
self.info = move(info)
return self
cdef inline CFileInfo unwrap(self) nogil:
return self.info
@staticmethod
cdef CFileInfo unwrap_safe(obj):
if not isinstance(obj, FileInfo):
raise TypeError("Expected FileInfo instance, got {0}"
.format(type(obj)))
return (<FileInfo> obj).unwrap()
def __repr__(self):
def getvalue(attr):
try:
return getattr(self, attr)
except ValueError:
return ''
s = '<FileInfo for {!r}: type={}'.format(self.path, str(self.type))
if self.is_file:
s += ', size={}'.format(self.size)
s += '>'
return s
@property
def type(self):
"""
Type of the file.
The returned enum values can be the following:
- FileType.NotFound: target does not exist
- FileType.Unknown: target exists but its type is unknown (could be a
special file such as a Unix socket or character device, or
Windows NUL / CON / ...)
- FileType.File: target is a regular file
- FileType.Directory: target is a regular directory
Returns
-------
type : FileType
"""
return _wrap_file_type(self.info.type())
@property
def is_file(self):
"""
"""
return self.type == FileType.File
@property
def path(self):
"""
The full file path in the filesystem.
"""
return frombytes(self.info.path())
@property
def base_name(self):
"""
The file base name.
Component after the last directory separator.
"""
return frombytes(self.info.base_name())
@property
def size(self):
"""
The size in bytes, if available.
Only regular files are guaranteed to have a size.
Returns
-------
size : int or None
"""
cdef int64_t size
size = self.info.size()
return (size if size != -1 else None)
@property
def extension(self):
"""
The file extension.
"""
return frombytes(self.info.extension())
@property
def mtime(self):
"""
The time of last modification, if available.
Returns
-------
mtime : datetime.datetime or None
"""
cdef int64_t nanoseconds
nanoseconds = TimePoint_to_ns(self.info.mtime())
return (datetime.fromtimestamp(nanoseconds / 1.0e9, timezone.utc)
if nanoseconds != -1 else None)
@property
def mtime_ns(self):
"""
The time of last modification, if available, expressed in nanoseconds
since the Unix epoch.
Returns
-------
mtime_ns : int or None
"""
cdef int64_t nanoseconds
nanoseconds = TimePoint_to_ns(self.info.mtime())
return (nanoseconds if nanoseconds != -1 else None)
cdef class FileSelector(_Weakrefable):
"""
File and directory selector.
It contains a set of options that describes how to search for files and
directories.
Parameters
----------
base_dir : str
The directory in which to select files. Relative paths also work, use
'.' for the current directory and '..' for the parent.
allow_not_found : bool, default False
The behavior if `base_dir` doesn't exist in the filesystem.
If false, an error is returned.
If true, an empty selection is returned.
recursive : bool, default False
Whether to recurse into subdirectories.
"""
def __init__(self, base_dir, bint allow_not_found=False,
bint recursive=False):
self.base_dir = base_dir
self.recursive = recursive
self.allow_not_found = allow_not_found
@staticmethod
cdef FileSelector wrap(CFileSelector wrapped):
cdef FileSelector self = FileSelector.__new__(FileSelector)
self.selector = move(wrapped)
return self
cdef inline CFileSelector unwrap(self) nogil:
return self.selector
@property
def base_dir(self):
return frombytes(self.selector.base_dir)
@base_dir.setter
def base_dir(self, base_dir):
self.selector.base_dir = _path_as_bytes(base_dir)
@property
def allow_not_found(self):
return self.selector.allow_not_found
@allow_not_found.setter
def allow_not_found(self, bint allow_not_found):
self.selector.allow_not_found = allow_not_found
@property
def recursive(self):
return self.selector.recursive
@recursive.setter
def recursive(self, bint recursive):
self.selector.recursive = recursive
def __repr__(self):
return ("<FileSelector base_dir={0.base_dir!r} "
"recursive={0.recursive}>".format(self))
cdef class FileSystem(_Weakrefable):
"""
Abstract file system API.
"""
def __init__(self):
raise TypeError("FileSystem is an abstract class, instantiate one of "
"the subclasses instead: LocalFileSystem or "
"SubTreeFileSystem")
@staticmethod
def from_uri(uri):
"""
Create a new FileSystem from URI or Path.
Recognized URI schemes are "file", "mock", "s3fs", "hdfs" and "viewfs".
In addition, the argument can be a pathlib.Path object, or a string
describing an absolute local path.
Parameters
----------
uri : string
URI-based path, for example: file:///some/local/path.
Returns
-------
With (filesystem, path) tuple where path is the abstract path inside
the FileSystem instance.
"""
cdef:
c_string path
CResult[shared_ptr[CFileSystem]] result
if isinstance(uri, pathlib.Path):
# Make absolute
uri = uri.resolve().absolute()
uri = _stringify_path(uri)
result = CFileSystemFromUriOrPath(tobytes(uri), &path)
return FileSystem.wrap(GetResultValue(result)), frombytes(path)
cdef init(self, const shared_ptr[CFileSystem]& wrapped):
self.wrapped = wrapped
self.fs = wrapped.get()
@staticmethod
cdef wrap(const shared_ptr[CFileSystem]& sp):
cdef FileSystem self
typ = frombytes(sp.get().type_name())
if typ == 'local':
self = LocalFileSystem.__new__(LocalFileSystem)
elif typ == 'mock':
self = _MockFileSystem.__new__(_MockFileSystem)
elif typ == 'subtree':
self = SubTreeFileSystem.__new__(SubTreeFileSystem)
elif typ == 's3':
from pyarrow._s3fs import S3FileSystem
self = S3FileSystem.__new__(S3FileSystem)
elif typ == 'hdfs':
from pyarrow._hdfs import HadoopFileSystem
self = HadoopFileSystem.__new__(HadoopFileSystem)
elif typ.startswith('py::'):
self = PyFileSystem.__new__(PyFileSystem)
else:
raise TypeError('Cannot wrap FileSystem pointer')
self.init(sp)
return self
cdef inline shared_ptr[CFileSystem] unwrap(self) nogil:
return self.wrapped
def equals(self, FileSystem other):
return self.fs.Equals(other.unwrap())
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
@property
def type_name(self):
"""
The filesystem's type name.
"""
return frombytes(self.fs.type_name())
def get_file_info(self, paths_or_selector):
"""
Get info for the given files.
Any symlink is automatically dereferenced, recursively. A non-existing
or unreachable file returns a FileStat object and has a FileType of
value NotFound. An exception indicates a truly exceptional condition
(low-level I/O error, etc.).
Parameters
----------
paths_or_selector: FileSelector, path-like or list of path-likes
Either a selector object, a path-like object or a list of
path-like objects. The selector's base directory will not be
part of the results, even if it exists. If it doesn't exist,
use `allow_not_found`.
Returns
-------
FileInfo or list of FileInfo
Single FileInfo object is returned for a single path, otherwise
a list of FileInfo objects is returned.
"""
cdef:
CFileInfo info
c_string path
vector[CFileInfo] infos
vector[c_string] paths
CFileSelector selector
if isinstance(paths_or_selector, FileSelector):
with nogil:
selector = (<FileSelector>paths_or_selector).selector
infos = GetResultValue(self.fs.GetFileInfo(selector))
elif isinstance(paths_or_selector, (list, tuple)):
paths = [_path_as_bytes(s) for s in paths_or_selector]
with nogil:
infos = GetResultValue(self.fs.GetFileInfo(paths))
elif isinstance(paths_or_selector, (bytes, str)):
path =_path_as_bytes(paths_or_selector)
with nogil:
info = GetResultValue(self.fs.GetFileInfo(path))
return FileInfo.wrap(info)
else:
raise TypeError('Must pass either path(s) or a FileSelector')
return [FileInfo.wrap(info) for info in infos]
def create_dir(self, path, *, bint recursive=True):
"""
Create a directory and subdirectories.
This function succeeds if the directory already exists.
Parameters
----------
path : str
The path of the new directory.
recursive: bool, default True
Create nested directories as well.
"""
cdef c_string directory = _path_as_bytes(path)
with nogil:
check_status(self.fs.CreateDir(directory, recursive=recursive))
def delete_dir(self, path):
"""Delete a directory and its contents, recursively.
Parameters
----------
path : str
The path of the directory to be deleted.
"""
cdef c_string directory = _path_as_bytes(path)
with nogil:
check_status(self.fs.DeleteDir(directory))
def delete_dir_contents(self, path, *, bint accept_root_dir=False):
"""Delete a directory's contents, recursively.
Like delete_dir, but doesn't delete the directory itself.
Parameters
----------
path : str
The path of the directory to be deleted.
accept_root_dir : boolean, default False
Allow deleting the root directory's contents
(if path is empty or "/")
"""
cdef c_string directory = _path_as_bytes(path)
if accept_root_dir and directory.strip(b"/") == b"":
with nogil:
check_status(self.fs.DeleteRootDirContents())
else:
with nogil:
check_status(self.fs.DeleteDirContents(directory))
def move(self, src, dest):
"""
Move / rename a file or directory.
If the destination exists:
- if it is a non-empty directory, an error is returned
- otherwise, if it has the same type as the source, it is replaced
- otherwise, behavior is unspecified (implementation-dependent).
Parameters
----------
src : str
The path of the file or the directory to be moved.
dest : str
The destination path where the file or directory is moved to.
"""
cdef:
c_string source = _path_as_bytes(src)
c_string destination = _path_as_bytes(dest)
with nogil:
check_status(self.fs.Move(source, destination))
def copy_file(self, src, dest):
"""
Copy a file.
If the destination exists and is a directory, an error is returned.
Otherwise, it is replaced.
Parameters
----------
src : str
The path of the file to be copied from.
dest : str
The destination path where the file is copied to.
"""
cdef:
c_string source = _path_as_bytes(src)
c_string destination = _path_as_bytes(dest)
with nogil:
check_status(self.fs.CopyFile(source, destination))
def delete_file(self, path):
"""
Delete a file.
Parameters
----------
path : str
The path of the file to be deleted.
"""
cdef c_string file = _path_as_bytes(path)
with nogil:
check_status(self.fs.DeleteFile(file))
def _wrap_input_stream(self, stream, path, compression, buffer_size):
if buffer_size is not None and buffer_size != 0:
stream = BufferedInputStream(stream, buffer_size)
if compression == 'detect':
compression = _detect_compression(path)
if compression is not None:
stream = CompressedInputStream(stream, compression)
return stream
def _wrap_output_stream(self, stream, path, compression, buffer_size):
if buffer_size is not None and buffer_size != 0:
stream = BufferedOutputStream(stream, buffer_size)
if compression == 'detect':
compression = _detect_compression(path)
if compression is not None:
stream = CompressedOutputStream(stream, compression)
return stream
def open_input_file(self, path):
"""
Open an input file for random access reading.
Parameters
----------
path : str
The source to open for reading.
Returns
-------
stram : NativeFile
"""
cdef:
c_string pathstr = _path_as_bytes(path)
NativeFile stream = NativeFile()
shared_ptr[CRandomAccessFile] in_handle
with nogil:
in_handle = GetResultValue(self.fs.OpenInputFile(pathstr))
stream.set_random_access_file(in_handle)
stream.is_readable = True
return stream
def open_input_stream(self, path, compression='detect', buffer_size=None):
"""
Open an input stream for sequential reading.
Parameters
----------
source: str
The source to open for reading.
compression: str optional, default 'detect'
The compression algorithm to use for on-the-fly decompression.
If "detect" and source is a file path, then compression will be
chosen based on the file extension.
If None, no compression will be applied. Otherwise, a well-known
algorithm name must be supplied (e.g. "gzip").
buffer_size: int optional, default None
If None or 0, no buffering will happen. Otherwise the size of the
temporary read buffer.
Returns
-------
stream : NativeFile
"""
cdef:
c_string pathstr = _path_as_bytes(path)
NativeFile stream = NativeFile()
shared_ptr[CInputStream] in_handle
with nogil:
in_handle = GetResultValue(self.fs.OpenInputStream(pathstr))
stream.set_input_stream(in_handle)
stream.is_readable = True
return self._wrap_input_stream(
stream, path=path, compression=compression, buffer_size=buffer_size
)
def open_output_stream(self, path, compression='detect', buffer_size=None):
"""
Open an output stream for sequential writing.
If the target already exists, existing data is truncated.
Parameters
----------
path : str
The source to open for writing.
compression: str optional, default 'detect'
The compression algorithm to use for on-the-fly compression.
If "detect" and source is a file path, then compression will be
chosen based on the file extension.
If None, no compression will be applied. Otherwise, a well-known
algorithm name must be supplied (e.g. "gzip").
buffer_size: int optional, default None
If None or 0, no buffering will happen. Otherwise the size of the
temporary write buffer.
Returns
-------
stream : NativeFile
"""
cdef:
c_string pathstr = _path_as_bytes(path)
NativeFile stream = NativeFile()
shared_ptr[COutputStream] out_handle
with nogil:
out_handle = GetResultValue(self.fs.OpenOutputStream(pathstr))
stream.set_output_stream(out_handle)
stream.is_writable = True
return self._wrap_output_stream(
stream, path=path, compression=compression, buffer_size=buffer_size
)
def open_append_stream(self, path, compression='detect', buffer_size=None):
"""
Open an output stream for appending.
If the target doesn't exist, a new empty file is created.
Parameters
----------
path : str
The source to open for writing.
compression: str optional, default 'detect'
The compression algorithm to use for on-the-fly compression.
If "detect" and source is a file path, then compression will be
chosen based on the file extension.
If None, no compression will be applied. Otherwise, a well-known
algorithm name must be supplied (e.g. "gzip").
buffer_size: int optional, default None
If None or 0, no buffering will happen. Otherwise the size of the
temporary write buffer.
Returns
-------
stream : NativeFile
"""
cdef:
c_string pathstr = _path_as_bytes(path)
NativeFile stream = NativeFile()
shared_ptr[COutputStream] out_handle
with nogil:
out_handle = GetResultValue(self.fs.OpenAppendStream(pathstr))
stream.set_output_stream(out_handle)
stream.is_writable = True
return self._wrap_output_stream(
stream, path=path, compression=compression, buffer_size=buffer_size
)
def normalize_path(self, path):
"""
Normalize filesystem path.
Parameters
----------
path : str
The path to normalize
Returns
-------
normalized_path : str
The normalized path
"""
cdef:
c_string c_path = _path_as_bytes(path)
c_string c_path_normalized
c_path_normalized = GetResultValue(self.fs.NormalizePath(c_path))
return frombytes(c_path_normalized)
cdef class LocalFileSystem(FileSystem):
"""
A FileSystem implementation accessing files on the local machine.
Details such as symlinks are abstracted away (symlinks are always followed,
except when deleting an entry).
Parameters
----------
use_mmap: bool, default False
Whether open_input_stream and open_input_file should return
a mmap'ed file or a regular file.
"""
def __init__(self, *, use_mmap=False):
cdef:
CLocalFileSystemOptions opts
shared_ptr[CLocalFileSystem] fs
opts = CLocalFileSystemOptions.Defaults()
opts.use_mmap = use_mmap
fs = make_shared[CLocalFileSystem](opts)
self.init(<shared_ptr[CFileSystem]> fs)
cdef init(self, const shared_ptr[CFileSystem]& c_fs):
FileSystem.init(self, c_fs)
self.localfs = <CLocalFileSystem*> c_fs.get()
@classmethod
def _reconstruct(cls, kwargs):
# __reduce__ doesn't allow passing named arguments directly to the
# reconstructor, hence this wrapper.
return cls(**kwargs)
def __reduce__(self):
cdef CLocalFileSystemOptions opts = self.localfs.options()
return LocalFileSystem._reconstruct, (dict(
use_mmap=opts.use_mmap),)
cdef class SubTreeFileSystem(FileSystem):
"""
Delegates to another implementation after prepending a fixed base path.
This is useful to expose a logical view of a subtree of a filesystem,
for example a directory in a LocalFileSystem.
Note, that this makes no security guarantee. For example, symlinks may
allow to "escape" the subtree and access other parts of the underlying
filesystem.
Parameters
----------
base_path: str
The root of the subtree.
base_fs: FileSystem
FileSystem object the operations delegated to.
"""
def __init__(self, base_path, FileSystem base_fs):
cdef:
c_string pathstr
shared_ptr[CSubTreeFileSystem] wrapped
pathstr = _path_as_bytes(base_path)
wrapped = make_shared[CSubTreeFileSystem](pathstr, base_fs.wrapped)
self.init(<shared_ptr[CFileSystem]> wrapped)
cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.subtreefs = <CSubTreeFileSystem*> wrapped.get()
def __reduce__(self):
return SubTreeFileSystem, (
frombytes(self.subtreefs.base_path()),
FileSystem.wrap(self.subtreefs.base_fs())
)
@property
def base_path(self):
return frombytes(self.subtreefs.base_path())
@property
def base_fs(self):
return FileSystem.wrap(self.subtreefs.base_fs())
cdef class _MockFileSystem(FileSystem):
def __init__(self, datetime current_time=None):
cdef shared_ptr[CMockFileSystem] wrapped
current_time = current_time or datetime.now()
wrapped = make_shared[CMockFileSystem](
PyDateTime_to_TimePoint(<PyDateTime_DateTime*> current_time)
)
self.init(<shared_ptr[CFileSystem]> wrapped)
cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.mockfs = <CMockFileSystem*> wrapped.get()
cdef class PyFileSystem(FileSystem):
"""
A FileSystem with behavior implemented in Python.
Parameters
----------
handler : FileSystemHandler
The handler object implementing custom filesystem behavior.
"""
def __init__(self, handler):
cdef:
CPyFileSystemVtable vtable
shared_ptr[CPyFileSystem] wrapped
if not isinstance(handler, FileSystemHandler):
raise TypeError("Expected a FileSystemHandler instance, got {0}"
.format(type(handler)))
vtable.get_type_name = _cb_get_type_name
vtable.equals = _cb_equals
vtable.get_file_info = _cb_get_file_info
vtable.get_file_info_vector = _cb_get_file_info_vector
vtable.get_file_info_selector = _cb_get_file_info_selector
vtable.create_dir = _cb_create_dir
vtable.delete_dir = _cb_delete_dir
vtable.delete_dir_contents = _cb_delete_dir_contents
vtable.delete_root_dir_contents = _cb_delete_root_dir_contents
vtable.delete_file = _cb_delete_file
vtable.move = _cb_move
vtable.copy_file = _cb_copy_file
vtable.open_input_stream = _cb_open_input_stream
vtable.open_input_file = _cb_open_input_file
vtable.open_output_stream = _cb_open_output_stream
vtable.open_append_stream = _cb_open_append_stream
vtable.normalize_path = _cb_normalize_path
wrapped = CPyFileSystem.Make(handler, move(vtable))
self.init(<shared_ptr[CFileSystem]> wrapped)
cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.pyfs = <CPyFileSystem*> wrapped.get()
@property
def handler(self):
"""
The filesystem's underlying handler.
Returns
-------
handler : FileSystemHandler
"""
return <object> self.pyfs.handler()
def __reduce__(self):
return PyFileSystem, (self.handler,)
class FileSystemHandler(ABC):
"""
An abstract class exposing methods to implement PyFileSystem's behavior.
"""
@abstractmethod
def get_type_name(self):
"""
Implement PyFileSystem.type_name.
"""
@abstractmethod
def get_file_info(self, paths):
"""
Implement PyFileSystem.get_file_info(paths).
"""
@abstractmethod
def get_file_info_selector(self, selector):
"""
Implement PyFileSystem.get_file_info(selector).
"""
@abstractmethod
def create_dir(self, path, recursive):
"""
Implement PyFileSystem.create_dir(...).
"""
@abstractmethod
def delete_dir(self, path):
"""
Implement PyFileSystem.delete_dir(...).
"""
@abstractmethod
def delete_dir_contents(self, path):
"""
Implement PyFileSystem.delete_dir_contents(...).
"""
@abstractmethod
def delete_root_dir_contents(self):
"""
Implement PyFileSystem.delete_dir_contents("/", accept_root_dir=True).
"""
@abstractmethod
def delete_file(self, path):
"""
Implement PyFileSystem.delete_file(...).
"""
@abstractmethod
def move(self, src, dest):
"""
Implement PyFileSystem.move(...).
"""
@abstractmethod
def copy_file(self, src, dest):
"""
Implement PyFileSystem.copy_file(...).
"""
@abstractmethod
def open_input_stream(self, path):
"""
Implement PyFileSystem.open_input_stream(...).
"""
@abstractmethod
def open_input_file(self, path):
"""
Implement PyFileSystem.open_input_file(...).
"""
@abstractmethod
def open_output_stream(self, path):
"""
Implement PyFileSystem.open_output_stream(...).
"""
@abstractmethod
def open_append_stream(self, path):
"""
Implement PyFileSystem.open_append_stream(...).
"""
@abstractmethod
def normalize_path(self, path):
"""
Implement PyFileSystem.normalize_path(...).
"""
# Callback definitions for CPyFileSystemVtable
cdef void _cb_get_type_name(handler, c_string* out) except *:
out[0] = tobytes("py::" + handler.get_type_name())
cdef c_bool _cb_equals(handler, const CFileSystem& c_other) except False:
if c_other.type_name().startswith(b"py::"):
return <object> (<const CPyFileSystem&> c_other).handler() == handler
return False
cdef void _cb_get_file_info(handler, const c_string& path,
CFileInfo* out) except *:
infos = handler.get_file_info([frombytes(path)])
if not isinstance(infos, list) or len(infos) != 1:
raise TypeError("get_file_info should have returned a 1-element list")
out[0] = FileInfo.unwrap_safe(infos[0])
cdef void _cb_get_file_info_vector(handler, const vector[c_string]& paths,
vector[CFileInfo]* out) except *:
py_paths = [frombytes(paths[i]) for i in range(len(paths))]
infos = handler.get_file_info(py_paths)
if not isinstance(infos, list):
raise TypeError("get_file_info should have returned a list")
out[0].clear()
out[0].reserve(len(infos))
for info in infos:
out[0].push_back(FileInfo.unwrap_safe(info))
cdef void _cb_get_file_info_selector(handler, const CFileSelector& selector,
vector[CFileInfo]* out) except *:
infos = handler.get_file_info_selector(FileSelector.wrap(selector))
if not isinstance(infos, list):
raise TypeError("get_file_info_selector should have returned a list")
out[0].clear()
out[0].reserve(len(infos))
for info in infos:
out[0].push_back(FileInfo.unwrap_safe(info))
cdef void _cb_create_dir(handler, const c_string& path,
c_bool recursive) except *:
handler.create_dir(frombytes(path), recursive)
cdef void _cb_delete_dir(handler, const c_string& path) except *:
handler.delete_dir(frombytes(path))
cdef void _cb_delete_dir_contents(handler, const c_string& path) except *:
handler.delete_dir_contents(frombytes(path))
cdef void _cb_delete_root_dir_contents(handler) except *:
handler.delete_root_dir_contents()
cdef void _cb_delete_file(handler, const c_string& path) except *:
handler.delete_file(frombytes(path))
cdef void _cb_move(handler, const c_string& src,
const c_string& dest) except *:
handler.move(frombytes(src), frombytes(dest))
cdef void _cb_copy_file(handler, const c_string& src,
const c_string& dest) except *:
handler.copy_file(frombytes(src), frombytes(dest))
cdef void _cb_open_input_stream(handler, const c_string& path,
shared_ptr[CInputStream]* out) except *:
stream = handler.open_input_stream(frombytes(path))
if not isinstance(stream, NativeFile):
raise TypeError("open_input_stream should have returned "
"a PyArrow file")
out[0] = (<NativeFile> stream).get_input_stream()
cdef void _cb_open_input_file(handler, const c_string& path,
shared_ptr[CRandomAccessFile]* out) except *:
stream = handler.open_input_file(frombytes(path))
if not isinstance(stream, NativeFile):
raise TypeError("open_input_file should have returned "
"a PyArrow file")
out[0] = (<NativeFile> stream).get_random_access_file()
cdef void _cb_open_output_stream(handler, const c_string& path,
shared_ptr[COutputStream]* out) except *:
stream = handler.open_output_stream(frombytes(path))
if not isinstance(stream, NativeFile):
raise TypeError("open_output_stream should have returned "
"a PyArrow file")
out[0] = (<NativeFile> stream).get_output_stream()
cdef void _cb_open_append_stream(handler, const c_string& path,
shared_ptr[COutputStream]* out) except *:
stream = handler.open_append_stream(frombytes(path))
if not isinstance(stream, NativeFile):
raise TypeError("open_append_stream should have returned "
"a PyArrow file")
out[0] = (<NativeFile> stream).get_output_stream()
cdef void _cb_normalize_path(handler, const c_string& path,
c_string* out) except *:
out[0] = tobytes(handler.normalize_path(frombytes(path)))