blob: 5a3633049d92814311a4717b7c039fa9e4f6455d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the type-hint objects and decorators with Python 3 syntax not
supported by 2.7."""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import print_function
import typing
import unittest
import apache_beam.typehints.typehints as typehints
from apache_beam import Map
from apache_beam import PTransform
from apache_beam.pvalue import PBegin
from apache_beam.pvalue import PCollection
from apache_beam.pvalue import PDone
from apache_beam.transforms.core import DoFn
from apache_beam.typehints import KV
from apache_beam.typehints import Iterable
from apache_beam.typehints.typehints import Any
class TestParDoAnnotations(unittest.TestCase):
def test_with_side_input(self):
class MyDoFn(DoFn):
def process(self, element: float, side_input: str) -> \
Iterable[KV[str, float]]:
pass
th = MyDoFn().get_type_hints()
self.assertEqual(th.input_types, ((float, str), {}))
self.assertEqual(th.output_types, ((KV[str, float], ), {}))
def test_pep484_annotations(self):
class MyDoFn(DoFn):
def process(self, element: int) -> Iterable[str]:
pass
th = MyDoFn().get_type_hints()
self.assertEqual(th.input_types, ((int, ), {}))
self.assertEqual(th.output_types, ((str, ), {}))
class TestPTransformAnnotations(unittest.TestCase):
def test_pep484_annotations(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((int, ), {}))
self.assertEqual(th.output_types, ((str, ), {}))
def test_annotations_without_input_pcollection_wrapper(self):
class MyPTransform(PTransform):
def expand(self, pcoll: int) -> PCollection[str]:
return pcoll | Map(lambda num: str(num))
error_str = (
r'This input type hint will be ignored and not used for '
r'type-checking purposes. Typically, input type hints for a '
r'PTransform are single (or nested) types wrapped by a '
r'PCollection, or PBegin. Got: {} instead.'.format(int))
with self.assertLogs(level='WARN') as log:
MyPTransform().get_type_hints()
self.assertIn(error_str, log.output[0])
def test_annotations_without_output_pcollection_wrapper(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PCollection[int]) -> str:
return pcoll | Map(lambda num: str(num))
error_str = (
r'This output type hint will be ignored and not used for '
r'type-checking purposes. Typically, output type hints for a '
r'PTransform are single (or nested) types wrapped by a '
r'PCollection, PDone, or None. Got: {} instead.'.format(str))
with self.assertLogs(level='WARN') as log:
th = MyPTransform().get_type_hints()
self.assertIn(error_str, log.output[0])
self.assertEqual(th.input_types, ((int, ), {}))
self.assertEqual(th.output_types, None)
def test_annotations_without_input_internal_type(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PCollection) -> PCollection[str]:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, ((str, ), {}))
def test_annotations_without_output_internal_type(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PCollection[int]) -> PCollection:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((int, ), {}))
self.assertEqual(th.output_types, ((Any, ), {}))
def test_annotations_without_any_internal_type(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PCollection) -> PCollection:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, ((Any, ), {}))
def test_annotations_without_input_typehint(self):
class MyPTransform(PTransform):
def expand(self, pcoll) -> PCollection[str]:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, ((str, ), {}))
def test_annotations_without_output_typehint(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PCollection[int]):
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((int, ), {}))
self.assertEqual(th.output_types, ((Any, ), {}))
def test_annotations_without_any_typehints(self):
class MyPTransform(PTransform):
def expand(self, pcoll):
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, None)
self.assertEqual(th.output_types, None)
def test_annotations_with_pbegin(self):
class MyPTransform(PTransform):
def expand(self, pcoll: PBegin):
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, ((Any, ), {}))
def test_annotations_with_pdone(self):
class MyPTransform(PTransform):
def expand(self, pcoll) -> PDone:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, ((Any, ), {}))
def test_annotations_with_none_input(self):
class MyPTransform(PTransform):
def expand(self, pcoll: None) -> PCollection[str]:
return pcoll | Map(lambda num: str(num))
error_str = (
r'This input type hint will be ignored and not used for '
r'type-checking purposes. Typically, input type hints for a '
r'PTransform are single (or nested) types wrapped by a '
r'PCollection, or PBegin. Got: {} instead.'.format(None))
with self.assertLogs(level='WARN') as log:
th = MyPTransform().get_type_hints()
self.assertIn(error_str, log.output[0])
self.assertEqual(th.input_types, None)
self.assertEqual(th.output_types, ((str, ), {}))
def test_annotations_with_none_output(self):
class MyPTransform(PTransform):
def expand(self, pcoll) -> None:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, ((Any, ), {}))
def test_annotations_with_arbitrary_output(self):
class MyPTransform(PTransform):
def expand(self, pcoll) -> str:
return pcoll | Map(lambda num: str(num))
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((Any, ), {}))
self.assertEqual(th.output_types, None)
def test_annotations_with_arbitrary_input_and_output(self):
class MyPTransform(PTransform):
def expand(self, pcoll: int) -> str:
return pcoll | Map(lambda num: str(num))
input_error_str = (
r'This input type hint will be ignored and not used for '
r'type-checking purposes. Typically, input type hints for a '
r'PTransform are single (or nested) types wrapped by a '
r'PCollection, or PBegin. Got: {} instead.'.format(int))
output_error_str = (
r'This output type hint will be ignored and not used for '
r'type-checking purposes. Typically, output type hints for a '
r'PTransform are single (or nested) types wrapped by a '
r'PCollection, PDone, or None. Got: {} instead.'.format(str))
with self.assertLogs(level='WARN') as log:
th = MyPTransform().get_type_hints()
self.assertIn(input_error_str, log.output[0])
self.assertIn(output_error_str, log.output[1])
self.assertEqual(th.input_types, None)
self.assertEqual(th.output_types, None)
def test_typing_module_annotations_are_converted_to_beam_annotations(self):
class MyPTransform(PTransform):
def expand(
self, pcoll: PCollection[typing.Dict[str, str]]
) -> PCollection[typing.Dict[str, str]]:
return pcoll
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
class MyPTransform(PTransform):
def expand(self, pcoll:
PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
-> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
return pcoll
th = MyPTransform().get_type_hints()
self.assertEqual(
th.input_types,
((typehints.Union[int, typehints.Any, typehints.Dict[str,
float]], ), {}))
self.assertEqual(
th.input_types,
((typehints.Union[int, typehints.Any, typehints.Dict[str,
float]], ), {}))
def test_mixed_annotations_are_converted_to_beam_annotations(self):
class MyPTransform(PTransform):
def expand(self, pcoll: typing.Any) -> typehints.Any:
return pcoll
th = MyPTransform().get_type_hints()
self.assertEqual(th.input_types, ((typehints.Any, ), {}))
self.assertEqual(th.input_types, ((typehints.Any, ), {}))
if __name__ == '__main__':
unittest.main()