| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| Predicate pushdown correctness regression suite. |
| |
| Locks in the invariants: |
| |
| - PK tables: manifest stats pruning consults *key_stats only*. A |
| value-only predicate must short-circuit to "keep the file" even if |
| value_stats would say otherwise. |
| - Reader-level value predicate is applied *after* merge — covered rows |
| cannot resurface even when an older file passes manifest pruning. |
| - Append-only tables: value_stats pruning is false-positive-safe (may |
| over-include) but never false-negative (never drops a live row). |
| |
| Three layers: |
| |
| 1. Unit — synthetic ManifestEntry to assert the manifest gate's exact |
| behaviour without relying on full I/O. |
| 2. Round-trip — write real snapshots, read back through the public API, |
| compare to the source-of-truth (post-merge oracle). |
| 3. Property — random datasets + random predicates, asserting that the |
| pushed-down result equals the full-scan-then-filter oracle. |
| (Deterministic random; no hypothesis dependency, keeps the |
| Python 3.6 compatibility contract intact.) |
| """ |
| |
| import os |
| import random |
| import shutil |
| import tempfile |
| import unittest |
| from typing import Any, Dict, List, Optional, Tuple |
| |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory, Schema |
| from pypaimon.common.predicate import Predicate |
| from pypaimon.manifest.schema.data_file_meta import DataFileMeta |
| from pypaimon.manifest.schema.manifest_entry import ManifestEntry |
| from pypaimon.manifest.schema.simple_stats import SimpleStats |
| from pypaimon.read.scanner.file_scanner import FileScanner |
| from pypaimon.schema.data_types import AtomicType, DataField |
| from pypaimon.table.row.generic_row import GenericRow |
| |
| |
| # --------------------------------------------------------------------------- |
| # Synthetic-fixture helpers used by the unit-level tests. |
| # --------------------------------------------------------------------------- |
| def _stats(min_vals: List[Any], max_vals: List[Any], |
| fields: List[DataField], null_counts: Optional[List[int]] = None) -> SimpleStats: |
| if null_counts is None: |
| null_counts = [0] * len(min_vals) |
| return SimpleStats( |
| min_values=GenericRow(min_vals, fields), |
| max_values=GenericRow(max_vals, fields), |
| null_counts=null_counts, |
| ) |
| |
| |
| def _make_pk_entry(level: int, key_stats: SimpleStats, value_stats: SimpleStats, |
| row_count: int = 1, schema_id: int = 0, |
| value_stats_cols: Optional[List[str]] = None) -> ManifestEntry: |
| file = DataFileMeta.create( |
| file_name=f'data-L{level}-{random.randint(0, 1 << 30)}.parquet', |
| file_size=1024, |
| row_count=row_count, |
| min_key=key_stats.min_values, |
| max_key=key_stats.max_values, |
| key_stats=key_stats, |
| value_stats=value_stats, |
| min_sequence_number=0, |
| max_sequence_number=0, |
| schema_id=schema_id, |
| level=level, |
| extra_files=[], |
| value_stats_cols=value_stats_cols, |
| ) |
| return ManifestEntry( |
| kind=0, partition=GenericRow([], []), bucket=0, |
| total_buckets=1, file=file, |
| ) |
| |
| |
| def _build_pk_scanner(table, predicate: Optional[Predicate]) -> FileScanner: |
| """Construct a FileScanner without running any actual scan.""" |
| return FileScanner( |
| table=table, |
| manifest_scanner=lambda: [], |
| predicate=predicate, |
| ) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Layer 1 — Unit: assert _filter_manifest_entry behaviour on synthetic input. |
| # --------------------------------------------------------------------------- |
| class FilterManifestEntryUnitTest(unittest.TestCase): |
| """Pin down the contract of FileScanner._filter_manifest_entry.""" |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) |
| cls.catalog.create_database('default', False) |
| |
| cls._pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('val', pa.int64()), |
| ]) |
| # PK table — exercises the stats-only path. |
| cls.catalog.create_table( |
| 'default.pk_plain', |
| Schema.from_pyarrow_schema( |
| cls._pa_schema, |
| primary_keys=['id'], |
| options={'bucket': '1', 'file.format': 'parquet'}, |
| ), |
| False, |
| ) |
| # Append-only table — exercises the value_stats path for comparison. |
| cls.catalog.create_table( |
| 'default.append_only', |
| Schema.from_pyarrow_schema( |
| cls._pa_schema, |
| options={'file.format': 'parquet', 'metadata.stats-mode': 'full'}, |
| ), |
| False, |
| ) |
| |
| @classmethod |
| def tearDownClass(cls): |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| @staticmethod |
| def _id_field(): |
| return DataField(0, 'id', AtomicType('BIGINT', nullable=False)) |
| |
| @staticmethod |
| def _val_field(): |
| return DataField(1, 'val', AtomicType('BIGINT', nullable=True)) |
| |
| # ---- The headline invariant ---------------------------------------- |
| def test_pk_table_uses_key_stats_only_not_value_stats(self): |
| """The whole point: PK manifest pruning must NEVER consult value_stats. |
| |
| We construct an entry where: |
| * key_stats says id ∈ [10, 20] ← truthful PK range, includes 15 |
| * value_stats says val ∈ [99, 99] ← intentionally a lie that, if |
| consulted, would let a `val == 0` predicate falsely drop the file. |
| |
| The query is on the value column (`val == 0`). On a PK table the |
| Python design only projects the predicate onto PK columns first |
| (which yields an empty/no-op predicate), so the file MUST be kept. |
| If the implementation regressed and consulted value_stats, the file |
| would be dropped and the (post-merge) row that actually has val=0 |
| in some other file would still come back — silently incorrect. |
| """ |
| table = self.catalog.get_table('default.pk_plain') |
| pred = table.new_read_builder().new_predicate_builder().equal('val', 0) |
| |
| entry = _make_pk_entry( |
| level=1, |
| key_stats=_stats([10], [20], [self._id_field()]), |
| # Lie about val: stats claim 99..99, predicate is val==0. |
| value_stats=_stats([99], [99], [self._val_field()]), |
| ) |
| scanner = _build_pk_scanner(table, pred) |
| # primary_key_predicate is None for a value-only predicate on PK |
| # table → the gate must short-circuit to True regardless of stats. |
| self.assertIsNone(scanner.primary_key_predicate, |
| "value predicate must not project onto PK") |
| self.assertTrue(scanner._filter_manifest_entry(entry), |
| "PK table must NOT consult value_stats — entry kept") |
| |
| def test_pk_table_uses_key_stats_to_drop_outside_range(self): |
| """Symmetric: when the predicate IS on PK and key_stats excludes it, drop.""" |
| table = self.catalog.get_table('default.pk_plain') |
| pred = table.new_read_builder().new_predicate_builder().equal('id', 5) |
| |
| entry = _make_pk_entry( |
| level=1, |
| key_stats=_stats([10], [20], [self._id_field()]), |
| value_stats=_stats([0], [0], [self._val_field()]), |
| ) |
| scanner = _build_pk_scanner(table, pred) |
| self.assertFalse(scanner._filter_manifest_entry(entry), |
| "PK predicate outside key_stats range must drop") |
| |
| def test_pk_table_uses_key_stats_to_keep_inside_range(self): |
| table = self.catalog.get_table('default.pk_plain') |
| pred = table.new_read_builder().new_predicate_builder().equal('id', 15) |
| |
| entry = _make_pk_entry( |
| level=1, |
| key_stats=_stats([10], [20], [self._id_field()]), |
| value_stats=_stats([0], [0], [self._val_field()]), |
| ) |
| scanner = _build_pk_scanner(table, pred) |
| self.assertTrue(scanner._filter_manifest_entry(entry)) |
| |
| def test_pk_compound_predicate_only_consults_key_stats(self): |
| """Dual to test_pk_table_uses_key_stats_only_*: covers the |
| non-early-return branch. |
| |
| Predicate is `id == 15 AND val == 0`. The PK projection yields |
| `id == 15` which key_stats [10,20] keeps. value_stats deliberately |
| lies that val ∈ [99,99], so a (buggy) implementation that consulted |
| value_stats anywhere — even AFTER the early return — would drop the |
| file (Equal.test_by_stats(99,99,[0]) is False). The correct |
| implementation reaches `primary_key_predicate.test_by_simple_stats` |
| which only ever touches key_stats, so the assertion is True. |
| """ |
| table = self.catalog.get_table('default.pk_plain') |
| pb = table.new_read_builder().new_predicate_builder() |
| pred = pb.and_predicates([ |
| pb.equal('id', 15), |
| pb.equal('val', 0), |
| ]) |
| |
| entry = _make_pk_entry( |
| level=1, |
| key_stats=_stats([10], [20], [self._id_field()]), |
| # value_stats lies: claims val ∈ [99,99]. If consulted, val==0 |
| # is outside the range and the file would be dropped. |
| value_stats=_stats([99], [99], [self._val_field()]), |
| ) |
| scanner = _build_pk_scanner(table, pred) |
| # PK predicate is now non-empty (id == 15) — exercises the path |
| # that calls primary_key_predicate.test_by_simple_stats(key_stats). |
| self.assertIsNotNone(scanner.primary_key_predicate) |
| self.assertTrue(scanner._filter_manifest_entry(entry), |
| "value_stats must NEVER be consulted on PK tables — " |
| "even when the PK predicate path runs") |
| |
| # ---- Append-only path uses value_stats (and that IS correct) ------- |
| def _append_entry(self, val_min: int, val_max: int) -> ManifestEntry: |
| """Append-only entries: value_stats covers BOTH id and val (the |
| projection in SimpleStatsEvolution maps schema indices through |
| value_stats_cols).""" |
| fields = [self._id_field(), self._val_field()] |
| return _make_pk_entry( |
| level=0, |
| key_stats=_stats([0], [0], [self._id_field()]), |
| value_stats=_stats([0, val_min], [0, val_max], fields), |
| value_stats_cols=['id', 'val'], |
| ) |
| |
| def test_append_only_uses_value_stats_to_drop(self): |
| """Append-only has no L0/merge complications — value_stats pruning |
| is safe and required for performance.""" |
| table = self.catalog.get_table('default.append_only') |
| pred = table.new_read_builder().new_predicate_builder().equal('val', 5) |
| |
| # value_stats excludes 5 → must drop. |
| scanner = _build_pk_scanner(table, pred) |
| self.assertFalse(scanner._filter_manifest_entry(self._append_entry(10, 20))) |
| |
| def test_append_only_uses_value_stats_to_keep(self): |
| table = self.catalog.get_table('default.append_only') |
| pred = table.new_read_builder().new_predicate_builder().equal('val', 15) |
| |
| scanner = _build_pk_scanner(table, pred) |
| self.assertTrue(scanner._filter_manifest_entry(self._append_entry(10, 20))) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Layer 2 — Round-trip integration: write real snapshots, read via public API. |
| # --------------------------------------------------------------------------- |
| class PushdownRoundTripIntegrationTest(unittest.TestCase): |
| """Write multi-snapshot tables and verify pushdown vs full-scan oracle.""" |
| |
| # Suppress compaction so L0 deterministically persists across snapshots. |
| _SUPPRESS = { |
| 'bucket': '1', |
| 'num-levels': '3', |
| 'num-sorted-run.compaction-trigger': '999', |
| 'num-sorted-run.stop-trigger': '999', |
| 'compaction.max-size-amplification-percent': '999', |
| } |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) |
| cls.catalog.create_database('default', False) |
| |
| @classmethod |
| def tearDownClass(cls): |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| def _create_pk_table(self, name: str) -> Any: |
| opts = dict(self._SUPPRESS) |
| pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('val', pa.int64()), |
| ]) |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, primary_keys=['id'], options=opts, |
| ) |
| full = f'default.{name}' |
| self.catalog.create_table(full, schema, False) |
| return self.catalog.get_table(full) |
| |
| def _create_append_table(self, name: str) -> Any: |
| pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('val', pa.int64()), |
| ('grp', pa.string()), |
| ]) |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, options={'file.format': 'parquet', 'metadata.stats-mode': 'full'}, |
| ) |
| full = f'default.{name}' |
| self.catalog.create_table(full, schema, False) |
| return self.catalog.get_table(full) |
| |
| def _write(self, table, batches: List[List[Dict]]): |
| pa_schema = table.table_schema.to_arrow_schema() if hasattr( |
| table.table_schema, 'to_arrow_schema') else None |
| for rows in batches: |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| if pa_schema is not None: |
| batch = pa.Table.from_pylist(rows, schema=pa_schema) |
| else: |
| batch = pa.Table.from_pylist(rows) |
| w.write_arrow(batch) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| |
| def _read_all(self, table, predicate=None) -> List[Dict]: |
| rb = table.new_read_builder() |
| if predicate is not None: |
| rb = rb.with_filter(predicate) |
| scan = rb.new_scan() |
| splits = scan.plan().splits() |
| if not splits: |
| return [] |
| return rb.new_read().to_arrow(splits).to_pylist() |
| |
| # ------------------------------------------------------------------- |
| # Partition stats false-positive safety: predicate on a value column |
| # never accidentally drops a manifest. (A filter ON the partition column |
| # is exact; this asserts the orthogonal predicate path.) |
| # ------------------------------------------------------------------- |
| def test_append_value_predicate_matches_oracle(self): |
| table = self._create_append_table('rt_append_val') |
| rows = [{'id': i, 'val': i * 10, 'grp': 'a' if i % 2 else 'b'} |
| for i in range(100)] |
| # Two snapshots → two manifests → exercises manifest-level partition |
| # stats pruning even on append tables. |
| self._write(table, [rows[:50], rows[50:]]) |
| |
| pred_builder = table.new_read_builder().new_predicate_builder() |
| for predicate, oracle_pred in [ |
| (pred_builder.equal('val', 250), lambda r: r['val'] == 250), |
| (pred_builder.greater_than('val', 800), lambda r: r['val'] > 800), |
| (pred_builder.between('val', 100, 300), |
| lambda r: 100 <= r['val'] <= 300), |
| (pred_builder.is_in('grp', ['a']), lambda r: r['grp'] == 'a'), |
| ]: |
| with self.subTest(predicate=predicate.method): |
| got = sorted(self._read_all(table, predicate=predicate), |
| key=lambda r: r['id']) |
| want = sorted([r for r in rows if oracle_pred(r)], |
| key=lambda r: r['id']) |
| self.assertEqual(got, want, |
| "pushed-down predicate result must equal full-scan oracle") |
| |
| def test_pk_pk_predicate_matches_oracle(self): |
| """PK predicate on PK table: exact, exercises key_stats path.""" |
| table = self._create_pk_table('rt_pk_pk_pred') |
| rows = [{'id': i, 'val': i * 7} for i in range(50)] |
| self._write(table, [rows[:25], rows[25:]]) |
| |
| pb = table.new_read_builder().new_predicate_builder() |
| for predicate, oracle in [ |
| (pb.equal('id', 13), lambda r: r['id'] == 13), |
| (pb.is_in('id', [1, 5, 10, 99]), |
| lambda r: r['id'] in {1, 5, 10, 99}), |
| (pb.greater_or_equal('id', 40), lambda r: r['id'] >= 40), |
| ]: |
| with self.subTest(predicate=predicate.method): |
| got = sorted(self._read_all(table, predicate=predicate), |
| key=lambda r: r['id']) |
| want = sorted([r for r in rows if oracle(r)], |
| key=lambda r: r['id']) |
| self.assertEqual(got, want) |
| |
| def test_pk_value_predicate_matches_post_merge_oracle(self): |
| """A value predicate on a PK table: post-merge oracle == reader output. |
| |
| This is the 'value filter applied after merge' contract translated |
| into a directly-checkable property: the answer is whatever you'd |
| get by (a) merging snapshots into latest-per-PK, (b) applying the |
| predicate to that merged set in Python. |
| |
| Note: this asserts end-to-end semantics only (predicate is applied |
| post-merge, not per-file). The "PK manifest gate must not consult |
| value_stats" invariant is locked down in the unit tests above |
| (``test_pk_table_uses_key_stats_only_*``); this round-trip case |
| does not by itself catch a regression where the gate misuses |
| value_stats, since the file-level effect is masked by the |
| post-merge filter. |
| """ |
| table = self._create_pk_table('rt_pk_val_pred') |
| self._write(table, [ |
| [{'id': i, 'val': i} for i in range(20)], |
| [{'id': i, 'val': i + 1000} for i in range(0, 20, 2)], # update evens |
| ]) |
| # Oracle: latest write per PK wins. |
| merged: Dict[int, int] = {} |
| for batch in [ |
| [(i, i) for i in range(20)], |
| [(i, i + 1000) for i in range(0, 20, 2)], |
| ]: |
| for k, v in batch: |
| merged[k] = v |
| |
| pb = table.new_read_builder().new_predicate_builder() |
| for predicate, oracle in [ |
| (pb.greater_than('val', 500), lambda v: v > 500), |
| (pb.less_or_equal('val', 5), lambda v: v <= 5), |
| (pb.between('val', 1000, 1010), lambda v: 1000 <= v <= 1010), |
| ]: |
| with self.subTest(predicate=predicate.method): |
| got = sorted(self._read_all(table, predicate=predicate), |
| key=lambda r: r['id']) |
| want = sorted( |
| [{'id': k, 'val': v} for k, v in merged.items() if oracle(v)], |
| key=lambda r: r['id']) |
| self.assertEqual(got, want, |
| "pushed-down value filter must agree with post-merge oracle") |
| |
| |
| # --------------------------------------------------------------------------- |
| # Layer 3 — Property: random datasets + random predicates. |
| # --------------------------------------------------------------------------- |
| class PushdownPropertyTest(unittest.TestCase): |
| """For N random (table, predicate) pairs, push-down result must equal |
| the full-scan-then-Python-filter result. |
| |
| No hypothesis dependency — keeps Python 3.6 compat. Seeded for repro. |
| """ |
| |
| SEED = 0x70D0 # stable across runs; bump to flush flaky regressions. |
| APPEND_TRIALS = 40 |
| PK_TRIALS = 30 |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) |
| cls.catalog.create_database('default', False) |
| |
| @classmethod |
| def tearDownClass(cls): |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| def setUp(self): |
| # Fresh per-test RNG keeps each test method's dataset/predicate |
| # sequence stable regardless of method execution order or future |
| # additions in this class. |
| self.rnd = random.Random(self.SEED) |
| |
| def _make_append_table(self, idx: int): |
| pa_schema = pa.schema([ |
| pa.field('k', pa.int64(), nullable=False), |
| ('v', pa.int64()), |
| ]) |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={'file.format': 'parquet', 'metadata.stats-mode': 'full'}, |
| ) |
| name = f'default.prop_append_{idx}' |
| self.catalog.create_table(name, schema, False) |
| return self.catalog.get_table(name) |
| |
| def _make_pk_table(self, idx: int): |
| pa_schema = pa.schema([ |
| pa.field('k', pa.int64(), nullable=False), |
| ('v', pa.int64()), |
| ]) |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| primary_keys=['k'], |
| options={'bucket': '1', 'file.format': 'parquet'}, |
| ) |
| name = f'default.prop_pk_{idx}' |
| self.catalog.create_table(name, schema, False) |
| return self.catalog.get_table(name) |
| |
| def _write_one_snapshot(self, table, rows: List[Dict]): |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| pa_schema = table.table_schema.to_arrow_schema() if hasattr( |
| table.table_schema, 'to_arrow_schema') else None |
| batch = pa.Table.from_pylist(rows, schema=pa_schema) \ |
| if pa_schema is not None else pa.Table.from_pylist(rows) |
| w.write_arrow(batch) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| |
| def _read_with(self, table, predicate=None): |
| rb = table.new_read_builder() |
| if predicate is not None: |
| rb = rb.with_filter(predicate) |
| splits = rb.new_scan().plan().splits() |
| if not splits: |
| return [] |
| return rb.new_read().to_arrow(splits).to_pylist() |
| |
| def _gen_predicate(self, pb, field: str, |
| sample_values: List[int]) -> Tuple[Predicate, Any]: |
| """Pick a random predicate on `field`. Returns (predicate, oracle). |
| |
| Operator set covers every method the property layer can sensibly |
| exercise on numeric columns: equal/not_equal, full ordering family, |
| between/not_between, in/not_in, is_null/is_not_null. The |
| is_null path in particular re-covers a historical bug where |
| missing null_counts in stats caused isNull to drop every file. |
| """ |
| op = self.rnd.choice([ |
| 'equal', 'not_equal', |
| 'less_than', 'less_or_equal', 'greater_than', 'greater_or_equal', |
| 'between', 'not_between', |
| 'is_in', 'is_not_in', |
| 'is_null', 'is_not_null', |
| ]) |
| if op == 'equal': |
| v = self.rnd.choice(sample_values) |
| return pb.equal(field, v), lambda r: r[field] == v |
| if op == 'not_equal': |
| v = self.rnd.choice(sample_values) |
| return pb.not_equal(field, v), lambda r: r[field] != v |
| if op == 'less_than': |
| v = self.rnd.choice(sample_values) |
| return pb.less_than(field, v), lambda r: r[field] < v |
| if op == 'less_or_equal': |
| v = self.rnd.choice(sample_values) |
| return pb.less_or_equal(field, v), lambda r: r[field] <= v |
| if op == 'greater_than': |
| v = self.rnd.choice(sample_values) |
| return pb.greater_than(field, v), lambda r: r[field] > v |
| if op == 'greater_or_equal': |
| v = self.rnd.choice(sample_values) |
| return pb.greater_or_equal(field, v), lambda r: r[field] >= v |
| if op == 'between': |
| a, b = sorted(self.rnd.sample(sample_values, 2)) |
| return (pb.between(field, a, b), |
| lambda r, a=a, b=b: a <= r[field] <= b) |
| if op == 'not_between': |
| a, b = sorted(self.rnd.sample(sample_values, 2)) |
| return (pb.not_between(field, a, b), |
| lambda r, a=a, b=b: not (a <= r[field] <= b)) |
| if op == 'is_in': |
| n = self.rnd.randint(1, min(5, len(sample_values))) |
| xs = self.rnd.sample(sample_values, n) |
| return pb.is_in(field, xs), (lambda r, xs=set(xs): r[field] in xs) |
| if op == 'is_not_in': |
| n = self.rnd.randint(1, min(5, len(sample_values))) |
| xs = self.rnd.sample(sample_values, n) |
| return (pb.is_not_in(field, xs), |
| lambda r, xs=set(xs): r[field] not in xs) |
| if op == 'is_null': |
| # Our test data has no nulls, so the oracle is "no row matches" |
| # — but the pushdown must arrive at the same answer without |
| # incorrectly dropping non-null files (the historical bug). |
| return pb.is_null(field), lambda r: False |
| # is_not_null |
| return pb.is_not_null(field), lambda r: True |
| |
| # ------------------------------------------------------------------- |
| # Append-only: oracle = identity over rows; push-down filter result |
| # must equal Python-side filter over all rows. |
| # ------------------------------------------------------------------- |
| def test_property_append_random(self): |
| for trial in range(self.APPEND_TRIALS): |
| table = self._make_append_table(trial) |
| n = self.rnd.randint(20, 200) |
| rows = [{'k': i, 'v': self.rnd.randint(-50, 50)} for i in range(n)] |
| # 1-3 snapshots so we exercise multiple manifests. |
| n_snaps = self.rnd.randint(1, 3) |
| chunks = self._chunk(rows, n_snaps) |
| for chunk in chunks: |
| if chunk: |
| self._write_one_snapshot(table, chunk) |
| |
| pb = table.new_read_builder().new_predicate_builder() |
| field = self.rnd.choice(['k', 'v']) |
| sample = sorted({r[field] for r in rows}) |
| pred, oracle = self._gen_predicate(pb, field, sample) |
| |
| got = sorted(self._read_with(table, pred), key=lambda r: r['k']) |
| want = sorted([r for r in rows if oracle(r)], key=lambda r: r['k']) |
| self.assertEqual(got, want, |
| f"trial {trial} field={field} method={pred.method} mismatch") |
| |
| # ------------------------------------------------------------------- |
| # PK: oracle = post-merge state (latest write per PK wins). Random |
| # multi-snapshot writes can update the same PK. |
| # ------------------------------------------------------------------- |
| def test_property_pk_random(self): |
| for trial in range(self.PK_TRIALS): |
| table = self._make_pk_table(trial) |
| n_snaps = self.rnd.randint(1, 3) |
| merged: Dict[int, int] = {} |
| for _ in range(n_snaps): |
| m = self.rnd.randint(5, 20) |
| # Within a single snapshot batch, Paimon's merge resolution |
| # for duplicate PKs is not the "last-in-iteration" rule we |
| # use as oracle. Avoid the ambiguity by guaranteeing PK |
| # uniqueness inside each batch — the cross-snapshot update |
| # semantics (the path we actually want to exercise) are |
| # unaffected. |
| ks = self.rnd.sample(range(40), min(m, 40)) |
| rows = [] |
| for k in ks: |
| v = self.rnd.randint(-50, 50) |
| rows.append({'k': k, 'v': v}) |
| merged[k] = v # latest snapshot wins |
| self._write_one_snapshot(table, rows) |
| |
| pb = table.new_read_builder().new_predicate_builder() |
| # PK predicates hit key_stats path; value predicates hit |
| # post-merge filter path. Mix both. |
| field = self.rnd.choice(['k', 'v']) |
| if not merged: |
| continue |
| sample_values = sorted(merged.keys() if field == 'k' |
| else set(merged.values())) |
| if not sample_values: |
| continue |
| pred, oracle = self._gen_predicate(pb, field, sample_values) |
| |
| got = sorted(self._read_with(table, pred), key=lambda r: r['k']) |
| want = sorted( |
| [{'k': k, 'v': v} for k, v in merged.items() |
| if oracle({'k': k, 'v': v})], |
| key=lambda r: r['k']) |
| self.assertEqual(got, want, |
| f"trial {trial} field={field} method={pred.method} mismatch " |
| f"merged={merged}") |
| |
| @staticmethod |
| def _chunk(items: List, n: int) -> List[List]: |
| """Split into n roughly-equal chunks.""" |
| n = max(1, n) |
| size = max(1, (len(items) + n - 1) // n) |
| return [items[i:i + size] for i in range(0, len(items), size)] |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |