blob: 0846f591214ce93a6b8c3ecc99055caf5913a469 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Tests for Daft-side Paimon scan explain diagnostics."""
from __future__ import annotations
import pyarrow as pa
import pytest
pypaimon = pytest.importorskip("pypaimon")
daft = pytest.importorskip("daft")
from daft import col
from pypaimon.daft import explain_paimon_scan
from pypaimon.daft.daft_catalog import PaimonTable
from pypaimon.daft.daft_explain import (
READER_MODE_NATIVE_PARQUET,
READER_MODE_PYPAIMON_FALLBACK,
)
from pypaimon.daft.daft_compat import has_file_range_reads
from pypaimon.daft.daft_datasource import PaimonDataSource
from pypaimon.daft.daft_paimon import _explain_table
from pypaimon.read.explain import ExplainResult, ExplainSplitInfo
requires_blob = pytest.mark.skipif(not has_file_range_reads(), reason="BLOB support requires daft >= 0.7.11")
@pytest.fixture
def catalog_options(tmp_path):
options = {"warehouse": str(tmp_path)}
catalog = pypaimon.CatalogFactory.create(options)
catalog.create_database("test_db", ignore_if_exists=True)
return options
def _create_table(
catalog_options,
table_name: str,
pa_schema: pa.Schema,
*,
partition_keys: list[str] | None = None,
primary_keys: list[str] | None = None,
options: dict[str, str] | None = None,
):
identifier = f"test_db.{table_name}"
catalog = pypaimon.CatalogFactory.create(catalog_options)
schema = pypaimon.Schema.from_pyarrow_schema(
pa_schema,
partition_keys=partition_keys,
primary_keys=primary_keys,
options=options,
)
catalog.create_table(identifier, schema, ignore_if_exists=False)
return identifier, catalog.get_table(identifier)
def _write_arrow(table, data: pa.Table) -> None:
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
try:
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
finally:
table_write.close()
table_commit.close()
def _single_split_explain(
*,
table_identifier: str,
raw_convertible: bool,
has_deletion_vectors: bool,
) -> ExplainResult:
split = ExplainSplitInfo(
partition={},
bucket=0,
file_count=1,
row_count=4,
merged_row_count=None,
file_size=128,
raw_convertible=raw_convertible,
has_deletion_vectors=has_deletion_vectors,
level_histogram={0: 1},
deletion_file_count=1 if has_deletion_vectors else 0,
file_paths=["/tmp/fake.parquet"],
)
return ExplainResult(
table_identifier=table_identifier,
is_primary_key_table=False,
bucket_mode="unaware",
deletion_vectors_enabled=has_deletion_vectors,
data_evolution_enabled=False,
snapshot_id=1,
schema_id=0,
file_count=1,
total_file_size=split.file_size,
estimated_row_count=split.row_count,
deletion_file_count=split.deletion_file_count,
level_histogram=split.level_histogram,
split_count=1,
splits_raw_convertible=1 if raw_convertible else 0,
splits_with_deletion_vectors=1 if has_deletion_vectors else 0,
files_per_split_min=1,
files_per_split_max=1,
files_per_split_avg=1.0,
split_size_min=split.file_size,
split_size_max=split.file_size,
split_size_avg=float(split.file_size),
split_size_p50=split.file_size,
split_size_p95=split.file_size,
splits=[split],
)
def test_explain_paimon_scan_reports_native_parquet_routing(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
])
identifier, table = _create_table(
catalog_options,
"explain_native",
pa_schema,
options={"bucket": "-1", "file.format": "parquet"},
)
_write_arrow(table, pa.table({"id": [1, 2, 3], "name": ["a", "b", "c"]}, schema=pa_schema))
result = explain_paimon_scan(
identifier,
catalog_options,
filters=col("id") == 2,
columns=["name"],
limit=1,
verbose=True,
)
assert result.native_parquet_split_count == result.paimon_scan.split_count
assert result.native_parquet_split_count > 0
assert result.pypaimon_fallback_split_count == 0
assert result.fallback_reasons == {}
assert result.requested_columns == ["name"]
assert result.requested_limit == 1
assert result.source_limit == 1
assert result.limit_pushed is True
assert any("id" in pushed for pushed in result.pushed_filters)
assert result.remaining_filters == []
assert result.splits is not None
assert all(split.reader_mode == READER_MODE_NATIVE_PARQUET for split in result.splits)
assert "Daft Paimon Scan" in str(result)
assert "PyPaimon Scan Plan" in str(result)
def test_explain_scan_keeps_limit_above_remaining_filters(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
])
identifier, table = _create_table(
catalog_options,
"explain_remaining_filter",
pa_schema,
options={"bucket": "-1", "file.format": "parquet"},
)
_write_arrow(table, pa.table({"id": [1, 2], "name": ["a", "b"]}, schema=pa_schema))
result = PaimonTable(table, catalog_options=catalog_options).explain_scan(
filters=~((col("id") == 1) & (col("name") == "a")),
limit=1,
)
assert result.native_parquet_split_count == result.paimon_scan.split_count
assert result.pypaimon_fallback_split_count == 0
assert result.pushed_filters == []
assert any("id" in remaining for remaining in result.remaining_filters)
assert result.source_limit is None
assert result.limit_pushed is False
assert result.splits is None
assert result.paimon_scan.splits is None
def test_explain_scan_pushes_supported_not_and_limit(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
])
identifier, table = _create_table(
catalog_options,
"explain_not_filter",
pa_schema,
options={"bucket": "-1", "file.format": "parquet"},
)
_write_arrow(table, pa.table({"id": [1, 2], "name": ["a", "b"]}, schema=pa_schema))
result = PaimonTable(table, catalog_options=catalog_options).explain_scan(
filters=~(col("id") == 1),
limit=1,
)
assert result.native_parquet_split_count == result.paimon_scan.split_count
assert result.pypaimon_fallback_split_count == 0
assert any("!=" in pushed or "not" in pushed for pushed in result.pushed_filters)
assert result.remaining_filters == []
assert result.source_limit == 1
assert result.limit_pushed is True
assert result.splits is None
assert result.paimon_scan.splits is None
def test_explain_scan_partially_pushes_conjuncts_and_keeps_limit(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
])
identifier, table = _create_table(
catalog_options,
"explain_partial_conjunct",
pa_schema,
options={"bucket": "-1", "file.format": "parquet"},
)
_write_arrow(table, pa.table({"id": [1, 2], "name": ["alpha", "bravo"]}, schema=pa_schema))
result = PaimonTable(table, catalog_options=catalog_options).explain_scan(
filters=(col("id") == 1) & col("name").contains(col("id")),
limit=1,
)
assert any("id" in pushed for pushed in result.pushed_filters)
assert any("contains" in remaining for remaining in result.remaining_filters)
assert result.source_limit is None
assert result.limit_pushed is False
def test_explain_scan_applies_partition_filters_to_reader_counts(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
("dt", pa.string()),
])
identifier, table = _create_table(
catalog_options,
"explain_partition_filter",
pa_schema,
partition_keys=["dt"],
options={"bucket": "1", "file.format": "parquet"},
)
_write_arrow(
table,
pa.table({"id": [1], "name": ["a"], "dt": ["2024-01-01"]}, schema=pa_schema),
)
_write_arrow(
table,
pa.table({"id": [2], "name": ["b"], "dt": ["2024-01-02"]}, schema=pa_schema),
)
result = explain_paimon_scan(
identifier,
catalog_options,
partition_filters=col("dt") == "2024-01-02",
verbose=True,
)
assert result.paimon_scan.split_count == 2
assert result.native_parquet_split_count == 1
assert result.pypaimon_fallback_split_count == 0
assert any("dt" in partition_filter for partition_filter in result.partition_filters)
assert result.splits is not None
assert len(result.splits) == 1
assert result.splits[0].partition == {"dt": "2024-01-02"}
def test_explain_scan_reports_pk_lsm_fallback(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
("dt", pa.string()),
])
_, table = _create_table(
catalog_options,
"explain_pk_fallback",
pa_schema,
partition_keys=["dt"],
primary_keys=["id", "dt"],
options={"bucket": "1", "file.format": "parquet"},
)
_write_arrow(
table,
pa.table({"id": [1, 2], "name": ["old-a", "old-b"], "dt": ["2024-01-01", "2024-01-01"]}, schema=pa_schema),
)
_write_arrow(
table,
pa.table({"id": [1], "name": ["new-a"], "dt": ["2024-01-01"]}, schema=pa_schema),
)
result = _explain_table(
table,
catalog_options=catalog_options,
filters=col("id") == 1,
columns=["name"],
limit=1,
verbose=True,
)
assert result.pypaimon_fallback_split_count > 0
assert result.native_parquet_split_count == 0
assert result.fallback_reasons["LSM merge required"] == result.pypaimon_fallback_split_count
assert result.fallback_read_columns is not None
assert "name" in result.fallback_read_columns
assert "id" in result.fallback_read_columns
assert result.splits is not None
assert all(split.reader_mode == READER_MODE_PYPAIMON_FALLBACK for split in result.splits)
assert all(split.fallback_reason == "LSM merge required" for split in result.splits)
def test_explain_scan_reports_non_parquet_fallback(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
])
_, table = _create_table(
catalog_options,
"explain_avro_fallback",
pa_schema,
options={"bucket": "-1", "file.format": "avro"},
)
_write_arrow(table, pa.table({"id": [1], "name": ["a"]}, schema=pa_schema))
result = _explain_table(table, catalog_options=catalog_options, verbose=True)
assert result.pypaimon_fallback_split_count == result.paimon_scan.split_count
assert result.pypaimon_fallback_split_count > 0
assert result.native_parquet_split_count == 0
assert result.fallback_reasons["non-parquet format"] == result.pypaimon_fallback_split_count
assert result.splits is not None
assert all(split.fallback_reason == "non-parquet format" for split in result.splits)
@requires_blob
def test_explain_scan_reports_blob_fallback(catalog_options):
pa_schema = pa.schema([
("id", pa.int64()),
("payload", pa.large_binary()),
])
_, table = _create_table(
catalog_options,
"explain_blob_fallback",
pa_schema,
options={
"bucket": "-1",
"file.format": "parquet",
"row-tracking.enabled": "true",
"data-evolution.enabled": "true",
},
)
_write_arrow(table, pa.table({"id": [1], "payload": [b"hello"]}, schema=pa_schema))
result = _explain_table(table, catalog_options=catalog_options, verbose=True)
assert result.pypaimon_fallback_split_count == result.paimon_scan.split_count
assert result.pypaimon_fallback_split_count > 0
assert result.native_parquet_split_count == 0
assert result.fallback_reasons["blob columns present"] == result.pypaimon_fallback_split_count
assert result.splits is not None
assert all(split.reader_mode == READER_MODE_PYPAIMON_FALLBACK for split in result.splits)
assert all(split.fallback_reason == "blob columns present" for split in result.splits)
def test_explain_scan_reports_deletion_vector_fallback(catalog_options, monkeypatch):
pa_schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
])
_, table = _create_table(
catalog_options,
"explain_deletion_vector_fallback",
pa_schema,
options={"bucket": "-1", "file.format": "parquet"},
)
class FakeReadBuilder:
def explain(self, verbose: bool = False) -> ExplainResult:
assert verbose is True
return _single_split_explain(
table_identifier="test_db.explain_deletion_vector_fallback",
raw_convertible=True,
has_deletion_vectors=True,
)
def fake_scan_read_builder(self, table, read_pushdowns):
return FakeReadBuilder()
monkeypatch.setattr(PaimonDataSource, "_scan_read_builder", fake_scan_read_builder)
result = _explain_table(table, catalog_options=catalog_options, verbose=True)
assert result.pypaimon_fallback_split_count == 1
assert result.native_parquet_split_count == 0
assert result.fallback_reasons == {"deletion vectors present": 1}
assert result.splits is not None
assert len(result.splits) == 1
assert result.splits[0].reader_mode == READER_MODE_PYPAIMON_FALLBACK
assert result.splits[0].fallback_reason == "deletion vectors present"