| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint:disable=redefined-outer-name |
| |
| import math |
| from datetime import date, datetime |
| |
| import pyarrow as pa |
| import pytest |
| import pytz |
| from pyspark.sql import DataFrame, SparkSession |
| |
| from pyiceberg.catalog import Catalog |
| from pyiceberg.exceptions import NoSuchTableError |
| from pyiceberg.schema import Schema |
| from pyiceberg.table import Table |
| from pyiceberg.typedef import Properties |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DoubleType, |
| FixedType, |
| FloatType, |
| IntegerType, |
| LongType, |
| NestedField, |
| StringType, |
| TimestampType, |
| TimestamptzType, |
| ) |
| |
| TABLE_SCHEMA = Schema( |
| NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False), |
| NestedField(field_id=2, name="string", field_type=StringType(), required=False), |
| NestedField(field_id=3, name="string_long", field_type=StringType(), required=False), |
| NestedField(field_id=4, name="int", field_type=IntegerType(), required=False), |
| NestedField(field_id=5, name="long", field_type=LongType(), required=False), |
| NestedField(field_id=6, name="float", field_type=FloatType(), required=False), |
| NestedField(field_id=7, name="double", field_type=DoubleType(), required=False), |
| NestedField(field_id=8, name="timestamp", field_type=TimestampType(), required=False), |
| NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), required=False), |
| NestedField(field_id=10, name="date", field_type=DateType(), required=False), |
| # NestedField(field_id=11, name="time", field_type=TimeType(), required=False), |
| # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), |
| NestedField(field_id=12, name="binary", field_type=BinaryType(), required=False), |
| NestedField(field_id=13, name="fixed", field_type=FixedType(16), required=False), |
| ) |
| |
| |
| def _create_table(session_catalog: Catalog, identifier: str, properties: Properties) -> Table: |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| return session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) |
| |
| |
| def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: |
| from pandas.testing import assert_frame_equal |
| |
| assert df.column_names == [ |
| "content", |
| "file_path", |
| "file_format", |
| "spec_id", |
| "partition", |
| "record_count", |
| "file_size_in_bytes", |
| "column_sizes", |
| "value_counts", |
| "null_value_counts", |
| "nan_value_counts", |
| "lower_bounds", |
| "upper_bounds", |
| "key_metadata", |
| "split_offsets", |
| "equality_ids", |
| "sort_order_id", |
| "readable_metrics", |
| ] |
| |
| # make sure the non-nullable fields are filled |
| for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: |
| for value in df[int_column]: |
| assert isinstance(value.as_py(), int) |
| |
| for split_offsets in df["split_offsets"]: |
| if split_offsets.as_py() is not None: |
| assert isinstance(split_offsets.as_py(), list) |
| |
| for file_path in df["file_path"]: |
| assert file_path.as_py().startswith("s3://") |
| |
| # sort the dataframes by content and file_path to compare them, |
| # as the order of the files is not guaranteed in case of all_files |
| lhs = df.to_pandas().sort_values(by=["content", "file_path"]).reset_index(drop=True) |
| rhs = spark_df.toPandas().sort_values(by=["content", "file_path"]).reset_index(drop=True) |
| |
| lhs_subset = lhs[ |
| [ |
| "content", |
| "file_path", |
| "file_format", |
| "spec_id", |
| "record_count", |
| "file_size_in_bytes", |
| "split_offsets", |
| "equality_ids", |
| "sort_order_id", |
| ] |
| ] |
| rhs_subset = rhs[ |
| [ |
| "content", |
| "file_path", |
| "file_format", |
| "spec_id", |
| "record_count", |
| "file_size_in_bytes", |
| "split_offsets", |
| "equality_ids", |
| "sort_order_id", |
| ] |
| ] |
| |
| assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) |
| |
| for column in df.column_names: |
| if column == "partition": |
| # Spark leaves out the partition if the table is unpartitioned |
| continue |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): |
| # NaN != NaN in Python |
| continue |
| if column in [ |
| "column_sizes", |
| "value_counts", |
| "null_value_counts", |
| "nan_value_counts", |
| "lower_bounds", |
| "upper_bounds", |
| ]: |
| if isinstance(right, dict): |
| left = dict(left) |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| elif column == "readable_metrics": |
| assert list(left.keys()) == [ |
| "bool", |
| "string", |
| "string_long", |
| "int", |
| "long", |
| "float", |
| "double", |
| "timestamp", |
| "timestamptz", |
| "date", |
| "binary", |
| "fixed", |
| ] |
| assert left.keys() == right.keys() |
| |
| for rm_column in left.keys(): |
| rm_lhs = left[rm_column] |
| rm_rhs = right[rm_column] |
| |
| assert rm_lhs["column_size"] == rm_rhs["column_size"] |
| assert rm_lhs["value_count"] == rm_rhs["value_count"] |
| assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] |
| assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] |
| |
| if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]: |
| # PySpark does not correctly set the timstamptz |
| rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) |
| rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) |
| |
| assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] |
| assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] |
| else: |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_snapshots( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| identifier = "default.table_metadata_snapshots" |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| tbl.append(arrow_table_with_null) |
| # should produce a DELETE entry |
| tbl.overwrite(arrow_table_with_null) |
| # Since we don't rewrite, this should produce a new manifest with an ADDED entry |
| tbl.append(arrow_table_with_null) |
| |
| df = tbl.inspect.snapshots() |
| |
| assert df.column_names == [ |
| "committed_at", |
| "snapshot_id", |
| "parent_id", |
| "operation", |
| "manifest_list", |
| "summary", |
| ] |
| |
| for committed_at in df["committed_at"]: |
| assert isinstance(committed_at.as_py(), datetime) |
| |
| for snapshot_id in df["snapshot_id"]: |
| assert isinstance(snapshot_id.as_py(), int) |
| |
| assert df["parent_id"][0].as_py() is None |
| assert df["parent_id"][1:].to_pylist() == df["snapshot_id"][:-1].to_pylist() |
| |
| assert [operation.as_py() for operation in df["operation"]] == ["append", "delete", "append", "append"] |
| |
| for manifest_list in df["manifest_list"]: |
| assert manifest_list.as_py().startswith("s3://") |
| |
| file_size = int(next(value for key, value in df["summary"][0].as_py() if key == "added-files-size")) |
| assert file_size > 0 |
| |
| # Append |
| assert df["summary"][0].as_py() == [ |
| ("added-files-size", str(file_size)), |
| ("added-data-files", "1"), |
| ("added-records", "3"), |
| ("total-data-files", "1"), |
| ("total-delete-files", "0"), |
| ("total-records", "3"), |
| ("total-files-size", str(file_size)), |
| ("total-position-deletes", "0"), |
| ("total-equality-deletes", "0"), |
| ] |
| |
| # Delete |
| assert df["summary"][1].as_py() == [ |
| ("removed-files-size", str(file_size)), |
| ("deleted-data-files", "1"), |
| ("deleted-records", "3"), |
| ("total-data-files", "0"), |
| ("total-delete-files", "0"), |
| ("total-records", "0"), |
| ("total-files-size", "0"), |
| ("total-position-deletes", "0"), |
| ("total-equality-deletes", "0"), |
| ] |
| |
| lhs = spark.table(f"{identifier}.snapshots").toPandas() |
| rhs = df.to_pandas() |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| if column == "summary": |
| # Arrow returns a list of tuples, instead of a dict |
| right = dict(right) |
| |
| if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): |
| # NaN != NaN in Python |
| continue |
| |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_entries( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| identifier = "default.table_metadata_entries" |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| # Write some data |
| tbl.append(arrow_table_with_null) |
| # Generate a DELETE entry |
| tbl.overwrite(arrow_table_with_null) |
| |
| def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: |
| assert df.column_names == [ |
| "status", |
| "snapshot_id", |
| "sequence_number", |
| "file_sequence_number", |
| "data_file", |
| "readable_metrics", |
| ] |
| |
| # Make sure that they are filled properly |
| for int_column in ["status", "snapshot_id", "sequence_number", "file_sequence_number"]: |
| for value in df[int_column]: |
| assert isinstance(value.as_py(), int) |
| |
| for snapshot_id in df["snapshot_id"]: |
| assert isinstance(snapshot_id.as_py(), int) |
| |
| lhs = df.to_pandas() |
| rhs = spark_df.toPandas() |
| assert len(lhs) == len(rhs) |
| |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| if column == "data_file": |
| for df_column in left.keys(): |
| if df_column == "partition": |
| # Spark leaves out the partition if the table is unpartitioned |
| continue |
| |
| df_lhs = left[df_column] |
| df_rhs = right[df_column] |
| if isinstance(df_rhs, dict): |
| # Arrow turns dicts into lists of tuple |
| df_lhs = dict(df_lhs) |
| |
| assert df_lhs == df_rhs, f"Difference in data_file column {df_column}: {df_lhs} != {df_rhs}" |
| elif column == "readable_metrics": |
| assert list(left.keys()) == [ |
| "bool", |
| "string", |
| "string_long", |
| "int", |
| "long", |
| "float", |
| "double", |
| "timestamp", |
| "timestamptz", |
| "date", |
| "binary", |
| "fixed", |
| ] |
| |
| assert left.keys() == right.keys() |
| |
| for rm_column in left.keys(): |
| rm_lhs = left[rm_column] |
| rm_rhs = right[rm_column] |
| |
| assert rm_lhs["column_size"] == rm_rhs["column_size"] |
| assert rm_lhs["value_count"] == rm_rhs["value_count"] |
| assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] |
| assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] |
| |
| if rm_column == "timestamptz": |
| # PySpark does not correctly set the timstamptz |
| rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) |
| rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) |
| |
| assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] |
| assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] |
| else: |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| for snapshot in tbl.metadata.snapshots: |
| df = tbl.inspect.entries(snapshot_id=snapshot.snapshot_id) |
| spark_df = spark.sql(f"SELECT * FROM {identifier}.entries VERSION AS OF {snapshot.snapshot_id}") |
| check_pyiceberg_df_equals_spark_df(df, spark_df) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| identifier = "default.table_metadata_entries_partitioned" |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| spark.sql( |
| f""" |
| CREATE TABLE {identifier} ( |
| dt date |
| ) |
| PARTITIONED BY (months(dt)) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (CAST('2021-01-01' AS date)) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} |
| REPLACE PARTITION FIELD dt_month WITH days(dt) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (CAST('2021-02-01' AS date)) |
| """ |
| ) |
| |
| df = session_catalog.load_table(identifier).inspect.entries() |
| |
| assert df.to_pydict()["data_file"][0]["partition"] == {"dt_day": date(2021, 2, 1), "dt_month": None} |
| assert df.to_pydict()["data_file"][1]["partition"] == {"dt_day": None, "dt_month": 612} |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_refs( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| identifier = "default.table_metadata_refs" |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| # write data to create snapshot |
| tbl.append(arrow_table_with_null) |
| |
| # create a test branch |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} CREATE BRANCH IF NOT EXISTS testBranch RETAIN 7 DAYS WITH SNAPSHOT RETENTION 2 SNAPSHOTS |
| """ |
| ) |
| |
| # create a test tag against current snapshot |
| current_snapshot = tbl.current_snapshot() |
| assert current_snapshot is not None |
| current_snapshot_id = current_snapshot.snapshot_id |
| |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} CREATE TAG testTag AS OF VERSION {current_snapshot_id} RETAIN 180 DAYS |
| """ |
| ) |
| |
| df = tbl.refresh().inspect.refs() |
| |
| assert df.column_names == [ |
| "name", |
| "type", |
| "snapshot_id", |
| "max_reference_age_in_ms", |
| "min_snapshots_to_keep", |
| "max_snapshot_age_in_ms", |
| ] |
| |
| assert [name.as_py() for name in df["name"]] == ["testBranch", "main", "testTag"] |
| assert [ref_type.as_py() for ref_type in df["type"]] == ["BRANCH", "BRANCH", "TAG"] |
| |
| for snapshot_id in df["snapshot_id"]: |
| assert isinstance(snapshot_id.as_py(), int) |
| |
| for int_column in ["max_reference_age_in_ms", "min_snapshots_to_keep", "max_snapshot_age_in_ms"]: |
| for value in df[int_column]: |
| assert isinstance(value.as_py(), int) or not value.as_py() |
| |
| lhs = spark.table(f"{identifier}.refs").toPandas() |
| rhs = df.to_pandas() |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): |
| # NaN != NaN in Python |
| continue |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_partitions_unpartitioned( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| identifier = "default.table_metadata_partitions_unpartitioned" |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| # Write some data through multiple commits |
| tbl.append(arrow_table_with_null) |
| tbl.append(arrow_table_with_null) |
| |
| df = tbl.inspect.partitions() |
| assert df.column_names == [ |
| "record_count", |
| "file_count", |
| "total_data_file_size_in_bytes", |
| "position_delete_record_count", |
| "position_delete_file_count", |
| "equality_delete_record_count", |
| "equality_delete_file_count", |
| "last_updated_at", |
| "last_updated_snapshot_id", |
| ] |
| for last_updated_at in df["last_updated_at"]: |
| assert isinstance(last_updated_at.as_py(), datetime) |
| |
| int_cols = [ |
| "record_count", |
| "file_count", |
| "total_data_file_size_in_bytes", |
| "position_delete_record_count", |
| "position_delete_file_count", |
| "equality_delete_record_count", |
| "equality_delete_file_count", |
| "last_updated_snapshot_id", |
| ] |
| for column in int_cols: |
| for value in df[column]: |
| assert isinstance(value.as_py(), int) |
| lhs = df.to_pandas() |
| rhs = spark.table(f"{identifier}.partitions").toPandas() |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| identifier = "default.table_metadata_partitions_partitioned" |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| spark.sql( |
| f""" |
| CREATE TABLE {identifier} ( |
| name string, |
| dt date |
| ) |
| PARTITIONED BY (months(dt)) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date)) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date)) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} |
| REPLACE PARTITION FIELD dt_month WITH days(dt) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date)) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} |
| DROP PARTITION FIELD dt_day |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date)) |
| """ |
| ) |
| |
| def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: |
| lhs = df.to_pandas().sort_values("spec_id") |
| rhs = spark_df.toPandas().sort_values("spec_id") |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| tbl = session_catalog.load_table(identifier) |
| for snapshot in tbl.metadata.snapshots: |
| df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) |
| spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") |
| check_pyiceberg_df_equals_spark_df(df, spark_df) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| identifier = "default.table_metadata_manifests" |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| spark.sql( |
| f""" |
| CREATE TABLE {identifier} ( |
| id int, |
| data string |
| ) |
| PARTITIONED BY (data) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (1, "a") |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (2, "b") |
| """ |
| ) |
| |
| df = session_catalog.load_table(identifier).inspect.manifests() |
| |
| assert df.column_names == [ |
| "content", |
| "path", |
| "length", |
| "partition_spec_id", |
| "added_snapshot_id", |
| "added_data_files_count", |
| "existing_data_files_count", |
| "deleted_data_files_count", |
| "added_delete_files_count", |
| "existing_delete_files_count", |
| "deleted_delete_files_count", |
| "partition_summaries", |
| ] |
| |
| int_cols = [ |
| "content", |
| "length", |
| "partition_spec_id", |
| "added_snapshot_id", |
| "added_data_files_count", |
| "existing_data_files_count", |
| "deleted_data_files_count", |
| "added_delete_files_count", |
| "existing_delete_files_count", |
| "deleted_delete_files_count", |
| ] |
| |
| for column in int_cols: |
| for value in df[column]: |
| assert isinstance(value.as_py(), int) |
| |
| for value in df["path"]: |
| assert isinstance(value.as_py(), str) |
| |
| for value in df["partition_summaries"]: |
| assert isinstance(value.as_py(), list) |
| for row in value: |
| assert isinstance(row["contains_null"].as_py(), bool) |
| assert isinstance(row["contains_nan"].as_py(), (bool, type(None))) |
| assert isinstance(row["lower_bound"].as_py(), (str, type(None))) |
| assert isinstance(row["upper_bound"].as_py(), (str, type(None))) |
| |
| lhs = spark.table(f"{identifier}.manifests").toPandas() |
| rhs = df.to_pandas() |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_metadata_log_entries( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| from pandas.testing import assert_frame_equal |
| |
| identifier = "default.table_metadata_log_entries" |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| # Write some data |
| tbl.append(arrow_table_with_null) |
| tbl.append(arrow_table_with_null) |
| tbl.append(arrow_table_with_null) |
| |
| df = tbl.inspect.metadata_log_entries() |
| spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries") |
| lhs = df.to_pandas() |
| rhs = spark_df.toPandas() |
| |
| # Timestamp in the last row of `metadata_log_entries` table is based on when the table was read |
| # Therefore, the timestamp of the last row for pyiceberg dataframe and spark dataframe will be different |
| left_before_last, left_last = lhs[:-1], lhs[-1:] |
| right_before_last, right_last = rhs[:-1], rhs[-1:] |
| |
| # compare all rows except for the last row |
| assert_frame_equal(left_before_last, right_before_last, check_dtype=False) |
| # compare the last row, except for the timestamp |
| for column in df.column_names: |
| for left, right in zip(left_last[column], right_last[column]): |
| if column == "timestamp": |
| continue |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| identifier = "default.table_history" |
| |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| spark.sql( |
| f""" |
| CREATE TABLE {identifier} ( |
| id int, |
| data string |
| ) |
| PARTITIONED BY (data) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (1, "a") |
| """ |
| ) |
| |
| table = session_catalog.load_table(identifier) |
| first_snapshot = table.current_snapshot() |
| snapshot_id = None if not first_snapshot else first_snapshot.snapshot_id |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (2, "b") |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| CALL integration.system.rollback_to_snapshot('{identifier}', {snapshot_id}) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (3, "c") |
| """ |
| ) |
| |
| table.refresh() |
| |
| df = table.inspect.history() |
| |
| assert df.column_names == [ |
| "made_current_at", |
| "snapshot_id", |
| "parent_id", |
| "is_current_ancestor", |
| ] |
| |
| lhs = spark.table(f"{identifier}.history").toPandas() |
| rhs = df.to_pandas() |
| for column in df.column_names: |
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): |
| if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): |
| # NaN != NaN in Python |
| continue |
| assert left == right, f"Difference in column {column}: {left} != {right}" |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_files( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| identifier = "default.table_metadata_files" |
| |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| tbl.append(arrow_table_with_null) |
| |
| # append more data |
| tbl.append(arrow_table_with_null) |
| |
| # configure table properties |
| if format_version == 2: |
| with tbl.transaction() as txn: |
| txn.set_properties({"write.delete.mode": "merge-on-read"}) |
| spark.sql(f"DELETE FROM {identifier} WHERE int = 1") |
| |
| files_df = tbl.refresh().inspect.files() |
| |
| data_files_df = tbl.inspect.data_files() |
| |
| delete_files_df = tbl.inspect.delete_files() |
| |
| _inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) |
| _inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) |
| _inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| identifier = "default.table_metadata_files" |
| |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| files_df = tbl.refresh().inspect.files() |
| data_files_df = tbl.inspect.data_files() |
| delete_files_df = tbl.inspect.delete_files() |
| all_files_df = tbl.inspect.all_files() |
| all_data_files_df = tbl.inspect.all_data_files() |
| all_delete_files_df = tbl.inspect.all_delete_files() |
| |
| def inspect_files_asserts(df: pa.Table) -> None: |
| assert df.column_names == [ |
| "content", |
| "file_path", |
| "file_format", |
| "spec_id", |
| "partition", |
| "record_count", |
| "file_size_in_bytes", |
| "column_sizes", |
| "value_counts", |
| "null_value_counts", |
| "nan_value_counts", |
| "lower_bounds", |
| "upper_bounds", |
| "key_metadata", |
| "split_offsets", |
| "equality_ids", |
| "sort_order_id", |
| "readable_metrics", |
| ] |
| |
| assert df.to_pandas().empty is True |
| |
| inspect_files_asserts(files_df) |
| inspect_files_asserts(data_files_df) |
| inspect_files_asserts(delete_files_df) |
| inspect_files_asserts(all_files_df) |
| inspect_files_asserts(all_data_files_df) |
| inspect_files_asserts(all_delete_files_df) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| from pandas.testing import assert_frame_equal |
| |
| identifier = "default.table_metadata_all_manifests" |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| spark.sql( |
| f""" |
| CREATE TABLE {identifier} ( |
| id int, |
| data string |
| ) |
| PARTITIONED BY (data) |
| TBLPROPERTIES ('write.update.mode'='merge-on-read', |
| 'write.delete.mode'='merge-on-read') |
| """ |
| ) |
| tbl = session_catalog.load_table(identifier) |
| |
| # check all_manifests when there are no snapshots |
| lhs = tbl.inspect.all_manifests().to_pandas() |
| rhs = spark.table(f"{identifier}.all_manifests").toPandas() |
| assert_frame_equal(lhs, rhs, check_dtype=False) |
| |
| spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") |
| |
| spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") |
| |
| spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") |
| |
| spark.sql(f"DELETE FROM {identifier} WHERE id = 2") |
| |
| spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") |
| |
| tbl.refresh() |
| df = tbl.inspect.all_manifests() |
| |
| assert df.column_names == [ |
| "content", |
| "path", |
| "length", |
| "partition_spec_id", |
| "added_snapshot_id", |
| "added_data_files_count", |
| "existing_data_files_count", |
| "deleted_data_files_count", |
| "added_delete_files_count", |
| "existing_delete_files_count", |
| "deleted_delete_files_count", |
| "partition_summaries", |
| "reference_snapshot_id", |
| ] |
| |
| int_cols = [ |
| "content", |
| "length", |
| "partition_spec_id", |
| "added_snapshot_id", |
| "added_data_files_count", |
| "existing_data_files_count", |
| "deleted_data_files_count", |
| "added_delete_files_count", |
| "existing_delete_files_count", |
| "deleted_delete_files_count", |
| "reference_snapshot_id", |
| ] |
| |
| for column in int_cols: |
| for value in df[column]: |
| assert isinstance(value.as_py(), int) |
| |
| for value in df["path"]: |
| assert isinstance(value.as_py(), str) |
| |
| for value in df["partition_summaries"]: |
| assert isinstance(value.as_py(), list) |
| for row in value: |
| assert isinstance(row["contains_null"].as_py(), bool) |
| assert isinstance(row["contains_nan"].as_py(), (bool, type(None))) |
| assert isinstance(row["lower_bound"].as_py(), (str, type(None))) |
| assert isinstance(row["upper_bound"].as_py(), (str, type(None))) |
| |
| lhs = spark.table(f"{identifier}.all_manifests").toPandas() |
| rhs = df.to_pandas() |
| assert_frame_equal(lhs, rhs, check_dtype=False) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2]) |
| def test_inspect_all_files( |
| spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
| ) -> None: |
| identifier = "default.table_metadata_files" |
| |
| tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) |
| |
| # append three times |
| for _ in range(3): |
| tbl.append(arrow_table_with_null) |
| |
| # configure table properties |
| if format_version == 2: |
| with tbl.transaction() as txn: |
| txn.set_properties({"write.delete.mode": "merge-on-read"}) |
| txn.set_properties({"write.update.mode": "merge-on-read"}) |
| spark.sql(f"DELETE FROM {identifier} WHERE int = 1") |
| tbl.refresh() |
| tbl.append(arrow_table_with_null) |
| spark.sql(f"UPDATE {identifier} SET string = 'b' WHERE int = 9") |
| spark.sql(f"DELETE FROM {identifier} WHERE int = 1") |
| tbl.refresh() |
| |
| all_files_df = tbl.inspect.all_files() |
| all_data_files_df = tbl.inspect.all_data_files() |
| all_delete_files_df = tbl.inspect.all_delete_files() |
| |
| _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) |
| _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) |
| _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) |
| |
| |
| @pytest.mark.integration |
| def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: |
| identifier = "default.table_metadata_files" |
| |
| tbl = _create_table( |
| session_catalog, |
| identifier, |
| properties={ |
| "format-version": "3", |
| "write.delete.mode": "merge-on-read", |
| "write.update.mode": "merge-on-read", |
| "write.merge.mode": "merge-on-read", |
| }, |
| ) |
| |
| insert_data_sql = f"""INSERT INTO {identifier} VALUES |
| (false, 'a', 'aaaaaaaaaaaaaaaaaaaaaa', 1, 1, 0.0, 0.0, TIMESTAMP('2023-01-01 19:25:00'), TIMESTAMP('2023-01-01 19:25:00+00:00'), DATE('2023-01-01'), X'01', X'00000000000000000000000000000000'), |
| (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), |
| (true, 'z', 'zzzzzzzzzzzzzzzzzzzzzz', 9, 9, 0.9, 0.9, TIMESTAMP('2023-03-01 19:25:00'), TIMESTAMP('2023-03-01 19:25:00+00:00'), DATE('2023-03-01'), X'12', X'11111111111111111111111111111111'); |
| """ |
| |
| spark.sql(insert_data_sql) |
| spark.sql(insert_data_sql) |
| spark.sql(f"UPDATE {identifier} SET int = 2 WHERE int = 1") |
| spark.sql(f"DELETE FROM {identifier} WHERE int = 9") |
| |
| tbl.refresh() |
| |
| files_df = tbl.inspect.files() |
| data_files_df = tbl.inspect.data_files() |
| delete_files_df = tbl.inspect.delete_files() |
| |
| all_files_df = tbl.inspect.all_files() |
| all_data_files_df = tbl.inspect.all_data_files() |
| all_delete_files_df = tbl.inspect.all_delete_files() |
| |
| _inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) |
| _inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) |
| _inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) |
| |
| _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) |
| _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) |
| _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize("format_version", [1, 2, 3]) |
| def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| from pandas.testing import assert_frame_equal |
| |
| identifier = "default.table_metadata_files_partitioned" |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| spark.sql( |
| f""" |
| CREATE TABLE {identifier} ( |
| dt date, |
| int_data int |
| ) |
| PARTITIONED BY (months(dt)) |
| TBLPROPERTIES ('format-version'='{format_version}') |
| """ |
| ) |
| |
| if format_version > 1: |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} SET TBLPROPERTIES( |
| 'write.update.mode' = 'merge-on-read', |
| 'write.delete.mode' = 'merge-on-read', |
| 'write.merge.mode' = 'merge-on-read') |
| """ |
| ) |
| |
| spark.sql(f""" |
| INSERT INTO {identifier} VALUES (CAST('2025-01-01' AS date), 1), (CAST('2025-01-01' AS date), 2) |
| """) |
| |
| spark.sql( |
| f""" |
| ALTER TABLE {identifier} |
| REPLACE PARTITION FIELD dt_month WITH days(dt) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| INSERT INTO {identifier} VALUES (CAST('2025-01-02' AS date), 2) |
| """ |
| ) |
| |
| spark.sql( |
| f""" |
| DELETE FROM {identifier} WHERE int_data = 1 |
| """ |
| ) |
| |
| tbl = session_catalog.load_table(identifier) |
| files_df = tbl.inspect.files() |
| lhs = files_df.to_pandas()[["file_path", "partition"]].sort_values("file_path", ignore_index=True).reset_index() |
| rhs = ( |
| spark.table(f"{identifier}.files") |
| .select(["file_path", "partition"]) |
| .toPandas() |
| .sort_values("file_path", ignore_index=True) |
| .reset_index() |
| ) |
| assert_frame_equal(lhs, rhs, check_dtype=False) |