Additional CoGBK tests.
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index 5951581..2bc36b6 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -287,19 +287,19 @@
if use_global_window:
pcoll = pcoll | WindowInto(window.GlobalWindows())
- keyed_actual = pcoll | "ToVoidKey" >> Map(lambda v: (None, v))
+ keyed_actual = pcoll | 'ToVoidKey' >> Map(lambda v: (None, v))
keyed_actual.is_bounded = True
# This is a CoGroupByKey so that the matcher always runs, even if the
# PCollection is empty.
plain_actual = ((keyed_singleton, keyed_actual)
- | "Group" >> CoGroupByKey()
- | "Unkey" >> Map(lambda k_values: k_values[1][1]))
+ | 'Group' >> CoGroupByKey()
+ | 'Unkey' >> Map(lambda k_values: k_values[1][1]))
if not use_global_window:
- plain_actual = plain_actual | "AddWindow" >> ParDo(AddWindow())
+ plain_actual = plain_actual | 'AddWindow' >> ParDo(AddWindow())
- plain_actual = plain_actual | "Match" >> Map(matcher)
+ plain_actual = plain_actual | 'Match' >> Map(matcher)
def default_label(self):
return label
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 9ce33f5..ce1901f 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -755,7 +755,7 @@
assert_that(even_length, equal_to(['AA', 'CC']), label='assert:even')
assert_that(odd_length, equal_to(['BBB']), label='assert:odd')
- def test_co_group_by_key_on_list(self):
+ def test_co_group_by_key_on_tuple(self):
with TestPipeline() as pipeline:
pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
('b', 3), ('c', 4)])
@@ -773,8 +773,19 @@
('b', 3), ('c', 4)])
pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
('c', 7), ('c', 8)])
- result = iter([pcoll_1, pcoll_2]) | beam.CoGroupByKey()
- result |= _SortLists
+ result = iter([pcoll_1, pcoll_2]) | beam.CoGroupByKey() | _SortLists
+ assert_that(
+ result,
+ equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+ ('c', ([4], [7, 8]))]))
+
+ def test_co_group_by_key_on_list(self):
+ with TestPipeline() as pipeline:
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+ ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+ ('c', 7), ('c', 8)])
+ result = [pcoll_1, pcoll_2] | beam.CoGroupByKey() | _SortLists
assert_that(
result,
equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
@@ -786,8 +797,7 @@
('b', 3), ('c', 4)])
pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
('c', 7), ('c', 8)])
- result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
- result |= _SortLists
+ result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey() | _SortLists
assert_that(
result,
equal_to([('a', {
@@ -803,8 +813,7 @@
key = ('a', ('b', 'c'))
pcoll_1 = pipeline | 'Start 1' >> beam.Create([(key, 1)])
pcoll_2 = pipeline | 'Start 2' >> beam.Create([(key, 2)])
- result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
- result |= _SortLists
+ result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey() | _SortLists
assert_that(result, equal_to([(key, {'X': [1], 'Y': [2]})]))
def test_co_group_by_key_on_empty(self):
@@ -824,6 +833,43 @@
equal_to([]),
label='AssertEmptyDict')
+ def test_co_group_by_key_on_one(self):
+ with TestPipeline() as pipeline:
+ pcoll = pipeline | beam.Create([('a', 1), ('b', 2)])
+ expected = [('a', ([1], )), ('b', ([2], ))]
+ assert_that((pcoll, ) | 'OneTuple' >> beam.CoGroupByKey(),
+ equal_to(expected),
+ label='AssertOneTuple')
+ assert_that([pcoll] | 'OneList' >> beam.CoGroupByKey(),
+ equal_to(expected),
+ label='AssertOneList')
+ assert_that(
+ iter([pcoll]) | 'OneIterable' >> beam.CoGroupByKey(),
+ equal_to(expected),
+ label='AssertOneIterable')
+ assert_that({'tag': pcoll}
+ | 'OneDict' >> beam.CoGroupByKey()
+ | beam.MapTuple(lambda k, v: (k, (v['tag'], ))),
+ equal_to(expected),
+ label='AssertOneDict')
+
+ def test_co_group_by_key_on_empty(self):
+ with TestPipeline() as pipeline:
+ assert_that(
+ tuple() | 'EmptyTuple' >> beam.CoGroupByKey(pipeline=pipeline),
+ equal_to([]),
+ label='AssertEmptyTuple')
+ assert_that([] | 'EmptyList' >> beam.CoGroupByKey(pipeline=pipeline),
+ equal_to([]),
+ label='AssertEmptyList')
+ assert_that(
+ iter([]) | 'EmptyIterable' >> beam.CoGroupByKey(pipeline=pipeline),
+ equal_to([]),
+ label='AssertEmptyIterable')
+ assert_that({} | 'EmptyDict' >> beam.CoGroupByKey(pipeline=pipeline),
+ equal_to([]),
+ label='AssertEmptyDict')
+
def test_group_by_key_input_must_be_kv_pairs(self):
with self.assertRaises(typehints.TypeCheckError) as e:
with TestPipeline() as pipeline: