blob: 58776848b871c0b5f30b474c9d40bdf94b94e820 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""End-to-end tests for the ``first-row`` merge engine.
Each test creates a PK table with ``merge-engine`` set to ``first-row``,
writes one or more batches, and reads back. The first-row engine keeps
only the earliest row per primary key.
"""
import os
import shutil
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
class FirstRowMergeEngineE2ETest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('default', True)
cls.pa_schema = pa.schema([
pa.field('id', pa.int64(), nullable=False),
('a', pa.string()),
('b', pa.string()),
])
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
def _create_pk_table(self, table_name, extra_options=None):
options = {
'bucket': '1',
'merge-engine': 'first-row',
}
if extra_options:
options.update(extra_options)
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['id'],
options=options,
)
full = 'default.{}'.format(table_name)
self.catalog.create_table(full, schema, False)
return self.catalog.get_table(full)
def _write(self, table, rows):
wb = table.new_batch_write_builder()
w = wb.new_write()
c = wb.new_commit()
try:
w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
c.commit(w.prepare_commit())
finally:
w.close()
c.close()
def _read(self, table):
rb = table.new_read_builder()
splits = rb.new_scan().plan().splits()
if not splits:
return []
return sorted(
rb.new_read().to_arrow(splits).to_pylist(),
key=lambda r: r['id'],
)
def test_first_row_keeps_earliest(self):
"""Two writes with the same PK — first-row keeps the first one."""
table = self._create_pk_table('first_row_basic')
self._write(table, [{'id': 1, 'a': 'first', 'b': 'B1'}])
self._write(table, [{'id': 1, 'a': 'second', 'b': 'B2'}])
self.assertEqual(
self._read(table),
[{'id': 1, 'a': 'first', 'b': 'B1'}],
)
def test_first_row_multiple_keys(self):
"""Multiple PKs across two writes — each key keeps its first row."""
table = self._create_pk_table('first_row_multi_key')
self._write(table, [
{'id': 1, 'a': 'A1', 'b': 'B1'},
{'id': 2, 'a': 'A2', 'b': 'B2'},
])
self._write(table, [
{'id': 1, 'a': 'A1-new', 'b': 'B1-new'},
{'id': 3, 'a': 'A3', 'b': 'B3'},
])
self.assertEqual(
self._read(table),
[
{'id': 1, 'a': 'A1', 'b': 'B1'},
{'id': 2, 'a': 'A2', 'b': 'B2'},
{'id': 3, 'a': 'A3', 'b': 'B3'},
],
)
def test_first_row_three_writes(self):
"""Three writes for the same PK — always the first one wins."""
table = self._create_pk_table('first_row_three')
self._write(table, [{'id': 1, 'a': 'first', 'b': None}])
self._write(table, [{'id': 1, 'a': 'second', 'b': 'B'}])
self._write(table, [{'id': 1, 'a': 'third', 'b': 'C'}])
self.assertEqual(
self._read(table),
[{'id': 1, 'a': 'first', 'b': None}],
)
def test_first_row_single_write(self):
"""A single write should read back unchanged."""
table = self._create_pk_table('first_row_single')
self._write(table, [
{'id': 1, 'a': 'A', 'b': 'B'},
{'id': 2, 'a': 'C', 'b': 'D'},
])
self.assertEqual(
self._read(table),
[
{'id': 1, 'a': 'A', 'b': 'B'},
{'id': 2, 'a': 'C', 'b': 'D'},
],
)
def test_first_row_intra_batch_duplicate(self):
"""A single write whose batch already contains duplicate PKs.
The whole batch is folded in one flush, so this exercises the
write-side fold rather than the cross-commit read merge. first-row
must keep the first occurrence of each PK.
"""
table = self._create_pk_table('first_row_intra_batch')
self._write(table, [
{'id': 1, 'a': 'first', 'b': 'B1'},
{'id': 1, 'a': 'second', 'b': 'B2'},
{'id': 1, 'a': 'third', 'b': 'B3'},
{'id': 2, 'a': 'only', 'b': 'B'},
])
self.assertEqual(
self._read(table),
[
{'id': 1, 'a': 'first', 'b': 'B1'},
{'id': 2, 'a': 'only', 'b': 'B'},
],
)
def test_first_row_multiple_writes_one_commit(self):
"""Several write_arrow calls committed once: the same PK across
those writes folds in a single flush. first-row keeps the first.
"""
table = self._create_pk_table('first_row_multi_write_one_commit')
wb = table.new_batch_write_builder()
w = wb.new_write()
c = wb.new_commit()
try:
w.write_arrow(pa.Table.from_pylist(
[{'id': 1, 'a': 'first', 'b': 'B1'}], schema=self.pa_schema))
w.write_arrow(pa.Table.from_pylist(
[{'id': 1, 'a': 'second', 'b': 'B2'}], schema=self.pa_schema))
c.commit(w.prepare_commit())
finally:
w.close()
c.close()
self.assertEqual(
self._read(table),
[{'id': 1, 'a': 'first', 'b': 'B1'}],
)
if __name__ == '__main__':
unittest.main()