blob: 2cdf9916ee431dd031dd5a3b56b4d8f23184b11f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from datetime import datetime
from typing import List
import pyarrow as pa
import pytest
from pyspark.sql import SparkSession
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import AlwaysTrue, EqualTo
from pyiceberg.manifest import ManifestEntryStatus
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Summary
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, TimestampType
def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
for sql in sqls:
spark.sql(sql)
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES('format-version' = {format_version})
""",
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
""",
f"""
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
""",
],
)
tbl = session_catalog.load_table(identifier)
tbl.delete(EqualTo("number_partitioned", 10))
# No overwrite operation
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "delete"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 11], "number": [20, 30]}
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES('format-version' = {format_version})
""",
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
""",
f"""
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
""",
],
)
tbl = session_catalog.load_table(identifier)
tbl.delete(EqualTo("number", 20))
# We don't delete a whole partition, so there is only a overwrite
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [30, 30]}
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES('format-version' = {format_version})
""",
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
""",
f"""
INSERT INTO {identifier} VALUES (11, 20), (11, NULL)
""",
],
)
tbl = session_catalog.load_table(identifier)
tbl.delete(EqualTo("number", 20))
# We don't delete a whole partition, so there is only a overwrite
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [None, 30]}
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.filterwarnings("ignore:Delete operation did not match any records")
def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES('format-version' = {format_version})
""",
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
""",
],
)
tbl = session_catalog.load_table(identifier)
tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10], "number": [20, 30]}
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_delete_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
""",
# Generate a positional delete
f"""
DELETE FROM {identifier} WHERE number = 30
""",
],
)
tbl = session_catalog.load_table(identifier)
# Assert that there is just a single Parquet file, that has one merge on read file
files = list(tbl.scan().plan_files())
assert len(files) == 1
assert len(files[0].delete_files) == 1
# Will rewrite a data file without the positional delete
tbl.delete(EqualTo("number", 40))
# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.test_delete_partitioned_table_positional_deletes_empty_batch"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'write.parquet.row-group-limit'=1
)
""",
],
)
tbl = session_catalog.load_table(identifier)
arrow_table = pa.Table.from_arrays(
[
pa.array([10, 10, 10]),
pa.array([1, 2, 3]),
],
schema=pa.schema([pa.field("number_partitioned", pa.int32()), pa.field("number", pa.int32())]),
)
tbl.append(arrow_table)
assert len(tbl.scan().to_arrow()) == 3
run_spark_commands(
spark,
[
# Generate a positional delete
f"""
DELETE FROM {identifier} WHERE number = 1
""",
],
)
# Assert that there is just a single Parquet file, that has one merge on read file
tbl = tbl.refresh()
files = list(tbl.scan().plan_files())
assert len(files) == 1
assert len(files[0].delete_files) == 1
assert len(tbl.scan().to_arrow()) == 2
assert len(tbl.scan(row_filter="number_partitioned == 10").to_arrow()) == 2
assert len(tbl.scan(row_filter="number_partitioned == 1").to_arrow()) == 0
reader = tbl.scan(row_filter="number_partitioned == 1").to_arrow_batch_reader()
assert isinstance(reader, pa.RecordBatchReader)
assert len(reader.read_all()) == 0
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number int
)
USING iceberg
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
],
)
tbl = session_catalog.load_table(identifier)
arrow_table = pa.Table.from_arrays(
[
pa.array(list(range(1, 1001)) * 100),
],
schema=pa.schema([pa.field("number", pa.int32())]),
)
tbl.append(arrow_table)
run_spark_commands(
spark,
[
f"""
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
""",
],
)
tbl.refresh()
reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
assert isinstance(reader, pa.RecordBatchReader)
pyiceberg_count = len(reader.read_all())
expected_count = 46 * 100
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
f"""
INSERT INTO {identifier} VALUES (10, 1), (10, 2), (20, 3)
""",
],
)
tbl = session_catalog.load_table(identifier)
files = list(tbl.scan().plan_files())
assert len(files) == 2
arrow_schema = pa.schema([pa.field("number_partitioned", pa.int32()), pa.field("number", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{"number_partitioned": 10, "number": 4},
{"number_partitioned": 10, "number": 5},
],
schema=arrow_schema,
)
# Will rewrite a data file without the positional delete
tbl.overwrite(arrow_tbl, "number_partitioned == 10")
# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "append"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10, 20], "number": [4, 5, 3]}
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.table_partitioned_delete_sequence_number"
# This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that
# the manifest gets rewritten (but not the data file with a MoR), and check if the delete is still there
# to assure that the sequence numbers are maintained
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
f"""
INSERT INTO {identifier} VALUES (10, 100), (10, 101), (20, 200), (20, 201), (20, 202)
""",
# Generate a positional delete
f"""
DELETE FROM {identifier} WHERE number = 101
""",
],
)
tbl = session_catalog.load_table(identifier)
files = list(tbl.scan().plan_files())
assert len(files) == 2
# Will rewrite a data file without a positional delete
tbl.delete(EqualTo("number", 201))
# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
snapshots = tbl.snapshots()
assert len(snapshots) == 3
# Snapshots produced by Spark
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"]
# Will rewrite one parquet file
assert snapshots[2].summary == Summary(
Operation.OVERWRITE,
**{
"added-files-size": snapshots[2].summary["total-files-size"],
"added-data-files": "1",
"added-records": "2",
"changed-partition-count": "1",
"total-files-size": snapshots[2].summary["total-files-size"],
"total-delete-files": "0",
"total-data-files": "1",
"total-position-deletes": "0",
"total-records": "2",
"total-equality-deletes": "0",
"deleted-data-files": "2",
"removed-delete-files": "1",
"deleted-records": "5",
"removed-files-size": snapshots[2].summary["removed-files-size"],
"removed-position-deletes": "1",
},
)
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [20, 20, 10], "number": [200, 202, 100]}
@pytest.mark.integration
def test_delete_no_match(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{"ints": 1},
{"ints": 3},
],
schema=arrow_schema,
)
iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
tbl_identifier = "default.test_delete_no_match"
try:
session_catalog.drop_table(tbl_identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
tbl.append(arrow_tbl)
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
tbl.delete("ints == 2") # Only 1 and 3 in the file, but is between the lower and upper bound
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
@pytest.mark.integration
def test_delete_overwrite(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{"ints": 1},
{"ints": 2},
],
schema=arrow_schema,
)
iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
tbl_identifier = "default.test_delete_overwrite"
try:
session_catalog.drop_table(tbl_identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
tbl.append(arrow_tbl)
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
arrow_tbl_overwrite = pa.Table.from_pylist(
[
{"ints": 3},
{"ints": 4},
],
schema=arrow_schema,
)
tbl.overwrite(arrow_tbl_overwrite, "ints == 2") # Should rewrite one file
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
Operation.APPEND,
Operation.OVERWRITE,
Operation.APPEND,
]
assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1]
@pytest.mark.integration
def test_delete_truncate(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{"ints": 1},
],
schema=arrow_schema,
)
iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
tbl_identifier = "default.test_delete_overwrite"
try:
session_catalog.drop_table(tbl_identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
tbl.append(arrow_tbl)
# Effectively a truncate
tbl.delete(delete_filter=AlwaysTrue())
manifests = tbl.current_snapshot().manifests(tbl.io)
assert len(manifests) == 1
entries = manifests[0].fetch_manifest_entry(tbl.io, discard_deleted=False)
assert len(entries) == 1
assert entries[0].status == ManifestEntryStatus.DELETED
@pytest.mark.integration
def test_delete_overwrite_table_with_null(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[{"ints": 1}, {"ints": 2}, {"ints": None}],
schema=arrow_schema,
)
iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
tbl_identifier = "default.test_delete_overwrite_with_null"
try:
session_catalog.drop_table(tbl_identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
tbl.append(arrow_tbl)
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
arrow_tbl_overwrite = pa.Table.from_pylist(
[
{"ints": 3},
{"ints": 4},
],
schema=arrow_schema,
)
tbl.overwrite(arrow_tbl_overwrite, "ints == 2") # Should rewrite one file
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
Operation.APPEND,
Operation.OVERWRITE,
Operation.APPEND,
]
assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1, None]
@pytest.mark.integration
def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("floats", pa.float32())])
# Create Arrow Table with NaN values
data = [pa.array([1.0, float("nan"), 2.0], type=pa.float32())]
arrow_tbl = pa.Table.from_arrays(
data,
schema=arrow_schema,
)
iceberg_schema = Schema(NestedField(1, "floats", FloatType()))
tbl_identifier = "default.test_delete_overwrite_with_nan"
try:
session_catalog.drop_table(tbl_identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
tbl.append(arrow_tbl)
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
arrow_tbl_overwrite = pa.Table.from_pylist(
[
{"floats": 3.0},
{"floats": 4.0},
],
schema=arrow_schema,
)
"""
We want to test the _expression_to_complementary_pyarrow function can generate a correct complimentary filter
for selecting records to remain in the new overwritten file.
Compared with test_delete_overwrite_table_with_null which tests rows with null cells,
nan testing is faced with a more tricky issue:
A filter of (field == value) will not include cells of nan but col != val will.
(Interestingly, neither == or != will include null)
This means if we set the test case as floats == 2.0 (equal predicate as in test_delete_overwrite_table_with_null),
test will pass even without the logic under test
in _NullNaNUnmentionedTermsCollector (a helper of _expression_to_complementary_pyarrow
to handle revert of iceberg expression of is_null/not_null/is_nan/not_nan).
Instead, we test the filter of !=, so that the revert is == which exposes the issue.
"""
tbl.overwrite(arrow_tbl_overwrite, "floats != 2.0") # Should rewrite one file
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
Operation.APPEND,
Operation.OVERWRITE,
Operation.APPEND,
]
result = tbl.scan().to_arrow()["floats"].to_pylist()
from math import isnan
assert any(isnan(e) for e in result)
assert 2.0 in result
assert 3.0 in result
assert 4.0 in result
@pytest.mark.integration
def test_delete_after_partition_evolution_from_unpartitioned(session_catalog: RestCatalog) -> None:
identifier = "default.test_delete_after_partition_evolution_from_unpartitioned"
arrow_table = pa.Table.from_arrays(
[
pa.array([2, 3, 4, 5, 6]),
],
names=["idx"],
)
try:
session_catalog.drop_table(identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(
identifier,
schema=Schema(
NestedField(1, "idx", LongType()),
),
)
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column("idx", "id")
with tx.update_spec() as spec:
spec.add_field("id", IdentityTransform())
# Append one more time to create data files with two partition specs
tbl.append(arrow_table.rename_columns(["id"]))
tbl.delete("id == 4")
# Expect 8 records: 10 records - 2
assert len(tbl.scan().to_arrow()) == 8
@pytest.mark.integration
def test_delete_after_partition_evolution_from_partitioned(session_catalog: RestCatalog) -> None:
identifier = "default.test_delete_after_partition_evolution_from_partitioned"
arrow_table = pa.Table.from_arrays(
[
pa.array([2, 3, 4, 5, 6]),
pa.array([
datetime(2021, 5, 19),
datetime(2022, 7, 25),
datetime(2023, 3, 22),
datetime(2024, 7, 17),
datetime(2025, 2, 22),
]),
],
names=["idx", "ts"],
)
try:
session_catalog.drop_table(identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(
identifier,
schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())),
partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")),
)
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column("idx", "id")
with tx.update_spec() as spec:
spec.add_field("id", IdentityTransform())
# Append one more time to create data files with two partition specs
tbl.append(arrow_table.rename_columns(["id", "ts"]))
tbl.delete("id == 4")
# Expect 8 records: 10 records - 2
assert len(tbl.scan().to_arrow()) == 8