| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| """End-to-end tests for the ``sequence.field`` option on the read path. |
| |
| ``sequence.field`` lets the user pick an explicit column (or columns) |
| whose value -- not the file-level sequence number -- decides which record |
| is the "latest" for a primary key. The tricky case is when the |
| write/file order disagrees with the ``sequence.field`` order: a row |
| written *later* (higher file sequence number) carrying a *lower* |
| ``sequence.field`` value must lose to the earlier-written row. The Java |
| merge path applies a ``UserDefinedSeqComparator`` on the value row before |
| falling back to the file sequence number; pypaimon mirrors that via |
| ``builtin_seq_comparator`` wired into ``SortMergeReaderWithMinHeap``. |
| """ |
| |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory, Schema |
| |
| |
| class SequenceFieldReadE2ETest(unittest.TestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) |
| cls.catalog.create_database('default', True) |
| |
| cls.pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('ts', pa.int64()), |
| ('ts2', pa.int64()), |
| ('val', pa.string()), |
| ]) |
| |
| @classmethod |
| def tearDownClass(cls): |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| def _create_pk_table(self, table_name, merge_engine='deduplicate', |
| extra_options=None, partition_keys=None): |
| # bucket=1 forces all rows for a PK into one bucket so the read |
| # goes through SortMergeReader (where sequence ordering matters) |
| # instead of the raw-convertible fast path. |
| options = { |
| 'bucket': '1', |
| 'merge-engine': merge_engine, |
| } |
| if extra_options: |
| options.update(extra_options) |
| schema = Schema.from_pyarrow_schema( |
| self.pa_schema, |
| primary_keys=['id'], |
| partition_keys=partition_keys or [], |
| options=options, |
| ) |
| full = 'default.{}'.format(table_name) |
| self.catalog.create_table(full, schema, False) |
| return self.catalog.get_table(full) |
| |
| def _write(self, table, rows): |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| |
| def _read(self, table, projection=None, predicate=None): |
| rb = table.new_read_builder() |
| if projection is not None: |
| rb = rb.with_projection(projection) |
| if predicate is not None: |
| rb = rb.with_filter(predicate) |
| splits = rb.new_scan().plan().splits() |
| if not splits: |
| return [] |
| return sorted( |
| rb.new_read().to_arrow(splits).to_pylist(), |
| key=lambda r: r['id'], |
| ) |
| |
| # -- basic ordering -------------------------------------------------- |
| |
| def test_later_write_with_lower_sequence_field_loses(self): |
| """The row written second has a higher file sequence number but a |
| lower ``sequence.field`` value, so the earlier (higher-ts) row |
| must win. |
| """ |
| table = self._create_pk_table( |
| 'seq_basic', extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}], |
| ) |
| |
| def test_later_write_with_higher_sequence_field_wins(self): |
| """Sanity check the non-inverted case still works.""" |
| table = self._create_pk_table( |
| 'seq_basic_fwd', extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}], |
| ) |
| |
| # -- multiple sequence fields ---------------------------------------- |
| |
| def test_multi_sequence_field_left_to_right(self): |
| """When the first sequence field ties, the second breaks it.""" |
| table = self._create_pk_table( |
| 'seq_multi', extra_options={'sequence.field': 'ts,ts2'}) |
| # Same ts; ts2 decides. Write the ts2-winner first so file order |
| # disagrees with the sequence-field order. |
| self._write(table, [{'id': 1, 'ts': 10, 'ts2': 99, 'val': 'win'}]) |
| self._write(table, [{'id': 1, 'ts': 10, 'ts2': 1, 'val': 'lose'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 10, 'ts2': 99, 'val': 'win'}], |
| ) |
| |
| # -- sort order ------------------------------------------------------ |
| |
| def test_descending_sort_order_lowest_wins(self): |
| """With descending sort order, the lowest ``sequence.field`` value |
| is considered the latest. |
| """ |
| table = self._create_pk_table( |
| 'seq_desc', |
| extra_options={'sequence.field': 'ts', |
| 'sequence.field.sort-order': 'descending'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}], |
| ) |
| |
| def test_descending_sort_order_null_sequence_sorts_first(self): |
| """Null ordering must stay independent of sort order: Java builds |
| the sequence comparator with ``nullIsLast=false`` and applies |
| descending only to non-null value comparisons, so a null |
| ``sequence.field`` value always sorts first (loses) -- even under |
| descending order. A non-null row must therefore beat a null-seq |
| row regardless of write order. |
| """ |
| table = self._create_pk_table( |
| 'seq_desc_null', |
| extra_options={'sequence.field': 'ts', |
| 'sequence.field.sort-order': 'descending'}) |
| # null-seq row written second (higher file sequence number). With |
| # nulls-first ordering it still loses to the earlier non-null row. |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}]) |
| self._write(table, [{'id': 1, 'ts': None, 'ts2': 0, 'val': 'null'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}], |
| ) |
| |
| def test_ascending_sort_order_null_sequence_sorts_first(self): |
| """Mirror of the descending case under the default ascending order: |
| a null ``sequence.field`` value sorts first (loses) to a non-null |
| row written earlier. |
| """ |
| table = self._create_pk_table( |
| 'seq_asc_null', extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}]) |
| self._write(table, [{'id': 1, 'ts': None, 'ts2': 0, 'val': 'null'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}], |
| ) |
| |
| # -- projection drops the sequence field ----------------------------- |
| |
| def test_projection_dropping_sequence_field(self): |
| """Projecting columns that exclude the sequence field must still |
| return the sequence-field-correct row, and the output schema must |
| contain exactly the requested columns (no leaked ``ts``). |
| """ |
| table = self._create_pk_table( |
| 'seq_proj', extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| |
| rows = self._read(table, projection=['id', 'val']) |
| self.assertEqual(rows, [{'id': 1, 'val': 'high'}]) |
| # No injected sequence column leaks into the output. |
| self.assertEqual(set(rows[0].keys()), {'id', 'val'}) |
| |
| def test_projection_dropping_sequence_field_with_predicate(self): |
| """Projection drops the seq field AND a predicate filters on a |
| kept column -- predicate coordinates must stay correct against the |
| widened (seq-injected) read type. |
| """ |
| table = self._create_pk_table( |
| 'seq_proj_pred', extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'keep'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'drop'}]) |
| self._write(table, [{'id': 2, 'ts': 5, 'ts2': 0, 'val': 'other'}]) |
| |
| rb = table.new_read_builder().with_projection(['id', 'val']) |
| pb = rb.new_predicate_builder() |
| rows = self._read(table, projection=['id', 'val'], |
| predicate=pb.equal('val', 'keep')) |
| self.assertEqual(rows, [{'id': 1, 'val': 'keep'}]) |
| |
| # -- per merge engine ------------------------------------------------ |
| |
| def test_partial_update_respects_sequence_field(self): |
| """partial-update folds non-null fields in sequence-field order, so |
| a later-written but lower-ts row must not overwrite a field set by |
| the higher-ts row. |
| """ |
| table = self._create_pk_table( |
| 'seq_pu', merge_engine='partial-update', |
| extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}], |
| ) |
| |
| def test_first_row_with_sequence_field_rejected(self): |
| """sequence.field on the first-row merge engine is an invalid |
| configuration that Java rejects at schema validation |
| (SchemaValidation.validateSequenceField). pypaimon has no |
| schema-creation validation, so the read-builder guard must reject |
| it rather than silently apply a sequence ordering first-row never |
| honors on write. |
| """ |
| table = self._create_pk_table( |
| 'seq_fr', merge_engine='first-row', |
| extra_options={'sequence.field': 'ts'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| with self.assertRaises(ValueError) as ctx: |
| table.new_read_builder().new_read() |
| self.assertIn('FIRST_ROW', str(ctx.exception)) |
| |
| def test_aggregation_last_value_respects_sequence_field(self): |
| """``last_value`` must pick the value from the highest-sequence-field |
| row, even when that row was written first. |
| """ |
| table = self._create_pk_table( |
| 'seq_agg', merge_engine='aggregation', |
| extra_options={ |
| 'sequence.field': 'ts', |
| 'fields.val.aggregate-function': 'last_value', |
| 'fields.ts2.aggregate-function': 'last_value', |
| }) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}], |
| ) |
| |
| # -- unsupported sub-features still rejected ------------------------- |
| |
| def test_missing_sequence_field_rejected(self): |
| """A sequence.field naming a column absent from the schema is |
| invalid (Java SchemaValidation). The guard must reject it with a |
| clear message before any read execution. |
| """ |
| table = self._create_pk_table( |
| 'seq_missing', extra_options={'sequence.field': 'nope'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| with self.assertRaises(ValueError) as ctx: |
| table.new_read_builder().new_read() |
| self.assertIn('nope', str(ctx.exception)) |
| |
| def test_duplicate_sequence_field_rejected(self): |
| """A sequence.field listing the same column twice is invalid |
| (Java SchemaValidation rejects repeated sequence fields). |
| """ |
| table = self._create_pk_table( |
| 'seq_dup', extra_options={'sequence.field': 'ts,ts'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| with self.assertRaises(ValueError) as ctx: |
| table.new_read_builder().new_read() |
| self.assertIn('ts', str(ctx.exception)) |
| |
| def test_empty_segment_sequence_field_rejected(self): |
| """A malformed ``sequence.field`` with an empty segment (e.g. |
| ``'ts,,ts2'``) leaves an empty field name after trimming -- matching |
| Java ``CoreOptions.sequenceField()``, which trims but does not drop |
| empty segments -- and must be rejected by validation rather than |
| silently accepted as ``['ts', 'ts2']``. |
| """ |
| table = self._create_pk_table( |
| 'seq_empty_seg', extra_options={'sequence.field': 'ts,,ts2'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| with self.assertRaises(ValueError) as ctx: |
| table.new_read_builder().new_read() |
| # The empty field name is the one that can't be found in the schema. |
| self.assertIn('can not be found', str(ctx.exception)) |
| |
| def test_cross_partition_update_with_sequence_field_rejected(self): |
| """sequence.field is invalid under cross-partition update (the PK |
| does not include all partition fields), matching Java |
| SchemaValidation. |
| """ |
| table = self._create_pk_table( |
| 'seq_xpart', extra_options={'sequence.field': 'ts'}, |
| partition_keys=['ts2']) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| with self.assertRaises(ValueError) as ctx: |
| table.new_read_builder().new_read() |
| self.assertIn('cross partition', str(ctx.exception).lower()) |
| |
| def test_aggregate_function_on_sequence_field_rejected(self): |
| """Defining an aggregator on the sequence column is invalid: Java |
| rejects fields.<seq>.aggregate-function outright in |
| SchemaValidation.validateSequenceField. The read-builder guard |
| must reject it rather than silently override the user's |
| aggregator with last_value. |
| """ |
| table = self._create_pk_table( |
| 'seq_agg_on_seq', merge_engine='aggregation', |
| extra_options={'sequence.field': 'ts', |
| 'fields.ts.aggregate-function': 'sum'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| with self.assertRaises(ValueError) as ctx: |
| table.new_read_builder().new_read() |
| self.assertIn('fields.ts.aggregate-function', str(ctx.exception)) |
| |
| def test_sequence_group_still_rejected(self): |
| """Top-level sequence.field is supported, but per-field |
| sequence-group is not -- it must still be rejected. The shared |
| merge-engine dispatch now rejects this combination fail-fast on |
| the write path, so the write (not the read) is what raises. |
| """ |
| table = self._create_pk_table( |
| 'seq_group', merge_engine='partial-update', |
| extra_options={'sequence.field': 'ts', |
| 'fields.ts2.sequence-group': 'val'}) |
| with self.assertRaises(NotImplementedError): |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| |
| def test_nested_sequence_field_rejected(self): |
| """nested-sequence-field is unimplemented and must be rejected |
| rather than silently ignored by the top-level comparator. |
| """ |
| table = self._create_pk_table( |
| 'seq_nested', merge_engine='deduplicate', |
| extra_options={'sequence.field': 'ts', |
| 'fields.val.nested-sequence-field': 'ts2'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}]) |
| with self.assertRaises(NotImplementedError): |
| self._read(table) |
| |
| def test_trailing_comma_sequence_field_tolerated(self): |
| """A trailing comma (``'ts,'``) must be tolerated, matching Java |
| ``String.split(',')`` which drops trailing empty segments. It |
| behaves exactly like ``'ts'`` -- not rejected as an empty field. |
| """ |
| table = self._create_pk_table( |
| 'seq_trailing', extra_options={'sequence.field': 'ts,'}) |
| self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}]) |
| self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}]) |
| |
| self.assertEqual( |
| self._read(table), |
| [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}], |
| ) |
| |
| def test_complex_type_sequence_field_rejected(self): |
| """A complex (non-atomic) sequence field is valid in Java (handled |
| via RecordComparator) but unimplemented in pypaimon's atomic-only |
| comparator. It must be rejected with a clear NotImplementedError |
| rather than failing later with an obscure attribute error. |
| """ |
| pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('seq', pa.list_(pa.int64())), |
| ('val', pa.string()), |
| ]) |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, primary_keys=['id'], |
| options={'bucket': '1', 'merge-engine': 'deduplicate', |
| 'sequence.field': 'seq'}) |
| self.catalog.create_table('default.seq_complex', schema, False) |
| table = self.catalog.get_table('default.seq_complex') |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| w.write_arrow(pa.Table.from_pylist( |
| [{'id': 1, 'seq': [1, 2], 'val': 'x'}], schema=pa_schema)) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| with self.assertRaises(NotImplementedError) as ctx: |
| table.new_read_builder().new_read() |
| self.assertIn('seq', str(ctx.exception)) |
| |
| |
| class SequenceFieldComparabilityUnitTest(unittest.TestCase): |
| """Unit-level coverage of ``is_comparable_seq_field`` -- the predicate |
| behind the read-builder guard. VARIANT in particular is an |
| ``AtomicType`` but has no ordering, so it must be rejected like the |
| complex types rather than slipping through an ``isinstance(AtomicType)`` |
| check. |
| """ |
| |
| def test_variant_sequence_field_not_comparable(self): |
| from pypaimon.read.reader.sort_merge_reader import ( |
| is_comparable_seq_field) |
| from pypaimon.schema.data_types import AtomicType, DataField |
| |
| variant = DataField(0, 'seq', AtomicType('VARIANT')) |
| self.assertFalse(is_comparable_seq_field(variant)) |
| |
| def test_atomic_types_are_comparable(self): |
| from pypaimon.read.reader.sort_merge_reader import ( |
| is_comparable_seq_field) |
| from pypaimon.schema.data_types import AtomicType, DataField |
| |
| for type_str in ('BIGINT', 'INT', 'TIMESTAMP(6)', 'DECIMAL(10, 2)', |
| 'STRING', 'BIGINT NOT NULL'): |
| field = DataField(0, 'seq', AtomicType(type_str)) |
| self.assertTrue(is_comparable_seq_field(field), |
| '{} should be comparable'.format(type_str)) |
| |
| def test_complex_types_not_comparable(self): |
| from pypaimon.read.reader.sort_merge_reader import ( |
| is_comparable_seq_field) |
| from pypaimon.schema.data_types import ( |
| ArrayType, AtomicType, DataField) |
| |
| array = DataField(0, 'seq', ArrayType(True, AtomicType('INT'))) |
| self.assertFalse(is_comparable_seq_field(array)) |
| |
| |
| class SequenceFieldParameterizedTypeTest(unittest.TestCase): |
| """The comparability check must accept parameterized atomic types |
| (TIMESTAMP(p), DECIMAL(p, s), TIME(p)) as sequence fields -- their |
| type string carries ``(...)`` which must not be mistaken for a |
| non-comparable type. |
| """ |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.tempdir = tempfile.mkdtemp() |
| cls.catalog = CatalogFactory.create( |
| {'warehouse': os.path.join(cls.tempdir, 'warehouse')}) |
| cls.catalog.create_database('default', True) |
| |
| @classmethod |
| def tearDownClass(cls): |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| def _run(self, table_name, pa_schema, rows_first, rows_second, expected): |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, primary_keys=['id'], |
| options={'bucket': '1', 'merge-engine': 'deduplicate', |
| 'sequence.field': 'seq'}) |
| full = 'default.{}'.format(table_name) |
| self.catalog.create_table(full, schema, False) |
| table = self.catalog.get_table(full) |
| for batch in (rows_first, rows_second): |
| wb = table.new_batch_write_builder() |
| w = wb.new_write() |
| c = wb.new_commit() |
| try: |
| w.write_arrow(pa.Table.from_pylist(batch, schema=pa_schema)) |
| c.commit(w.prepare_commit()) |
| finally: |
| w.close() |
| c.close() |
| rb = table.new_read_builder() |
| splits = rb.new_scan().plan().splits() |
| rows = rb.new_read().to_arrow(splits).to_pylist() |
| self.assertEqual(rows, expected) |
| |
| def test_timestamp_sequence_field(self): |
| import datetime |
| pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('seq', pa.timestamp('us')), |
| ('val', pa.string()), |
| ]) |
| hi = datetime.datetime(2020, 1, 2) |
| lo = datetime.datetime(2020, 1, 1) |
| # Later write has the lower timestamp -> earlier (higher-ts) wins. |
| self._run('seq_ts', |
| pa_schema, |
| [{'id': 1, 'seq': hi, 'val': 'high'}], |
| [{'id': 1, 'seq': lo, 'val': 'low'}], |
| [{'id': 1, 'seq': hi, 'val': 'high'}]) |
| |
| def test_decimal_sequence_field(self): |
| from decimal import Decimal |
| pa_schema = pa.schema([ |
| pa.field('id', pa.int64(), nullable=False), |
| ('seq', pa.decimal128(10, 2)), |
| ('val', pa.string()), |
| ]) |
| self._run('seq_dec', |
| pa_schema, |
| [{'id': 1, 'seq': Decimal('100.50'), 'val': 'high'}], |
| [{'id': 1, 'seq': Decimal('50.25'), 'val': 'low'}], |
| [{'id': 1, 'seq': Decimal('100.50'), 'val': 'high'}]) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |