Merge pull request #22007 from [release-2.40.0] Fix FlatMap numpy array bug
[release-2.40.0] Fix FlatMap numpy array bug
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 9d47c2c..66a7bfc 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -1561,7 +1561,8 @@
A value wrapped in a TaggedOutput object will be unwrapped and
then dispatched to the appropriate indexed output.
"""
- results = results or []
+ if results is None:
+ results = []
# TODO(https://github.com/apache/beam/issues/20404): Verify that the
# results object is a valid iterable type if
@@ -1614,7 +1615,9 @@
A value wrapped in a TaggedOutput object will be unwrapped and
then dispatched to the appropriate indexed output.
"""
- results = results or []
+ if results is None:
+ results = []
+
output_element_count = 0
for result in results:
tag, result = self._handle_tagged_output(result)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 2218f8d..fbc36f3 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -454,6 +454,15 @@
ExpectingSideInputsFn(f'Do{k}'),
*[beam.pvalue.AsList(inputs[s]) for s in range(1, k)]))
+ def test_flatmap_numpy_array(self):
+ with self.create_pipeline() as p:
+ pc = (
+ p
+ | beam.Create([np.array(range(10))])
+ | beam.FlatMap(lambda arr: arr))
+
+ assert_that(pc, equal_to([np.int64(i) for i in range(10)]))
+
@unittest.skip('https://github.com/apache/beam/issues/21228')
def test_pardo_side_input_sparse_dependencies(self):
with self.create_pipeline() as p: