blob: 1a2eee7c782323d32fa6afe322d8b0b4f6a6c96d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Runner Validation Test Suite for Cross-language Transforms
As per Beams's Portability Framework design, Cross-language transforms
should work out of the box. In spite of this, there always exists a
possibility of rough edges existing. It could be caused due to unpolished
implementation of any part of the execution code path, for example:
- Transform expansion [SDK]
- Pipeline construction [SDK]
- Cross-language artifact staging [Runner]
- Language specific serialization/deserialization of PCollection (and
other data types) [Runner/SDK]
In an effort to improve developer visibility into potential problems,
this test suite validates correct execution of 5 Core Beam transforms when
used as cross-language transforms within the Python SDK from any foreign SDK:
- ParDo
- GroupByKey
- CoGroupByKey
- Combine
- Flatten
- Partition
See Runner Validation Test Plan for Cross-language transforms at
for further details.
import logging
import os
import typing
import unittest
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix"
TEST_MULTI_URN = "beam:transforms:xlang:test:multi"
TEST_GBK_URN = "beam:transforms:xlang:test:gbk"
TEST_CGBK_URN = "beam:transforms:xlang:test:cgbk"
TEST_COMGL_URN = "beam:transforms:xlang:test:comgl"
TEST_COMPK_URN = "beam:transforms:xlang:test:compk"
TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten"
TEST_PARTITION_URN = "beam:transforms:xlang:test:partition"
class CrossLanguageTestPipelines(object):
def __init__(self, expansion_service=None):
self.expansion_service = expansion_service or (
'localhost:%s' % os.environ.get('EXPANSION_PORT'))
def run_prefix(self, pipeline):
Target transform - ParDo
Test scenario - Mapping elements from a single input collection to a
single output collection
Boundary conditions checked -
- PCollection<?> to external transforms
- PCollection<?> from external transforms
with pipeline as p:
res = (
| beam.Create(['a', 'b']).with_output_types(str)
| beam.ExternalTransform(
ImplicitSchemaPayloadBuilder({'data': u'0'}),
assert_that(res, equal_to(['0a', '0b']))
def run_multi_input_output_with_sideinput(self, pipeline):
Target transform - ParDo
Test scenario - Mapping elements from multiple input collections (main
and side) to multiple output collections (main and side)
Boundary conditions checked -
- PCollectionTuple to external transforms
- PCollectionTuple from external transforms
with pipeline as p:
main1 = p | 'Main1' >> beam.Create(
['a', 'bb'], reshuffle=False).with_output_types(str)
main2 = p | 'Main2' >> beam.Create(
['x', 'yy', 'zzz'], reshuffle=False).with_output_types(str)
side = p | 'Side' >> beam.Create(['s']).with_output_types(str)
res = dict(
main1=main1, main2=main2, side=side) | beam.ExternalTransform(
TEST_MULTI_URN, None, self.expansion_service)
assert_that(res['main'], equal_to(['as', 'bbs', 'xs', 'yys', 'zzzs']))
assert_that(res['side'], equal_to(['ss']), label='CheckSide')
def run_group_by_key(self, pipeline):
Target transform - GroupByKey
Test scenario - Grouping a collection of KV<K,V> to a collection of
KV<K, Iterable<V>> by key
Boundary conditions checked -
- PCollection<KV<?, ?>> to external transforms
- PCollection<KV<?, Iterable<?>>> from external transforms
with pipeline as p:
res = (
| beam.Create([(0, "1"), (0, "2"),
(1, "3")], reshuffle=False).with_output_types(
typing.Tuple[int, str])
| beam.ExternalTransform(TEST_GBK_URN, None, self.expansion_service)
| beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1])))))
assert_that(res, equal_to(['0:1,2', '1:3']))
def run_cogroup_by_key(self, pipeline):
Target transform - CoGroupByKey
Test scenario - Grouping multiple input collections with keys to a
collection of KV<K, CoGbkResult> by key
Boundary conditions checked -
- KeyedPCollectionTuple<?> to external transforms
- PCollection<KV<?, Iterable<?>>> from external transforms
with pipeline as p:
col1 = p | 'create_col1' >> beam.Create(
[(0, "1"), (0, "2"), (1, "3")], reshuffle=False).with_output_types(
typing.Tuple[int, str])
col2 = p | 'create_col2' >> beam.Create(
[(0, "4"), (1, "5"), (1, "6")], reshuffle=False).with_output_types(
typing.Tuple[int, str])
res = (
dict(col1=col1, col2=col2)
| beam.ExternalTransform(TEST_CGBK_URN, None, self.expansion_service)
| beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1])))))
assert_that(res, equal_to(['0:1,2,4', '1:3,5,6']))
def run_combine_globally(self, pipeline):
Target transform - Combine
Test scenario - Combining elements globally with a predefined simple
Boundary conditions checked -
- PCollection<?> to external transforms
- PCollection<?> from external transforms
with pipeline as p:
res = (
| beam.Create([1, 2, 3]).with_output_types(int)
| beam.ExternalTransform(
TEST_COMGL_URN, None, self.expansion_service))
assert_that(res, equal_to([6]))
def run_combine_per_key(self, pipeline):
Target transform - Combine
Test scenario - Combining elements per key with a predefined simple
merging function
Boundary conditions checked -
- PCollection<?> to external transforms
- PCollection<?> from external transforms
with pipeline as p:
res = (
| beam.Create([('a', 1), ('a', 2),
('b', 3)]).with_output_types(typing.Tuple[str, int])
| beam.ExternalTransform(
TEST_COMPK_URN, None, self.expansion_service))
assert_that(res, equal_to([('a', 3), ('b', 3)]))
def run_flatten(self, pipeline):
Target transform - Flatten
Test scenario - Merging multiple collections into a single collection
Boundary conditions checked -
- PCollectionList<?> to external transforms
- PCollection<?> from external transforms
with pipeline as p:
col1 = p | 'col1' >> beam.Create([1, 2, 3]).with_output_types(int)
col2 = p | 'col2' >> beam.Create([4, 5, 6]).with_output_types(int)
res = ((col1, col2)
| beam.ExternalTransform(
TEST_FLATTEN_URN, None, self.expansion_service))
assert_that(res, equal_to([1, 2, 3, 4, 5, 6]))
def run_partition(self, pipeline):
Target transform - Partition
Test scenario - Splitting a single collection into multiple collections
with a predefined simple PartitionFn
Boundary conditions checked -
- PCollection<?> to external transforms
- PCollectionList<?> from external transforms
with pipeline as p:
res = (
| beam.Create([1, 2, 3, 4, 5, 6]).with_output_types(int)
| beam.ExternalTransform(
TEST_PARTITION_URN, None, self.expansion_service))
assert_that(res['0'], equal_to([2, 4, 6]), label='check_even')
assert_that(res['1'], equal_to([1, 3, 5]), label='check_odd')
"EXPANSION_PORT environment var is not provided.")
class ValidateRunnerXlangTest(unittest.TestCase):
_multiprocess_can_split_ = True
def create_pipeline(self):
test_pipeline = TestPipeline()
test_pipeline.not_use_test_runner_api = True
return test_pipeline
def test_prefix(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_multi_input_output_with_sideinput(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_group_by_key(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_cogroup_by_key(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_combine_globally(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_combine_per_key(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_flatten(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
def test_partition(self, test_pipeline=None):
test_pipeline or self.create_pipeline())
if __name__ == '__main__':