| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint: disable=protected-access,unused-argument,redefined-outer-name |
| |
| import os |
| import tempfile |
| from datetime import date |
| from typing import Any, List, Optional |
| from unittest.mock import MagicMock, patch |
| from uuid import uuid4 |
| |
| import pyarrow as pa |
| import pyarrow.parquet as pq |
| import pytest |
| from pyarrow.fs import FileType, LocalFileSystem |
| |
| from pyiceberg.avro.resolver import ResolveError |
| from pyiceberg.catalog.noop import NoopCatalog |
| from pyiceberg.expressions import ( |
| AlwaysFalse, |
| AlwaysTrue, |
| And, |
| BooleanExpression, |
| BoundEqualTo, |
| BoundGreaterThan, |
| BoundGreaterThanOrEqual, |
| BoundIn, |
| BoundIsNaN, |
| BoundIsNull, |
| BoundLessThan, |
| BoundLessThanOrEqual, |
| BoundNotEqualTo, |
| BoundNotIn, |
| BoundNotNaN, |
| BoundNotNull, |
| BoundNotStartsWith, |
| BoundReference, |
| BoundStartsWith, |
| GreaterThan, |
| Not, |
| Or, |
| literal, |
| ) |
| from pyiceberg.io import InputStream, OutputStream, load_file_io |
| from pyiceberg.io.pyarrow import ( |
| ICEBERG_SCHEMA, |
| PyArrowFile, |
| PyArrowFileIO, |
| StatsAggregator, |
| _ConvertToArrowSchema, |
| _primitive_to_physical, |
| _read_deletes, |
| expression_to_pyarrow, |
| project_table, |
| schema_to_pyarrow, |
| ) |
| from pyiceberg.manifest import DataFile, DataFileContent, FileFormat |
| from pyiceberg.partitioning import PartitionSpec |
| from pyiceberg.schema import Schema, make_compatible_name, visit |
| from pyiceberg.table import FileScanTask, Table |
| from pyiceberg.table.metadata import TableMetadataV2 |
| from pyiceberg.typedef import UTF8 |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DecimalType, |
| DoubleType, |
| FixedType, |
| FloatType, |
| IntegerType, |
| ListType, |
| LongType, |
| MapType, |
| NestedField, |
| PrimitiveType, |
| StringType, |
| StructType, |
| TimestampType, |
| TimestamptzType, |
| TimeType, |
| ) |
| |
| |
| def test_pyarrow_infer_local_fs_from_path() -> None: |
| """Test path with `file` scheme and no scheme both use LocalFileSystem""" |
| assert isinstance(PyArrowFileIO().new_output("file://tmp/warehouse")._filesystem, LocalFileSystem) |
| assert isinstance(PyArrowFileIO().new_output("/tmp/warehouse")._filesystem, LocalFileSystem) |
| |
| |
| def test_pyarrow_local_fs_can_create_path_without_parent_dir() -> None: |
| """Test LocalFileSystem can create path without first creating the parent directories""" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_path = f"{tmpdirname}/foo/bar/baz.txt" |
| output_file = PyArrowFileIO().new_output(file_path) |
| parent_path = os.path.dirname(file_path) |
| assert output_file._filesystem.get_file_info(parent_path).type == FileType.NotFound |
| try: |
| with output_file.create() as f: |
| f.write(b"foo") |
| except Exception: |
| pytest.fail("Failed to write to file without parent directory") |
| |
| |
| def test_pyarrow_input_file() -> None: |
| """Test reading a file using PyArrowFile""" |
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(file_location, "wb") as f: |
| f.write(b"foo") |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(file_location) |
| |
| # Instantiate the input file |
| absolute_file_location = os.path.abspath(file_location) |
| input_file = PyArrowFileIO().new_input(location=f"{absolute_file_location}") |
| |
| # Test opening and reading the file |
| r = input_file.open(seekable=False) |
| assert isinstance(r, InputStream) # Test that the file object abides by the InputStream protocol |
| data = r.read() |
| assert data == b"foo" |
| assert len(input_file) == 3 |
| with pytest.raises(OSError) as exc_info: |
| r.seek(0, 0) |
| assert "only valid on seekable files" in str(exc_info.value) |
| |
| |
| def test_pyarrow_input_file_seekable() -> None: |
| """Test reading a file using PyArrowFile""" |
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| with open(file_location, "wb") as f: |
| f.write(b"foo") |
| |
| # Confirm that the file initially exists |
| assert os.path.exists(file_location) |
| |
| # Instantiate the input file |
| absolute_file_location = os.path.abspath(file_location) |
| input_file = PyArrowFileIO().new_input(location=f"{absolute_file_location}") |
| |
| # Test opening and reading the file |
| r = input_file.open(seekable=True) |
| assert isinstance(r, InputStream) # Test that the file object abides by the InputStream protocol |
| data = r.read() |
| assert data == b"foo" |
| assert len(input_file) == 3 |
| r.seek(0, 0) |
| data = r.read() |
| assert data == b"foo" |
| assert len(input_file) == 3 |
| |
| |
| def test_pyarrow_output_file() -> None: |
| """Test writing a file using PyArrowFile""" |
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| |
| # Instantiate the output file |
| absolute_file_location = os.path.abspath(file_location) |
| output_file = PyArrowFileIO().new_output(location=f"{absolute_file_location}") |
| |
| # Create the output file and write to it |
| f = output_file.create() |
| assert isinstance(f, OutputStream) # Test that the file object abides by the OutputStream protocol |
| f.write(b"foo") |
| |
| # Confirm that bytes were written |
| with open(file_location, "rb") as f: |
| assert f.read() == b"foo" |
| |
| assert len(output_file) == 3 |
| |
| |
| def test_pyarrow_invalid_scheme() -> None: |
| """Test that a ValueError is raised if a location is provided with an invalid scheme""" |
| |
| with pytest.raises(ValueError) as exc_info: |
| PyArrowFileIO().new_input("foo://bar/baz.txt") |
| |
| assert "Unrecognized filesystem type in URI" in str(exc_info.value) |
| |
| with pytest.raises(ValueError) as exc_info: |
| PyArrowFileIO().new_output("foo://bar/baz.txt") |
| |
| assert "Unrecognized filesystem type in URI" in str(exc_info.value) |
| |
| |
| def test_pyarrow_violating_input_stream_protocol() -> None: |
| """Test that a TypeError is raised if an input file is provided that violates the InputStream protocol""" |
| |
| # Missing seek, tell, closed, and close |
| input_file_mock = MagicMock(spec=["read"]) |
| |
| # Create a mocked filesystem that returns input_file_mock |
| filesystem_mock = MagicMock() |
| filesystem_mock.open_input_file.return_value = input_file_mock |
| |
| input_file = PyArrowFile("foo.txt", path="foo.txt", fs=filesystem_mock) |
| |
| f = input_file.open() |
| assert not isinstance(f, InputStream) |
| |
| |
| def test_pyarrow_violating_output_stream_protocol() -> None: |
| """Test that a TypeError is raised if an output stream is provided that violates the OutputStream protocol""" |
| |
| # Missing closed, and close |
| output_file_mock = MagicMock(spec=["write", "exists"]) |
| output_file_mock.exists.return_value = False |
| |
| file_info_mock = MagicMock() |
| file_info_mock.type = FileType.NotFound |
| |
| # Create a mocked filesystem that returns output_file_mock |
| filesystem_mock = MagicMock() |
| filesystem_mock.open_output_stream.return_value = output_file_mock |
| filesystem_mock.get_file_info.return_value = file_info_mock |
| |
| output_file = PyArrowFile("foo.txt", path="foo.txt", fs=filesystem_mock) |
| |
| f = output_file.create() |
| |
| assert not isinstance(f, OutputStream) |
| |
| |
| def test_raise_on_opening_a_local_file_not_found() -> None: |
| """Test that a PyArrowFile raises appropriately when a local file is not found""" |
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| file_location = os.path.join(tmpdirname, "foo.txt") |
| f = PyArrowFileIO().new_input(file_location) |
| |
| with pytest.raises(FileNotFoundError) as exc_info: |
| f.open() |
| |
| assert "[Errno 2] Failed to open local file" in str(exc_info.value) |
| |
| |
| def test_raise_on_opening_an_s3_file_no_permission() -> None: |
| """Test that opening a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'""" |
| |
| s3fs_mock = MagicMock() |
| s3fs_mock.open_input_file.side_effect = OSError("AWS Error [code 15]") |
| |
| f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock) |
| |
| with pytest.raises(PermissionError) as exc_info: |
| f.open() |
| |
| assert "Cannot open file, access denied:" in str(exc_info.value) |
| |
| |
| def test_raise_on_opening_an_s3_file_not_found() -> None: |
| """Test that a PyArrowFile raises a FileNotFoundError when the pyarrow error includes 'Path does not exist'""" |
| |
| s3fs_mock = MagicMock() |
| s3fs_mock.open_input_file.side_effect = OSError("Path does not exist") |
| |
| f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock) |
| |
| with pytest.raises(FileNotFoundError) as exc_info: |
| f.open() |
| |
| assert "Cannot open file, does not exist:" in str(exc_info.value) |
| |
| |
| @patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False) |
| def test_raise_on_creating_an_s3_file_no_permission(_: Any) -> None: |
| """Test that creating a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'""" |
| |
| s3fs_mock = MagicMock() |
| s3fs_mock.open_output_stream.side_effect = OSError("AWS Error [code 15]") |
| |
| f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock) |
| |
| with pytest.raises(PermissionError) as exc_info: |
| f.create() |
| |
| assert "Cannot create file, access denied:" in str(exc_info.value) |
| |
| |
| def test_deleting_s3_file_no_permission() -> None: |
| """Test that a PyArrowFile raises a PermissionError when the pyarrow OSError includes 'AWS Error [code 15]'""" |
| |
| s3fs_mock = MagicMock() |
| s3fs_mock.delete_file.side_effect = OSError("AWS Error [code 15]") |
| |
| with patch.object(PyArrowFileIO, "_initialize_fs") as submocked: |
| submocked.return_value = s3fs_mock |
| |
| with pytest.raises(PermissionError) as exc_info: |
| PyArrowFileIO().delete("s3://foo/bar.txt") |
| |
| assert "Cannot delete file, access denied:" in str(exc_info.value) |
| |
| |
| def test_deleting_s3_file_not_found() -> None: |
| """Test that a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'""" |
| |
| s3fs_mock = MagicMock() |
| s3fs_mock.delete_file.side_effect = OSError("Path does not exist") |
| |
| with patch.object(PyArrowFileIO, "_initialize_fs") as submocked: |
| submocked.return_value = s3fs_mock |
| |
| with pytest.raises(FileNotFoundError) as exc_info: |
| PyArrowFileIO().delete("s3://foo/bar.txt") |
| |
| assert "Cannot delete file, does not exist:" in str(exc_info.value) |
| |
| |
| def test_deleting_hdfs_file_not_found() -> None: |
| """Test that a PyArrowFile raises a PermissionError when the pyarrow error includes 'No such file or directory'""" |
| |
| hdfs_mock = MagicMock() |
| hdfs_mock.delete_file.side_effect = OSError("Path does not exist") |
| |
| with patch.object(PyArrowFileIO, "_initialize_fs") as submocked: |
| submocked.return_value = hdfs_mock |
| |
| with pytest.raises(FileNotFoundError) as exc_info: |
| PyArrowFileIO().delete("hdfs://foo/bar.txt") |
| |
| assert "Cannot delete file, does not exist:" in str(exc_info.value) |
| |
| |
| def test_schema_to_pyarrow_schema(table_schema_nested: Schema) -> None: |
| actual = schema_to_pyarrow(table_schema_nested) |
| expected = """foo: string |
| -- field metadata -- |
| PARQUET:field_id: '1' |
| bar: int32 not null |
| -- field metadata -- |
| PARQUET:field_id: '2' |
| baz: bool |
| -- field metadata -- |
| PARQUET:field_id: '3' |
| qux: list<element: string not null> not null |
| child 0, element: string not null |
| -- field metadata -- |
| PARQUET:field_id: '5' |
| -- field metadata -- |
| PARQUET:field_id: '4' |
| quux: map<string, map<string, int32>> not null |
| child 0, entries: struct<key: string not null, value: map<string, int32> not null> not null |
| child 0, key: string not null |
| -- field metadata -- |
| PARQUET:field_id: '7' |
| child 1, value: map<string, int32> not null |
| child 0, entries: struct<key: string not null, value: int32 not null> not null |
| child 0, key: string not null |
| -- field metadata -- |
| PARQUET:field_id: '9' |
| child 1, value: int32 not null |
| -- field metadata -- |
| PARQUET:field_id: '10' |
| -- field metadata -- |
| PARQUET:field_id: '8' |
| -- field metadata -- |
| PARQUET:field_id: '6' |
| location: list<element: struct<latitude: float, longitude: float> not null> not null |
| child 0, element: struct<latitude: float, longitude: float> not null |
| child 0, latitude: float |
| -- field metadata -- |
| PARQUET:field_id: '13' |
| child 1, longitude: float |
| -- field metadata -- |
| PARQUET:field_id: '14' |
| -- field metadata -- |
| PARQUET:field_id: '12' |
| -- field metadata -- |
| PARQUET:field_id: '11' |
| person: struct<name: string, age: int32 not null> |
| child 0, name: string |
| -- field metadata -- |
| PARQUET:field_id: '16' |
| child 1, age: int32 not null |
| -- field metadata -- |
| PARQUET:field_id: '17' |
| -- field metadata -- |
| PARQUET:field_id: '15'""" |
| assert repr(actual) == expected |
| |
| |
| def test_fixed_type_to_pyarrow() -> None: |
| length = 22 |
| iceberg_type = FixedType(length) |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.binary(length) |
| |
| |
| def test_decimal_type_to_pyarrow() -> None: |
| precision = 25 |
| scale = 19 |
| iceberg_type = DecimalType(precision, scale) |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.decimal128(precision, scale) |
| |
| |
| def test_boolean_type_to_pyarrow() -> None: |
| iceberg_type = BooleanType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.bool_() |
| |
| |
| def test_integer_type_to_pyarrow() -> None: |
| iceberg_type = IntegerType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.int32() |
| |
| |
| def test_long_type_to_pyarrow() -> None: |
| iceberg_type = LongType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.int64() |
| |
| |
| def test_float_type_to_pyarrow() -> None: |
| iceberg_type = FloatType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.float32() |
| |
| |
| def test_double_type_to_pyarrow() -> None: |
| iceberg_type = DoubleType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.float64() |
| |
| |
| def test_date_type_to_pyarrow() -> None: |
| iceberg_type = DateType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.date32() |
| |
| |
| def test_time_type_to_pyarrow() -> None: |
| iceberg_type = TimeType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.time64("us") |
| |
| |
| def test_timestamp_type_to_pyarrow() -> None: |
| iceberg_type = TimestampType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us") |
| |
| |
| def test_timestamptz_type_to_pyarrow() -> None: |
| iceberg_type = TimestamptzType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="UTC") |
| |
| |
| def test_string_type_to_pyarrow() -> None: |
| iceberg_type = StringType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.string() |
| |
| |
| def test_binary_type_to_pyarrow() -> None: |
| iceberg_type = BinaryType() |
| assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.large_binary() |
| |
| |
| def test_struct_type_to_pyarrow(table_schema_simple: Schema) -> None: |
| expected = pa.struct([ |
| pa.field("foo", pa.string(), nullable=True, metadata={"field_id": "1"}), |
| pa.field("bar", pa.int32(), nullable=False, metadata={"field_id": "2"}), |
| pa.field("baz", pa.bool_(), nullable=True, metadata={"field_id": "3"}), |
| ]) |
| assert visit(table_schema_simple.as_struct(), _ConvertToArrowSchema()) == expected |
| |
| |
| def test_map_type_to_pyarrow() -> None: |
| iceberg_map = MapType( |
| key_id=1, |
| key_type=IntegerType(), |
| value_id=2, |
| value_type=StringType(), |
| value_required=True, |
| ) |
| assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.map_( |
| pa.field("key", pa.int32(), nullable=False, metadata={"field_id": "1"}), |
| pa.field("value", pa.string(), nullable=False, metadata={"field_id": "2"}), |
| ) |
| |
| |
| def test_list_type_to_pyarrow() -> None: |
| iceberg_map = ListType( |
| element_id=1, |
| element_type=IntegerType(), |
| element_required=True, |
| ) |
| assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_( |
| pa.field("element", pa.int32(), nullable=False, metadata={"field_id": "1"}) |
| ) |
| |
| |
| @pytest.fixture |
| def bound_reference(table_schema_simple: Schema) -> BoundReference[str]: |
| return BoundReference(table_schema_simple.find_field(1), table_schema_simple.accessor_for_field(1)) |
| |
| |
| @pytest.fixture |
| def bound_double_reference() -> BoundReference[float]: |
| schema = Schema( |
| NestedField(field_id=1, name="foo", field_type=DoubleType(), required=False), |
| schema_id=1, |
| identifier_field_ids=[], |
| ) |
| return BoundReference(schema.find_field(1), schema.accessor_for_field(1)) |
| |
| |
| def test_expr_is_null_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundIsNull(bound_reference))) |
| == "<pyarrow.compute.Expression is_null(foo, {nan_is_null=false})>" |
| ) |
| |
| |
| def test_expr_not_null_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(BoundNotNull(bound_reference))) == "<pyarrow.compute.Expression is_valid(foo)>" |
| |
| |
| def test_expr_is_nan_to_pyarrow(bound_double_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(BoundIsNaN(bound_double_reference))) == "<pyarrow.compute.Expression is_nan(foo)>" |
| |
| |
| def test_expr_not_nan_to_pyarrow(bound_double_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(BoundNotNaN(bound_double_reference))) == "<pyarrow.compute.Expression invert(is_nan(foo))>" |
| |
| |
| def test_expr_equal_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundEqualTo(bound_reference, literal("hello")))) |
| == '<pyarrow.compute.Expression (foo == "hello")>' |
| ) |
| |
| |
| def test_expr_not_equal_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundNotEqualTo(bound_reference, literal("hello")))) |
| == '<pyarrow.compute.Expression (foo != "hello")>' |
| ) |
| |
| |
| def test_expr_greater_than_or_equal_equal_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundGreaterThanOrEqual(bound_reference, literal("hello")))) |
| == '<pyarrow.compute.Expression (foo >= "hello")>' |
| ) |
| |
| |
| def test_expr_greater_than_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundGreaterThan(bound_reference, literal("hello")))) |
| == '<pyarrow.compute.Expression (foo > "hello")>' |
| ) |
| |
| |
| def test_expr_less_than_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundLessThan(bound_reference, literal("hello")))) |
| == '<pyarrow.compute.Expression (foo < "hello")>' |
| ) |
| |
| |
| def test_expr_less_than_or_equal_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundLessThanOrEqual(bound_reference, literal("hello")))) |
| == '<pyarrow.compute.Expression (foo <= "hello")>' |
| ) |
| |
| |
| def test_expr_in_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(BoundIn(bound_reference, {literal("hello"), literal("world")}))) in ( |
| """<pyarrow.compute.Expression is_in(foo, {value_set=string:[ |
| "hello", |
| "world" |
| ], null_matching_behavior=MATCH})>""", |
| """<pyarrow.compute.Expression is_in(foo, {value_set=string:[ |
| "world", |
| "hello" |
| ], null_matching_behavior=MATCH})>""", |
| ) |
| |
| |
| def test_expr_not_in_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(BoundNotIn(bound_reference, {literal("hello"), literal("world")}))) in ( |
| """<pyarrow.compute.Expression invert(is_in(foo, {value_set=string:[ |
| "hello", |
| "world" |
| ], null_matching_behavior=MATCH}))>""", |
| """<pyarrow.compute.Expression invert(is_in(foo, {value_set=string:[ |
| "world", |
| "hello" |
| ], null_matching_behavior=MATCH}))>""", |
| ) |
| |
| |
| def test_expr_starts_with_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundStartsWith(bound_reference, literal("he")))) |
| == '<pyarrow.compute.Expression starts_with(foo, {pattern="he", ignore_case=false})>' |
| ) |
| |
| |
| def test_expr_not_starts_with_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(BoundNotStartsWith(bound_reference, literal("he")))) |
| == '<pyarrow.compute.Expression invert(starts_with(foo, {pattern="he", ignore_case=false}))>' |
| ) |
| |
| |
| def test_and_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(And(BoundEqualTo(bound_reference, literal("hello")), BoundIsNull(bound_reference)))) |
| == '<pyarrow.compute.Expression ((foo == "hello") and is_null(foo, {nan_is_null=false}))>' |
| ) |
| |
| |
| def test_or_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(Or(BoundEqualTo(bound_reference, literal("hello")), BoundIsNull(bound_reference)))) |
| == '<pyarrow.compute.Expression ((foo == "hello") or is_null(foo, {nan_is_null=false}))>' |
| ) |
| |
| |
| def test_not_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert ( |
| repr(expression_to_pyarrow(Not(BoundEqualTo(bound_reference, literal("hello"))))) |
| == '<pyarrow.compute.Expression invert((foo == "hello"))>' |
| ) |
| |
| |
| def test_always_true_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(AlwaysTrue())) == "<pyarrow.compute.Expression true>" |
| |
| |
| def test_always_false_to_pyarrow(bound_reference: BoundReference[str]) -> None: |
| assert repr(expression_to_pyarrow(AlwaysFalse())) == "<pyarrow.compute.Expression false>" |
| |
| |
| @pytest.fixture |
| def schema_int() -> Schema: |
| return Schema(NestedField(1, "id", IntegerType(), required=False)) |
| |
| |
| @pytest.fixture |
| def schema_int_str() -> Schema: |
| return Schema(NestedField(1, "id", IntegerType(), required=False), NestedField(2, "data", StringType(), required=False)) |
| |
| |
| @pytest.fixture |
| def schema_str() -> Schema: |
| return Schema(NestedField(2, "data", StringType(), required=False)) |
| |
| |
| @pytest.fixture |
| def schema_long() -> Schema: |
| return Schema(NestedField(3, "id", LongType(), required=False)) |
| |
| |
| @pytest.fixture |
| def schema_struct() -> Schema: |
| return Schema( |
| NestedField( |
| 4, |
| "location", |
| StructType( |
| NestedField(41, "lat", DoubleType()), |
| NestedField(42, "long", DoubleType()), |
| ), |
| ) |
| ) |
| |
| |
| @pytest.fixture |
| def schema_list() -> Schema: |
| return Schema( |
| NestedField(5, "ids", ListType(51, IntegerType(), element_required=False), required=False), |
| ) |
| |
| |
| @pytest.fixture |
| def schema_list_of_structs() -> Schema: |
| return Schema( |
| NestedField( |
| 5, |
| "locations", |
| ListType( |
| 51, |
| StructType(NestedField(511, "lat", DoubleType()), NestedField(512, "long", DoubleType())), |
| element_required=False, |
| ), |
| required=False, |
| ), |
| ) |
| |
| |
| @pytest.fixture |
| def schema_map_of_structs() -> Schema: |
| return Schema( |
| NestedField( |
| 5, |
| "locations", |
| MapType( |
| key_id=51, |
| value_id=52, |
| key_type=StringType(), |
| value_type=StructType(NestedField(511, "lat", DoubleType()), NestedField(512, "long", DoubleType())), |
| element_required=False, |
| ), |
| required=False, |
| ), |
| ) |
| |
| |
| @pytest.fixture |
| def schema_map() -> Schema: |
| return Schema( |
| NestedField( |
| 5, |
| "properties", |
| MapType( |
| key_id=51, |
| key_type=StringType(), |
| value_id=52, |
| value_type=StringType(), |
| value_required=True, |
| ), |
| required=False, |
| ), |
| ) |
| |
| |
| def _write_table_to_file(filepath: str, schema: pa.Schema, table: pa.Table) -> str: |
| with pq.ParquetWriter(filepath, schema) as writer: |
| writer.write_table(table) |
| return filepath |
| |
| |
| @pytest.fixture |
| def file_int(schema_int: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_int, metadata={ICEBERG_SCHEMA: bytes(schema_int.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/a.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema) |
| ) |
| |
| |
| @pytest.fixture |
| def file_int_str(schema_int_str: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_int_str, metadata={ICEBERG_SCHEMA: bytes(schema_int_str.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/a.parquet", |
| pyarrow_schema, |
| pa.Table.from_arrays([pa.array([0, 1, 2]), pa.array(["0", "1", "2"])], schema=pyarrow_schema), |
| ) |
| |
| |
| @pytest.fixture |
| def file_string(schema_str: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_str, metadata={ICEBERG_SCHEMA: bytes(schema_str.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/b.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array(["0", "1", "2"])], schema=pyarrow_schema) |
| ) |
| |
| |
| @pytest.fixture |
| def file_long(schema_long: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_long, metadata={ICEBERG_SCHEMA: bytes(schema_long.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/c.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema) |
| ) |
| |
| |
| @pytest.fixture |
| def file_struct(schema_struct: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_struct, metadata={ICEBERG_SCHEMA: bytes(schema_struct.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/d.parquet", |
| pyarrow_schema, |
| pa.Table.from_pylist( |
| [ |
| {"location": {"lat": 52.371807, "long": 4.896029}}, |
| {"location": {"lat": 52.387386, "long": 4.646219}}, |
| {"location": {"lat": 52.078663, "long": 4.288788}}, |
| ], |
| schema=pyarrow_schema, |
| ), |
| ) |
| |
| |
| @pytest.fixture |
| def file_list(schema_list: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_list, metadata={ICEBERG_SCHEMA: bytes(schema_list.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/e.parquet", |
| pyarrow_schema, |
| pa.Table.from_pylist( |
| [ |
| {"ids": list(range(1, 10))}, |
| {"ids": list(range(2, 20))}, |
| {"ids": list(range(3, 30))}, |
| ], |
| schema=pyarrow_schema, |
| ), |
| ) |
| |
| |
| @pytest.fixture |
| def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow( |
| schema_list_of_structs, metadata={ICEBERG_SCHEMA: bytes(schema_list_of_structs.model_dump_json(), UTF8)} |
| ) |
| return _write_table_to_file( |
| f"file:{tmpdir}/e.parquet", |
| pyarrow_schema, |
| pa.Table.from_pylist( |
| [ |
| {"locations": [{"lat": 52.371807, "long": 4.896029}, {"lat": 52.387386, "long": 4.646219}]}, |
| {"locations": []}, |
| {"locations": [{"lat": 52.078663, "long": 4.288788}, {"lat": 52.387386, "long": 4.646219}]}, |
| ], |
| schema=pyarrow_schema, |
| ), |
| ) |
| |
| |
| @pytest.fixture |
| def file_map_of_structs(schema_map_of_structs: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow( |
| schema_map_of_structs, metadata={ICEBERG_SCHEMA: bytes(schema_map_of_structs.model_dump_json(), UTF8)} |
| ) |
| return _write_table_to_file( |
| f"file:{tmpdir}/e.parquet", |
| pyarrow_schema, |
| pa.Table.from_pylist( |
| [ |
| {"locations": {"1": {"lat": 52.371807, "long": 4.896029}, "2": {"lat": 52.387386, "long": 4.646219}}}, |
| {"locations": {}}, |
| {"locations": {"3": {"lat": 52.078663, "long": 4.288788}, "4": {"lat": 52.387386, "long": 4.646219}}}, |
| ], |
| schema=pyarrow_schema, |
| ), |
| ) |
| |
| |
| @pytest.fixture |
| def file_map(schema_map: Schema, tmpdir: str) -> str: |
| pyarrow_schema = schema_to_pyarrow(schema_map, metadata={ICEBERG_SCHEMA: bytes(schema_map.model_dump_json(), UTF8)}) |
| return _write_table_to_file( |
| f"file:{tmpdir}/e.parquet", |
| pyarrow_schema, |
| pa.Table.from_pylist( |
| [ |
| {"properties": [("a", "b")]}, |
| {"properties": [("c", "d")]}, |
| {"properties": [("e", "f"), ("g", "h")]}, |
| ], |
| schema=pyarrow_schema, |
| ), |
| ) |
| |
| |
| def project( |
| schema: Schema, files: List[str], expr: Optional[BooleanExpression] = None, table_schema: Optional[Schema] = None |
| ) -> pa.Table: |
| return project_table( |
| [ |
| FileScanTask( |
| DataFile( |
| content=DataFileContent.DATA, |
| file_path=file, |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| ) |
| ) |
| for file in files |
| ], |
| Table( |
| ("namespace", "table"), |
| metadata=TableMetadataV2( |
| location="file://a/b/", |
| last_column_id=1, |
| format_version=2, |
| schemas=[table_schema or schema], |
| partition_specs=[PartitionSpec()], |
| ), |
| metadata_location="file://a/b/c.json", |
| io=PyArrowFileIO(), |
| catalog=NoopCatalog("NoopCatalog"), |
| ), |
| expr or AlwaysTrue(), |
| schema, |
| case_sensitive=True, |
| ) |
| |
| |
| def test_projection_add_column(file_int: str) -> None: |
| schema = Schema( |
| # All new IDs |
| NestedField(10, "id", IntegerType(), required=False), |
| NestedField(20, "list", ListType(21, IntegerType(), element_required=False), required=False), |
| NestedField( |
| 30, |
| "map", |
| MapType(key_id=31, key_type=IntegerType(), value_id=32, value_type=StringType(), value_required=False), |
| required=False, |
| ), |
| NestedField( |
| 40, |
| "location", |
| StructType( |
| NestedField(41, "lat", DoubleType(), required=False), NestedField(42, "lon", DoubleType(), required=False) |
| ), |
| required=False, |
| ), |
| ) |
| result_table = project(schema, [file_int]) |
| |
| for col in result_table.columns: |
| assert len(col) == 3 |
| |
| for actual, expected in zip(result_table.columns[0], [None, None, None]): |
| assert actual.as_py() == expected |
| |
| for actual, expected in zip(result_table.columns[1], [None, None, None]): |
| assert actual.as_py() == expected |
| |
| for actual, expected in zip(result_table.columns[2], [None, None, None]): |
| assert actual.as_py() == expected |
| |
| for actual, expected in zip(result_table.columns[3], [None, None, None]): |
| assert actual.as_py() == expected |
| assert ( |
| repr(result_table.schema) |
| == """id: int32 |
| list: list<element: int32> |
| child 0, element: int32 |
| -- field metadata -- |
| PARQUET:field_id: '21' |
| map: map<int32, string> |
| child 0, entries: struct<key: int32 not null, value: string> not null |
| child 0, key: int32 not null |
| -- field metadata -- |
| PARQUET:field_id: '31' |
| child 1, value: string |
| -- field metadata -- |
| PARQUET:field_id: '32' |
| location: struct<lat: double, lon: double> |
| child 0, lat: double |
| -- field metadata -- |
| PARQUET:field_id: '41' |
| child 1, lon: double |
| -- field metadata -- |
| PARQUET:field_id: '42'""" |
| ) |
| |
| |
| def test_read_list(schema_list: Schema, file_list: str) -> None: |
| result_table = project(schema_list, [file_list]) |
| |
| assert len(result_table.columns[0]) == 3 |
| for actual, expected in zip(result_table.columns[0], [list(range(1, 10)), list(range(2, 20)), list(range(3, 30))]): |
| assert actual.as_py() == expected |
| |
| assert ( |
| repr(result_table.schema) |
| == """ids: list<element: int32> |
| child 0, element: int32""" |
| ) |
| |
| |
| def test_read_map(schema_map: Schema, file_map: str) -> None: |
| result_table = project(schema_map, [file_map]) |
| |
| assert len(result_table.columns[0]) == 3 |
| for actual, expected in zip(result_table.columns[0], [[("a", "b")], [("c", "d")], [("e", "f"), ("g", "h")]]): |
| assert actual.as_py() == expected |
| |
| assert ( |
| repr(result_table.schema) |
| == """properties: map<string, string> |
| child 0, entries: struct<key: string not null, value: string not null> not null |
| child 0, key: string not null |
| child 1, value: string not null""" |
| ) |
| |
| |
| def test_projection_add_column_struct(schema_int: Schema, file_int: str) -> None: |
| schema = Schema( |
| # A new ID |
| NestedField( |
| 2, |
| "id", |
| MapType(key_id=3, key_type=IntegerType(), value_id=4, value_type=StringType(), value_required=False), |
| required=False, |
| ) |
| ) |
| result_table = project(schema, [file_int]) |
| # Everything should be None |
| for r in result_table.columns[0]: |
| assert r.as_py() is None |
| assert ( |
| repr(result_table.schema) |
| == """id: map<int32, string> |
| child 0, entries: struct<key: int32 not null, value: string> not null |
| child 0, key: int32 not null |
| -- field metadata -- |
| PARQUET:field_id: '3' |
| child 1, value: string |
| -- field metadata -- |
| PARQUET:field_id: '4'""" |
| ) |
| |
| |
| def test_projection_add_column_struct_required(file_int: str) -> None: |
| schema = Schema( |
| # A new ID |
| NestedField( |
| 2, |
| "other_id", |
| IntegerType(), |
| required=True, |
| ) |
| ) |
| with pytest.raises(ResolveError) as exc_info: |
| _ = project(schema, [file_int]) |
| assert "Field is required, and could not be found in the file: 2: other_id: required int" in str(exc_info.value) |
| |
| |
| def test_projection_rename_column(schema_int: Schema, file_int: str) -> None: |
| schema = Schema( |
| # Reuses the id 1 |
| NestedField(1, "other_name", IntegerType()) |
| ) |
| result_table = project(schema, [file_int]) |
| assert len(result_table.columns[0]) == 3 |
| for actual, expected in zip(result_table.columns[0], [0, 1, 2]): |
| assert actual.as_py() == expected |
| |
| assert repr(result_table.schema) == "other_name: int32 not null" |
| |
| |
| def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: |
| result_table = project(schema_int, [file_int, file_int]) |
| |
| for actual, expected in zip(result_table.columns[0], [0, 1, 2, 0, 1, 2]): |
| assert actual.as_py() == expected |
| assert len(result_table.columns[0]) == 6 |
| assert repr(result_table.schema) == "id: int32" |
| |
| |
| def test_projection_filter(schema_int: Schema, file_int: str) -> None: |
| result_table = project(schema_int, [file_int], GreaterThan("id", 4)) |
| assert len(result_table.columns[0]) == 0 |
| assert ( |
| repr(result_table.schema) |
| == """id: int32 |
| -- field metadata -- |
| PARQUET:field_id: '1'""" |
| ) |
| |
| |
| def test_projection_filter_renamed_column(file_int: str) -> None: |
| schema = Schema( |
| # Reuses the id 1 |
| NestedField(1, "other_id", IntegerType()) |
| ) |
| result_table = project(schema, [file_int], GreaterThan("other_id", 1)) |
| assert len(result_table.columns[0]) == 1 |
| assert repr(result_table.schema) == "other_id: int32 not null" |
| |
| |
| def test_projection_filter_add_column(schema_int: Schema, file_int: str, file_string: str) -> None: |
| """We have one file that has the column, and the other one doesn't""" |
| result_table = project(schema_int, [file_int, file_string]) |
| |
| for actual, expected in zip(result_table.columns[0], [0, 1, 2, None, None, None]): |
| assert actual.as_py() == expected |
| assert len(result_table.columns[0]) == 6 |
| assert repr(result_table.schema) == "id: int32" |
| |
| |
| def test_projection_filter_add_column_promote(file_int: str) -> None: |
| schema_long = Schema(NestedField(1, "id", LongType())) |
| result_table = project(schema_long, [file_int]) |
| |
| for actual, expected in zip(result_table.columns[0], [0, 1, 2]): |
| assert actual.as_py() == expected |
| assert len(result_table.columns[0]) == 3 |
| assert repr(result_table.schema) == "id: int64 not null" |
| |
| |
| def test_projection_filter_add_column_demote(file_long: str) -> None: |
| schema_int = Schema(NestedField(3, "id", IntegerType())) |
| with pytest.raises(ResolveError) as exc_info: |
| _ = project(schema_int, [file_long]) |
| assert "Cannot promote long to int" in str(exc_info.value) |
| |
| |
| def test_projection_nested_struct_subset(file_struct: str) -> None: |
| schema = Schema( |
| NestedField( |
| 4, |
| "location", |
| StructType( |
| NestedField(41, "lat", DoubleType()), |
| # long is missing! |
| ), |
| ) |
| ) |
| |
| result_table = project(schema, [file_struct]) |
| |
| for actual, expected in zip(result_table.columns[0], [52.371807, 52.387386, 52.078663]): |
| assert actual.as_py() == {"lat": expected} |
| |
| assert len(result_table.columns[0]) == 3 |
| assert ( |
| repr(result_table.schema) |
| == """location: struct<lat: double not null> not null |
| child 0, lat: double not null""" |
| ) |
| |
| |
| def test_projection_nested_new_field(file_struct: str) -> None: |
| schema = Schema( |
| NestedField( |
| 4, |
| "location", |
| StructType( |
| NestedField(43, "null", DoubleType(), required=False), # Whoa, this column doesn't exist in the file |
| ), |
| ) |
| ) |
| |
| result_table = project(schema, [file_struct]) |
| |
| for actual, expected in zip(result_table.columns[0], [None, None, None]): |
| assert actual.as_py() == {"null": expected} |
| assert len(result_table.columns[0]) == 3 |
| assert ( |
| repr(result_table.schema) |
| == """location: struct<null: double> not null |
| child 0, null: double""" |
| ) |
| |
| |
| def test_projection_nested_struct(schema_struct: Schema, file_struct: str) -> None: |
| schema = Schema( |
| NestedField( |
| 4, |
| "location", |
| StructType( |
| NestedField(41, "lat", DoubleType(), required=False), |
| NestedField(43, "null", DoubleType(), required=False), |
| NestedField(42, "long", DoubleType(), required=False), |
| ), |
| ) |
| ) |
| |
| result_table = project(schema, [file_struct]) |
| for actual, expected in zip( |
| result_table.columns[0], |
| [ |
| {"lat": 52.371807, "long": 4.896029, "null": None}, |
| {"lat": 52.387386, "long": 4.646219, "null": None}, |
| {"lat": 52.078663, "long": 4.288788, "null": None}, |
| ], |
| ): |
| assert actual.as_py() == expected |
| assert len(result_table.columns[0]) == 3 |
| assert ( |
| repr(result_table.schema) |
| == """location: struct<lat: double, null: double, long: double> not null |
| child 0, lat: double |
| child 1, null: double |
| child 2, long: double""" |
| ) |
| |
| |
| def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of_structs: str) -> None: |
| schema = Schema( |
| NestedField( |
| 5, |
| "locations", |
| ListType( |
| 51, |
| StructType( |
| NestedField(511, "latitude", DoubleType()), |
| NestedField(512, "longitude", DoubleType()), |
| NestedField(513, "altitude", DoubleType(), required=False), |
| ), |
| element_required=False, |
| ), |
| required=False, |
| ), |
| ) |
| |
| result_table = project(schema, [file_list_of_structs]) |
| assert len(result_table.columns) == 1 |
| assert len(result_table.columns[0]) == 3 |
| results = [row.as_py() for row in result_table.columns[0]] |
| assert results == [ |
| [ |
| {'latitude': 52.371807, 'longitude': 4.896029, 'altitude': None}, |
| {'latitude': 52.387386, 'longitude': 4.646219, 'altitude': None}, |
| ], |
| [], |
| [ |
| {'latitude': 52.078663, 'longitude': 4.288788, 'altitude': None}, |
| {'latitude': 52.387386, 'longitude': 4.646219, 'altitude': None}, |
| ], |
| ] |
| assert ( |
| repr(result_table.schema) |
| == """locations: list<element: struct<latitude: double not null, longitude: double not null, altitude: double>> |
| child 0, element: struct<latitude: double not null, longitude: double not null, altitude: double> |
| child 0, latitude: double not null |
| child 1, longitude: double not null |
| child 2, altitude: double""" |
| ) |
| |
| |
| def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_structs: str) -> None: |
| schema = Schema( |
| NestedField( |
| 5, |
| "locations", |
| MapType( |
| key_id=51, |
| value_id=52, |
| key_type=StringType(), |
| value_type=StructType( |
| NestedField(511, "latitude", DoubleType()), |
| NestedField(512, "longitude", DoubleType()), |
| NestedField(513, "altitude", DoubleType(), required=False), |
| ), |
| element_required=False, |
| ), |
| required=False, |
| ), |
| ) |
| |
| result_table = project(schema, [file_map_of_structs]) |
| assert len(result_table.columns) == 1 |
| assert len(result_table.columns[0]) == 3 |
| for actual, expected in zip( |
| result_table.columns[0], |
| [ |
| [ |
| ("1", {"latitude": 52.371807, "longitude": 4.896029, "altitude": None}), |
| ("2", {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}), |
| ], |
| [], |
| [ |
| ("3", {"latitude": 52.078663, "longitude": 4.288788, "altitude": None}), |
| ("4", {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}), |
| ], |
| ], |
| ): |
| assert actual.as_py() == expected |
| assert ( |
| repr(result_table.schema) |
| == """locations: map<string, struct<latitude: double not null, longitude: double not null, altitude: double>> |
| child 0, entries: struct<key: string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null |
| child 0, key: string not null |
| child 1, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null |
| child 0, latitude: double not null |
| child 1, longitude: double not null |
| child 2, altitude: double""" |
| ) |
| |
| |
| def test_projection_nested_struct_different_parent_id(file_struct: str) -> None: |
| schema = Schema( |
| NestedField( |
| 5, # 😱 this is 4 in the file, this will be fixed when projecting the file schema |
| "location", |
| StructType( |
| NestedField(41, "lat", DoubleType(), required=False), NestedField(42, "long", DoubleType(), required=False) |
| ), |
| required=False, |
| ) |
| ) |
| |
| result_table = project(schema, [file_struct]) |
| for actual, expected in zip(result_table.columns[0], [None, None, None]): |
| assert actual.as_py() == expected |
| assert len(result_table.columns[0]) == 3 |
| assert ( |
| repr(result_table.schema) |
| == """location: struct<lat: double, long: double> |
| child 0, lat: double |
| -- field metadata -- |
| PARQUET:field_id: '41' |
| child 1, long: double |
| -- field metadata -- |
| PARQUET:field_id: '42'""" |
| ) |
| |
| |
| def test_projection_filter_on_unprojected_field(schema_int_str: Schema, file_int_str: str) -> None: |
| schema = Schema(NestedField(1, "id", IntegerType())) |
| |
| result_table = project(schema, [file_int_str], GreaterThan("data", "1"), schema_int_str) |
| |
| for actual, expected in zip( |
| result_table.columns[0], |
| [2], |
| ): |
| assert actual.as_py() == expected |
| assert len(result_table.columns[0]) == 1 |
| assert repr(result_table.schema) == "id: int32 not null" |
| |
| |
| def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str: str) -> None: |
| schema = Schema(NestedField(1, "id", IntegerType())) |
| |
| with pytest.raises(ValueError) as exc_info: |
| _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str) |
| |
| assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value) |
| |
| |
| @pytest.fixture |
| def deletes_file(tmp_path: str, example_task: FileScanTask) -> str: |
| path = example_task.file.file_path |
| table = pa.table({"file_path": [path, path, path], "pos": [1, 3, 5]}) |
| |
| deletes_file_path = f"{tmp_path}/deletes.parquet" |
| pq.write_table(table, deletes_file_path) |
| |
| return deletes_file_path |
| |
| |
| def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None: |
| deletes = _read_deletes(LocalFileSystem(), DataFile(file_path=deletes_file, file_format=FileFormat.PARQUET)) |
| assert set(deletes.keys()) == {example_task.file.file_path} |
| assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]]) |
| |
| |
| def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simple: Schema) -> None: |
| metadata_location = "file://a/b/c.json" |
| example_task_with_delete = FileScanTask( |
| data_file=example_task.file, |
| delete_files={DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET)}, |
| ) |
| |
| with_deletes = project_table( |
| tasks=[example_task_with_delete], |
| table=Table( |
| ("namespace", "table"), |
| metadata=TableMetadataV2( |
| location=metadata_location, |
| last_column_id=1, |
| format_version=2, |
| current_schema_id=1, |
| schemas=[table_schema_simple], |
| partition_specs=[PartitionSpec()], |
| ), |
| metadata_location=metadata_location, |
| io=load_file_io(), |
| catalog=NoopCatalog("noop"), |
| ), |
| row_filter=AlwaysTrue(), |
| projected_schema=table_schema_simple, |
| ) |
| |
| assert ( |
| str(with_deletes) |
| == """pyarrow.Table |
| foo: string |
| bar: int64 not null |
| baz: bool |
| ---- |
| foo: [["a","c"]] |
| bar: [[1,3]] |
| baz: [[true,null]]""" |
| ) |
| |
| |
| def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_schema_simple: Schema) -> None: |
| metadata_location = "file://a/b/c.json" |
| example_task_with_delete = FileScanTask( |
| data_file=example_task.file, |
| delete_files={ |
| DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), |
| DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), |
| }, |
| ) |
| |
| with_deletes = project_table( |
| tasks=[example_task_with_delete], |
| table=Table( |
| ("namespace", "table"), |
| metadata=TableMetadataV2( |
| location=metadata_location, |
| last_column_id=1, |
| format_version=2, |
| current_schema_id=1, |
| schemas=[table_schema_simple], |
| partition_specs=[PartitionSpec()], |
| ), |
| metadata_location=metadata_location, |
| io=load_file_io(), |
| catalog=NoopCatalog("noop"), |
| ), |
| row_filter=AlwaysTrue(), |
| projected_schema=table_schema_simple, |
| ) |
| |
| assert ( |
| str(with_deletes) |
| == """pyarrow.Table |
| foo: string |
| bar: int64 not null |
| baz: bool |
| ---- |
| foo: [["a","c"]] |
| bar: [[1,3]] |
| baz: [[true,null]]""" |
| ) |
| |
| |
| def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Schema) -> None: |
| metadata_location = "file://a/b/c.json" |
| projection = project_table( |
| [example_task], |
| Table( |
| ("namespace", "table"), |
| metadata=TableMetadataV2( |
| location=metadata_location, |
| last_column_id=1, |
| format_version=2, |
| current_schema_id=1, |
| schemas=[table_schema_simple], |
| partition_specs=[PartitionSpec()], |
| ), |
| metadata_location=metadata_location, |
| io=load_file_io(properties={"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location), |
| catalog=NoopCatalog("NoopCatalog"), |
| ), |
| case_sensitive=True, |
| projected_schema=table_schema_simple, |
| row_filter=AlwaysTrue(), |
| ) |
| |
| assert ( |
| str(projection) |
| == """pyarrow.Table |
| foo: string |
| bar: int64 not null |
| baz: bool |
| ---- |
| foo: [["a","b","c"]] |
| bar: [[1,2,3]] |
| baz: [[true,false,null]]""" |
| ) |
| |
| |
| @pytest.mark.gcs |
| def test_new_input_file_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test creating a new input file from a fsspec file-io""" |
| filename = str(uuid4()) |
| |
| input_file = pyarrow_fileio_gcs.new_input(f"gs://warehouse/{filename}") |
| |
| assert isinstance(input_file, PyArrowFile) |
| assert input_file.location == f"gs://warehouse/{filename}" |
| |
| |
| @pytest.mark.gcs |
| def test_new_output_file_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test creating a new output file from an fsspec file-io""" |
| filename = str(uuid4()) |
| |
| output_file = pyarrow_fileio_gcs.new_output(f"gs://warehouse/{filename}") |
| |
| assert isinstance(output_file, PyArrowFile) |
| assert output_file.location == f"gs://warehouse/{filename}" |
| |
| |
| @pytest.mark.gcs |
| @pytest.mark.skip(reason="Open issue on Arrow: https://github.com/apache/arrow/issues/36993") |
| def test_write_and_read_file_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test writing and reading a file using FsspecInputFile and FsspecOutputFile""" |
| location = f"gs://warehouse/{uuid4()}.txt" |
| output_file = pyarrow_fileio_gcs.new_output(location=location) |
| with output_file.create() as f: |
| assert f.write(b"foo") == 3 |
| |
| assert output_file.exists() |
| |
| input_file = pyarrow_fileio_gcs.new_input(location=location) |
| with input_file.open() as f: |
| assert f.read() == b"foo" |
| |
| pyarrow_fileio_gcs.delete(input_file) |
| |
| |
| @pytest.mark.gcs |
| def test_getting_length_of_file_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test getting the length of an FsspecInputFile and FsspecOutputFile""" |
| filename = str(uuid4()) |
| |
| output_file = pyarrow_fileio_gcs.new_output(location=f"gs://warehouse/{filename}") |
| with output_file.create() as f: |
| f.write(b"foobar") |
| |
| assert len(output_file) == 6 |
| |
| input_file = pyarrow_fileio_gcs.new_input(location=f"gs://warehouse/{filename}") |
| assert len(input_file) == 6 |
| |
| pyarrow_fileio_gcs.delete(output_file) |
| |
| |
| @pytest.mark.gcs |
| @pytest.mark.skip(reason="Open issue on Arrow: https://github.com/apache/arrow/issues/36993") |
| def test_file_tell_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| location = f"gs://warehouse/{uuid4()}" |
| |
| output_file = pyarrow_fileio_gcs.new_output(location=location) |
| with output_file.create() as write_file: |
| write_file.write(b"foobar") |
| |
| input_file = pyarrow_fileio_gcs.new_input(location=location) |
| with input_file.open() as f: |
| f.seek(0) |
| assert f.tell() == 0 |
| f.seek(1) |
| assert f.tell() == 1 |
| f.seek(3) |
| assert f.tell() == 3 |
| f.seek(0) |
| assert f.tell() == 0 |
| |
| |
| @pytest.mark.gcs |
| @pytest.mark.skip(reason="Open issue on Arrow: https://github.com/apache/arrow/issues/36993") |
| def test_read_specified_bytes_for_file_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| location = f"gs://warehouse/{uuid4()}" |
| |
| output_file = pyarrow_fileio_gcs.new_output(location=location) |
| with output_file.create() as write_file: |
| write_file.write(b"foo") |
| |
| input_file = pyarrow_fileio_gcs.new_input(location=location) |
| with input_file.open() as f: |
| f.seek(0) |
| assert b"f" == f.read(1) |
| f.seek(0) |
| assert b"fo" == f.read(2) |
| f.seek(1) |
| assert b"o" == f.read(1) |
| f.seek(1) |
| assert b"oo" == f.read(2) |
| f.seek(0) |
| assert b"foo" == f.read(999) # test reading amount larger than entire content length |
| |
| pyarrow_fileio_gcs.delete(input_file) |
| |
| |
| @pytest.mark.gcs |
| @pytest.mark.skip(reason="Open issue on Arrow: https://github.com/apache/arrow/issues/36993") |
| def test_raise_on_opening_file_not_found_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test that an fsspec input file raises appropriately when the gcs file is not found""" |
| |
| filename = str(uuid4()) |
| input_file = pyarrow_fileio_gcs.new_input(location=f"gs://warehouse/{filename}") |
| with pytest.raises(FileNotFoundError) as exc_info: |
| input_file.open().read() |
| |
| assert filename in str(exc_info.value) |
| |
| |
| @pytest.mark.gcs |
| def test_checking_if_a_file_exists_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test checking if a file exists""" |
| non_existent_file = pyarrow_fileio_gcs.new_input(location="gs://warehouse/does-not-exist.txt") |
| assert not non_existent_file.exists() |
| |
| location = f"gs://warehouse/{uuid4()}" |
| output_file = pyarrow_fileio_gcs.new_output(location=location) |
| assert not output_file.exists() |
| with output_file.create() as f: |
| f.write(b"foo") |
| |
| existing_input_file = pyarrow_fileio_gcs.new_input(location=location) |
| assert existing_input_file.exists() |
| |
| existing_output_file = pyarrow_fileio_gcs.new_output(location=location) |
| assert existing_output_file.exists() |
| |
| pyarrow_fileio_gcs.delete(existing_output_file) |
| |
| |
| @pytest.mark.gcs |
| @pytest.mark.skip(reason="Open issue on Arrow: https://github.com/apache/arrow/issues/36993") |
| def test_closing_a_file_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test closing an output file and input file""" |
| filename = str(uuid4()) |
| output_file = pyarrow_fileio_gcs.new_output(location=f"gs://warehouse/{filename}") |
| with output_file.create() as write_file: |
| write_file.write(b"foo") |
| assert not write_file.closed # type: ignore |
| assert write_file.closed # type: ignore |
| |
| input_file = pyarrow_fileio_gcs.new_input(location=f"gs://warehouse/{filename}") |
| with input_file.open() as f: |
| assert not f.closed # type: ignore |
| assert f.closed # type: ignore |
| |
| pyarrow_fileio_gcs.delete(f"gs://warehouse/{filename}") |
| |
| |
| @pytest.mark.gcs |
| def test_converting_an_outputfile_to_an_inputfile_gcs(pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test converting an output file to an input file""" |
| filename = str(uuid4()) |
| output_file = pyarrow_fileio_gcs.new_output(location=f"gs://warehouse/{filename}") |
| input_file = output_file.to_input_file() |
| assert input_file.location == output_file.location |
| |
| |
| @pytest.mark.gcs |
| @pytest.mark.skip(reason="Open issue on Arrow: https://github.com/apache/arrow/issues/36993") |
| def test_writing_avro_file_gcs(generated_manifest_entry_file: str, pyarrow_fileio_gcs: PyArrowFileIO) -> None: |
| """Test that bytes match when reading a local avro file, writing it using fsspec file-io, and then reading it again""" |
| filename = str(uuid4()) |
| with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f: |
| b1 = f.read() |
| with pyarrow_fileio_gcs.new_output(location=f"gs://warehouse/{filename}").create() as out_f: |
| out_f.write(b1) |
| with pyarrow_fileio_gcs.new_input(location=f"gs://warehouse/{filename}").open() as in_f: |
| b2 = in_f.read() |
| assert b1 == b2 # Check that bytes of read from local avro file match bytes written to s3 |
| |
| pyarrow_fileio_gcs.delete(f"gs://warehouse/{filename}") |
| |
| |
| def test_parse_location() -> None: |
| def check_results(location: str, expected_schema: str, expected_netloc: str, expected_uri: str) -> None: |
| schema, netloc, uri = PyArrowFileIO.parse_location(location) |
| assert schema == expected_schema |
| assert netloc == expected_netloc |
| assert uri == expected_uri |
| |
| check_results("hdfs://127.0.0.1:9000/root/foo.txt", "hdfs", "127.0.0.1:9000", "hdfs://127.0.0.1:9000/root/foo.txt") |
| check_results("hdfs://127.0.0.1/root/foo.txt", "hdfs", "127.0.0.1", "hdfs://127.0.0.1/root/foo.txt") |
| check_results("hdfs://clusterA/root/foo.txt", "hdfs", "clusterA", "hdfs://clusterA/root/foo.txt") |
| |
| check_results("/root/foo.txt", "file", "", "/root/foo.txt") |
| check_results("/root/tmp/foo.txt", "file", "", "/root/tmp/foo.txt") |
| |
| |
| def test_make_compatible_name() -> None: |
| assert make_compatible_name("label/abc") == "label_x2Fabc" |
| assert make_compatible_name("label?abc") == "label_x3Fabc" |
| |
| |
| @pytest.mark.parametrize( |
| "vals, primitive_type, expected_result", |
| [ |
| ([None, 2, 1], IntegerType(), 1), |
| ([1, None, 2], IntegerType(), 1), |
| ([None, None, None], IntegerType(), None), |
| ([None, date(2024, 2, 4), date(2024, 1, 2)], DateType(), date(2024, 1, 2)), |
| ([date(2024, 1, 2), None, date(2024, 2, 4)], DateType(), date(2024, 1, 2)), |
| ([None, None, None], DateType(), None), |
| ], |
| ) |
| def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None: |
| stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type)) |
| |
| for val in vals: |
| stats.update_min(val) |
| |
| assert stats.current_min == expected_result |
| |
| |
| @pytest.mark.parametrize( |
| "vals, primitive_type, expected_result", |
| [ |
| ([None, 2, 1], IntegerType(), 2), |
| ([1, None, 2], IntegerType(), 2), |
| ([None, None, None], IntegerType(), None), |
| ([None, date(2024, 2, 4), date(2024, 1, 2)], DateType(), date(2024, 2, 4)), |
| ([date(2024, 1, 2), None, date(2024, 2, 4)], DateType(), date(2024, 2, 4)), |
| ([None, None, None], DateType(), None), |
| ], |
| ) |
| def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None: |
| stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type)) |
| |
| for val in vals: |
| stats.update_max(val) |
| |
| assert stats.current_max == expected_result |