blob: 34f434a2b102f28339afca9d8b49d080876142e3 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Tests for _write_table() — writes to a local Paimon filesystem catalog.
All tests run without Docker or external services. Data written via Daft is
verified by reading back with both _read_table() and pypaimon's native
reader to ensure correctness.
"""
from __future__ import annotations
import pyarrow as pa
import pytest
pypaimon = pytest.importorskip("pypaimon")
daft = pytest.importorskip("daft")
from pypaimon.daft.daft_compat import file_range_position_field, has_file_range_reads
from pypaimon.daft.daft_catalog import PaimonTable
from pypaimon.daft.daft_datasink import PaimonDataSink
from pypaimon.daft.daft_paimon import _read_table, _write_table
requires_blob = pytest.mark.skipif(not has_file_range_reads(), reason="BLOB support requires daft >= 0.7.11")
# ---------------------------------------------------------------------------
# Helpers & Fixtures
# ---------------------------------------------------------------------------
def _write_to_paimon(table, arrow_table, mode="append", overwrite_partition=None):
write_builder = table.new_batch_write_builder()
if mode == "overwrite":
write_builder.overwrite(overwrite_partition or {})
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
try:
table_write.write_arrow(arrow_table)
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
finally:
table_write.close()
table_commit.close()
def _create_id_dt_table(catalog, table_name: str):
schema = pypaimon.Schema.from_pyarrow_schema(
pa.schema([
pa.field("id", pa.int64()),
pa.field("dt", pa.string()),
]),
options={"bucket": "1", "file.format": "parquet", "bucket-key": "id"},
)
catalog.create_table(table_name, schema, ignore_if_exists=False)
return catalog.get_table(table_name)
@pytest.fixture(scope="function")
def local_paimon_catalog(tmp_path):
catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
catalog.create_database("test_db", ignore_if_exists=True)
return catalog, tmp_path
@pytest.fixture
def append_only_table(local_paimon_catalog):
catalog, tmp_path = local_paimon_catalog
schema = pypaimon.Schema.from_pyarrow_schema(
pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("value", pa.float64()),
pa.field("dt", pa.string()),
]),
partition_keys=["dt"],
options={"bucket": "1", "file.format": "parquet"},
)
catalog.create_table("test_db.append_table", schema, ignore_if_exists=False)
return catalog.get_table("test_db.append_table"), tmp_path
@pytest.fixture
def append_only_table_no_partition(local_paimon_catalog):
catalog, tmp_path = local_paimon_catalog
schema = pypaimon.Schema.from_pyarrow_schema(
pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
]),
options={"bucket": "1", "file.format": "parquet"},
)
catalog.create_table("test_db.append_no_part", schema, ignore_if_exists=False)
return catalog.get_table("test_db.append_no_part"), tmp_path
@pytest.fixture
def pk_table(local_paimon_catalog):
catalog, tmp_path = local_paimon_catalog
schema = pypaimon.Schema.from_pyarrow_schema(
pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("dt", pa.string()),
]),
partition_keys=["dt"],
primary_keys=["id", "dt"],
options={"bucket": "1", "file.format": "parquet"},
)
catalog.create_table("test_db.pk_table", schema, ignore_if_exists=False)
return catalog.get_table("test_db.pk_table"), tmp_path
# ---------------------------------------------------------------------------
# Basic append
# ---------------------------------------------------------------------------
def test_write_paimon_append_basic(append_only_table):
"""_write_table with mode='append' should persist data readable by _read_table."""
table, _ = append_only_table
df = daft.from_pydict(
{
"id": [1, 2, 3],
"name": ["alice", "bob", "charlie"],
"value": [1.1, 2.2, 3.3],
"dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
}
)
_write_table(df, table)
result = _read_table(table).sort("id").to_arrow()
assert result.num_rows == 3
assert result.column("id").to_pylist() == [1, 2, 3]
assert result.column("name").to_pylist() == ["alice", "bob", "charlie"]
def test_write_paimon_append_returns_summary(append_only_table):
"""_write_table should return a DataFrame with operation metadata columns."""
table, _ = append_only_table
df = daft.from_pydict(
{
"id": [10, 20],
"name": ["x", "y"],
"value": [5.0, 6.0],
"dt": ["2024-02-01", "2024-02-01"],
}
)
result = _write_table(df, table)
result_dict = result.to_pydict()
assert "operation" in result_dict
assert "rows" in result_dict
assert "file_size" in result_dict
assert "file_name" in result_dict
assert all(op == "ADD" for op in result_dict["operation"])
assert sum(result_dict["rows"]) == 2
assert all(s > 0 for s in result_dict["file_size"])
assert all(len(fn) > 0 for fn in result_dict["file_name"])
def test_write_paimon_append_multiple_times(append_only_table):
"""Multiple append writes should accumulate rows."""
table, _ = append_only_table
df1 = daft.from_pydict({"id": [1], "name": ["a"], "value": [1.0], "dt": ["2024-01-01"]})
df2 = daft.from_pydict({"id": [2], "name": ["b"], "value": [2.0], "dt": ["2024-01-02"]})
_write_table(df1, table)
_write_table(df2, table)
result = _read_table(table).sort("id").to_arrow()
assert result.num_rows == 2
assert result.column("id").to_pylist() == [1, 2]
def test_write_paimon_roundtrip_native_verify(append_only_table):
"""Data written by Daft should also be readable via pypaimon's native reader."""
table, _ = append_only_table
df = daft.from_pydict(
{
"id": [7, 8, 9],
"name": ["p", "q", "r"],
"value": [7.0, 8.0, 9.0],
"dt": ["2024-05-01", "2024-05-01", "2024-05-01"],
}
)
_write_table(df, table)
# Verify via pypaimon native reader
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
arrow_table = table_read.to_arrow(splits)
assert arrow_table.num_rows == 3
ids = sorted(arrow_table.column("id").to_pylist())
assert ids == [7, 8, 9]
def test_write_paimon_aligns_columns_by_name(local_paimon_catalog):
"""Input column order should not affect the values written to Paimon."""
catalog, _ = local_paimon_catalog
table = _create_id_dt_table(catalog, "test_db.column_order")
df = daft.from_pydict(
{
"dt": ["101", "202"],
"id": [1, 2],
}
)
_write_table(df, table)
result = _read_table(table).sort("id").to_pydict()
assert result == {
"id": [1, 2],
"dt": ["101", "202"],
}
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
arrow_table = table_read.to_arrow(splits)
native_rows = sorted(
zip(
arrow_table.column("id").to_pylist(),
arrow_table.column("dt").to_pylist(),
)
)
assert native_rows == [(1, "101"), (2, "202")]
# ---------------------------------------------------------------------------
# Overwrite
# ---------------------------------------------------------------------------
def test_write_paimon_overwrite_full_unpartitioned(append_only_table_no_partition):
"""mode='overwrite' on an unpartitioned table should replace all existing data."""
table, _ = append_only_table_no_partition
initial = daft.from_pydict({"id": [1, 2], "name": ["a", "b"]})
_write_table(initial, table)
replacement = daft.from_pydict({"id": [100], "name": ["z"]})
result = _write_table(replacement, table, mode="overwrite")
result_dict = result.to_pydict()
assert all(op == "OVERWRITE" for op in result_dict["operation"])
final = _read_table(table).to_arrow()
assert final.num_rows == 1
assert final.column("id").to_pylist() == [100]
def test_write_paimon_overwrite_dynamic_partition(append_only_table):
"""mode='overwrite' on a partitioned table should only replace touched partitions."""
table, _ = append_only_table
initial = daft.from_pydict(
{
"id": [1, 2, 3, 4],
"name": ["a", "b", "c", "d"],
"value": [1.0, 2.0, 3.0, 4.0],
"dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
}
)
_write_table(initial, table)
replacement = daft.from_pydict({"id": [10], "name": ["x"], "value": [10.0], "dt": ["2024-01-01"]})
result = _write_table(replacement, table, mode="overwrite")
result_dict = result.to_pydict()
assert all(op == "OVERWRITE" for op in result_dict["operation"])
final = _read_table(table).sort("id").to_pydict()
assert final["id"] == [3, 4, 10]
assert final["dt"] == ["2024-01-02", "2024-01-02", "2024-01-01"]
# ---------------------------------------------------------------------------
# Error handling
# ---------------------------------------------------------------------------
def test_write_paimon_invalid_mode(append_only_table):
"""An unsupported mode should raise a ValueError."""
table, _ = append_only_table
df = daft.from_pydict({"id": [1], "name": ["a"], "value": [1.0], "dt": ["2024-01-01"]})
with pytest.raises(ValueError, match="Only 'append' or 'overwrite' mode is supported"):
_write_table(df, table, mode="upsert")
def test_write_paimon_sink_serializes_without_file_io(append_only_table):
"""PaimonDataSink should not pickle table FileIO objects."""
from daft.pickle import dumps, loads
class Unpicklable:
def __reduce__(self):
raise TypeError("file io marker should not be serialized")
table, _ = append_only_table
table.file_io._unpicklable_marker = Unpicklable()
sink = PaimonDataSink(table, mode="overwrite")
commit_user = sink._write_builder.commit_user
restored = loads(dumps(sink))
assert restored.name() == sink.name()
assert restored._mode == "overwrite"
assert restored._write_builder.commit_user == commit_user
assert restored._write_builder.static_partition == {}
assert restored._table.identifier.get_full_name() == table.identifier.get_full_name()
def test_write_paimon_rejects_extra_columns(local_paimon_catalog):
"""Extra input columns should fail instead of being silently dropped."""
catalog, _ = local_paimon_catalog
table = _create_id_dt_table(catalog, "test_db.extra_columns")
df = daft.from_pydict(
{"id": [1], "dt": ["2024-01-01"], "extra": ["unused"]}
)
with pytest.raises(RuntimeError, match="Paimon write schema mismatch"):
_write_table(df, table)
def test_write_paimon_pk_table(pk_table):
"""Writing to a PK table should work and be readable back."""
table, _ = pk_table
df = daft.from_pydict(
{
"id": [1, 2, 3],
"name": ["x", "y", "z"],
"dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
}
)
_write_table(df, table)
result = _read_table(table).sort("id").to_arrow()
assert result.num_rows == 3
assert result.column("id").to_pylist() == [1, 2, 3]
# ---------------------------------------------------------------------------
# Schema conversion tests
# ---------------------------------------------------------------------------
class TestSchemaConversion:
"""Tests for schema conversion utilities."""
def test_align_batch_to_target_schema_by_name(self):
"""Record batches should be reordered by field name before casting."""
sink = PaimonDataSink.__new__(PaimonDataSink)
sink._target_schema = pa.schema([("id", pa.int64()), ("dt", pa.string())])
batch = pa.record_batch(
[
pa.array(["101", "202"], type=pa.large_string()),
pa.array([1, 2], type=pa.int64()),
],
names=["dt", "id"],
)
sink._validate_input_schema(batch.schema)
aligned = sink._align_batch_to_target_schema(batch)
assert aligned.schema == sink._target_schema
assert aligned.to_pydict() == {
"id": [1, 2],
"dt": ["101", "202"],
}
def test_validate_input_schema_rejects_mismatch(self):
"""Schema validation should fail fast on missing or extra fields."""
sink = PaimonDataSink.__new__(PaimonDataSink)
sink._target_schema = pa.schema([("id", pa.int64()), ("dt", pa.string())])
input_schema = pa.schema([("id", pa.int64()), ("extra", pa.string())])
with pytest.raises(
ValueError,
match="missing fields: \\['dt'\\]; extra fields: \\['extra'\\]",
):
sink._validate_input_schema(input_schema)
def test_write_large_string_conversion(self, local_paimon_catalog):
"""Test that large_string columns are converted to string for pypaimon."""
catalog, tmp_path = local_paimon_catalog
pa_schema = pa.schema([("id", pa.int64()), ("text", pa.string())])
paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
catalog.create_table("test_db.large_str", paimon_schema, ignore_if_exists=True)
table = catalog.get_table("test_db.large_str")
df = daft.from_pydict({"id": [1, 2, 3], "text": ["a", "b", "c"]})
_write_table(df, table, mode="append")
result = _read_table(table).to_pydict()
assert result["id"] == [1, 2, 3]
assert result["text"] == ["a", "b", "c"]
def test_write_large_binary_conversion(self, local_paimon_catalog):
"""Test that large_binary columns are converted to binary for pypaimon."""
catalog, tmp_path = local_paimon_catalog
pa_schema = pa.schema([("id", pa.int64()), ("data", pa.binary())])
paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
catalog.create_table("test_db.large_bin", paimon_schema, ignore_if_exists=True)
table = catalog.get_table("test_db.large_bin")
df = daft.from_pydict({"id": [1, 2], "data": [b"abc", b"def"]})
_write_table(df, table, mode="append")
result = _read_table(table).to_pydict()
assert result["id"] == [1, 2]
# ---------------------------------------------------------------------------
# Complex type tests
# ---------------------------------------------------------------------------
class TestComplexTypes:
"""Tests for writing complex data types."""
def test_write_nested_list(self, local_paimon_catalog):
"""Test writing Paimon table with list type."""
catalog, tmp_path = local_paimon_catalog
pa_schema = pa.schema([("id", pa.int64()), ("list_col", pa.list_(pa.int64()))])
paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
catalog.create_table("test_db.write_list", paimon_schema, ignore_if_exists=True)
table = catalog.get_table("test_db.write_list")
df = daft.from_pydict({"id": [1, 2], "list_col": [[1, 2, 3], [4, 5]]})
_write_table(df, table, mode="append")
result = _read_table(table).to_pydict()
assert result["id"] == [1, 2]
@requires_blob
class TestBlobType:
"""Tests for BLOB type support (pypaimon 1.4+)."""
def test_write_read_blob_type(self, local_paimon_catalog):
"""Test that BLOB columns are returned as FileReference objects."""
catalog, tmp_path = local_paimon_catalog
pa_schema = pa.schema([("id", pa.int64()), ("blob_data", pa.large_binary())])
paimon_schema = pypaimon.Schema.from_pyarrow_schema(
pa_schema,
options={
"bucket": "1",
"file.format": "parquet",
"row-tracking.enabled": "true",
"data-evolution.enabled": "true",
},
)
catalog.create_table("test_db.blob_table", paimon_schema, ignore_if_exists=True)
table = catalog.get_table("test_db.blob_table")
df = daft.from_pydict({"id": [1, 2], "blob_data": [b"hello", b"world"]})
_write_table(df, table, mode="append")
result_df = _read_table(table).sort("id")
assert str(result_df.schema()["blob_data"].dtype) == "File[Unknown]"
result = result_df.to_pydict()
assert result["id"] == [1, 2]
blob_refs = result["blob_data"]
assert len(blob_refs) == 2
for ref in blob_refs:
assert isinstance(ref, daft.File)
assert isinstance(ref.path, str)
assert ".blob" in ref.path
assert getattr(ref, file_range_position_field()) is not None
file_size = ref.size() if callable(getattr(ref, "size", None)) else ref.length
assert file_size is not None
# ---------------------------------------------------------------------------
# Truncate tests
# ---------------------------------------------------------------------------
class TestTruncate:
"""Tests for table truncate operations (pypaimon 1.4+)."""
def test_truncate_table(self, append_only_table):
"""truncate() should remove all data from the table."""
table, _ = append_only_table
df = daft.from_pydict(
{"id": [1, 2, 3], "name": ["a", "b", "c"], "value": [1.0, 2.0, 3.0], "dt": ["2024-01-01"] * 3}
)
_write_table(df, table)
assert _read_table(table).count_rows() == 3
paimon_table = PaimonTable(table)
paimon_table.truncate()
assert _read_table(table).count_rows() == 0
def test_truncate_partitions(self, append_only_table):
"""truncate_partitions() should remove only the specified partition data."""
table, _ = append_only_table
df = daft.from_pydict(
{
"id": [1, 2, 3, 4],
"name": ["a", "b", "c", "d"],
"value": [1.0, 2.0, 3.0, 4.0],
"dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
}
)
_write_table(df, table)
assert _read_table(table).count_rows() == 4
paimon_table = PaimonTable(table)
paimon_table.truncate_partitions([{"dt": "2024-01-01"}])
result = _read_table(table).sort("id").to_pydict()
assert result["id"] == [3, 4]
assert result["dt"] == ["2024-01-02", "2024-01-02"]