[BEAM-8575] Windows idempotency: Applying the same window fn (or wind… (#10071)
* [BEAM-8575] Windows idempotency: Applying the same window fn (or window fn + GBK) to the
input multiple times will have the same effect as applying it once
* Update sdks/python/apache_beam/transforms/window_test.py
Co-Authored-By: Lukasz Cwik <lcwik@google.com>
* Update sdks/python/apache_beam/transforms/window_test.py
Co-Authored-By: Lukasz Cwik <lcwik@google.com>
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index dda651d..0d5e14f 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -22,6 +22,8 @@
import unittest
from builtins import range
+from nose.plugins.attrib import attr
+
import apache_beam as beam
from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
@@ -281,6 +283,35 @@
assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
label='assert:mean')
+ @attr('ValidatesRunner')
+ def test_window_assignment_idempotency(self):
+ with TestPipeline() as p:
+ pcoll = self.timestamped_key_values(p, 'key', 0, 1, 2, 3, 4)
+ result = (pcoll
+ | 'window' >> WindowInto(FixedWindows(2))
+ | 'same window' >> WindowInto(FixedWindows(2))
+ | 'same window again' >> WindowInto(FixedWindows(2))
+ | GroupByKey())
+
+ assert_that(result, equal_to([('key', [0, 1]),
+ ('key', [2, 3]),
+ ('key', [4])]))
+
+ @attr('ValidatesRunner')
+ def test_window_assignment_through_multiple_gbk_idempotency(self):
+ with TestPipeline() as p:
+ pcoll = self.timestamped_key_values(p, 'key', 0, 1, 2, 3, 4)
+ result = (pcoll
+ | 'window' >> WindowInto(FixedWindows(2))
+ | 'gbk' >> GroupByKey()
+ | 'same window' >> WindowInto(FixedWindows(2))
+ | 'another gbk' >> GroupByKey()
+ | 'same window again' >> WindowInto(FixedWindows(2))
+ | 'gbk again' >> GroupByKey())
+
+ assert_that(result, equal_to([('key', [[[0, 1]]]),
+ ('key', [[[2, 3]]]),
+ ('key', [[[4]]])]))
class RunnerApiTest(unittest.TestCase):