[BEAM-10121][release-2.22.0] Python RowCoder doesn't support nested structs (#11863)
* Rename TEST_CASE
* Add support for nested structs
diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py
index 0311497..8c41891 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -103,6 +103,8 @@
elif type_info == "array_type":
return IterableCoder(
RowCoder.coder_from_type(field_type.array_type.element_type))
+ elif type_info == "row_type":
+ return RowCoder(field_type.row_type.schema)
# The Java SDK supports several more types, but the coders are not yet
# standard, and are not implemented in Python.
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py
index 0ffd983..8eb7ee2 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -49,9 +49,9 @@
class RowCoderTest(unittest.TestCase):
- TEST_CASE = Person("Jon Snow", 23, None, ["crow", "wildling"])
- TEST_CASES = [
- TEST_CASE,
+ JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"])
+ PEOPLE = [
+ JON_SNOW,
Person("Daenerys Targaryen", 25, "Westeros", ["Mother of Dragons"]),
Person("Michael Bluth", 30, None, [])
]
@@ -60,7 +60,7 @@
expected_coder = RowCoder(typing_to_runner_api(Person).row_type.schema)
real_coder = coders_registry.get_coder(Person)
- for test_case in self.TEST_CASES:
+ for test_case in self.PEOPLE:
self.assertEqual(
expected_coder.encode(test_case), real_coder.encode(test_case))
@@ -90,7 +90,7 @@
])
coder = RowCoder(schema)
- for test_case in self.TEST_CASES:
+ for test_case in self.PEOPLE:
self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
@unittest.skip(
@@ -182,9 +182,17 @@
with TestPipeline() as p:
res = (
p
- | beam.Create(self.TEST_CASES)
+ | beam.Create(self.PEOPLE)
| beam.Filter(lambda person: person.name == "Jon Snow"))
- assert_that(res, equal_to([self.TEST_CASE]))
+ assert_that(res, equal_to([self.JON_SNOW]))
+
+ def test_row_coder_nested_struct(self):
+ Pair = typing.NamedTuple('Pair', [('left', Person), ('right', Person)])
+
+ value = Pair(self.PEOPLE[0], self.PEOPLE[1])
+ coder = RowCoder(typing_to_runner_api(Pair).row_type.schema)
+
+ self.assertEqual(value, coder.decode(coder.encode(value)))
if __name__ == "__main__":