blob: 8bc19fd08445ce5e10b1b891eff1c4c1669932d2 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from pypaimon.write.row_key_extractor import SimpleHashBucketAssigner
class SimpleHashBucketAssignerTest(unittest.TestCase):
def test_assign(self):
assigner = SimpleHashBucketAssigner(2, 0, 100, -1)
partition = ()
buckets = [assigner.assign(partition, h) for h in range(301)]
for b in buckets[:100]:
self.assertEqual(b, 0)
for b in buckets[100:200]:
self.assertEqual(b, 2)
self.assertEqual(buckets[200], 4)
def test_assign_with_upper_bound(self):
assigner = SimpleHashBucketAssigner(1, 0, 100, 3)
partition = ()
buckets = [assigner.assign(partition, h) for h in range(400)]
for b in buckets[:100]:
self.assertEqual(b, 0)
for b in buckets[100:200]:
self.assertEqual(b, 1)
for b in buckets[200:300]:
self.assertEqual(b, 2)
for b in buckets[300:]:
self.assertIn(b, [0, 1, 2])
def test_assign_with_same_hash(self):
for max_buckets in [-1, 1, 2]:
with self.subTest(max_buckets=max_buckets):
assigner = SimpleHashBucketAssigner(1, 0, 100, max_buckets)
partition = ()
hashes = list(range(100)) + list(range(100))
buckets = [assigner.assign(partition, h) for h in hashes]
for b in buckets[100:]:
self.assertEqual(b, 0)
if __name__ == '__main__':
unittest.main()