blob: 46d54f04915fc8d4e877806c4e92b53b1ffb08f2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
import math
import os
import random
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Dict
from urllib.parse import urlparse
import pandas as pd
import pandas.testing
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest
import pytz
from pyarrow.fs import S3FileSystem
from pydantic_core import ValidationError
from pyspark.sql import SparkSession
from pytest_mock.plugin import MockerFixture
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
from pyiceberg.types import (
DateType,
DoubleType,
IntegerType,
ListType,
LongType,
NestedField,
StringType,
)
from utils import _create_table
@pytest.fixture(scope="session", autouse=True)
def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_with_null"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None:
identifier = "default.arrow_table_v1_without_data"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None:
identifier = "default.arrow_table_v1_with_only_nulls"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_appended_with_null"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v2_with_null"
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None:
identifier = "default.arrow_table_v2_without_data"
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None:
identifier = "default.arrow_table_v2_with_only_nulls"
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v2_appended_with_null"
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null])
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.fixture(scope="session", autouse=True)
def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_v2_appended_with_null"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
with tbl.transaction() as tx:
tx.upgrade_table_version(format_version=2)
tbl.append(arrow_table_with_null)
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_count(spark: SparkSession, format_version: int) -> None:
df = spark.table(f"default.arrow_table_v{format_version}_with_null")
assert df.count() == 3, "Expected 3 rows"
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_null(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_with_null"
df = spark.table(identifier)
for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for {col}"
assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for {col}"
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_without_data(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_without_data"
df = spark.table(identifier)
for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}"
assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}"
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_only_nulls(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_with_only_nulls"
df = spark.table(identifier)
for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for {col}"
assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}"
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_appended_null(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_appended_with_null"
df = spark.table(identifier)
for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}"
assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for {col}"
@pytest.mark.integration
def test_query_filter_v1_v2_append_null(
spark: SparkSession,
arrow_table_with_null: pa.Table,
) -> None:
identifier = "default.arrow_table_v1_v2_appended_with_null"
df = spark.table(identifier)
for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}"
assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for {col}"
@pytest.mark.integration
def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_summaries"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null])
tbl.overwrite(arrow_table_with_null)
rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()
operations = [row.operation for row in rows]
assert operations == ["append", "append", "delete", "append"]
summaries = [row.summary for row in rows]
file_size = int(summaries[0]["added-files-size"])
assert file_size > 0
# Append
assert summaries[0] == {
"added-data-files": "1",
"added-files-size": str(file_size),
"added-records": "3",
"total-data-files": "1",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": str(file_size),
"total-position-deletes": "0",
"total-records": "3",
}
# Append
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": str(file_size),
"added-records": "3",
"total-data-files": "2",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": str(file_size * 2),
"total-position-deletes": "0",
"total-records": "6",
}
# Delete
assert summaries[2] == {
"deleted-data-files": "2",
"deleted-records": "6",
"removed-files-size": str(file_size * 2),
"total-data-files": "0",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "0",
"total-position-deletes": "0",
"total-records": "0",
}
# Append
assert summaries[3] == {
"added-data-files": "1",
"added-files-size": str(file_size),
"added-records": "3",
"total-data-files": "1",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": str(file_size),
"total-position-deletes": "0",
"total-records": "3",
}
@pytest.mark.integration
def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.test_summaries_partial_overwrite"
TEST_DATA = {
"id": [1, 2, 3, 1, 1],
"name": ["AB", "CD", "EF", "CD", "EF"],
}
pa_schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
]
)
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema)
with tbl.update_spec() as txn:
txn.add_identity("id")
tbl.append(arrow_table)
assert len(tbl.inspect.data_files()) == 3
tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file
rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()
operations = [row.operation for row in rows]
assert operations == ["append", "overwrite"]
summaries = [row.summary for row in rows]
file_size = int(summaries[0]["added-files-size"])
assert file_size > 0
# APPEND
assert summaries[0] == {
"added-data-files": "3",
"added-files-size": "2570",
"added-records": "5",
"changed-partition-count": "3",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2570",
"total-position-deletes": "0",
"total-records": "5",
}
# Java produces:
# {
# "added-data-files": "1",
# "added-files-size": "707",
# "added-records": "2",
# "app-id": "local-1743678304626",
# "changed-partition-count": "1",
# "deleted-data-files": "1",
# "deleted-records": "3",
# "engine-name": "spark",
# "engine-version": "3.5.5",
# "iceberg-version": "Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)",
# "removed-files-size": "693",
# "spark.app.id": "local-1743678304626",
# "total-data-files": "3",
# "total-delete-files": "0",
# "total-equality-deletes": "0",
# "total-files-size": "1993",
# "total-position-deletes": "0",
# "total-records": "4"
# }
files = tbl.inspect.data_files()
assert len(files) == 3
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "859",
"added-records": "2",
"changed-partition-count": "1",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": "866",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2563",
"total-position-deletes": "0",
"total-records": "4",
}
assert len(tbl.scan().to_pandas()) == 4
@pytest.mark.integration
def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_data_files"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [])
tbl.append(arrow_table_with_null)
# should produce a DELETE entry
tbl.overwrite(arrow_table_with_null)
# Since we don't rewrite, this should produce a new manifest with an ADDED entry
tbl.append(arrow_table_with_null)
rows = spark.sql(
f"""
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM {identifier}.all_manifests
"""
).collect()
assert [row.added_data_files_count for row in rows] == [1, 0, 1, 1, 1]
assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0]
assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0]
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_object_storage_data_files(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
tbl = _create_table(
session_catalog=session_catalog,
identifier="default.object_stored",
properties={"format-version": format_version, TableProperties.OBJECT_STORE_ENABLED: True},
data=[arrow_table_with_null],
)
tbl.append(arrow_table_with_null)
paths = tbl.inspect.data_files().to_pydict()["file_path"]
assert len(paths) == 2
for location in paths:
assert location.startswith("s3://warehouse/default/object_stored/data/")
parts = location.split("/")
assert len(parts) == 11
# Entropy binary directories should have been injected
for dir_name in parts[6:10]:
assert dir_name
assert all(c in "01" for c in dir_name)
@pytest.mark.integration
def test_python_writes_with_spark_snapshot_reads(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table
) -> None:
identifier = "default.python_writes_with_spark_snapshot_reads"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [])
def get_current_snapshot_id(identifier: str) -> int:
return (
spark.sql(f"SELECT snapshot_id FROM {identifier}.snapshots order by committed_at desc limit 1")
.collect()[0]
.snapshot_id
)
tbl.append(arrow_table_with_null)
assert tbl.current_snapshot().snapshot_id == get_current_snapshot_id(identifier) # type: ignore
tbl.overwrite(arrow_table_with_null)
assert tbl.current_snapshot().snapshot_id == get_current_snapshot_id(identifier) # type: ignore
tbl.append(arrow_table_with_null)
assert tbl.current_snapshot().snapshot_id == get_current_snapshot_id(identifier) # type: ignore
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_python_writes_special_character_column_with_spark_reads(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = "default.python_writes_special_character_column_with_spark_reads"
column_name_with_special_character = "letter/abc"
TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = {
column_name_with_special_character: ["a", None, "z"],
"id": [1, 2, 3],
"name": ["AB", "CD", "EF"],
"address": [
{"street": "123", "city": "SFO", "zip": 12345, column_name_with_special_character: "a"},
{"street": "456", "city": "SW", "zip": 67890, column_name_with_special_character: "b"},
{"street": "789", "city": "Random", "zip": 10112, column_name_with_special_character: "c"},
],
}
pa_schema = pa.schema(
[
pa.field(column_name_with_special_character, pa.string()),
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field(
"address",
pa.struct(
[
pa.field("street", pa.string()),
pa.field("city", pa.string()),
pa.field("zip", pa.int32()),
pa.field(column_name_with_special_character, pa.string()),
]
),
),
]
)
arrow_table_with_special_character_column = pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=pa_schema)
tbl.append(arrow_table_with_special_character_column)
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
pyiceberg_df = tbl.scan().to_pandas()
assert spark_df.equals(pyiceberg_df)
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_python_writes_dictionary_encoded_column_with_spark_reads(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = "default.python_writes_dictionary_encoded_column_with_spark_reads"
TEST_DATA = {
"id": [1, 2, 3, 1, 1],
"name": ["AB", "CD", "EF", "CD", "EF"],
}
pa_schema = pa.schema(
[
pa.field("id", pa.dictionary(pa.int32(), pa.int32(), False)),
pa.field("name", pa.dictionary(pa.int32(), pa.string(), False)),
]
)
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=pa_schema)
tbl.append(arrow_table)
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
pyiceberg_df = tbl.scan().to_pandas()
# We're just interested in the content, PyIceberg actually makes a nice Categorical out of it:
# E AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="name") are different
# E
# E Attribute "dtype" are different
# E [left]: object
# E [right]: CategoricalDtype(categories=['AB', 'CD', 'EF'], ordered=False, categories_dtype=object)
pandas.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False, check_categorical=False)
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_python_writes_with_small_and_large_types_spark_reads(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = "default.python_writes_with_small_and_large_types_spark_reads"
TEST_DATA = {
"foo": ["a", None, "z"],
"id": [1, 2, 3],
"name": ["AB", "CD", "EF"],
"address": [
{"street": "123", "city": "SFO", "zip": 12345, "bar": "a"},
{"street": "456", "city": "SW", "zip": 67890, "bar": "b"},
{"street": "789", "city": "Random", "zip": 10112, "bar": "c"},
],
}
pa_schema = pa.schema(
[
pa.field("foo", pa.string()),
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field(
"address",
pa.struct(
[
pa.field("street", pa.string()),
pa.field("city", pa.string()),
pa.field("zip", pa.int32()),
pa.field("bar", pa.string()),
]
),
),
]
)
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=pa_schema)
tbl.append(arrow_table)
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
pyiceberg_df = tbl.scan().to_pandas()
assert spark_df.equals(pyiceberg_df)
arrow_table_on_read = tbl.scan().to_arrow()
assert arrow_table_on_read.schema == pa.schema(
[
pa.field("foo", pa.string()),
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field(
"address",
pa.struct(
[
pa.field("street", pa.string()),
pa.field("city", pa.string()),
pa.field("zip", pa.int32()),
pa.field("bar", pa.string()),
]
),
),
]
)
@pytest.mark.integration
def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.write_bin_pack_data_files"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [])
def get_data_files_count(identifier: str) -> int:
return spark.sql(
f"""
SELECT *
FROM {identifier}.files
"""
).count()
# writes 1 data file since the table is smaller than default target file size
assert arrow_table_with_null.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
tbl.append(arrow_table_with_null)
assert get_data_files_count(identifier) == 1
# writes 1 data file as long as table is smaller than default target file size
bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10)
assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
tbl.overwrite(bigger_arrow_tbl)
assert get_data_files_count(identifier) == 1
# writes multiple data files once target file size is overridden
target_file_size = arrow_table_with_null.nbytes
tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}).commit_transaction()
assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES)
assert target_file_size < bigger_arrow_tbl.nbytes
tbl.overwrite(bigger_arrow_tbl)
assert get_data_files_count(identifier) == 10
# writes half the number of data files when target file size doubles
target_file_size = arrow_table_with_null.nbytes * 2
tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}).commit_transaction()
assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES)
assert target_file_size < bigger_arrow_tbl.nbytes
tbl.overwrite(bigger_arrow_tbl)
assert get_data_files_count(identifier) == 5
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize(
"properties, expected_compression_name",
[
# REST catalog uses Zstandard by default: https://github.com/apache/iceberg/pull/8593
({}, "ZSTD"),
({"write.parquet.compression-codec": "uncompressed"}, "UNCOMPRESSED"),
({"write.parquet.compression-codec": "gzip", "write.parquet.compression-level": "1"}, "GZIP"),
({"write.parquet.compression-codec": "zstd", "write.parquet.compression-level": "1"}, "ZSTD"),
({"write.parquet.compression-codec": "snappy"}, "SNAPPY"),
],
)
def test_write_parquet_compression_properties(
spark: SparkSession,
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
format_version: int,
properties: Dict[str, Any],
expected_compression_name: str,
) -> None:
identifier = "default.write_parquet_compression_properties"
tbl = _create_table(session_catalog, identifier, {"format-version": format_version, **properties}, [arrow_table_with_null])
data_file_paths = [task.file.file_path for task in tbl.scan().plan_files()]
fs = S3FileSystem(
endpoint_override=session_catalog.properties["s3.endpoint"],
access_key=session_catalog.properties["s3.access-key-id"],
secret_key=session_catalog.properties["s3.secret-access-key"],
)
uri = urlparse(data_file_paths[0])
with fs.open_input_file(f"{uri.netloc}{uri.path}") as f:
parquet_metadata = pq.read_metadata(f)
compression = parquet_metadata.row_group(0).column(0).compression
assert compression == expected_compression_name
@pytest.mark.integration
@pytest.mark.parametrize(
"properties, expected_kwargs",
[
({"write.parquet.page-size-bytes": "42"}, {"data_page_size": 42}),
({"write.parquet.dict-size-bytes": "42"}, {"dictionary_pagesize_limit": 42}),
],
)
def test_write_parquet_other_properties(
mocker: MockerFixture,
spark: SparkSession,
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
properties: Dict[str, Any],
expected_kwargs: Dict[str, Any],
) -> None:
identifier = "default.test_write_parquet_other_properties"
# The properties we test cannot be checked on the resulting Parquet file, so we spy on the ParquetWriter call instead
ParquetWriter = mocker.spy(pq, "ParquetWriter")
_create_table(session_catalog, identifier, properties, [arrow_table_with_null])
call_kwargs = ParquetWriter.call_args[1]
for key, value in expected_kwargs.items():
assert call_kwargs.get(key) == value
@pytest.mark.integration
@pytest.mark.parametrize(
"properties",
[
{"write.parquet.row-group-size-bytes": "42"},
{"write.parquet.bloom-filter-enabled.column.bool": "42"},
{"write.parquet.bloom-filter-max-bytes": "42"},
],
)
def test_write_parquet_unsupported_properties(
spark: SparkSession,
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
properties: Dict[str, str],
) -> None:
identifier = "default.write_parquet_unsupported_properties"
tbl = _create_table(session_catalog, identifier, properties, [])
with pytest.warns(UserWarning, match=r"Parquet writer option.*"):
tbl.append(arrow_table_with_null)
@pytest.mark.integration
def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_data_files"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [])
with pytest.raises(ValueError, match="Expected PyArrow table, got: not a df"):
tbl.overwrite("not a df")
with pytest.raises(ValueError, match="Expected PyArrow table, got: not a df"):
tbl.append("not a df")
@pytest.mark.integration
def test_summaries_with_only_nulls(
spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: pa.Table, arrow_table_with_only_nulls: pa.Table
) -> None:
identifier = "default.arrow_table_summaries_with_only_nulls"
tbl = _create_table(
session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data, arrow_table_with_only_nulls]
)
tbl.overwrite(arrow_table_without_data)
rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()
operations = [row.operation for row in rows]
assert operations == ["append", "append", "delete", "append"]
summaries = [row.summary for row in rows]
file_size = int(summaries[1]["added-files-size"])
assert file_size > 0
assert summaries[0] == {
"total-data-files": "0",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "0",
"total-position-deletes": "0",
"total-records": "0",
}
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": str(file_size),
"added-records": "2",
"total-data-files": "1",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": str(file_size),
"total-position-deletes": "0",
"total-records": "2",
}
assert summaries[2] == {
"deleted-data-files": "1",
"deleted-records": "2",
"removed-files-size": str(file_size),
"total-data-files": "0",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "0",
"total-position-deletes": "0",
"total-records": "0",
}
assert summaries[3] == {
"total-data-files": "0",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "0",
"total-position-deletes": "0",
"total-records": "0",
}
@pytest.mark.integration
def test_duckdb_url_import(warehouse: Path, arrow_table_with_null: pa.Table) -> None:
os.environ["TZ"] = "Etc/UTC"
time.tzset()
tz = pytz.timezone(os.environ["TZ"])
catalog = SqlCatalog("test_sql_catalog", uri="sqlite:///:memory:", warehouse=f"/{warehouse}")
catalog.create_namespace("default")
identifier = "default.arrow_table_v1_with_null"
tbl = _create_table(catalog, identifier, {}, [arrow_table_with_null])
location = tbl.metadata_location
import duckdb
duckdb.sql("INSTALL iceberg; LOAD iceberg;")
result = duckdb.sql(
f"""
SELECT *
FROM iceberg_scan('{location}')
"""
).fetchall()
assert result == [
(
False,
"a",
"aaaaaaaaaaaaaaaaaaaaaa",
1,
1,
0.0,
0.0,
datetime(2023, 1, 1, 19, 25),
datetime(2023, 1, 1, 19, 25, tzinfo=tz),
date(2023, 1, 1),
b"\x01",
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
),
(None, None, None, None, None, None, None, None, None, None, None, None),
(
True,
"z",
"zzzzzzzzzzzzzzzzzzzzzz",
9,
9,
0.8999999761581421,
0.9,
datetime(2023, 3, 1, 19, 25),
datetime(2023, 3, 1, 19, 25, tzinfo=tz),
date(2023, 3, 1),
b"\x12",
b"\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11",
),
]
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.arrow_write_data_and_evolve_schema_v{format_version}"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
pa_table = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
},
schema=pa.schema([pa.field("foo", pa.string(), nullable=True)]),
)
tbl = session_catalog.create_table(
identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)}
)
pa_table_with_column = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
"bar": [19, None, 25],
},
schema=pa.schema(
[
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]
),
)
with tbl.transaction() as txn:
with txn.update_schema() as schema_txn:
schema_txn.union_by_name(pa_table_with_column.schema)
txn.append(pa_table_with_column)
txn.overwrite(pa_table_with_column)
txn.delete("foo = 'a'")
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_create_table_transaction(catalog: Catalog, format_version: int) -> None:
if format_version == 1 and isinstance(catalog, RestCatalog):
pytest.skip(
"There is a bug in the REST catalog image (https://github.com/apache/iceberg/issues/8756) that prevents create and commit a staged version 1 table"
)
identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
try:
catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
pa_table = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
},
schema=pa.schema([pa.field("foo", pa.string(), nullable=True)]),
)
pa_table_with_column = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
"bar": [19, None, 25],
},
schema=pa.schema(
[
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]
),
)
with catalog.create_table_transaction(
identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)}
) as txn:
with txn.update_snapshot().fast_append() as snapshot_update:
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, io=txn._table.io):
snapshot_update.append_data_file(data_file)
with txn.update_schema() as schema_txn:
schema_txn.union_by_name(pa_table_with_column.schema)
with txn.update_snapshot().fast_append() as snapshot_update:
for data_file in _dataframe_to_data_files(
table_metadata=txn.table_metadata, df=pa_table_with_column, io=txn._table.io
):
snapshot_update.append_data_file(data_file)
tbl = catalog.load_table(identifier=identifier)
assert tbl.format_version == format_version
assert len(tbl.scan().to_arrow()) == 6
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_create_table_with_non_default_values(catalog: Catalog, table_schema_with_all_types: Schema, format_version: int) -> None:
if format_version == 1 and isinstance(catalog, RestCatalog):
pytest.skip(
"There is a bug in the REST catalog image (https://github.com/apache/iceberg/issues/8756) that prevents create and commit a staged version 1 table"
)
identifier = f"default.arrow_create_table_transaction_with_non_default_values_{catalog.name}_{format_version}"
identifier_ref = f"default.arrow_create_table_transaction_with_non_default_values_ref_{catalog.name}_{format_version}"
try:
catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
try:
catalog.drop_table(identifier=identifier_ref)
except NoSuchTableError:
pass
iceberg_spec = PartitionSpec(
*[PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="integer_partition")]
)
sort_order = SortOrder(*[SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC)])
txn = catalog.create_table_transaction(
identifier=identifier,
schema=table_schema_with_all_types,
partition_spec=iceberg_spec,
sort_order=sort_order,
properties={"format-version": format_version},
)
txn.commit_transaction()
tbl = catalog.load_table(identifier)
tbl_ref = catalog.create_table(
identifier=identifier_ref,
schema=table_schema_with_all_types,
partition_spec=iceberg_spec,
sort_order=sort_order,
properties={"format-version": format_version},
)
assert tbl.format_version == tbl_ref.format_version
assert tbl.schema() == tbl_ref.schema()
assert tbl.schemas() == tbl_ref.schemas()
assert tbl.spec() == tbl_ref.spec()
assert tbl.specs() == tbl_ref.specs()
assert tbl.sort_order() == tbl_ref.sort_order()
assert tbl.sort_orders() == tbl_ref.sort_orders()
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_properties_int_value(
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
format_version: int,
) -> None:
# table properties can be set to int, but still serialized to string
property_with_int = {"property_name": 42}
identifier = "default.test_table_properties_int_value"
tbl = _create_table(
session_catalog, identifier, {"format-version": format_version, **property_with_int}, [arrow_table_with_null]
)
assert isinstance(tbl.properties["property_name"], str)
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_properties_raise_for_none_value(
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
format_version: int,
) -> None:
property_with_none = {"property_name": None}
identifier = "default.test_table_properties_raise_for_none_value"
with pytest.raises(ValidationError) as exc_info:
_ = _create_table(
session_catalog, identifier, {"format-version": format_version, **property_with_none}, [arrow_table_with_null]
)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_snapshots(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.table_metadata_snapshots"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
tbl.append(arrow_table_with_null)
# should produce a DELETE entry
tbl.overwrite(arrow_table_with_null)
# Since we don't rewrite, this should produce a new manifest with an ADDED entry
tbl.append(arrow_table_with_null)
df = tbl.inspect.snapshots()
assert df.column_names == [
"committed_at",
"snapshot_id",
"parent_id",
"operation",
"manifest_list",
"summary",
]
for committed_at in df["committed_at"]:
assert isinstance(committed_at.as_py(), datetime)
for snapshot_id in df["snapshot_id"]:
assert isinstance(snapshot_id.as_py(), int)
assert df["parent_id"][0].as_py() is None
assert df["parent_id"][1:].to_pylist() == df["snapshot_id"][:-1].to_pylist()
assert [operation.as_py() for operation in df["operation"]] == ["append", "delete", "append", "append"]
for manifest_list in df["manifest_list"]:
assert manifest_list.as_py().startswith("s3://")
file_size = int(next(value for key, value in df["summary"][0].as_py() if key == "added-files-size"))
assert file_size > 0
# Append
assert df["summary"][0].as_py() == [
("added-files-size", str(file_size)),
("added-data-files", "1"),
("added-records", "3"),
("total-data-files", "1"),
("total-delete-files", "0"),
("total-records", "3"),
("total-files-size", str(file_size)),
("total-position-deletes", "0"),
("total-equality-deletes", "0"),
]
# Delete
assert df["summary"][1].as_py() == [
("removed-files-size", str(file_size)),
("deleted-data-files", "1"),
("deleted-records", "3"),
("total-data-files", "0"),
("total-delete-files", "0"),
("total-records", "0"),
("total-files-size", "0"),
("total-position-deletes", "0"),
("total-equality-deletes", "0"),
]
lhs = spark.table(f"{identifier}.snapshots").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if column == "summary":
# Arrow returns a list of tuples, instead of a dict
right = dict(right)
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
assert left == right, f"Difference in column {column}: {left} != {right}"
@pytest.mark.integration
def test_write_within_transaction(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.write_in_open_transaction"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [])
def get_metadata_entries_count(identifier: str) -> int:
return spark.sql(
f"""
SELECT *
FROM {identifier}.metadata_log_entries
"""
).count()
# one metadata entry from table creation
assert get_metadata_entries_count(identifier) == 1
# one more metadata entry from transaction
with tbl.transaction() as tx:
tx.set_properties({"test": "1"})
tx.append(arrow_table_with_null)
assert get_metadata_entries_count(identifier) == 2
# two more metadata entries added from two separate transactions
tbl.transaction().set_properties({"test": "2"}).commit_transaction()
tbl.append(arrow_table_with_null)
assert get_metadata_entries_count(identifier) == 4
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_hive_catalog_storage_descriptor(
session_catalog_hive: HiveCatalog,
pa_schema: pa.Schema,
arrow_table_with_null: pa.Table,
spark: SparkSession,
format_version: int,
) -> None:
tbl = _create_table(
session_catalog_hive, "default.test_storage_descriptor", {"format-version": format_version}, [arrow_table_with_null]
)
# check if pyiceberg can read the table
assert len(tbl.scan().to_arrow()) == 3
# check if spark can read the table
assert spark.sql("SELECT * FROM hive.default.test_storage_descriptor").count() == 3
@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_sanitize_character_partitioned(catalog: Catalog) -> None:
table_name = "default.test_table_partitioned_sanitized_character"
try:
catalog.drop_table(table_name)
except NoSuchTableError:
pass
tbl = _create_table(
session_catalog=catalog,
identifier=table_name,
schema=Schema(NestedField(field_id=1, name="some.id", type=IntegerType(), required=True)),
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, name="some.id_identity", transform=IdentityTransform())
),
data=[pa.Table.from_arrays([range(22)], schema=pa.schema([pa.field("some.id", pa.int32(), nullable=False)]))],
)
assert len(tbl.scan().to_arrow()) == 22
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = "default.test_table_write_subset_of_schema"
tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null])
arrow_table_without_some_columns = arrow_table_with_null.combine_chunks().drop(arrow_table_with_null.column_names[0])
assert len(arrow_table_without_some_columns.columns) < len(arrow_table_with_null.columns)
tbl.overwrite(arrow_table_without_some_columns)
tbl.append(arrow_table_without_some_columns)
# overwrite and then append should produce twice the data
assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.filterwarnings("ignore:Delete operation did not match any records")
def test_table_write_out_of_order_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = "default.test_table_write_out_of_order_schema"
# rotate the schema fields by 1
fields = list(arrow_table_with_null.schema)
rotated_fields = fields[1:] + fields[:1]
rotated_schema = pa.schema(rotated_fields)
assert arrow_table_with_null.schema != rotated_schema
tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=rotated_schema)
tbl.overwrite(arrow_table_with_null)
tbl.append(arrow_table_with_null)
# overwrite and then append should produce twice the data
assert len(tbl.scan().to_arrow()) == len(arrow_table_with_null) * 2
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_write_schema_with_valid_nullability_diff(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_table_write_with_valid_nullability_diff"
table_schema = Schema(
NestedField(field_id=1, name="long", field_type=LongType(), required=False),
)
other_schema = pa.schema(
(
pa.field("long", pa.int64(), nullable=False), # can support writing required pyarrow field to optional Iceberg field
)
)
arrow_table = pa.Table.from_pydict(
{
"long": [1, 9],
},
schema=other_schema,
)
tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table], schema=table_schema)
# table's long field should cast to be optional on read
written_arrow_table = tbl.scan().to_arrow()
assert written_arrow_table == arrow_table.cast(pa.schema((pa.field("long", pa.int64(), nullable=True),)))
lhs = spark.table(f"{identifier}").toPandas()
rhs = written_arrow_table.to_pandas()
for column in written_arrow_table.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
assert left == right
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_write_schema_with_valid_upcast(
spark: SparkSession,
session_catalog: Catalog,
format_version: int,
table_schema_with_promoted_types: Schema,
pyarrow_schema_with_promoted_types: pa.Schema,
pyarrow_table_with_promoted_types: pa.Table,
) -> None:
identifier = "default.test_table_write_with_valid_upcast"
tbl = _create_table(
session_catalog,
identifier,
{"format-version": format_version},
[pyarrow_table_with_promoted_types],
schema=table_schema_with_promoted_types,
)
# table's long field should cast to long on read
written_arrow_table = tbl.scan().to_arrow()
assert written_arrow_table == pyarrow_table_with_promoted_types.cast(
pa.schema(
(
pa.field("long", pa.int64(), nullable=True),
pa.field("list", pa.list_(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
)
)
)
lhs = spark.table(f"{identifier}").toPandas()
rhs = written_arrow_table.to_pandas()
for column in written_arrow_table.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if column == "map":
# Arrow returns a list of tuples, instead of a dict
right = dict(right)
if column == "list":
# Arrow returns an array, convert to list for equality check
left, right = list(left), list(right)
if column == "uuid":
# Spark Iceberg represents UUID as hex string like '715a78ef-4e53-4089-9bf9-3ad0ee9bf545'
# whereas PyIceberg represents UUID as bytes on read
left, right = left.replace("-", ""), right.hex()
assert left == right
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_all_timestamp_precision(
mocker: MockerFixture,
spark: SparkSession,
session_catalog: Catalog,
format_version: int,
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
arrow_table_with_all_timestamp_precisions: pa.Table,
arrow_table_schema_with_all_microseconds_timestamp_precisions: pa.Schema,
) -> None:
identifier = "default.table_all_timestamp_precision"
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"})
tbl = _create_table(
session_catalog,
identifier,
{"format-version": format_version},
data=[arrow_table_with_all_timestamp_precisions],
schema=arrow_table_schema_with_all_timestamp_precisions,
)
tbl.overwrite(arrow_table_with_all_timestamp_precisions)
written_arrow_table = tbl.scan().to_arrow()
assert written_arrow_table.schema == arrow_table_schema_with_all_microseconds_timestamp_precisions
assert written_arrow_table == arrow_table_with_all_timestamp_precisions.cast(
arrow_table_schema_with_all_microseconds_timestamp_precisions, safe=False
)
lhs = spark.table(f"{identifier}").toPandas()
rhs = written_arrow_table.to_pandas()
for column in written_arrow_table.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if pd.isnull(left):
assert pd.isnull(right)
else:
# Check only upto microsecond precision since Spark loaded dtype is timezone unaware
# and supports upto microsecond precision
assert left.timestamp() == right.timestamp(), f"Difference in column {column}: {left} != {right}"
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
tbl_a = _create_table(
session_catalog,
"default.merge_manifest_a",
{"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version},
[],
)
tbl_b = _create_table(
session_catalog,
"default.merge_manifest_b",
{
"commit.manifest-merge.enabled": "true",
"commit.manifest.min-count-to-merge": "1",
"commit.manifest.target-size-bytes": "1",
"format-version": format_version,
},
[],
)
tbl_c = _create_table(
session_catalog,
"default.merge_manifest_c",
{"commit.manifest.min-count-to-merge": "1", "format-version": format_version},
[],
)
# tbl_a should merge all manifests into 1
tbl_a.append(arrow_table_with_null)
tbl_a.append(arrow_table_with_null)
tbl_a.append(arrow_table_with_null)
# tbl_b should not merge any manifests because the target size is too small
tbl_b.append(arrow_table_with_null)
tbl_b.append(arrow_table_with_null)
tbl_b.append(arrow_table_with_null)
# tbl_c should not merge any manifests because merging is disabled
tbl_c.append(arrow_table_with_null)
tbl_c.append(arrow_table_with_null)
tbl_c.append(arrow_table_with_null)
assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore
assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore
assert len(tbl_c.current_snapshot().manifests(tbl_c.io)) == 3 # type: ignore
# tbl_a and tbl_c should contain the same data
assert tbl_a.scan().to_arrow().equals(tbl_c.scan().to_arrow())
# tbl_b and tbl_c should contain the same data
assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow())
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
tbl_a = _create_table(
session_catalog,
"default.merge_manifest_a",
{"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version},
[],
)
# tbl_a should merge all manifests into 1
tbl_a.append(arrow_table_with_null)
tbl_a_first_entries = tbl_a.inspect.entries().to_pydict()
first_snapshot_id = tbl_a_first_entries["snapshot_id"][0]
first_data_file_path = tbl_a_first_entries["data_file"][0]["file_path"]
tbl_a.append(arrow_table_with_null)
tbl_a.append(arrow_table_with_null)
assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore
# verify the sequence number of tbl_a's only manifest file
tbl_a_manifest = tbl_a.current_snapshot().manifests(tbl_a.io)[0] # type: ignore
assert tbl_a_manifest.sequence_number == (3 if format_version == 2 else 0)
assert tbl_a_manifest.min_sequence_number == (1 if format_version == 2 else 0)
# verify the manifest entries of tbl_a, in which the manifests are merged
tbl_a_entries = tbl_a.inspect.entries().to_pydict()
assert tbl_a_entries["status"] == [1, 0, 0]
assert tbl_a_entries["sequence_number"] == [3, 2, 1] if format_version == 2 else [0, 0, 0]
assert tbl_a_entries["file_sequence_number"] == [3, 2, 1] if format_version == 2 else [0, 0, 0]
for i in range(3):
tbl_a_data_file = tbl_a_entries["data_file"][i]
assert tbl_a_data_file["column_sizes"] == [
(1, 49),
(2, 78),
(3, 128),
(4, 94),
(5, 118),
(6, 94),
(7, 118),
(8, 118),
(9, 118),
(10, 94),
(11, 78),
(12, 109),
]
assert tbl_a_data_file["content"] == 0
assert tbl_a_data_file["equality_ids"] is None
assert tbl_a_data_file["file_format"] == "PARQUET"
assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/")
if tbl_a_data_file["file_path"] == first_data_file_path:
# verify that the snapshot id recorded should be the one where the file was added
assert tbl_a_entries["snapshot_id"][i] == first_snapshot_id
assert tbl_a_data_file["key_metadata"] is None
assert tbl_a_data_file["lower_bounds"] == [
(1, b"\x00"),
(2, b"a"),
(3, b"aaaaaaaaaaaaaaaa"),
(4, b"\x01\x00\x00\x00"),
(5, b"\x01\x00\x00\x00\x00\x00\x00\x00"),
(6, b"\x00\x00\x00\x80"),
(7, b"\x00\x00\x00\x00\x00\x00\x00\x80"),
(8, b"\x00\x9bj\xca8\xf1\x05\x00"),
(9, b"\x00\x9bj\xca8\xf1\x05\x00"),
(10, b"\x9eK\x00\x00"),
(11, b"\x01"),
(12, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"),
]
assert tbl_a_data_file["nan_value_counts"] == []
assert tbl_a_data_file["null_value_counts"] == [
(1, 1),
(2, 1),
(3, 1),
(4, 1),
(5, 1),
(6, 1),
(7, 1),
(8, 1),
(9, 1),
(10, 1),
(11, 1),
(12, 1),
]
assert tbl_a_data_file["partition"] == {}
assert tbl_a_data_file["record_count"] == 3
assert tbl_a_data_file["sort_order_id"] is None
assert tbl_a_data_file["split_offsets"] == [4]
assert tbl_a_data_file["upper_bounds"] == [
(1, b"\x01"),
(2, b"z"),
(3, b"zzzzzzzzzzzzzzz{"),
(4, b"\t\x00\x00\x00"),
(5, b"\t\x00\x00\x00\x00\x00\x00\x00"),
(6, b"fff?"),
(7, b"\xcd\xcc\xcc\xcc\xcc\xcc\xec?"),
(8, b"\x00\xbb\r\xab\xdb\xf5\x05\x00"),
(9, b"\x00\xbb\r\xab\xdb\xf5\x05\x00"),
(10, b"\xd9K\x00\x00"),
(11, b"\x12"),
(12, b"\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11"),
]
assert tbl_a_data_file["value_counts"] == [
(1, 3),
(2, 3),
(3, 3),
(4, 3),
(5, 3),
(6, 3),
(7, 3),
(8, 3),
(9, 3),
(10, 3),
(11, 3),
(12, 3),
]
@pytest.mark.integration
def test_rest_catalog_with_empty_catalog_name_append_data(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.test_rest_append"
test_catalog = load_catalog(
"", # intentionally empty
**session_catalog.properties,
)
tbl = _create_table(test_catalog, identifier, data=[])
tbl.append(arrow_table_with_null)
@pytest.mark.integration
def test_table_v1_with_null_nested_namespace(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.lower.table_v1_with_null_nested_namespace"
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
assert session_catalog.load_table(identifier) is not None
assert session_catalog.table_exists(identifier)
# We expect no error here
session_catalog.drop_table(identifier)
@pytest.mark.integration
def test_view_exists(
spark: SparkSession,
session_catalog: Catalog,
) -> None:
identifier = "default.some_view"
spark.sql(
f"""
CREATE VIEW {identifier}
AS
(SELECT 1 as some_col)
"""
).collect()
assert session_catalog.view_exists(identifier)
session_catalog.drop_view(identifier) # clean up
@pytest.mark.integration
def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:
schema = Schema(
NestedField(1, "id", StringType(), required=True),
NestedField(2, "name", StringType(), required=False),
identifier_field_ids=[1],
)
data = pa.Table.from_pylist(
[
{"id": "1", "name": "Amsterdam"},
{"id": "2", "name": "San Francisco"},
{"id": "3", "name": "Drachten"},
],
schema=schema.as_arrow(),
)
identifier = "default.test_overwrite_all_data_with_filter"
tbl = _create_table(session_catalog, identifier, data=[data], schema=schema)
tbl.overwrite(data, In("id", ["1", "2", "3"]))
assert len(tbl.scan().to_arrow()) == 3
@pytest.mark.integration
def test_delete_threshold(session_catalog: Catalog) -> None:
schema = Schema(
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
NestedField(field_id=104, name="relevancy_score", field_type=DoubleType(), required=False),
)
partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))
try:
session_catalog.drop_table(
identifier="default.scores",
)
except NoSuchTableError:
pass
session_catalog.create_table(
identifier="default.scores",
schema=schema,
partition_spec=partition_spec,
)
# Parameters
num_rows = 100 # Number of rows in the dataframe
id_min, id_max = 1, 10000
date_start, date_end = date(2024, 1, 1), date(2024, 2, 1)
# Generate the 'id' column
id_column = [random.randint(id_min, id_max) for _ in range(num_rows)]
# Generate the 'created_at' column as dates only
date_range = pd.date_range(start=date_start, end=date_end, freq="D").to_list() # Daily frequency for dates
created_at_column = [random.choice(date_range) for _ in range(num_rows)] # Convert to string (YYYY-MM-DD format)
# Generate the 'relevancy_score' column with a peak around 0.1
relevancy_score_column = [random.betavariate(2, 20) for _ in range(num_rows)] # Adjusting parameters to peak around 0.1
# Create the dataframe
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})
iceberg_table = session_catalog.load_table("default.scores")
# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
arrow_schema = iceberg_table.schema().as_arrow()
docs_table = pa.Table.from_pandas(df, schema=arrow_schema)
# Append the data to the Iceberg table
iceberg_table.append(docs_table)
delete_condition = GreaterThanOrEqual("relevancy_score", 0.1)
lower_before = len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow())
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
iceberg_table.delete(delete_condition)
assert len(iceberg_table.scan().to_arrow()) == lower_before
@pytest.mark.integration
def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> None:
random.seed(876)
N = 1440
d = {
"timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) + timedelta(minutes=i) for i in range(N)]),
"category": pa.array([random.choice(["A", "B", "C"]) for _ in range(N)]),
"value": pa.array([random.gauss(0, 1) for _ in range(N)]),
}
data = pa.Table.from_pydict(d)
try:
session_catalog.drop_table(
identifier="default.test_error_table",
)
except NoSuchTableError:
pass
table = session_catalog.create_table(
"default.test_error_table",
schema=data.schema,
)
with table.update_spec() as update:
update.add_field("timestamp", transform=HourTransform())
table.append(data)
with table.update_spec() as update:
update.add_field("category", transform=IdentityTransform())
data_ = data.filter(
(pc.field("category") == "A")
& (pc.field("timestamp") >= datetime(2023, 1, 1, 0))
& (pc.field("timestamp") < datetime(2023, 1, 1, 1))
)
table.overwrite(
df=data_,
overwrite_filter=And(
And(
GreaterThanOrEqual("timestamp", datetime(2023, 1, 1, 0).isoformat()),
LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()),
),
EqualTo("category", "A"),
),
)
@pytest.mark.integration
def test_writing_null_structs(session_catalog: Catalog) -> None:
import pyarrow as pa
schema = pa.schema(
[
pa.field(
"struct_field_1",
pa.struct(
[
pa.field("string_nested_1", pa.string()),
pa.field("int_item_2", pa.int32()),
pa.field("float_item_2", pa.float32()),
]
),
),
]
)
records = [
{
"struct_field_1": {
"string_nested_1": "nest_1",
"int_item_2": 1234,
"float_item_2": 1.234,
},
},
{},
]
try:
session_catalog.drop_table(
identifier="default.test_writing_null_structs",
)
except NoSuchTableError:
pass
table = session_catalog.create_table("default.test_writing_null_structs", schema)
pyarrow_table: pa.Table = pa.Table.from_pylist(records, schema=schema)
table.append(pyarrow_table)
assert pyarrow_table.to_pandas()["struct_field_1"].tolist() == table.scan().to_pandas()["struct_field_1"].tolist()
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_abort_table_transaction_on_exception(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.table_test_abort_table_transaction_on_exception"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
# Pre-populate some data
tbl.append(arrow_table_with_null)
table_size = len(arrow_table_with_null)
assert len(tbl.scan().to_pandas()) == table_size
# try to commit a transaction that raises exception at the middle
with pytest.raises(ValueError):
with tbl.transaction() as txn:
txn.append(arrow_table_with_null)
raise ValueError
txn.append(arrow_table_with_null) # type: ignore
# Validate the transaction is aborted and no partial update is applied
assert len(tbl.scan().to_pandas()) == table_size # type: ignore
@pytest.mark.integration
def test_write_optional_list(session_catalog: Catalog) -> None:
identifier = "default.test_write_optional_list"
schema = Schema(
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="my_list",
field_type=ListType(element_id=45, element=StringType(), element_required=False),
required=False,
),
)
session_catalog.create_table_if_not_exists(identifier, schema)
df_1 = pa.Table.from_pylist(
[
{"name": "one", "my_list": ["test"]},
{"name": "another", "my_list": ["test"]},
]
)
session_catalog.load_table(identifier).append(df_1)
assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 2
df_2 = pa.Table.from_pylist(
[
{"name": "one"},
{"name": "another"},
]
)
session_catalog.load_table(identifier).append(df_2)
assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 4
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_evolve_and_write(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_evolve_and_write"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}, schema=Schema())
other_table = session_catalog.load_table(identifier)
numbers = pa.array([1, 2, 3, 4], type=pa.int32())
with tbl.update_schema() as upd:
# This is not known by other_table
upd.add_column("id", IntegerType())
with other_table.transaction() as tx:
# Refreshes the underlying metadata, and the schema
other_table.refresh()
tx.append(
pa.Table.from_arrays(
[
numbers,
],
schema=pa.schema(
[
pa.field("id", pa.int32(), nullable=True),
]
),
)
)
assert session_catalog.load_table(identifier).scan().to_arrow().column(0).combine_chunks() == numbers