blob: 9507d056eb954de114207edbfc692339ead24a7e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for best-effort deterministic pickling of sets."""
import enum
import unittest
from apache_beam.internal.set_pickler import sort_if_possible
class A:
def __init__(self, x, y):
self.x = x
self.y = y
class B:
pass
class C:
pass
class SortIfPossibleTest(unittest.TestCase):
def test_order_by_type(self):
a = A(1, 2)
self.assertEqual(
sort_if_possible([123, "abc", True, a]),
# A, bool, int, str
[a, True, 123, "abc"],
)
def test_sorts_ints(self):
self.assertEqual(sort_if_possible({5, 2, 3, 1, 4}), [1, 2, 3, 4, 5])
def test_sorts_booleans(self):
self.assertEqual(sort_if_possible({False, True}), [False, True])
def test_sorts_floats(self):
self.assertEqual(sort_if_possible({-18.0, 3.14, 2.71}), [-18.0, 2.71, 3.14])
def test_sorts_strings(self):
self.assertEqual(
sort_if_possible({"when", "in", "the", "course", "of"}),
["course", "in", "of", "the", "when"],
)
def test_sorts_bytes(self):
self.assertEqual(
sort_if_possible({b"when", b"in", b"the", b"course", b"of"}),
[b"course", b"in", b"of", b"the", b"when"],
)
def test_sorts_bytearrays(self):
f = bytearray
self.assertEqual(
sort_if_possible(
[f(b"when"), f(b"in"), f(b"the"), f(b"course"), f(b"of")]),
[f(b"course"), f(b"in"), f(b"of"), f(b"the"), f(b"when")],
)
def test_sort_tuples_by_length(self):
self.assertEqual(
sort_if_possible({(1, 1, 1), (1, 1), (1, )}), [(1, ), (1, 1),
(1, 1, 1)])
def test_sort_tuples_by_element_values(self):
self.assertEqual(
sort_if_possible({(0, 0), (1, 1), (0, 1), (1, 0)}),
[(0, 0), (0, 1), (1, 0), (1, 1)],
)
def test_sort_nested_tuples(self):
self.assertEqual(
sort_if_possible({(1, (4, )), (1, (1, )), (1, (3, )), (1, (2, ))}),
[(1, (1, )), (1, (2, )), (1, (3, )), (1, (4, ))],
)
def test_sort_lists_by_length(self):
self.assertEqual(
sort_if_possible([[1, 1, 1], [1, 1], [
1,
]]), [[
1,
], [1, 1], [1, 1, 1]])
def test_sort_lists_by_element_values(self):
self.assertEqual(
sort_if_possible([[0, 0], [1, 1], [0, 1], [1, 0]]),
[[0, 0], [0, 1], [1, 0], [1, 1]],
)
def test_sort_frozenset_like_sorted_tuple(self):
self.assertEqual(
sort_if_possible(
{frozenset([1, 2, 3]), frozenset([1]), frozenset([1, 2, 4])}),
[frozenset([1]), frozenset([1, 2, 3]), frozenset([1, 2, 4])],
)
def test_sort_set_like_sorted_tuple(self):
self.assertEqual(
sort_if_possible([set([1, 2, 3]), set([1]), set([1, 2, 4])]),
[set([1]), set([1, 2, 3]), set([1, 2, 4])],
)
def test_order_objects_by_class_name(self):
a = A(1, 2)
b = B()
c = C()
self.assertEqual(sort_if_possible({b, c, a}), [a, b, c])
def test_order_objects_by_number_of_fields(self):
o1 = C()
o2 = C()
setattr(o2, "f1", 1)
o3 = C()
setattr(o3, "f1", 1)
setattr(o3, "f2", 2)
self.assertEqual(sort_if_possible({o3, o2, o1}), [o1, o2, o3])
def test_order_objects_by_field_name(self):
o1 = C()
setattr(o1, "aaa", 1)
o2 = C()
setattr(o2, "bbb", 1)
o3 = C()
setattr(o3, "ccc", 1)
self.assertEqual(sort_if_possible({o3, o2, o1}), [o1, o2, o3])
def test_order_objects_by_field_value(self):
a1_1 = A(1, 1)
a1_2 = A(1, 2)
a2_1 = A(2, 1)
a2_2 = A(2, 2)
self.assertEqual(
sort_if_possible({a2_1, a1_1, a2_2, a1_2}), [a1_1, a1_2, a2_1, a2_2])
def test_cyclic_data(self):
def create_tuple_with_cycles():
o = C()
t = (o, )
setattr(t[0], "t", t)
return t
t1 = create_tuple_with_cycles()
t2 = create_tuple_with_cycles()
t3 = create_tuple_with_cycles()
actual = {hash(t) for t in sort_if_possible({t1, t2, t3})}
expected = {hash(t1), hash(t2), hash(t3)}
self.assertEqual(actual, expected)
def test_order_dict_by_length(self):
self.assertEqual(
sort_if_possible([{
'a': 1, 'b': 2
}, {
'a': 1
}, {
'a': 1, 'b': 2, 'c': 3
}]), [{
'a': 1
}, {
'a': 1, 'b': 2
}, {
'a': 1, 'b': 2, 'c': 3
}])
def test_order_dict_by_key(self):
self.assertEqual(
sort_if_possible([{
'b': 1
}, {
'a': 1
}, {
'c': 1
}]), [{
'a': 1
}, {
'b': 1
}, {
'c': 1
}])
def test_order_dict_by_value(self):
self.assertEqual(
sort_if_possible([{
'a': 2
}, {
'a': 1
}, {
'a': 3
}]), [{
'a': 1
}, {
'a': 2
}, {
'a': 3
}])
def test_dict_keys_do_not_have_lt(self):
self.assertEqual(
sort_if_possible([{(1, 1): 1}, {(1, ): 1}, {(1, 1, 1): 1}]),
[{(1, ): 1}, {(1, 1): 1}, {(1, 1, 1): 1}])
def test_dict_values_do_not_have_lt(self):
self.assertEqual(
sort_if_possible([{
'a': (1, 1)
}, {
'a': (1, )
}, {
'a': (1, 1, 1)
}]), [{
'a': (1, )
}, {
'a': (1, 1)
}, {
'a': (1, 1, 1)
}])
def test_order_enums_by_name(self):
class CardinalDirection(enum.Enum):
NORTH = 1
EAST = 2
SOUTH = 3
WEST = 4
self.assertEqual(
sort_if_possible({
CardinalDirection.NORTH,
CardinalDirection.SOUTH,
CardinalDirection.EAST,
CardinalDirection.WEST,
}),
[
CardinalDirection.EAST,
CardinalDirection.NORTH,
CardinalDirection.SOUTH,
CardinalDirection.WEST,
])
def test_enum_with_many_values(self):
MyEnum = enum.Enum('MyEnum', ' '.join(f'N{i}' for i in range(10000)))
self.assertEqual(
sort_if_possible({
MyEnum.N789,
MyEnum.N123,
MyEnum.N456,
}), [
MyEnum.N123,
MyEnum.N456,
MyEnum.N789,
])
if __name__ == "__main__":
unittest.main()