Work around for BEAM-8598.
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index f0005c9..9d9284c 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -168,6 +168,7 @@
"""
def __init__(self, coder=coders.FastPrimitivesCoder(), events=()):
+ super(TestStream, self).__init__()
assert coder is not None
self.coder = coder
self.current_watermark = timestamp.MIN_TIMESTAMP
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index c68d085..22ecda3 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -696,7 +696,7 @@
self.assertEqual([], output, msg='Unexpected output: %s' % output)
-class TestStreamTranscriptTest(TranscriptTest):
+class BaseTestStreamTranscriptTest(TranscriptTest):
"""A suite of TestStream-based tests based on trigger transcript entries.
"""
@@ -742,12 +742,40 @@
The key is ignored, but all items must be on the same key to share state.
"""
+ def __init__(self, allow_out_of_order=True):
+ # Some runners don't support cross-stage TestStream semantics.
+ self.allow_out_of_order = allow_out_of_order
+
def process(
- self, element, seen=beam.DoFn.StateParam(
+ self,
+ element,
+ seen=beam.DoFn.StateParam(
beam.transforms.userstate.BagStateSpec(
'seen',
+ beam.coders.FastPrimitivesCoder())),
+ expected=beam.DoFn.StateParam(
+ beam.transforms.userstate.BagStateSpec(
+ 'expected',
beam.coders.FastPrimitivesCoder()))):
_, (action, data) = element
+
+ if self.allow_out_of_order:
+ if action == 'expect' and not list(seen.read()):
+ if data:
+ expected.add(data)
+ return
+ elif action == 'actual' and list(expected.read()):
+ seen.add(data)
+ all_data = list(seen.read())
+ all_expected = list(expected.read())
+ if len(all_data) == len(all_expected[0]):
+ expected.clear()
+ for expect in all_expected[1:]:
+ expected.add(expect)
+ action, data = 'expect', all_expected[0]
+ else:
+ return
+
if action == 'actual':
seen.add(data)
@@ -818,7 +846,17 @@
tagged_outputs = (
outputs | beam.MapTuple(lambda key, value: (key, ('actual', value))))
# pylint: disable=expression-not-assigned
- (tagged_expected, tagged_outputs) | beam.Flatten() | beam.ParDo(Check())
+ ([tagged_expected, tagged_outputs]
+ | beam.Flatten()
+ | beam.ParDo(Check(self.allow_out_of_order)))
+
+
+class TestStreamTranscriptTest(BaseTestStreamTranscriptTest):
+ allow_out_of_order = False
+
+
+class WeakTestStreamTranscriptTest(BaseTestStreamTranscriptTest):
+ allow_out_of_order = True
TRANSCRIPT_TEST_FILE = os.path.join(
@@ -827,6 +865,7 @@
if os.path.exists(TRANSCRIPT_TEST_FILE):
TriggerDriverTranscriptTest._create_tests(TRANSCRIPT_TEST_FILE)
TestStreamTranscriptTest._create_tests(TRANSCRIPT_TEST_FILE)
+ WeakTestStreamTranscriptTest._create_tests(TRANSCRIPT_TEST_FILE)
if __name__ == '__main__':
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 4843c8c..3cb4362 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -92,7 +92,7 @@
&& cd ${pythonRootDir} \\
&& pip install -e .[test] \\
&& python setup.py nosetests \\
- --tests apache_beam.transforms.trigger_test:TestStreamTranscriptTest.test_fixed_default \\
+ --tests apache_beam.transforms.trigger_test:WeakTestStreamTranscriptTest \\
--test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:1.9:job-server:").shadowJar.archivePath}'
"""
}