Merge pull request #14798 from ihji/BEAM-12324
[BEAM-12324] TranslationsTest.test_run_packable_combine_* failing on PostCommit_Py_VR_Dataflow
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
index e0a4efe..73e8648 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
@@ -236,21 +236,21 @@
# These CombinePerKey stages will be packed if and only if
# translations.pack_combiners is enabled in the TestPipeline runner.
assert_that(
- pcoll | 'mean-perkey' >> combiners.Mean.PerKey(),
- equal_to([('a', 3.4)]),
- label='assert-mean-perkey')
+ pcoll | 'min-perkey' >> core.CombinePerKey(min),
+ equal_to([('a', -1)]),
+ label='assert-min-perkey')
assert_that(
pcoll | 'count-perkey' >> combiners.Count.PerKey(),
equal_to([('a', 10)]),
label='assert-count-perkey')
assert_that(
pcoll
- | 'largest-perkey' >> core.CombinePerKey(combiners.Largest(1)),
- equal_to([('a', [9])]),
+ | 'largest-perkey' >> combiners.Top.LargestPerKey(2),
+ equal_to([('a', [9, 6])]),
label='assert-largest-perkey')
with TestPipeline() as pipeline:
- vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
+ vals = [6, 3, 1, -1, 9, 1, 5, 2, 0, 6]
_ = (
pipeline
| Create([('a', x) for x in vals])
@@ -267,21 +267,21 @@
# translations.eliminate_common_key_with_void and
# translations.pack_combiners are enabled in the TestPipeline runner.
assert_that(
- pcoll | 'mean-globally' >> combiners.Mean.Globally(),
- equal_to([3.4]),
- label='assert-mean-globally')
+ pcoll | 'min-globally' >> core.CombineGlobally(min),
+ equal_to([-1]),
+ label='assert-min-globally')
assert_that(
pcoll | 'count-globally' >> combiners.Count.Globally(),
equal_to([10]),
label='assert-count-globally')
assert_that(
pcoll
- | 'largest-globally' >> core.CombineGlobally(combiners.Largest(1)),
- equal_to([[9]]),
+ | 'largest-globally' >> combiners.Top.Largest(2),
+ equal_to([[9, 6]]),
label='assert-largest-globally')
with TestPipeline() as pipeline:
- vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
+ vals = [6, 3, 1, -1, 9, 1, 5, 2, 0, 6]
_ = pipeline | Create(vals) | 'multiple-combines' >> MultipleCombines()
def test_conditionally_packed_combiners(self):