blob: 5e565619846fb31df15190e74e33e80531bbc5ec [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Direct unit tests for ``FirstRowMergeFunction``.
Drives the merge function with synthetic ``KeyValue`` instances so the
contract is pinned down without going through the full read pipeline.
"""
import unittest
from pypaimon.read.reader.first_row_merge_function import \
FirstRowMergeFunction
from pypaimon.table.row.key_value import KeyValue
from pypaimon.table.row.row_kind import RowKind
def _kv(key, seq, row_kind, value):
kv = KeyValue(key_arity=len(key), value_arity=len(value))
kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
return kv
def _result_value(kv):
return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
def _result_key(kv):
return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
class FirstRowMergeFunctionTest(unittest.TestCase):
def test_single_insert_returns_value(self):
mf = FirstRowMergeFunction()
mf.reset()
mf.add(_kv((1,), 1, RowKind.INSERT, (10, "a")))
result = mf.get_result()
self.assertIsNotNone(result)
self.assertEqual(_result_key(result), (1,))
self.assertEqual(_result_value(result), (10, "a"))
def test_keeps_first_row_not_latest(self):
mf = FirstRowMergeFunction()
mf.reset()
mf.add(_kv((1,), 1, RowKind.INSERT, (10, "first")))
mf.add(_kv((1,), 2, RowKind.INSERT, (20, "second")))
mf.add(_kv((1,), 3, RowKind.UPDATE_AFTER, (30, "third")))
result = mf.get_result()
self.assertEqual(_result_value(result), (10, "first"))
def test_keeps_first_row_when_kv_is_pooled(self):
# The writer's fold (KeyValueDataWriter._merge_pending_by_pk) reuses
# a single KeyValue and replace()s it per row. add() must snapshot
# the first row; otherwise get_result tracks the pooled kv's last
# replace() and returns the LAST row -- silently turning first-row
# into last-row. This is the case the per-row _kv() tests miss.
mf = FirstRowMergeFunction()
mf.reset()
pooled = KeyValue(key_arity=1, value_arity=2)
pooled.replace((1, 1, RowKind.INSERT.value, 10, "first"))
mf.add(pooled)
pooled.replace((1, 2, RowKind.INSERT.value, 20, "second"))
mf.add(pooled)
result = mf.get_result()
self.assertEqual(_result_value(result), (10, "first"))
def test_reset_clears_state(self):
mf = FirstRowMergeFunction()
mf.reset()
mf.add(_kv((1,), 1, RowKind.INSERT, (10,)))
self.assertIsNotNone(mf.get_result())
mf.reset()
self.assertIsNone(mf.get_result())
mf.add(_kv((2,), 2, RowKind.INSERT, (20,)))
result = mf.get_result()
self.assertEqual(_result_key(result), (2,))
self.assertEqual(_result_value(result), (20,))
def test_empty_returns_none(self):
mf = FirstRowMergeFunction()
mf.reset()
self.assertIsNone(mf.get_result())
def test_delete_raises_by_default(self):
mf = FirstRowMergeFunction(ignore_delete=False)
mf.reset()
with self.assertRaises(ValueError):
mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
def test_update_before_raises_by_default(self):
mf = FirstRowMergeFunction(ignore_delete=False)
mf.reset()
with self.assertRaises(ValueError):
mf.add(_kv((1,), 1, RowKind.UPDATE_BEFORE, (10,)))
def test_ignore_delete_skips_retract(self):
mf = FirstRowMergeFunction(ignore_delete=True)
mf.reset()
mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
mf.add(_kv((1,), 2, RowKind.INSERT, (20,)))
result = mf.get_result()
self.assertIsNotNone(result)
self.assertEqual(_result_value(result), (20,))
def test_ignore_delete_skips_update_before(self):
mf = FirstRowMergeFunction(ignore_delete=True)
mf.reset()
mf.add(_kv((1,), 1, RowKind.UPDATE_BEFORE, (10,)))
self.assertIsNone(mf.get_result())
def test_ignore_delete_only_retract_returns_none(self):
mf = FirstRowMergeFunction(ignore_delete=True)
mf.reset()
mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
mf.add(_kv((1,), 2, RowKind.UPDATE_BEFORE, (20,)))
self.assertIsNone(mf.get_result())
def test_update_after_accepted_as_first(self):
mf = FirstRowMergeFunction()
mf.reset()
mf.add(_kv((1,), 1, RowKind.UPDATE_AFTER, (10,)))
result = mf.get_result()
self.assertIsNotNone(result)
self.assertEqual(_result_value(result), (10,))
if __name__ == "__main__":
unittest.main()