| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| """End-to-end tests for the ``partial-update`` merge engine. |
| |
| Each test creates a PK table with ``merge-engine`` set to a particular |
| value, writes one or more batches, and reads back. Partial-update reads |
| must merge non-null fields across batches; ``deduplicate`` must keep |
| the latest row only; ``first-row`` must keep the earliest row. |
| ``aggregation`` has its own engine-specific e2e coverage in |
| :mod:`test_aggregation_e2e`. |
| """ |
| |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory, Schema |
| |
| |
| class PartialUpdateMergeEngineE2ETest(unittest.TestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) |
| cls.catalog.create_database('default', True) |
| |
| cls.pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('a', pa.string()), |
| ('b', pa.string()), |
| ('c', pa.string()), |
| ]) |
| |
| @classmethod |
| def tearDownClass(cls): |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| def _create_pk_table(self, table_name, merge_engine='partial-update', |
| extra_options=None): |
| # bucket=1 so all rows for any PK land in the same bucket; this is |
| # what forces the read path through SortMergeReader instead of the |
| # raw_convertible / single-file fast path. partial-update merging |
| # only happens inside SortMergeReader. |
| options = { |
| 'bucket': '1', |
| 'merge-engine': merge_engine, |
| } |
| if extra_options: |
| options.update(extra_options) |
| schema = Schema.from_pyarrow_schema( |
| self.pa_schema, |
| primary_keys=['id'], |
| options=options, |
| ) |
| full = 'default.{}'.format(table_name) |
| self.catalog.create_table(full, schema, False) |
| return self.catalog.get_table(full) |
| |
| def _write(self, table, rows): |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| |
| def _write_many(self, table, batches): |
| """Multiple ``write_arrow`` calls inside a single ``prepare_commit``. |
| |
| Mirrors the reviewer's question: rows that land in the same |
| underlying data file must still go through the merge-engine |
| dispatch; in-writer merging cannot silently degrade to dedupe. |
| """ |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| for rows in batches: |
| w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| |
| def _read(self, table): |
| rb = table.new_read_builder() |
| splits = rb.new_scan().plan().splits() |
| if not splits: |
| return [] |
| return sorted( |
| rb.new_read().to_arrow(splits).to_pylist(), |
| key=lambda r: r['id'], |
| ) |
| |
| # -- partial-update happy path --------------------------------------- |
| |
| def test_partial_update_two_writes_merges_non_null(self): |
| """Two writes against the same PK with disjoint non-null columns |
| must merge into a single row that has both columns populated. |
| """ |
| table = self._create_pk_table('two_writes') |
| self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) |
| self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], |
| ) |
| |
| def test_partial_update_three_writes_merges_left_to_right(self): |
| """Three overlapping writes — each filling in a different column — |
| compose into the union of non-null fields. |
| """ |
| table = self._create_pk_table('three_writes') |
| self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) |
| self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) |
| self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], |
| ) |
| |
| def test_partial_update_disjoint_keys_unaffected(self): |
| """Three rows with disjoint PKs must all appear unchanged in the |
| output — partial-update only merges rows that share a PK. |
| """ |
| table = self._create_pk_table('disjoint_keys') |
| self._write(table, [ |
| {'id': 1, 'a': 'A1', 'b': None, 'c': None}, |
| {'id': 2, 'a': None, 'b': 'B2', 'c': None}, |
| {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, |
| ]) |
| |
| self.assertEqual( |
| self._read(table), |
| [ |
| {'id': 1, 'a': 'A1', 'b': None, 'c': None}, |
| {'id': 2, 'a': None, 'b': 'B2', 'c': None}, |
| {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, |
| ], |
| ) |
| |
| def test_partial_update_later_value_wins_over_earlier_non_null(self): |
| """When two writes both supply a non-null value for the same |
| column, the later value wins (latest non-null per field). |
| """ |
| table = self._create_pk_table('later_wins') |
| self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}]) |
| self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}], |
| ) |
| |
| def test_partial_update_later_null_does_not_clobber_earlier_value(self): |
| """A later write with NULL for a column does NOT overwrite an |
| earlier non-null value for that column. |
| """ |
| table = self._create_pk_table('null_no_clobber') |
| self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}]) |
| self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], |
| ) |
| |
| # -- single-commit, multiple write_arrow calls ----------------------- |
| # |
| # The in-memory merge buffer added to ``KeyValueDataWriter`` runs |
| # the merge function on flush, so rows from multiple ``write_arrow`` |
| # calls that share a primary key are folded into a single row before |
| # the data file is written. The flushed file therefore satisfies the |
| # LSM "PK unique within a file" invariant the read-side |
| # ``raw_convertible`` fast path relies on. |
| |
| def test_partial_update_two_write_arrows_single_commit(self): |
| """Two ``write_arrow`` calls + one ``prepare_commit``: each |
| carries a disjoint non-null field; result is the per-field merge. |
| """ |
| table = self._create_pk_table('two_writes_single_commit') |
| self._write_many(table, [ |
| [{'id': 1, 'a': 'A', 'b': None, 'c': None}], |
| [{'id': 1, 'a': None, 'b': 'B', 'c': None}], |
| ]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], |
| ) |
| |
| def test_partial_update_three_write_arrows_single_commit(self): |
| """Three ``write_arrow`` calls in a single commit compose into |
| the union of non-null fields. |
| """ |
| table = self._create_pk_table('three_writes_single_commit') |
| self._write_many(table, [ |
| [{'id': 1, 'a': 'A', 'b': None, 'c': None}], |
| [{'id': 1, 'a': None, 'b': 'B', 'c': None}], |
| [{'id': 1, 'a': None, 'b': None, 'c': 'C'}], |
| ]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], |
| ) |
| |
| # -- deduplicate (regression) ---------------------------------------- |
| |
| def test_deduplicate_engine_unchanged(self): |
| """The default ``deduplicate`` engine must keep the latest row |
| intact, including its NULLs — exactly the pre-PR behaviour. |
| """ |
| table = self._create_pk_table('dedupe', merge_engine='deduplicate') |
| self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}]) |
| self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'new', 'b': None, 'c': None}], |
| ) |
| |
| def test_deduplicate_two_write_arrows_single_commit(self): |
| """Pre-PR master silently returned both rows because the |
| flushed file held two records sharing a primary key. With the |
| in-memory merge buffer in place, ``deduplicate`` collapses |
| same-PK rows in a single commit too -- LSM "PK unique within a |
| file" invariant restored. |
| """ |
| table = self._create_pk_table( |
| 'dedupe_two_writes_single_commit', |
| merge_engine='deduplicate', |
| ) |
| self._write_many(table, [ |
| [{'id': 1, 'a': 'first', 'b': 'old', 'c': None}], |
| [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}], |
| ]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}], |
| ) |
| |
| # -- other supported engines (smoke) --------------------------------- |
| |
| def test_first_row_engine_keeps_first(self): |
| """The ``first-row`` engine must keep the earliest row per PK. |
| |
| Both the writer-side merge buffer and the reader-side merge |
| function go through ``merge_engine_dispatch``, so first-row is |
| a real supported engine (no dedupe fallback / no NotImplemented |
| raise) on both sides. |
| """ |
| table = self._create_pk_table('first_row_supported', |
| merge_engine='first-row') |
| self._write(table, [{'id': 1, 'a': 'first', 'b': None, 'c': None}]) |
| self._write(table, [{'id': 1, 'a': 'second', 'b': 'B', 'c': 'C'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'first', 'b': None, 'c': None}], |
| ) |
| |
| def test_aggregation_engine_write_logs_fallback_warning(self): |
| """The write-side fallback to deduplicate for unsupported engines |
| is silent in terms of return value -- a ``logging.warning`` is |
| the only signal that file contents will not match the table's |
| declared semantics. Important when the same table is read back |
| by a reader that honours the declared engine; the pypaimon |
| read-side raise wouldn't fire there. |
| """ |
| table = self._create_pk_table('agg_warning', |
| merge_engine='aggregation') |
| with self.assertLogs( |
| 'pypaimon.write.file_store_write', level='WARNING') as cm: |
| self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) |
| combined = '\n'.join(cm.output) |
| self.assertIn('aggregation', combined) |
| self.assertIn('deduplicate', combined) |
| |
| # -- partial-update + out-of-scope option combinations --------------- |
| # |
| # When a user pairs ``merge-engine: partial-update`` with any option |
| # this port doesn't implement (sequence-group, per-field aggregator |
| # override, ignore-delete, partial-update.remove-record-on-*), we |
| # must raise rather than silently run the simple last-non-null merge |
| # — otherwise we'd reproduce the same silent-corruption pattern this |
| # PR exists to close. |
| |
| def _assert_partial_update_unsupported(self, table_name, extra_options, |
| expected_keys): |
| # Shared dispatch runs at write time too, so the unsupported- |
| # option error surfaces inside the first ``write_arrow`` call |
| # (when ``FileStoreWrite._create_data_writer`` first runs) |
| # rather than waiting for read. |
| table = self._create_pk_table( |
| table_name, extra_options=extra_options) |
| with self.assertRaises(NotImplementedError) as cm: |
| self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) |
| msg = str(cm.exception) |
| self.assertIn("partial-update", msg) |
| for key in expected_keys: |
| self.assertIn(key, msg, |
| "expected option key '{}' in error: {}".format(key, msg)) |
| |
| def test_partial_update_with_sequence_group_raises(self): |
| self._assert_partial_update_unsupported( |
| 'pu_seq_group', |
| {'fields.b.sequence-group': 'a'}, |
| ['fields.b.sequence-group'], |
| ) |
| |
| def test_partial_update_with_field_aggregate_function_raises(self): |
| self._assert_partial_update_unsupported( |
| 'pu_field_agg', |
| {'fields.a.aggregate-function': 'last_non_null_value'}, |
| ['fields.a.aggregate-function'], |
| ) |
| |
| def test_partial_update_with_default_aggregate_function_raises(self): |
| self._assert_partial_update_unsupported( |
| 'pu_default_agg', |
| {'fields.default-aggregate-function': 'last_non_null_value'}, |
| ['fields.default-aggregate-function'], |
| ) |
| |
| def test_partial_update_with_ignore_delete_raises(self): |
| self._assert_partial_update_unsupported( |
| 'pu_ignore_delete', |
| {'ignore-delete': 'true'}, |
| ['ignore-delete'], |
| ) |
| |
| def test_partial_update_with_remove_record_on_delete_raises(self): |
| self._assert_partial_update_unsupported( |
| 'pu_rrod', |
| {'partial-update.remove-record-on-delete': 'true'}, |
| ['partial-update.remove-record-on-delete'], |
| ) |
| |
| def test_partial_update_with_remove_record_on_sequence_group_raises(self): |
| self._assert_partial_update_unsupported( |
| 'pu_rrosg', |
| {'partial-update.remove-record-on-sequence-group': 'true'}, |
| ['partial-update.remove-record-on-sequence-group'], |
| ) |
| |
| def test_partial_update_unsupported_options_guard_covers_raw_convertible(self): |
| """The read-side guard at ``TableRead.__init__`` must fire even |
| when the scan would dispatch every split through |
| ``RawFileSplitRead`` (single-snapshot, non-overlapping rows). |
| |
| Before the guard moved to ``TableRead.__init__`` this case |
| silently bypassed validation because raw-convertible splits skip |
| ``MergeFileSplitRead`` entirely -- the read path's |
| ``_build_merge_function`` never ran, so an option like |
| ``partial-update.remove-record-on-delete`` was ignored on read. |
| |
| The shared dispatch now also fires on the write path's first |
| flush (see ``_assert_partial_update_unsupported``), so we skip |
| ``_write`` here: the read-side guard runs at ``new_read()`` |
| construction time regardless of whether data exists. |
| """ |
| table = self._create_pk_table( |
| 'pu_rrod_raw_convertible', |
| extra_options={'partial-update.remove-record-on-delete': 'true'}, |
| ) |
| rb = table.new_read_builder() |
| with self.assertRaises(NotImplementedError) as cm: |
| rb.new_read() |
| msg = str(cm.exception) |
| self.assertIn('partial-update', msg) |
| self.assertIn('partial-update.remove-record-on-delete', msg) |
| |
| def test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self): |
| """Explicitly setting ignore-delete=false is equivalent to leaving |
| it unset and must not trip the guard.""" |
| table = self._create_pk_table( |
| 'pu_ignore_delete_false', |
| extra_options={'ignore-delete': 'false'}, |
| ) |
| self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) |
| self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], |
| ) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |