blob: 8268cdc7f03344318101f444a6af3163e8e80439 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""FileIO implementation for reading and writing table files that uses pyarrow.fs
This file contains a FileIO implementation that relies on the filesystem interface provided
by PyArrow. It relies on PyArrow's `from_uri` method that infers the correct filesystem
type to use. Theoretically, this allows the supported storage types to grow naturally
with the pyarrow library.
"""
import os
from typing import Union
from urllib.parse import urlparse
from pyarrow.fs import FileInfo, FileSystem, FileType
from pyiceberg.io.base import (
FileIO,
InputFile,
InputStream,
OutputFile,
OutputStream,
)
class PyArrowFile(InputFile, OutputFile):
"""A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances
Args:
location(str): A URI or a path to a local file
Attributes:
location(str): The URI or path to a local file for a PyArrowFile instance
Examples:
>>> from pyiceberg.io.pyarrow import PyArrowFile
>>> # input_file = PyArrowFile("s3://foo/bar.txt")
>>> # Read the contents of the PyArrowFile instance
>>> # Make sure that you have permissions to read/write
>>> # file_content = input_file.open().read()
>>> # output_file = PyArrowFile("s3://baz/qux.txt")
>>> # Write bytes to a file
>>> # Make sure that you have permissions to read/write
>>> # output_file.create().write(b'foobytes')
"""
def __init__(self, location: str):
parsed_location = urlparse(location) # Create a ParseResult from the URI
if not parsed_location.scheme: # If no scheme, assume the path is to a local file
self._filesystem, self._path = FileSystem.from_uri(os.path.abspath(location))
else:
self._filesystem, self._path = FileSystem.from_uri(location) # Infer the proper filesystem
super().__init__(location=location)
def _file_info(self) -> FileInfo:
"""Retrieves a pyarrow.fs.FileInfo object for the location
Raises:
PermissionError: If the file at self.location cannot be accessed due to a permission error such as
an AWS error code 15
"""
try:
file_info = self._filesystem.get_file_info(self._path)
except OSError as e:
if e.errno == 13 or "AWS Error [code 15]" in str(e):
raise PermissionError(f"Cannot get file info, access denied: {self.location}") from e
raise # pragma: no cover - If some other kind of OSError, raise the raw error
if file_info.type == FileType.NotFound:
raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
return file_info
def __len__(self) -> int:
"""Returns the total length of the file, in bytes"""
file_info = self._file_info()
return file_info.size
def exists(self) -> bool:
"""Checks whether the location exists"""
try:
self._file_info() # raises FileNotFoundError if it does not exist
return True
except FileNotFoundError:
return False
def open(self) -> InputStream:
"""Opens the location using a PyArrow FileSystem inferred from the location
Returns:
pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`
Raises:
FileNotFoundError: If the file at self.location does not exist
PermissionError: If the file at self.location cannot be accessed due to a permission error such as
an AWS error code 15
"""
try:
input_file = self._filesystem.open_input_file(self._path)
except FileNotFoundError:
raise
except PermissionError:
raise
except OSError as e:
if e.errno == 2 or "Path does not exist" in str(e):
raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
elif e.errno == 13 or "AWS Error [code 15]" in str(e):
raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
raise # pragma: no cover - If some other kind of OSError, raise the raw error
return input_file
def create(self, overwrite: bool = False) -> OutputStream:
"""Creates a writable pyarrow.lib.NativeFile for this PyArrowFile's location
Args:
overwrite(bool): Whether to overwrite the file if it already exists
Returns:
pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location
Raises:
FileExistsError: If the file already exists at `self.location` and `overwrite` is False
Note:
This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
a check is first performed to verify that the file does not exist. This is not thread-safe and
a possibility does exist that the file can be created by a concurrent process after the existence
check yet before the output stream is created. In such a case, the default pyarrow behavior will
truncate the contents of the existing file when opening the output stream.
"""
try:
if not overwrite and self.exists() is True:
raise FileExistsError(f"Cannot create file, already exists: {self.location}")
output_file = self._filesystem.open_output_stream(self._path)
except PermissionError:
raise
except OSError as e:
if e.errno == 13 or "AWS Error [code 15]" in str(e):
raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
raise # pragma: no cover - If some other kind of OSError, raise the raw error
return output_file
def to_input_file(self) -> "PyArrowFile":
"""Returns a new PyArrowFile for the location of an existing PyArrowFile instance
This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
a copy of the same instance.
"""
return self
class PyArrowFileIO(FileIO):
def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given location
Args:
location(str): A URI or a path to a local file
Returns:
PyArrowFile: A PyArrowFile instance for the given location
"""
return PyArrowFile(location)
def new_output(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to write bytes to the file at the given location
Args:
location(str): A URI or a path to a local file
Returns:
PyArrowFile: A PyArrowFile instance for the given location
"""
return PyArrowFile(location)
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"""Delete the file at the given location
Args:
location(str, InputFile, OutputFile): The URI to the file--if an InputFile instance or an
OutputFile instance is provided, the location attribute for that instance is used as the location
to delete
Raises:
FileNotFoundError: When the file at the provided location does not exist
PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
an AWS error code 15
"""
str_path = location.location if isinstance(location, (InputFile, OutputFile)) else location
filesystem, path = FileSystem.from_uri(str_path) # Infer the proper filesystem
try:
filesystem.delete_file(path)
except FileNotFoundError:
raise
except PermissionError:
raise
except OSError as e:
if e.errno == 2 or "Path does not exist" in str(e):
raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
elif e.errno == 13 or "AWS Error [code 15]" in str(e):
raise PermissionError(f"Cannot delete file, access denied: {location}") from e
raise # pragma: no cover - If some other kind of OSError, raise the raw error