blob: 19854f7af6fef55c06e9890fedf81f617d396905 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.schema.table_schema import TableSchema
def _bigint_field(idx: int, name: str) -> DataField:
return DataField(idx, name, AtomicType('BIGINT', nullable=False))
def _string_field(idx: int, name: str) -> DataField:
return DataField(idx, name, AtomicType('STRING'))
class TableSchemaBucketKeysTest(unittest.TestCase):
"""Cover the ``bucket-key`` resolution lifted onto TableSchema.
Mirrors Java ``TableSchema.bucketKeys()`` / ``logicalBucketKeyType()``.
"""
def _schema(self, primary_keys=None, partition_keys=None, options=None):
fields = [
_bigint_field(0, 'id'),
_string_field(1, 'region'),
_bigint_field(2, 'val'),
]
return TableSchema(
id=0,
fields=fields,
partition_keys=partition_keys or [],
primary_keys=primary_keys or [],
options=options or {},
)
def test_explicit_bucket_key_option_returns_those_columns(self):
schema = self._schema(
primary_keys=['id'],
options={'bucket-key': 'region,val'},
)
self.assertEqual(schema.bucket_keys, ['region', 'val'])
fields = schema.logical_bucket_key_fields
self.assertEqual([f.name for f in fields], ['region', 'val'])
def test_no_bucket_key_falls_back_to_trimmed_primary_keys(self):
# PK includes a partition column; trimmed bucket keys drop it.
schema = self._schema(
primary_keys=['region', 'id'],
partition_keys=['region'],
)
self.assertEqual(schema.bucket_keys, ['id'])
self.assertEqual(
[f.name for f in schema.logical_bucket_key_fields], ['id'])
def test_no_bucket_key_no_primary_keys_returns_empty(self):
schema = self._schema()
self.assertEqual(schema.bucket_keys, [])
self.assertEqual(schema.logical_bucket_key_fields, [])
def test_unknown_bucket_key_column_raises(self):
schema = self._schema(options={'bucket-key': 'nope'})
with self.assertRaises(ValueError):
_ = schema.bucket_keys
def test_whitespace_only_option_falls_back(self):
# Whitespace-only ``bucket-key`` mirrors an unset option.
schema = self._schema(
primary_keys=['id'],
options={'bucket-key': ' '},
)
self.assertEqual(schema.bucket_keys, ['id'])
if __name__ == '__main__':
unittest.main()