| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| import pickle |
| import tempfile |
| from typing import Any |
| |
| import pytest |
| |
| from pyiceberg.io import ( |
| ARROW_FILE_IO, |
| PY_IO_IMPL, |
| _import_file_io, |
| _infer_file_io_from_scheme, |
| load_file_io, |
| ) |
| from pyiceberg.io.pyarrow import PyArrowFileIO |
| |
| |
| def test_custom_local_input_file() -> None: |
| """Test initializing an InputFile implementation to read a local file""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(file_location, "wb") as write_file: |
| write_file.write(b"foo") |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(file_location) |
| |
| # Instantiate the input file |
| absolute_file_location = os.path.abspath(file_location) |
| input_file = PyArrowFileIO().new_input(location=f"{absolute_file_location}") |
| |
| # Test opening and reading the file |
| f = input_file.open() |
| data = f.read() |
| assert data == b"foo" |
| assert len(input_file) == 3 |
| |
| |
| def test_custom_local_output_file() -> None: |
| """Test initializing an OutputFile implementation to write to a local file""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| |
| # Instantiate the output file |
| absolute_file_location = os.path.abspath(file_location) |
| output_file = PyArrowFileIO().new_output(location=f"{absolute_file_location}") |
| |
| # Create the output file and write to it |
| f = output_file.create() |
| f.write(b"foo") |
| |
| # Confirm that bytes were written |
| with open(file_location, "rb") as f: |
| assert f.read() == b"foo" |
| |
| assert len(output_file) == 3 |
| |
| |
| def test_pickled_pyarrow_round_trip() -> None: |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| file_io = PyArrowFileIO() |
| serialized_file_io = pickle.dumps(file_io) |
| deserialized_file_io = pickle.loads(serialized_file_io) |
| absolute_file_location = os.path.abspath(file_location) |
| output_file = deserialized_file_io.new_output(location=f"{absolute_file_location}") |
| with output_file.create() as f: |
| f.write(b"foo") |
| |
| input_file = deserialized_file_io.new_input(location=f"{absolute_file_location}") |
| f = input_file.open() |
| data = f.read() |
| assert data == b"foo" |
| assert len(input_file) == 3 |
| deserialized_file_io.delete(location=f"{absolute_file_location}") |
| |
| |
| def test_custom_local_output_file_with_overwrite() -> None: |
| """Test initializing an OutputFile implementation to overwrite a local file""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| output_file_location = os.path.join(tmpdirname, "foo.txt") |
| |
| # Create a file in the temporary directory |
| with open(output_file_location, "wb") as write_file: |
| write_file.write(b"foo") |
| |
| # Instantiate an output file |
| output_file = PyArrowFileIO().new_output(location=f"{output_file_location}") |
| |
| # Confirm that a FileExistsError is raised when overwrite=False |
| with pytest.raises(FileExistsError): |
| f = output_file.create(overwrite=False) |
| f.write(b"foo") |
| |
| # Confirm that the file is overwritten with overwrite=True |
| f = output_file.create(overwrite=True) |
| f.write(b"bar") |
| with open(output_file_location, "rb") as f: |
| assert f.read() == b"bar" |
| |
| |
| def test_custom_file_exists() -> None: |
| """Test that the exists property returns the proper value for existing and non-existing files""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(file_location, "wb") as f: |
| f.write(b"foo") |
| |
| nonexistent_file_location = os.path.join(tmpdirname, "bar.txt") |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(file_location) |
| |
| # Get an absolute path for an existing file and a nonexistent file |
| absolute_file_location = os.path.abspath(file_location) |
| non_existent_absolute_file_location = os.path.abspath(nonexistent_file_location) |
| |
| # Create InputFile instances |
| input_file = PyArrowFileIO().new_input(location=f"{absolute_file_location}") |
| non_existent_input_file = PyArrowFileIO().new_input(location=f"{non_existent_absolute_file_location}") |
| |
| # Test opening and reading the file |
| assert input_file.exists() |
| assert not non_existent_input_file.exists() |
| |
| # Create OutputFile instances |
| file = PyArrowFileIO().new_output(location=f"{absolute_file_location}") |
| non_existent_file = PyArrowFileIO().new_output(location=f"{non_existent_absolute_file_location}") |
| |
| # Test opening and reading the file |
| assert file.exists() |
| assert not non_existent_file.exists() |
| |
| |
| def test_output_file_to_input_file() -> None: |
| """Test initializing an InputFile using the `to_input_file()` method on an OutputFile instance""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| output_file_location = os.path.join(tmpdirname, "foo.txt") |
| |
| # Create an output file instance |
| output_file = PyArrowFileIO().new_output(location=f"{output_file_location}") |
| |
| # Create the output file and write to it |
| with output_file.create() as output_stream: |
| output_stream.write(b"foo") |
| |
| # Convert to an input file and confirm the contents |
| input_file = output_file.to_input_file() |
| with input_file.open() as f: |
| assert f.read() == b"foo" |
| |
| |
| @pytest.mark.parametrize( |
| "string_uri", |
| [ |
| "foo/bar/baz.parquet", |
| "file:/foo/bar/baz.parquet", |
| "file:/foo/bar/baz.parquet", |
| ], |
| ) |
| def test_custom_file_io_locations(string_uri: str) -> None: |
| """Test that the location property is maintained as the value of the location argument""" |
| # Instantiate the file-io and create a new input and output file |
| file_io = PyArrowFileIO() |
| input_file = file_io.new_input(location=string_uri) |
| assert input_file.location == string_uri |
| |
| output_file = file_io.new_output(location=string_uri) |
| assert output_file.location == string_uri |
| |
| |
| def test_deleting_local_file_using_file_io() -> None: |
| """Test deleting a local file using FileIO.delete(...)""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| # Write to the temporary file |
| output_file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(output_file_location, "wb") as f: |
| f.write(b"foo") |
| |
| # Instantiate the file-io |
| file_io = PyArrowFileIO() |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(output_file_location) |
| |
| # Delete the file using the file-io implementations delete method |
| file_io.delete(output_file_location) |
| |
| # Confirm that the file no longer exists |
| assert not os.path.exists(output_file_location) |
| |
| |
| def test_raise_file_not_found_error_for_fileio_delete() -> None: |
| """Test raising a FileNotFound error when trying to delete a non-existent file""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| # Write to the temporary file |
| output_file_location = os.path.join(tmpdirname, "foo.txt") |
| |
| # Instantiate the file-io |
| file_io = PyArrowFileIO() |
| |
| # Delete the non-existent file using the file-io implementations delete method |
| with pytest.raises(FileNotFoundError) as exc_info: |
| file_io.delete(output_file_location) |
| |
| assert "Cannot delete file" in str(exc_info.value) |
| |
| # Confirm that the file no longer exists |
| assert not os.path.exists(output_file_location) |
| |
| |
| def test_deleting_local_file_using_file_io_input_file() -> None: |
| """Test deleting a local file by passing an InputFile instance to FileIO.delete(...)""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| # Write to the temporary file |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(file_location, "wb") as f: |
| f.write(b"foo") |
| |
| # Instantiate the file-io |
| file_io = PyArrowFileIO() |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(file_location) |
| |
| # Instantiate the custom InputFile |
| input_file = PyArrowFileIO().new_input(location=f"{file_location}") |
| |
| # Delete the file using the file-io implementations delete method |
| file_io.delete(input_file) |
| |
| # Confirm that the file no longer exists |
| assert not os.path.exists(file_location) |
| |
| |
| def test_deleting_local_file_using_file_io_output_file() -> None: |
| """Test deleting a local file by passing an OutputFile instance to FileIO.delete(...)""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| # Write to the temporary file |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(file_location, "wb") as f: |
| f.write(b"foo") |
| |
| # Instantiate the file-io |
| file_io = PyArrowFileIO() |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(file_location) |
| |
| # Instantiate the custom OutputFile |
| output_file = PyArrowFileIO().new_output(location=f"{file_location}") |
| |
| # Delete the file using the file-io implementations delete method |
| file_io.delete(output_file) |
| |
| # Confirm that the file no longer exists |
| assert not os.path.exists(file_location) |
| |
| |
| def test_import_file_io() -> None: |
| assert isinstance(_import_file_io(ARROW_FILE_IO, {}), PyArrowFileIO) |
| |
| |
| def test_import_file_io_does_not_exist(caplog: Any) -> None: |
| assert _import_file_io("pyiceberg.does.not.exist.FileIO", {}) is None |
| assert "ModuleNotFoundError: No module named 'pyiceberg.does'" in caplog.text |
| |
| |
| def test_load_file() -> None: |
| assert isinstance(load_file_io({PY_IO_IMPL: ARROW_FILE_IO}), PyArrowFileIO) |
| |
| |
| def test_load_file_io_no_arguments() -> None: |
| assert isinstance(load_file_io({}), PyArrowFileIO) |
| |
| |
| def test_load_file_io_does_not_exist() -> None: |
| with pytest.raises(ValueError) as exc_info: |
| load_file_io({PY_IO_IMPL: "pyiceberg.does.not.exist.FileIO"}) |
| |
| assert "Could not initialize FileIO: pyiceberg.does.not.exist.FileIO" in str(exc_info.value) |
| |
| |
| def test_load_file_io_warehouse() -> None: |
| assert isinstance(load_file_io({"warehouse": "s3://some-path/"}), PyArrowFileIO) |
| |
| |
| def test_load_file_io_location() -> None: |
| assert isinstance(load_file_io({"location": "s3://some-path/"}), PyArrowFileIO) |
| |
| |
| def test_load_file_io_location_no_schema() -> None: |
| assert isinstance(load_file_io({"location": "/no-schema/"}), PyArrowFileIO) |
| |
| |
| @pytest.mark.filterwarnings("ignore") |
| def test_mock_warehouse_location_file_io() -> None: |
| # For testing the selection logic |
| io = load_file_io({"warehouse": "test://some-path/"}) |
| assert io.properties["warehouse"] == "test://some-path/" |
| |
| |
| @pytest.mark.filterwarnings("ignore") |
| def test_mock_table_location_file_io() -> None: |
| # For testing the selection logic |
| io = load_file_io({}, "test://some-path/") |
| assert io.properties == {} |
| |
| |
| def test_gibberish_table_location_file_io() -> None: |
| # For testing the selection logic |
| assert isinstance(load_file_io({}, "gibberish"), PyArrowFileIO) |
| |
| |
| def test_infer_file_io_from_schema_unknown() -> None: |
| # When we have an unknown scheme, we would like to know |
| with pytest.warns(UserWarning) as w: |
| _infer_file_io_from_scheme("unknown://bucket/path/", {}) |
| |
| assert str(w[0].message) == "No preferred file implementation for scheme: unknown" |