blob: 2927f35b7bb8c27bac8e49ce22281cc476c9bbf2 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Unit tests for the FieldAggregator registry contract.
Drives :func:`register_aggregator` / :func:`create_field_aggregator`
without touching the read pipeline so the wiring is pinned down before
any concrete aggregators land in :mod:`aggregators`.
"""
import unittest
from pypaimon.read.reader.aggregate import (
create_field_aggregator,
register_aggregator,
)
from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
from pypaimon.schema.data_types import AtomicType
class _DummyAgg(FieldAggregator):
"""Minimal concrete subclass used only by these tests."""
def agg(self, accumulator, input_field):
return input_field
class FieldAggregatorRegistryTest(unittest.TestCase):
def test_register_and_create_returns_instance(self):
register_aggregator(
"_dummy_for_registry_test",
lambda field_type, field_name, options: _DummyAgg(
"_dummy_for_registry_test", field_type
),
)
agg = create_field_aggregator(
AtomicType("INT"),
"field0",
"_dummy_for_registry_test",
options=None,
)
self.assertIsInstance(agg, _DummyAgg)
self.assertEqual(agg.name, "_dummy_for_registry_test")
self.assertEqual(agg.field_type, AtomicType("INT"))
def test_re_register_replaces_existing_factory(self):
register_aggregator(
"_dummy_replaceable",
lambda ft, fn, opts: _DummyAgg("first", ft),
)
register_aggregator(
"_dummy_replaceable",
lambda ft, fn, opts: _DummyAgg("second", ft),
)
agg = create_field_aggregator(
AtomicType("BIGINT"), "f", "_dummy_replaceable", options=None
)
self.assertEqual(agg.name, "second")
def test_unknown_identifier_raises_value_error(self):
with self.assertRaises(ValueError) as ctx:
create_field_aggregator(
AtomicType("INT"),
"field0",
"this_aggregator_does_not_exist",
options=None,
)
msg = str(ctx.exception)
self.assertIn("unsupported aggregation", msg)
self.assertIn("this_aggregator_does_not_exist", msg)
def test_default_retract_raises_not_implemented(self):
agg = _DummyAgg("dummy", AtomicType("INT"))
with self.assertRaises(NotImplementedError) as ctx:
agg.retract(1, 2)
self.assertIn("does not support retract", str(ctx.exception))
self.assertIn("dummy", str(ctx.exception))
def test_default_reset_is_noop(self):
# Base-class reset() must not raise so subclasses without
# per-group state can skip overriding it.
agg = _DummyAgg("dummy", AtomicType("INT"))
agg.reset() # no exception expected
if __name__ == '__main__':
unittest.main()